| #!/usr/bin/env python |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| ''' |
| This module provides both client and server side of a XML RPC based server which |
| can be used to handle factory test states (status) and shared persistent data. |
| ''' |
| |
| |
| import logging |
| import mimetypes |
| import os |
| import shelve |
| import shutil |
| import SocketServer |
| import sys |
| import threading |
| import time |
| |
| from hashlib import sha1 |
| |
| import factory_common |
| |
| from jsonrpclib import jsonclass |
| from jsonrpclib import jsonrpc |
| from jsonrpclib import SimpleJSONRPCServer |
| from autotest_lib.client.cros import factory |
| from autotest_lib.client.cros.factory import TestState |
| from autotest_lib.client.cros.factory import unicode_to_string |
| |
| DEFAULT_FACTORY_STATE_PORT = 0x0FAC |
| DEFAULT_FACTORY_STATE_ADDRESS = 'localhost' |
| DEFAULT_FACTORY_STATE_BIND_ADDRESS = 'localhost' |
| DEFAULT_FACTORY_STATE_FILE_PATH = factory.get_state_root() |
| |
| |
| def _synchronized(f): |
| ''' |
| Decorates a function to grab a lock. |
| ''' |
| def wrapped(self, *args, **kw): |
| with self._lock: # pylint: disable=W0212 |
| return f(self, *args, **kw) |
| return wrapped |
| |
| |
| def clear_state(state_file_path=None): |
| '''Clears test state (removes the state file path). |
| |
| Args: |
| state_file_path: Path to state; uses the default path if None. |
| ''' |
| state_file_path = state_file_path or DEFAULT_FACTORY_STATE_FILE_PATH |
| logging.warn('Clearing state file path %s' % state_file_path) |
| if os.path.exists(state_file_path): |
| shutil.rmtree(state_file_path) |
| |
| |
| class TestHistoryItem(object): |
| def __init__(self, path, state, log, trace=None): |
| self.path = path |
| self.state = state |
| self.log = log |
| self.trace = trace |
| self.time = time.time() |
| |
| |
| @unicode_to_string.UnicodeToStringClass |
| class FactoryState(object): |
| ''' |
| The core implementation for factory state control. |
| The major provided features are: |
| |
| SHARED DATA |
| You can get/set simple data into the states and share between all tests. |
| See get_shared_data(name) and set_shared_data(name, value) for more |
| information. |
| |
| TEST STATUS |
| To track the execution status of factory auto tests, you can use |
| get_test_state, get_test_states methods, and update_test_state |
| methods. |
| |
| All arguments may be provided either as strings, or as Unicode strings in |
| which case they are converted to strings using UTF-8. All returned values |
| are strings (not Unicode). |
| |
| This object is thread-safe. |
| |
| See help(FactoryState.[methodname]) for more information. |
| ''' |
| |
| def __init__(self, state_file_path=None): |
| ''' |
| Initializes the state server. |
| |
| Parameters: |
| state_file_path: External file to store the state information. |
| ''' |
| state_file_path = state_file_path or DEFAULT_FACTORY_STATE_FILE_PATH |
| if not os.path.exists(state_file_path): |
| os.makedirs(state_file_path) |
| self._tests_shelf = shelve.open(state_file_path + '/tests') |
| self._data_shelf = shelve.open(state_file_path + '/data') |
| self._test_history_shelf = shelve.open(state_file_path + |
| '/test_history') |
| self._lock = threading.RLock() |
| self.test_list_struct = None |
| |
| if TestState not in jsonclass.supported_types: |
| jsonclass.supported_types.append(TestState) |
| |
| @_synchronized |
| def close(self): |
| ''' |
| Shuts down the state instance. |
| ''' |
| for shelf in [self._tests_shelf, |
| self._data_shelf, |
| self._test_history_shelf]: |
| try: |
| shelf.close() |
| except: |
| logging.exception('Unable to close shelf') |
| |
| @_synchronized |
| def update_test_state(self, path, **kw): |
| ''' |
| Updates the state of a test. |
| |
| See TestState.update for the allowable keyword arguments. |
| |
| @param path: The path to the test (see FactoryTest for a description |
| of test paths). |
| @param kw: See TestState.update for allowable arguments (e.g., |
| status and increment_count). |
| |
| @return: A tuple containing the new state, and a boolean indicating |
| whether the state was just changed. |
| ''' |
| state = self._tests_shelf.get(path) |
| old_state_repr = repr(state) |
| changed = False |
| |
| if not state: |
| changed = True |
| state = TestState() |
| |
| changed = changed | state.update(**kw) # Don't short-circuit |
| |
| if changed: |
| logging.debug('Updating test state for %s: %s -> %s', |
| path, old_state_repr, state) |
| self._tests_shelf[path] = state |
| self._tests_shelf.sync() |
| |
| return state, changed |
| |
| @_synchronized |
| def get_test_state(self, path): |
| ''' |
| Returns the state of a test. |
| ''' |
| return self._tests_shelf[path] |
| |
| @_synchronized |
| def get_test_paths(self): |
| ''' |
| Returns a list of all tests' paths. |
| ''' |
| return self._tests_shelf.keys() |
| |
| @_synchronized |
| def get_test_states(self): |
| ''' |
| Returns a map of each test's path to its state. |
| ''' |
| return dict(self._tests_shelf) |
| |
| def get_test_list(self): |
| ''' |
| Returns the test list. |
| ''' |
| return self.test_list.to_struct() |
| |
| @_synchronized |
| def set_shared_data(self, *key_value_pairs): |
| ''' |
| Sets shared data items. |
| |
| Args: |
| key_value_pairs: A series of alternating keys and values |
| (k1, v1, k2, v2...). In the simple case this can just |
| be a single key and value. |
| ''' |
| assert len(key_value_pairs) % 2 == 0, repr(key_value_pairs) |
| for i in range(0, len(key_value_pairs), 2): |
| self._data_shelf[key_value_pairs[i]] = key_value_pairs[i + 1] |
| self._data_shelf.sync() |
| |
| @_synchronized |
| def get_shared_data(self, key): |
| ''' |
| Retrieves a shared data item. |
| ''' |
| return self._data_shelf[key] |
| |
| @_synchronized |
| def has_shared_data(self, key): |
| ''' |
| Returns if a shared data item exists. |
| ''' |
| return key in self._data_shelf |
| |
| @_synchronized |
| def del_shared_data(self, key): |
| ''' |
| Deletes a shared data item. |
| ''' |
| del self._data_shelf[key] |
| |
| @_synchronized |
| def add_test_history(self, history_item): |
| path = history_item.path |
| assert path |
| |
| length_key = path + '[length]' |
| num_entries = self._test_history_shelf.get(length_key, 0) |
| self._test_history_shelf[path + '[%d]' % num_entries] = history_item |
| self._test_history_shelf[length_key] = num_entries + 1 |
| |
| @_synchronized |
| def get_test_history(self, paths): |
| if type(paths) != list: |
| paths = [paths] |
| ret = [] |
| |
| for path in paths: |
| i = 0 |
| while True: |
| value = self._test_history_shelf.get(path + '[%d]' % i) |
| |
| i += 1 |
| if not value: |
| break |
| ret.append(value) |
| |
| ret.sort(key=lambda item: item.time) |
| |
| return ret |
| |
| |
| def get_instance(address=DEFAULT_FACTORY_STATE_ADDRESS, |
| port=DEFAULT_FACTORY_STATE_PORT): |
| ''' |
| Gets an instance (for client side) to access the state server. |
| |
| @param address: Address of the server to be connected. |
| @param port: Port of the server to be connected. |
| @return An object with all public functions from FactoryState. |
| See help(FactoryState) for more information. |
| ''' |
| return jsonrpc.ServerProxy('http://%s:%d' % (address, port), |
| verbose=False) |
| |
| |
| class MyJSONRPCRequestHandler(SimpleJSONRPCServer.SimpleJSONRPCRequestHandler): |
| def do_GET(self): |
| logging.warn("HTTP request for path %s" % self.path) |
| |
| handler = self.server.handlers.get(self.path) |
| if handler: |
| return handler(self) |
| |
| if self.path == "/": |
| self.path = "/index.html" |
| |
| if ".." in self.path.split("/"): |
| logging.warn("Invalid path") |
| self.send_response(404) |
| return |
| |
| mime_type = mimetypes.guess_type(self.path) |
| if not mime_type: |
| logging.warn("Unable to guess MIME type") |
| self.send_response(404) |
| return |
| |
| local_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), |
| "static", |
| self.path.lstrip("/")) |
| if not os.path.exists(local_path): |
| logging.warn("File not found: %s" % local_path) |
| self.send_response(404) |
| return |
| |
| self.send_response(200) |
| self.send_header("Content-Type", mime_type[0]) |
| self.send_header("Content-Length", os.path.getsize(local_path)) |
| self.end_headers() |
| with open(local_path) as f: |
| shutil.copyfileobj(f, self.wfile) |
| |
| |
| class ThreadedJSONRPCServer(SocketServer.ThreadingMixIn, |
| SimpleJSONRPCServer.SimpleJSONRPCServer): |
| '''The JSON/RPC server. |
| |
| Properties: |
| handlers: A map from URLs to callbacks handling them. (The callback |
| takes a single argument: the request to handle.) |
| ''' |
| def __init__(self, *args, **kwargs): |
| SimpleJSONRPCServer.SimpleJSONRPCServer.__init__(self, *args, **kwargs) |
| self.handlers = {} |
| |
| def add_handler(self, url, callback): |
| self.handlers[url] = callback |
| |
| |
| def create_server(state_file_path=None, bind_address=None, port=None): |
| ''' |
| Creates a FactoryState object and an JSON/RPC server to serve it. |
| |
| @param state_file_path: The path containing the saved state. |
| @param bind_address: Address to bind to, defaulting to |
| DEFAULT_FACTORY_STATE_BIND_ADDRESS. |
| @param port: Port to bind to, defaulting to DEFAULT_FACTORY_STATE_PORT. |
| @return A tuple of the FactoryState instance and the SimpleJSONRPCServer |
| instance. |
| ''' |
| # We have some icons in SVG format, but this isn't recognized in |
| # the standard Python mimetypes set. |
| mimetypes.add_type('image/svg+xml', '.svg') |
| |
| if not bind_address: |
| bind_address = DEFAULT_FACTORY_STATE_BIND_ADDRESS |
| if not port: |
| port = DEFAULT_FACTORY_STATE_PORT |
| instance = FactoryState(state_file_path) |
| server = ThreadedJSONRPCServer( |
| (bind_address, port), |
| requestHandler=MyJSONRPCRequestHandler, |
| logRequests=False) |
| |
| server.register_introspection_functions() |
| server.register_instance(instance) |
| server.web_socket_handler = None |
| return instance, server |