| #!/usr/bin/python2 |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import atexit |
| import errno |
| import logging |
| import re |
| import sys |
| import socket |
| import threading |
| import xmlrpclib |
| |
| import rpm_controller |
| import rpm_logging_config |
| |
| from config import rpm_config |
| from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer |
| from rpm_infrastructure_exception import RPMInfrastructureException |
| |
| import common |
| from autotest_lib.site_utils.rpm_control_system import utils |
| |
| LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format') |
| |
| |
| class RPMDispatcher(object): |
| """ |
| This class is the RPM dispatcher server and it is responsible for |
| communicating directly to the RPM devices to change a DUT's outlet status. |
| |
| When an RPMDispatcher is initialized it registers itself with the frontend |
| server, who will field out outlet requests to this dispatcher. |
| |
| Once a request is received the dispatcher looks up the RPMController |
| instance for the given DUT and then queues up the request and blocks until |
| it is processed. |
| |
| @var _address: IP address or Hostname of this dispatcher server. |
| @var _frontend_server: URI of the frontend server. |
| @var _lock: Lock used to synchronize access to _worker_dict. |
| @var _port: Port assigned to this server instance. |
| @var _worker_dict: Dictionary mapping RPM hostname's to RPMController |
| instances. |
| """ |
| |
| |
| def __init__(self, address, port): |
| """ |
| RPMDispatcher constructor. |
| |
| Initialized instance vars and registers this server with the frontend |
| server. |
| |
| @param address: Address of this dispatcher server. |
| @param port: Port assigned to this dispatcher server. |
| |
| @raise RPMInfrastructureException: Raised if the dispatch server is |
| unable to register with the frontend |
| server. |
| """ |
| self._address = address |
| self._port = port |
| self._lock = threading.Lock() |
| self._worker_dict = {} |
| # We assume that the frontend server and dispatchers are running on the |
| # same host, and the frontend server is listening for connections from |
| # the external world. |
| frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE', |
| 'frontend_port') |
| self._frontend_server = 'http://%s:%d' % (socket.gethostname(), |
| frontend_server_port) |
| logging.info('Registering this rpm dispatcher with the frontend ' |
| 'server at %s.', self._frontend_server) |
| client = xmlrpclib.ServerProxy(self._frontend_server) |
| # De-register with the frontend when the dispatcher exit's. |
| atexit.register(self._unregister) |
| try: |
| client.register_dispatcher(self._get_serveruri()) |
| except socket.error as er: |
| err_msg = ('Unable to register with frontend server. Error: %s.' % |
| errno.errorcode[er.errno]) |
| logging.error(err_msg) |
| raise RPMInfrastructureException(err_msg) |
| |
| |
| def _worker_dict_put(self, key, value): |
| """ |
| Private method used to synchronize access to _worker_dict. |
| |
| @param key: key value we are using to access _worker_dict. |
| @param value: value we are putting into _worker_dict. |
| """ |
| with self._lock: |
| self._worker_dict[key] = value |
| |
| |
| def _worker_dict_get(self, key): |
| """ |
| Private method used to synchronize access to _worker_dict. |
| |
| @param key: key value we are using to access _worker_dict. |
| @return: value found when accessing _worker_dict |
| """ |
| with self._lock: |
| return self._worker_dict.get(key) |
| |
| |
| def is_up(self): |
| """ |
| Allows the frontend server to see if the dispatcher server is up before |
| attempting to queue requests. |
| |
| @return: True. If connection fails, the client proxy will throw a socket |
| error on the client side. |
| """ |
| return True |
| |
| |
| def queue_request(self, powerunit_info_dict, new_state): |
| """ |
| Looks up the appropriate RPMController instance for the device and queues |
| up the request. |
| |
| @param powerunit_info_dict: A dictionary, containing the attribute/values |
| of an unmarshalled PowerUnitInfo instance. |
| @param new_state: [ON, OFF, CYCLE] state we want to the change the |
| outlet to. |
| @return: True if the attempt to change power state was successful, |
| False otherwise. |
| """ |
| powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict) |
| logging.info('Received request to set device: %s to state: %s', |
| powerunit_info.device_hostname, new_state) |
| rpm_controller = self._get_rpm_controller( |
| powerunit_info.powerunit_hostname, |
| powerunit_info.hydra_hostname) |
| return rpm_controller.queue_request(powerunit_info, new_state) |
| |
| |
| def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None): |
| """ |
| Private method that retreives the appropriate RPMController instance |
| for this RPM Hostname or calls _create_rpm_controller it if it does not |
| already exist. |
| |
| @param rpm_hostname: hostname of the RPM whose RPMController we want. |
| |
| @return: RPMController instance responsible for this RPM. |
| """ |
| if not rpm_hostname: |
| return None |
| rpm_controller = self._worker_dict_get(rpm_hostname) |
| if not rpm_controller: |
| rpm_controller = self._create_rpm_controller( |
| rpm_hostname, hydra_hostname) |
| self._worker_dict_put(rpm_hostname, rpm_controller) |
| return rpm_controller |
| |
| |
| def _create_rpm_controller(self, rpm_hostname, hydra_hostname): |
| """ |
| Determines the type of RPMController required and initializes it. |
| |
| @param rpm_hostname: Hostname of the RPM we need to communicate with. |
| |
| @return: RPMController instance responsible for this RPM. |
| """ |
| hostname_elements = rpm_hostname.split('-') |
| if hostname_elements[-2] == 'poe': |
| # POE switch hostname looks like 'chromeos2-poe-switch1'. |
| logging.info('The controller is a Cisco POE switch.') |
| return rpm_controller.CiscoPOEController(rpm_hostname) |
| else: |
| # The device is an RPM. |
| rack_id = hostname_elements[-2] |
| rpm_typechecker = re.compile('rack[0-9]+[a-z]+') |
| if rpm_typechecker.match(rack_id): |
| logging.info('RPM is a webpowered device.') |
| return rpm_controller.WebPoweredRPMController(rpm_hostname) |
| else: |
| logging.info('RPM is a Sentry CDU device.') |
| return rpm_controller.SentryRPMController( |
| hostname=rpm_hostname, |
| hydra_hostname=hydra_hostname) |
| |
| |
| def _get_serveruri(self): |
| """ |
| Formats the _address and _port into a meaningful URI string. |
| |
| @return: URI of this dispatch server. |
| """ |
| return 'http://%s:%d' % (self._address, self._port) |
| |
| |
| def _unregister(self): |
| """ |
| Tells the frontend server that this dispatch server is shutting down and |
| to unregister it. |
| |
| Called by atexit. |
| |
| @raise RPMInfrastructureException: Raised if the dispatch server is |
| unable to unregister with the |
| frontend server. |
| """ |
| logging.info('Dispatch server shutting down. Unregistering with RPM ' |
| 'frontend server.') |
| client = xmlrpclib.ServerProxy(self._frontend_server) |
| try: |
| client.unregister_dispatcher(self._get_serveruri()) |
| except socket.error as er: |
| err_msg = ('Unable to unregister with frontend server. Error: %s.' % |
| errno.errorcode[er.errno]) |
| logging.error(err_msg) |
| raise RPMInfrastructureException(err_msg) |
| |
| |
| def launch_server_on_unused_port(): |
| """ |
| Looks up an unused port on this host and launches the xmlrpc server. |
| |
| Useful for testing by running multiple dispatch servers on the same host. |
| |
| @return: server,port - server object and the port that which it is listening |
| to. |
| """ |
| address = socket.gethostbyname(socket.gethostname()) |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| # Set this socket to allow reuse. |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('', 0)) |
| port = sock.getsockname()[1] |
| server = MultiThreadedXMLRPCServer((address, port), |
| allow_none=True) |
| sock.close() |
| return server, port |
| |
| |
| if __name__ == '__main__': |
| """ |
| Main function used to launch the dispatch server. Creates an instance of |
| RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance. |
| """ |
| if len(sys.argv) != 2: |
| print 'Usage: ./%s <log_file_name>' % sys.argv[0] |
| sys.exit(1) |
| |
| rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT) |
| rpm_logging_config.set_up_logging_to_server() |
| |
| # Get the local ip _address and set the server to utilize it. |
| address = socket.gethostbyname(socket.gethostname()) |
| server, port = launch_server_on_unused_port() |
| rpm_dispatcher = RPMDispatcher(address, port) |
| server.register_instance(rpm_dispatcher) |
| server.serve_forever() |