| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import logging |
| import multiprocessing |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes |
| from autotest_lib.server.cros.network import connection_worker |
| |
| """DUT Control module is used to control all the DUT's in a Clique set. |
| We need to execute a sequence of steps on each DUT in the pool parallely and |
| collect the results from all the executions. |
| |
| Class Hierarchy: |
| ---------------- |
| CliqueDUTControl |
| | |
| ------------------------------------------------------- |
| | | |
| CliqueDUTRole CliqueDUTBatch |
| | | |
| ------------------------------------- --------------------- |
| | | | | |
| DUTRoleConnectDisconnect DUTRoleFileTransfer CliqueDUTSet CliqueDUTPool |
| |
| CliqueDUTControl - Base control class. Stores and retrieves test params used |
| for all control operations. Should never be directly instantiated. |
| |
| CliqueDUTRole - Used to control one single DUT in the test. This is a base class |
| which should be derived to define a role to be performed by the DUT. Should |
| never be directly instantiated. |
| |
| CliqueDUTBatch - Used to control a batch of DUT in the test. It could |
| either be controlling a DUT set or an entire DUT pool. Implements the setup, |
| cleanup and execute functions which spawn off multiple threads to |
| control the execution of each step in the objects controlled. Should |
| never be directly instantiated. |
| |
| CliqueDUTSet - Used to control a set within the DUT pool. It has a number of |
| CliqueDUTRole objects to control. |
| |
| CliqueDUTPool - Used to control the entire DUT pool. It has a number of |
| CliqueDUTSet objects to control. |
| """ |
| |
| |
| # Dummy result error reason to be used when exception is encountered in a role. |
| ROLE_SETUP_EXCEPTION = "Role Setup Exception! " |
| ROLE_EXECUTE_EXCEPTION = "Role Execute Exception! " |
| ROLE_CLEANUP_EXCEPTION = "Role Teardown Exception! " |
| |
| # Dummy result error reason to be used when exception is encountered in a role. |
| POOL_SETUP_EXCEPTION = "Pool Setup Exception! " |
| POOL_CLEANUP_EXCEPTION = "Pool Teardown Exception! " |
| |
| # Result to returned after execution a sequence of steps. |
| ControlResult = collections.namedtuple( |
| 'ControlResult', [ 'uid', 'run_num', 'success', |
| 'error_reason', 'start_time', 'end_time' ]) |
| |
| class CliqueDUTUnknownParamError(error.TestError): |
| """Indicates an error in finding a required param from the |test_params|.""" |
| pass |
| |
| |
| class CliqueControl(object): |
| """CliqueControl is a base class which is used to control the DUT's in the |
| test. Not to be directly instantiated. |
| """ |
| |
| def __init__(self, dut_objs, assoc_params=None, conn_worker=None, |
| test_params=None, uid=""): |
| """Initialize. |
| |
| @param dut_objs: A list of objects that is being controlled by this |
| control object. |
| @param assoc_params: Association paramters to be used for this control |
| object. |
| @param conn_worker: ConnectionWorkerAbstract object, to run extra |
| work after successful connection. |
| @param test_params: A dictionary of params to be used for executing the |
| test. |
| @param uid: UID of this instance of the object. Host name for DUTRole |
| objects, Instance name for DUTBatch objects. |
| """ |
| self._dut_objs = dut_objs |
| self._test_params = test_params |
| self._assoc_params = assoc_params |
| self._conn_worker = conn_worker |
| self._uid = uid |
| |
| def find_param(self, param_key): |
| """Find the relevant param value for a role from internal dictionary. |
| |
| @param param_key: Look for the value of param_key in the dict. |
| |
| @raises CliqueDUTUnknownParamError if there is an error in lookup. |
| """ |
| if not self._test_params.has_key(param_key): |
| raise CliqueDUTUnknownParamError("Param %s not found in %s" % |
| (param_key, self._test_params)) |
| return self._test_params.get(param_key) |
| |
| @property |
| def dut_objs(self): |
| """Returns the dut_objs controlled by the object.""" |
| return self._dut_objs |
| |
| @property |
| def dut_obj(self): |
| """Returns the first dut_obj controlled by the object.""" |
| return self._dut_objs[0] |
| |
| @property |
| def uid(self): |
| """Returns a unique identifier associated with this object. It could |
| be just the hostname of the DUT in DUTRole objects or |
| set-number/pool-number in DUTSet DUTPool objects. |
| """ |
| return self._uid |
| |
| @property |
| def assoc_params(self): |
| """Returns the association params corresponding to the object.""" |
| return self._assoc_params |
| |
| @property |
| def conn_worker(self): |
| """Returns the connection worker corresponding to the object.""" |
| return self._conn_worker |
| |
| |
| def setup(self, run_num): |
| """Setup the DUT/DUT-set in the correct state before the sequence of |
| actions to be taken for the role is executed. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| """ |
| pass |
| |
| def cleanup(self, run_num): |
| """Cleanup the DUT/DUT-set state after the sequence of actions to be |
| taken for the role is executed. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| """ |
| pass |
| |
| def execute(self, run_num): |
| """Execute the sequence of actions to be taken for the role on the DUT |
| /DUT-set. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| |
| """ |
| pass |
| |
| |
| class CliqueDUTRole(CliqueControl): |
| """CliqueDUTRole is a base class which defines the role entrusted to each |
| DUT in the Clique Test. Not to be directly instantiated. |
| """ |
| |
| def __init__(self, dut, assoc_params=None, conn_worker=None, |
| test_params=None): |
| """Initialize. |
| |
| @param dut: A DUTObject representing a DUT in the set. |
| @param assoc_params: Association paramters to be used for this role. |
| @param conn_worker: ConnectionWorkerAbstract object, to run extra |
| work after successful connection. |
| @param test_params: A dictionary of params to be used for executing the |
| test. |
| """ |
| super(CliqueDUTRole, self).__init__( |
| dut_objs=[dut], assoc_params=assoc_params, |
| conn_worker=conn_worker, test_params=test_params, |
| uid=dut.host.hostname) |
| |
| def setup(self, run_num): |
| try: |
| assoc_params = self.assoc_params |
| self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid) |
| if not self.dut_obj.wifi_client.shill.init_test_network_state(): |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason="Failed to set up isolated " |
| "test context profile.", |
| start_time="", |
| end_time="") |
| return result |
| else: |
| return None |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=ROLE_SETUP_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| return result |
| |
| def cleanup(self, run_num): |
| try: |
| self.dut_obj.wifi_client.shill.clean_profiles() |
| return None |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=ROLE_CLEANUP_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| return result |
| |
| def _connect_wifi(self, run_num): |
| """Helper function to make a connection to the associated AP.""" |
| assoc_params = self.assoc_params |
| logging.info('Connection attempt %d', run_num) |
| self.dut_obj.host.syslog('Connection attempt %d' % run_num) |
| start_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout |
| start_time = start_time.strip() |
| assoc_result = xmlrpc_datatypes.deserialize( |
| self.dut_obj.wifi_client.shill.connect_wifi(assoc_params)) |
| end_time = self.dut_obj.host.run("date '+%FT%T.%N%:z'").stdout |
| end_time = end_time.strip() |
| success = assoc_result.success |
| if not success: |
| logging.error('Connection attempt %d failed; reason: %s', |
| run_num, assoc_result.failure_reason) |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=success, |
| error_reason=assoc_result.failure_reason, |
| start_time=start_time, |
| end_time=end_time) |
| return result |
| else: |
| logging.info('Connection attempt %d passed', run_num) |
| return None |
| |
| def _disconnect_wifi(self): |
| """Helper function to disconnect from the associated AP.""" |
| assoc_params = self.assoc_params |
| self.dut_obj.wifi_client.shill.disconnect(assoc_params.ssid) |
| |
| |
| # todo(rpius): Move these role implementations to a separate file since we'll |
| # end up having a lot of roles defined. |
| class DUTRoleConnectDisconnect(CliqueDUTRole): |
| """DUTRoleConnectDisconnect is used to make a DUT connect and disconnect |
| to a given AP repeatedly. |
| """ |
| |
| def execute(self, run_num): |
| try: |
| result = self._connect_wifi(run_num) |
| if result: |
| return result |
| |
| # Now disconnect from the AP. |
| self._disconnect_wifi() |
| |
| return None |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=ROLE_EXECUTE_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| return result |
| |
| |
| class DUTRoleConnectDuration(CliqueDUTRole): |
| """DUTRoleConnectDuration is used to make a DUT connect to a given AP and |
| then check the liveness of the connection from another worker device. |
| """ |
| |
| def setup(self, run_num): |
| result = super(DUTRoleConnectDuration, self).setup(run_num) |
| if result: |
| return result |
| # Let's check for the worker client now. |
| if not self.conn_worker: |
| return ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason="No connection worker found", |
| start_time="", |
| end_time="") |
| |
| def execute(self, run_num): |
| try: |
| result = self._connect_wifi(run_num) |
| if result: |
| return result |
| |
| # Let's start the ping from the worker client. |
| worker = connection_worker.ConnectionDuration.create_from_parent( |
| self.conn_worker) |
| worker.run(self.dut_obj.wifi_client) |
| |
| return None |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=ROLE_EXECUTE_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| return result |
| |
| |
| def dut_batch_worker(dut_control_obj, method, error_results_queue, run_num): |
| """The method called by multiprocessing worker pool for running the DUT |
| control object's setup/execute/cleanup methods. This function is the |
| function which is repeatedly scheduled for each DUT/DUT-set through the |
| multiprocessing worker. This has to be defined outside the class because it |
| needs to be pickleable. |
| |
| @param dut_control_obj: An object corresponding to DUT/DUT-set to control. |
| @param method: Method name to be invoked on the dut_control_obj. |
| it has to be one of setup/execute/teardown. |
| @param error_results_queue: Queue to put the error results after test. |
| @param run_num: Run number of this execution. |
| """ |
| logging.info("%s: Running %s", dut_control_obj.uid, method) |
| run_method = getattr(dut_control_obj, method, None) |
| if callable(run_method): |
| result = run_method(run_num) |
| if result: |
| error_results_queue.put(result) |
| |
| |
| class CliqueDUTBatch(CliqueControl): |
| """CliqueDUTBatch is a base class which is used to control a batch of DUTs. |
| This could either be a DUT set or the entire DUT pool. Not to be directly |
| instantiated. |
| """ |
| # Used to store the instance number of derived classes. |
| BATCH_UID_NUM = {} |
| |
| def __init__(self, dut_objs, test_params=None): |
| """Initialize. |
| |
| @param dut_objs: A list of DUTRole objects representing the DUTs in set. |
| @param test_params: A dictionary of params to be used for executing the |
| test. |
| """ |
| uid_num = self.BATCH_UID_NUM.get(self.__class__.__name__, 1) |
| uid = self.__class__.__name__ + str(uid_num) |
| self.BATCH_UID_NUM[self.__class__.__name__] = uid_num + 1 |
| super(CliqueDUTBatch, self).__init__( |
| dut_objs=dut_objs, test_params=test_params, uid=uid) |
| |
| def _spawn_worker_threads(self, method, run_num): |
| """Spawns multiple threads to run the the |method(run_num)| on all the |
| control objects in parallel. |
| |
| @param method: Method to be invoked on the dut_objs. |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| """ |
| tasks = [] |
| error_results_queue = multiprocessing.Queue() |
| for dut_obj in self.dut_objs: |
| task = multiprocessing.Process( |
| target=dut_batch_worker, |
| args=(dut_obj, method, error_results_queue, run_num)) |
| tasks.append(task) |
| # Run the tasks in parallel. |
| for task in tasks: |
| task.start() |
| for task in tasks: |
| task.join() |
| error_results = [] |
| while not error_results_queue.empty(): |
| result = error_results_queue.get() |
| # error_results returned at the DUT set level will be a list of |
| # ControlResult objects from each of the DUTs in the set. |
| # error_results returned at the DUT pool level will be a list of |
| # lists from each DUT set. Let's flatten out the list in that case |
| # since there could be ControlResult objects that are generated at |
| # the pool or set level which will make the final error result list |
| # assymetric where some elements are lists of ControlResult objects |
| # and some are just ControlResult objects. |
| if isinstance(result, list): |
| error_results.extend(result) |
| else: |
| error_results.append(result) |
| return error_results |
| |
| def setup(self, run_num): |
| """Setup the DUT-set/pool in the correct state before the sequence of |
| actions to be taken for the role is executed. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| """ |
| return self._spawn_worker_threads("setup", run_num) |
| |
| def cleanup(self, run_num): |
| """Cleanup the DUT-set/pool state after the sequence of actions to be |
| taken for the role is executed. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| """ |
| return self._spawn_worker_threads("cleanup", run_num) |
| |
| def execute(self, run_num): |
| """Execute the sequence of actions to be taken for the role on the |
| DUT-set/pool. |
| |
| @param run_num: Run number of this execution. |
| |
| @returns: An instance of ControlResult corresponding to all the errors |
| that were returned by the DUT/DUT's in the DUT-set which |
| is being controlled. |
| |
| """ |
| return self._spawn_worker_threads("execute", run_num) |
| |
| |
| class CliqueDUTSet(CliqueDUTBatch): |
| """CliqueDUTSet is an object which is used to control all the DUT's in a DUT |
| set. |
| """ |
| def setup(self, run_num): |
| # Placeholder to add any set specific actions. |
| return super(CliqueDUTSet, self).setup(run_num) |
| |
| def cleanup(self, run_num): |
| # Placeholder to add any set specific actions. |
| return super(CliqueDUTSet, self).cleanup(run_num) |
| |
| def execute(self, run_num): |
| # Placeholder to add any set specific actions. |
| return super(CliqueDUTSet, self).execute(run_num) |
| |
| |
| class CliqueDUTPool(CliqueDUTBatch): |
| """CliqueDUTSet is an object which is used to control all the DUT-sets in a |
| DUT pool. |
| """ |
| |
| def setup(self, run_num): |
| # Let's start the packet capture before we kick off the entire pool |
| # execution. |
| try: |
| capturer = self.find_param('capturer') |
| capturer_frequency = self.find_param('capturer_frequency') |
| capturer_ht_type = self.find_param('capturer_ht_type') |
| capturer.start_capture(capturer_frequency, |
| width_type=capturer_ht_type) |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=POOL_SETUP_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| # We cannot proceed with the test if this failed. |
| return result |
| # Now execute the setup on all the DUT-sets. |
| return super(CliqueDUTPool, self).setup(run_num) |
| |
| def cleanup(self, run_num): |
| # First execute the cleanup on all the DUT-sets. |
| results = super(CliqueDUTPool, self).cleanup(run_num) |
| # Now stop the packet capture. |
| try: |
| capturer = self.find_param('capturer') |
| filename = str('connect_try_%d.trc' % (run_num)), |
| capturer.stop_capture(save_dir=self.outputdir, |
| save_filename=filename) |
| except Exception as e: |
| result = ControlResult(uid=self.uid, |
| run_num=run_num, |
| success=False, |
| error_reason=POOL_CLEANUP_EXCEPTION + str(e), |
| start_time="", |
| end_time="") |
| if results: |
| results.append(result) |
| else: |
| results = result |
| return results |
| |
| def execute(self, run_num): |
| # Placeholder to add any pool specific actions. |
| return super(CliqueDUTPool, self).execute(run_num) |
| |
| |
| def execute_dut_pool(dut_pool, dut_role_classes, assoc_params_list, |
| conn_workers, test_params, num_runs=1): |
| |
| """Controls the DUT's in a given test scenario. The DUT's are assigned a |
| role according to the dut_role_classes provided for each DUT-set and all of |
| the sequence of steps are executed parallely on all the DUT's in the pool. |
| |
| @param dut_pool: 2D list of DUT objects corresponding to the DUT's in the |
| DUT pool. |
| @param dut_role_classes: List of roles to be assigned to each set in the DUT |
| pool. Each element has to be a derived class of |
| CliqueDUTRole. |
| @param assoc_params_list: List of association parameters corrresponding |
| to the AP to test against for each set in the |
| DUT. |
| @param conn_workers: List of ConnectionWorkerAbstract objects, to |
| run extra work after successful connection. |
| @param test_params: List of params to be used for the test. |
| @num_runs: Number of iterations of the test to be run. |
| """ |
| # Every DUT set in the pool needs to have a corresponding DUT role, |
| # association parameters and connection worker assigned from the test. |
| # It is the responsibilty of the test scenario to make sure that there is a |
| # one to one mapping of all these elements since DUT control is going to |
| # be generic. |
| # This might mean that the test needs to duplicate the association |
| # parameters in the list if there is only 1 AP and 2 DUT sets. |
| # Or if there is no connection worker required, then the test should create |
| # a list of 'None' objects with length of 2. |
| # DUT control does not care if the same AP is used for 2 DUT sets or if the |
| # same connection worker is shared across 2 DUT sets as long as the |
| # length of the lists are equal. |
| |
| if ((len(dut_pool) != len(dut_role_classes)) or |
| (len(dut_pool) != len(assoc_params_list)) or |
| (len(dut_pool) != len(conn_workers))): |
| raise error.TestError("Incorrect test configuration. Num DUT sets: %d, " |
| "Num DUT roles: %d, Num association params: %d, " |
| "Num connection workers: %d" % |
| (len(dut_pool), len(dut_role_classes), |
| len(assoc_params_list), len(conn_workers))) |
| |
| dut_set_control_objs = [] |
| for dut_set, dut_role_class, assoc_params, conn_worker in \ |
| zip(dut_pool, dut_role_classes, assoc_params_list, conn_workers): |
| dut_control_objs = [] |
| for dut in dut_set: |
| dut_control_obj = dut_role_class( |
| dut, assoc_params, conn_worker, test_params) |
| dut_control_objs.append(dut_control_obj) |
| dut_set_control_obj = CliqueDUTSet(dut_control_objs, test_params) |
| dut_set_control_objs.append(dut_set_control_obj) |
| dut_pool_control_obj = CliqueDUTPool(dut_set_control_objs, test_params) |
| |
| for run_num in range(0, num_runs): |
| # This setup, execute, cleanup calls on pool object, results in parallel |
| # invocation of call on all the DUT-sets which in turn results in |
| # parallel invocation of call on all the DUTs. |
| error_results = dut_pool_control_obj.setup(run_num) |
| if error_results: |
| return error_results |
| |
| error_results = dut_pool_control_obj.execute(run_num) |
| if error_results: |
| # Try to cleanup before we leave. |
| dut_pool_control_obj.cleanup(run_num) |
| return error_results |
| |
| error_results = dut_pool_control_obj.cleanup(run_num) |
| if error_results: |
| return error_results |
| return None |