| #!/usr/bin/python -u |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import fnmatch |
| import logging |
| import os |
| import cPickle as pickle |
| import pipes |
| import re |
| import subprocess |
| import tempfile |
| import threading |
| import time |
| import traceback |
| import unittest |
| import uuid |
| from optparse import OptionParser |
| from StringIO import StringIO |
| |
| import factory_common |
| from autotest_lib.client.cros import factory |
| from autotest_lib.client.cros.factory.event import Event |
| from autotest_lib.client.cros.factory import event_log |
| from autotest_lib.client.cros.factory import TestState |
| from autotest_lib.client.cros.factory import utils |
| |
| |
| # Number of bytes to include from the log of a failed test. |
| ERROR_LOG_TAIL_LENGTH = 8*1024 |
| |
| |
| class PyTestInfo(object): |
| def __init__(self, test_list, path, pytest_name, args, results_path): |
| self.test_list = test_list |
| self.path = path |
| self.pytest_name = pytest_name |
| self.args = args |
| self.results_path = results_path |
| |
| |
| class TestInvocation(object): |
| ''' |
| State for an active test. |
| ''' |
| def __init__(self, goofy, test, on_completion=None): |
| '''Constructor. |
| |
| @param goofy: The controlling Goofy object. |
| @param test: The FactoryTest object to test. |
| @param on_completion: Callback to invoke in the goofy event queue |
| on completion. |
| ''' |
| self.goofy = goofy |
| self.test = test |
| self.thread = threading.Thread(target=self._run, |
| name='TestInvocation-%s' % test.path) |
| self.on_completion = on_completion |
| self.uuid = event_log.TimedUuid() |
| self.env_additions = {'CROS_FACTORY_TEST_PATH': self.test.path, |
| 'CROS_FACTORY_TEST_INVOCATION': self.uuid} |
| self.debug_log_path = None |
| self._lock = threading.Lock() |
| # The following properties are guarded by the lock. |
| self._aborted = False |
| self._completed = False |
| self._process = None |
| |
| def __repr__(self): |
| return 'TestInvocation(_aborted=%s, _completed=%s)' % ( |
| self._aborted, self._completed) |
| |
| def start(self): |
| '''Starts the test thread.''' |
| self.thread.start() |
| |
| def abort_and_join(self): |
| ''' |
| Aborts a test (must be called from the event controller thread). |
| ''' |
| with self._lock: |
| self._aborted = True |
| if self._process: |
| utils.kill_process_tree(self._process, 'autotest') |
| if self.thread: |
| self.thread.join() |
| with self._lock: |
| # Should be set by the thread itself, but just in case... |
| self._completed = True |
| |
| def is_completed(self): |
| ''' |
| Returns true if the test has finished. |
| ''' |
| with self._lock: |
| return self._completed |
| |
| def _invoke_autotest(self): |
| ''' |
| Invokes an autotest test. |
| |
| This method encapsulates all the magic necessary to run a single |
| autotest test using the 'autotest' command-line tool and get a |
| sane pass/fail status and error message out. It may be better |
| to just write our own command-line wrapper for job.run_test |
| instead. |
| |
| @param test: the autotest to run |
| @param dargs: the argument map |
| @return: tuple of status (TestState.PASSED or TestState.FAILED) and |
| error message, if any |
| ''' |
| assert self.test.autotest_name |
| |
| test_tag = '%s_%s' % (self.test.path, self.count) |
| dargs = dict(self.test.dargs) |
| dargs.update({'tag': test_tag, |
| 'test_list_path': self.goofy.options.test_list}) |
| |
| status = TestState.FAILED |
| error_msg = 'Unknown' |
| |
| try: |
| output_dir = '%s/results/%s-%s' % (factory.CLIENT_PATH, |
| self.test.path, |
| self.uuid) |
| self.debug_log_path = os.path.join( |
| output_dir, |
| 'results/default/debug/client.INFO') |
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
| tmp_dir = tempfile.mkdtemp(prefix='tmp', dir=output_dir) |
| |
| control_file = os.path.join(tmp_dir, 'control') |
| result_file = os.path.join(tmp_dir, 'result') |
| args_file = os.path.join(tmp_dir, 'args') |
| |
| with open(args_file, 'w') as f: |
| pickle.dump(dargs, f) |
| |
| # Create a new control file to use to run the test |
| with open(control_file, 'w') as f: |
| print >> f, 'import common, traceback, utils' |
| print >> f, 'import cPickle as pickle' |
| print >> f, ("success = job.run_test(" |
| "'%s', **pickle.load(open('%s')))" % ( |
| self.test.autotest_name, args_file)) |
| |
| print >> f, ( |
| "pickle.dump((success, " |
| "str(job.last_error) if job.last_error else None), " |
| "open('%s', 'w'), protocol=2)" |
| % result_file) |
| |
| args = ['%s/bin/autotest' % factory.CLIENT_PATH, |
| '--output_dir', output_dir, |
| control_file] |
| |
| logging.debug('Test command line: %s', ' '.join( |
| [pipes.quote(arg) for arg in args])) |
| |
| with self._lock: |
| with self.goofy.env.lock: |
| self._process = self.goofy.env.spawn_autotest( |
| self.test.autotest_name, args, self.env_additions, |
| result_file) |
| |
| returncode = self._process.wait() |
| with self._lock: |
| if self._aborted: |
| error_msg = 'Aborted by operator' |
| return |
| |
| if returncode: |
| # Only happens when there is an autotest-level problem (not when |
| # the test actually failed). |
| error_msg = 'autotest returned with code %d' % returncode |
| return |
| |
| with open(result_file) as f: |
| try: |
| success, error_msg = pickle.load(f) |
| except: # pylint: disable=W0702 |
| logging.exception('Unable to retrieve autotest results') |
| error_msg = 'Unable to retrieve autotest results' |
| return |
| |
| if success: |
| status = TestState.PASSED |
| error_msg = '' |
| except Exception: # pylint: disable=W0703 |
| logging.exception('Exception in autotest driver') |
| # Make sure Goofy reports the exception upon destruction |
| # (e.g., for testing) |
| self.goofy.record_exception(traceback.format_exception_only( |
| *sys.exc_info()[:2])) |
| finally: |
| self.clean_autotest_logs(output_dir) |
| return status, error_msg # pylint: disable=W0150 |
| |
| def _invoke_pytest(self): |
| ''' |
| Invokes a pyunittest-based test. |
| ''' |
| assert self.test.pytest_name |
| |
| files_to_delete = [] |
| try: |
| def make_tmp(type): |
| ret = tempfile.mktemp( |
| prefix='%s-%s-' % (self.test.path, type)) |
| files_to_delete.append(ret) |
| return ret |
| |
| info_path = make_tmp('info') |
| results_path = make_tmp('results') |
| |
| log_dir = os.path.join(factory.get_log_root(), |
| 'factory_test_logs') |
| if not os.path.exists(log_dir): |
| os.makedirs(log_dir) |
| log_path = os.path.join(log_dir, |
| '%s.%03d' % (self.test.path, |
| self.count)) |
| |
| with open(info_path, 'w') as info: |
| pickle.dump(PyTestInfo( |
| test_list=self.goofy.options.test_list, |
| path=self.test.path, |
| pytest_name=self.test.pytest_name, |
| args=self.test.dargs, |
| results_path = results_path), |
| info) |
| |
| # Invoke the unittest driver in a separate process. |
| with open(log_path, "w") as log: |
| this_file = os.path.realpath(__file__) |
| this_file = re.sub(r'\.pyc$', '.py', this_file) |
| args = [this_file, '--pytest', info_path] |
| logging.debug('Test command line: %s >& %s', |
| ' '.join([pipes.quote(arg) for arg in args]), |
| log_path) |
| |
| env = dict(os.environ) |
| env.update(self.env_additions) |
| with self._lock: |
| if self._aborted: |
| return TestState.FAILED, 'Aborted before starting' |
| self._process = subprocess.Popen( |
| args, |
| env=env, |
| stdin=open(os.devnull, "w"), |
| stdout=log, |
| stderr=subprocess.STDOUT) |
| self._process.wait() |
| with self._lock: |
| if self._aborted: |
| return TestState.FAILED, 'Aborted by operator' |
| if self._process.returncode: |
| return TestState.FAILED, ( |
| 'Test returned code %d' % pytest.returncode) |
| |
| if not os.path.exists(results_path): |
| return TestState.FAILED, 'pytest did not complete' |
| |
| with open(results_path) as f: |
| return pickle.load(f) |
| except: # pylint: disable=W0702 |
| logging.exception('Unable to retrieve pytest results') |
| return TestState.FAILED, 'Unable to retrieve pytest results' |
| finally: |
| for f in files_to_delete: |
| try: |
| if os.path.exists(f): |
| os.unlink(f) |
| except: |
| logging.exception('Unable to delete temporary file %s', |
| f) |
| |
| def clean_autotest_logs(self, output_dir): |
| globs = self.goofy.test_list.options.preserve_autotest_results |
| if '*' in globs: |
| # Keep everything |
| return |
| |
| deleted_count = 0 |
| preserved_count = 0 |
| for root, dirs, files in os.walk(output_dir, topdown=False): |
| for f in files: |
| if any(fnmatch.fnmatch(f, g) |
| for g in globs): |
| # Keep it |
| preserved_count = 1 |
| else: |
| try: |
| os.unlink(os.path.join(root, f)) |
| deleted_count += 1 |
| except: |
| logging.exception('Unable to remove %s' % |
| os.path.join(root, f)) |
| try: |
| # Try to remove the directory (in case it's empty now) |
| os.rmdir(root) |
| except: |
| # Not empty; that's OK |
| pass |
| logging.info('Preserved %d files matching %s and removed %d', |
| preserved_count, globs, deleted_count) |
| |
| def _run(self): |
| with self._lock: |
| if self._aborted: |
| return |
| |
| self.count = self.test.update_state( |
| status=TestState.ACTIVE, increment_count=1, error_msg='').count |
| |
| factory.console.info('Running test %s' % self.test.path) |
| |
| log_args = dict( |
| path=self.test.path, |
| # Use Python representation for dargs, since some elements |
| # may not be representable in YAML. |
| dargs=repr(self.test.dargs), |
| invocation=self.uuid) |
| if self.test.autotest_name: |
| log_args['autotest_name'] = self.test.autotest_name |
| if self.test.pytest_name: |
| log_args['pytest_name'] = self.test.pytest_name |
| |
| self.goofy.event_log.Log('start_test', **log_args) |
| start_time = time.time() |
| try: |
| if self.test.autotest_name: |
| status, error_msg = self._invoke_autotest() |
| elif self.test.pytest_name: |
| status, error_msg = self._invoke_pytest() |
| else: |
| status = TestState.FAILED |
| error_msg = 'No autotest_name or pytest_name' |
| finally: |
| try: |
| self.goofy.event_client.post_event( |
| Event(Event.Type.DESTROY_TEST, |
| test=self.test.path, |
| invocation=self.uuid)) |
| except: |
| logging.exception('Unable to post END_TEST event') |
| |
| try: |
| # Leave all items in log_args; this duplicates |
| # things but will make it easier to grok the output. |
| log_args.update(dict(status=status, |
| duration=time.time() - start_time)) |
| if error_msg: |
| log_args['error_msg'] = error_msg |
| if (status != TestState.PASSED and |
| self.debug_log_path and |
| os.path.exists(self.debug_log_path)): |
| try: |
| debug_log_size = os.path.getsize(self.debug_log_path) |
| offset = max(0, debug_log_size - ERROR_LOG_TAIL_LENGTH) |
| with open(self.debug_log_path) as f: |
| f.seek(offset) |
| log_args['log_tail'] = f.read() |
| except: |
| logging.exception('Unable to read log tail') |
| self.goofy.event_log.Log('end_test', **log_args) |
| except: |
| logging.exception('Unable to log end_test event') |
| |
| factory.console.info('Test %s %s%s', |
| self.test.path, |
| status, |
| ': %s' % error_msg if error_msg else '') |
| |
| self.test.update_state(status=status, error_msg=error_msg, |
| visible=False) |
| with self._lock: |
| self._completed = True |
| |
| self.goofy.run_queue.put(self.goofy.reap_completed_tests) |
| if self.on_completion: |
| self.goofy.run_queue.put(self.on_completion) |
| |
| |
| def run_pytest(test_info): |
| '''Runs a pytest, saving a pickled (status, error_msg) tuple to the |
| appropriate results file. |
| |
| Args: |
| test_info: A PyTestInfo object containing information about what to |
| run. |
| ''' |
| try: |
| __import__('autotest_lib.client.cros.factory.tests.%s' % |
| test_info.pytest_name) |
| module = getattr(factory.tests, test_info.pytest_name) |
| suite = unittest.TestLoader().loadTestsFromModule(module) |
| |
| # Recursively set |
| def set_test_info(test): |
| if isinstance(test, unittest.TestCase): |
| test.test_info = test_info |
| elif isinstance(test, unittest.TestSuite): |
| for x in test: |
| set_test_info(x) |
| set_test_info(suite) |
| |
| runner = unittest.TextTestRunner() |
| result = runner.run(suite) |
| |
| def format_error_msg(test_name, trace): |
| '''Formats a trace so that the actual error message is in the last |
| line. |
| ''' |
| # The actual error is in the last line. |
| trace, _, error_msg = trace.strip().rpartition('\n') |
| error_msg = error_msg.replace('FactoryTestFailure: ', '') |
| return error_msg + '\n' + trace |
| |
| all_failures = result.failures + result.errors |
| if all_failures: |
| status = TestState.FAILED |
| error_msg = '; '.join(format_error_msg(test_name, trace) |
| for test_name, trace in all_failures) |
| logging.info('pytest failure: %s', error_msg) |
| else: |
| status = TestState.PASSED |
| error_msg = '' |
| except: |
| logging.exception('Unable to run pytest') |
| status = TestState.FAILED |
| error_msg = traceback.format_exc() |
| |
| with open(test_info.results_path, 'w') as results: |
| pickle.dump((status, error_msg), results) |
| |
| def main(): |
| parser = OptionParser() |
| parser.add_option('--pytest', dest='pytest_info', |
| help='Info for pytest to run') |
| (options, args) = parser.parse_args() |
| |
| assert options.pytest_info |
| |
| info = pickle.load(open(options.pytest_info)) |
| factory.init_logging(info.path) |
| run_pytest(info) |
| |
| if __name__ == '__main__': |
| main() |