30 name =
'HostTestPluginPowerCycleResetMethod'
33 capabilities = [
'power_cycle']
34 required_parameters = [
'target_id',
'device_info']
39 HostTestPluginBase.__init__(self)
41 def setup(self, *args, **kwargs):
42 """! Configure plugin, this function should be called before plugin execute() method is used.
46 def execute(self, capability, *args, **kwargs):
47 """! Executes capability by name
49 @param capability Capability name
50 @param args Additional arguments
51 @param kwargs Additional arguments
52 @details Each capability e.g. may directly just call some command line program or execute building pythonic function
53 @return Capability call return value
55 if 'target_id' not in kwargs
or not kwargs[
'target_id']:
59 if 'device_info' not in kwargs
or type(kwargs[
'device_info'])
is not dict:
60 self.
print_plugin_error(
"Error: This plugin requires dict parameter 'device_info' passed by the caller.")
65 if capability
in HostTestPluginPowerCycleResetMethod.capabilities:
66 target_id = kwargs[
'target_id']
67 device_info = kwargs[
'device_info']
71 result = self.
__hw_reset(ip, port, target_id, device_info)
74 def __get_mbed_tas_rm_addr(self):
76 Get IP and Port of mbed tas rm service.
80 ip = os.environ[
'MBED_TAS_RM_IP']
81 port = os.environ[
'MBED_TAS_RM_PORT']
84 self.
print_plugin_error(
"HOST: Failed to read environment variable (" + str(e) +
"). Can't perform hardware reset.")
88 def __hw_reset(self, ip, port, target_id, device_info):
90 Reset target device using TAS RM API
100 "name":
"switchResource",
103 "resource_type":
"mbed_platform",
104 "resource_id": target_id,
105 "switch_command":
"OFF"
112 "name":
"switchResource",
115 "resource_type":
"mbed_platform",
116 "resource_id": target_id,
117 "switch_command":
"ON"
125 switch_off_req = self.
__run_request(ip, port, switch_off_req)
126 if switch_off_req
is None:
130 if "error" in switch_off_req[
'sub_requests'][0]:
131 self.
print_plugin_error(
"HOST: Failed to reset target. error = %s" % switch_off_req[
'sub_requests'][0][
'error'])
134 def poll_state(required_state):
136 "name":
"switchResource",
139 "resource_type":
"mbed_platform",
140 "resource_id": target_id,
141 "switch_command":
"STATE"
147 while resp
and (resp[
'sub_requests'][0][
'state'] != required_state
or (required_state ==
'ON' and
148 resp[
'sub_requests'][0][
"mount_point"] ==
"Not Connected"))
and (time.time() - start) < 300:
156 resp = poll_state(
"ON")
157 if resp
and resp[
'sub_requests'][0][
'state'] ==
'ON' and resp[
'sub_requests'][0][
"mount_point"] !=
"Not Connected":
158 for k, v
in resp[
'sub_requests'][0].viewitems():
167 def __run_request(ip, port, request):
173 headers = {
'Content-type':
'application/json',
'Accept':
'text/plain'}
174 get_resp = requests.get(
"http://%s:%s/" % (ip, port), data=json.dumps(request), headers=headers)
175 resp = get_resp.json()
176 if get_resp.status_code == 200: