| #!/usr/bin/python |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import errno |
| import heapq |
| import logging |
| import os |
| import sys |
| import socket |
| import threading |
| import xmlrpclib |
| |
| import rpm_logging_config |
| from config import rpm_config |
| from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer |
| from rpm_infrastructure_exception import RPMInfrastructureException |
| |
| import common |
| from autotest_lib.server import frontend |
| from autotest_lib.site_utils.rpm_control_system import utils |
| |
| DEFAULT_RPM_COUNT = 0 |
| TERMINATED = -1 |
| |
| # Indexes for accessing heap entries. |
| RPM_COUNT = 0 |
| DISPATCHER_URI = 1 |
| |
| LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format') |
| DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id') |
| |
| # Valid state values. |
| VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE'] |
| |
| # Servo-interface mapping file |
| MAPPING_FILE = os.path.join( |
| os.path.dirname(__file__), |
| rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')) |
| |
| # Size of the LRU that holds power management unit information related |
| # to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc. |
| LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size') |
| |
| |
| class DispatcherDownException(Exception): |
| """Raised when a particular RPMDispatcher is down.""" |
| |
| |
| class RPMFrontendServer(object): |
| """ |
| This class is the frontend server of the RPM Infrastructure. All clients |
| will send their power state requests to this central server who will |
| forward the requests to an avaliable or already assigned RPM dispatcher |
| server. |
| |
| Once the dispatcher processes the request it will return the result |
| to this frontend server who will send the result back to the client. |
| |
| All calls to this server are blocking. |
| |
| @var _dispatcher_minheap: Min heap that returns a list of format- |
| [ num_rpm's, dispatcher_uri ] |
| Used to choose the least loaded dispatcher. |
| @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min |
| heap. If a dispatcher server shuts down this allows us to |
| invalidate the entry in the minheap. |
| @var _lock: Used to protect data from multiple running threads all |
| manipulating the same data. |
| @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher |
| server. |
| @var _mapping_last_modified: Last-modified time of the servo-interface |
| mapping file. |
| @var _servo_interface: Maps servo hostname to (switch_hostname, interface). |
| @var _rpm_info: An LRU cache to hold recently visited rpm information |
| so that we don't hit AFE too often. The elements in |
| the cache are instances of PowerUnitInfo indexed by |
| dut hostnames. POE info is not stored in the cache. |
| @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname. |
| @var _email_handler: Email handler to use to control email notifications. |
| """ |
| |
| |
| def __init__(self, email_handler=None): |
| """ |
| RPMFrontendServer constructor. |
| |
| Initializes instance variables. |
| """ |
| self._dispatcher_minheap = [] |
| self._entry_dict = {} |
| self._lock = threading.Lock() |
| self._mapping_last_modified = os.path.getmtime(MAPPING_FILE) |
| self._servo_interface = utils.load_servo_interface_mapping() |
| self._rpm_dict = {} |
| self._afe = frontend.AFE() |
| self._rpm_info = utils.LRUCache(size=LRU_SIZE) |
| self._email_handler = email_handler |
| |
| |
| def set_power_via_poe(self, device_hostname, new_state): |
| """Sets power state of the device to the requested state via POE. |
| |
| @param device_hostname: Hostname of the servo to control. |
| @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| device's outlet to. |
| |
| @return: True if the attempt to change power state was successful, |
| False otherwise. |
| |
| @raise RPMInfrastructureException: No dispatchers are available or can |
| be reached. |
| """ |
| # Remove any DNS Zone information and simplify down to just the hostname. |
| device_hostname = device_hostname.split('.')[0] |
| new_state = new_state.upper() |
| if new_state not in VALID_STATE_VALUES: |
| logging.error('Received request to set servo %s to invalid ' |
| 'state %s', device_hostname, new_state) |
| return False |
| logging.info('Received request to set servo: %s to state: %s', |
| device_hostname, new_state) |
| powerunit_info = self._get_poe_powerunit_info(device_hostname) |
| try: |
| return self._queue_once(powerunit_info, new_state) |
| except DispatcherDownException: |
| # Retry forwarding the request. |
| return self.set_power_via_poe(device_hostname, new_state) |
| |
| |
| def set_power_via_rpm(self, device_hostname, rpm_hostname, |
| rpm_outlet, hydra_hostname, new_state): |
| """Sets power state of a device to the requested state via RPM. |
| |
| Unlike the special case of POE, powerunit information is not available |
| on the RPM server, so must be provided as arguments. |
| |
| @param device_hostname: Hostname of the servo to control. |
| @param rpm_hostname: Hostname of the RPM to use. |
| @param rpm_outlet: The RPM outlet to control. |
| @param hydra_hostname: If required, the hydra device to SSH through to |
| get to the RPM. |
| @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| device's outlet to. |
| |
| @return: True if the attempt to change power state was successful, |
| False otherwise. |
| |
| @raise RPMInfrastructureException: No dispatchers are available or can |
| be reached. |
| """ |
| powerunit_info = utils.PowerUnitInfo( |
| device_hostname=device_hostname, |
| powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM, |
| powerunit_hostname=rpm_hostname, |
| outlet=rpm_outlet, |
| hydra_hostname=hydra_hostname, |
| ) |
| try: |
| return self._queue_once(powerunit_info, new_state) |
| except DispatcherDownException: |
| # Retry forwarding the request. |
| return self.set_power_via_rpm(device_hostname, rpm_hostname, |
| rpm_outlet, hydra_hostname, new_state) |
| |
| |
| def queue_request(self, device_hostname, new_state): |
| """ |
| Forwards a request to change a device's (a dut or a servo) power state |
| to the appropriate dispatcher server. |
| |
| This call will block until the forwarded request returns. |
| |
| @param device_hostname: Hostname of the device whose power state we want |
| to change. |
| @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| device's outlet to. |
| |
| @return: True if the attempt to change power state was successful, |
| False otherwise. |
| |
| @raise RPMInfrastructureException: No dispatchers are available or can |
| be reached. |
| """ |
| # Remove any DNS Zone information and simplify down to just the hostname. |
| device_hostname = device_hostname.split('.')[0] |
| new_state = new_state.upper() |
| # Put new_state in all uppercase letters |
| if new_state not in VALID_STATE_VALUES: |
| logging.error('Received request to set device %s to invalid ' |
| 'state %s', device_hostname, new_state) |
| return False |
| logging.info('Received request to set device: %s to state: %s', |
| device_hostname, new_state) |
| powerunit_info = self._get_powerunit_info(device_hostname) |
| try: |
| return self._queue_once(powerunit_info, new_state) |
| except DispatcherDownException: |
| # Retry forwarding the request. |
| return self.queue_request(device_hostname, new_state) |
| |
| |
| def _queue_once(self, powerunit_info, new_state): |
| """Queue one request to the dispatcher.""" |
| dispatcher_uri = self._get_dispatcher(powerunit_info) |
| if not dispatcher_uri: |
| # No dispatchers available. |
| raise RPMInfrastructureException('No dispatchers available.') |
| client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True) |
| try: |
| # Block on the request and return the result once it arrives. |
| return client.queue_request(powerunit_info, new_state) |
| except socket.error as er: |
| # Dispatcher Server is not reachable. Unregister it and retry. |
| logging.error("Can't reach Dispatch Server: %s. Error: %s", |
| dispatcher_uri, errno.errorcode[er.errno]) |
| if self.is_network_infrastructure_down(): |
| # No dispatchers can handle this request so raise an Exception |
| # to the caller. |
| raise RPMInfrastructureException('No dispatchers can be' |
| 'reached.') |
| logging.info('Will attempt forwarding request to other dispatch ' |
| 'servers.') |
| logging.error('Unregistering %s due to error. Recommend resetting ' |
| 'that dispatch server.', dispatcher_uri) |
| self.unregister_dispatcher(dispatcher_uri) |
| raise DispatcherDownException(dispatcher_uri) |
| |
| |
| def is_network_infrastructure_down(self): |
| """ |
| Check to see if we can communicate with any dispatcher servers. |
| |
| Only called in the situation that queuing a request to a dispatcher |
| server failed. |
| |
| @return: False if any dispatcher server is up and the rpm infrastructure |
| can still function. True otherwise. |
| """ |
| for dispatcher_entry in self._dispatcher_minheap: |
| dispatcher = xmlrpclib.ServerProxy( |
| dispatcher_entry[DISPATCHER_URI], allow_none=True) |
| try: |
| if dispatcher.is_up(): |
| # Atleast one dispatcher is alive so our network is fine. |
| return False |
| except socket.error: |
| # Can't talk to this dispatcher so keep looping. |
| pass |
| logging.error("Can't reach any dispatchers. Check frontend network " |
| 'status or all dispatchers are down.') |
| return True |
| |
| |
| def _get_powerunit_info(self, device_hostname): |
| """Get the power management unit information for a device. |
| |
| A device could be a chromeos dut or a servo. |
| 1) ChromeOS dut |
| Chromeos dut is managed by RPM. The related information |
| we need to know include rpm hostname, rpm outlet, hydra hostname. |
| Such information can be retrieved from afe_host_attributes table |
| from afe. A local LRU cache is used avoid hitting afe too often. |
| |
| 2) Servo |
| Servo is managed by POE. The related information we need to know |
| include poe hostname, poe interface. Such information is |
| stored in a local file and read into memory. |
| |
| @param device_hostname: A string representing the device's hostname. |
| |
| @returns: A PowerUnitInfo object. |
| @raises RPMInfrastructureException if failed to get the power |
| unit info. |
| |
| """ |
| if device_hostname.endswith('servo'): |
| return self._get_poe_powerunit_info(device_hostname) |
| else: |
| return self._get_rpm_powerunit_info(device_hostname) |
| |
| |
| def _get_poe_powerunit_info(self, device_hostname): |
| """Get the power management unit information for a POE controller. |
| |
| Servo is managed by POE. The related information we need to know |
| include poe hostname, poe interface. Such information is |
| stored in a local file and read into memory. |
| |
| @param device_hostname: A string representing the device's hostname. |
| |
| @returns: A PowerUnitInfo object. |
| @raises RPMInfrastructureException if failed to get the power |
| unit info. |
| |
| """ |
| with self._lock: |
| reload_info = utils.reload_servo_interface_mapping_if_necessary( |
| self._mapping_last_modified) |
| if reload_info: |
| self._mapping_last_modified, self._servo_interface = reload_info |
| switch_if_tuple = self._servo_interface.get(device_hostname) |
| if not switch_if_tuple: |
| raise RPMInfrastructureException( |
| 'Could not determine POE hostname for %s. ' |
| 'Please check the servo-interface mapping file.', |
| device_hostname) |
| else: |
| return utils.PowerUnitInfo( |
| device_hostname=device_hostname, |
| powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE, |
| powerunit_hostname=switch_if_tuple[0], |
| outlet=switch_if_tuple[1], |
| hydra_hostname=None) |
| |
| |
| |
| def _get_rpm_powerunit_info(self, device_hostname): |
| """Get the power management unit information for an RPM controller. |
| |
| Chromeos dut is managed by RPM. The related information |
| we need to know include rpm hostname, rpm outlet, hydra hostname. |
| Such information can be retrieved from afe_host_attributes table |
| from afe. A local LRU cache is used avoid hitting afe too often. |
| |
| @param device_hostname: A string representing the device's hostname. |
| |
| @returns: A PowerUnitInfo object. |
| @raises RPMInfrastructureException if failed to get the power |
| unit info. |
| |
| """ |
| with self._lock: |
| # Regular DUTs are managed by RPMs. |
| if device_hostname in self._rpm_info: |
| return self._rpm_info[device_hostname] |
| else: |
| hosts = self._afe.get_hosts(hostname=device_hostname) |
| if not hosts: |
| raise RPMInfrastructureException( |
| 'Can not retrieve rpm information ' |
| 'from AFE for %s, no host found.' % device_hostname) |
| else: |
| info = utils.PowerUnitInfo.get_powerunit_info(hosts[0]) |
| self._rpm_info[device_hostname] = info |
| return info |
| |
| |
| |
| def _get_dispatcher(self, powerunit_info): |
| """ |
| Private method that looks up or assigns a dispatcher server |
| responsible for communicating with the given RPM/POE. |
| |
| Will also call _check_dispatcher to make sure it is up before returning |
| it. |
| |
| @param powerunit_info: A PowerUnitInfo instance. |
| |
| @return: URI of dispatcher server responsible for the rpm/poe. |
| None if no dispatcher servers are available. |
| """ |
| powerunit_type = powerunit_info.powerunit_type |
| powerunit_hostname = powerunit_info.powerunit_hostname |
| with self._lock: |
| if self._rpm_dict.get(powerunit_hostname): |
| return self._rpm_dict[powerunit_hostname] |
| logging.info('No Dispatcher assigned for %s %s.', |
| powerunit_type, powerunit_hostname) |
| # Choose the least loaded dispatcher to communicate with the RPM. |
| try: |
| heap_entry = heapq.heappop(self._dispatcher_minheap) |
| except IndexError: |
| logging.error('Infrastructure Error: Frontend has no' |
| 'registered dispatchers to field out this ' |
| 'request!') |
| return None |
| dispatcher_uri = heap_entry[DISPATCHER_URI] |
| # Put this entry back in the heap with an RPM Count + 1. |
| heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1 |
| heapq.heappush(self._dispatcher_minheap, heap_entry) |
| logging.info('Assigning %s for %s %s', dispatcher_uri, |
| powerunit_type, powerunit_hostname) |
| self._rpm_dict[powerunit_hostname] = dispatcher_uri |
| return dispatcher_uri |
| |
| |
| def register_dispatcher(self, dispatcher_uri): |
| """ |
| Called by a dispatcher server so that the frontend server knows it is |
| available to field out RPM requests. |
| |
| Adds an entry to the min heap and entry map for this dispatcher. |
| |
| @param dispatcher_uri: Address of dispatcher server we are registering. |
| """ |
| logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri) |
| with self._lock: |
| heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri] |
| heapq.heappush(self._dispatcher_minheap, heap_entry) |
| self._entry_dict[dispatcher_uri] = heap_entry |
| |
| |
| def unregister_dispatcher(self, uri_to_unregister): |
| """ |
| Called by a dispatcher server as it exits so that the frontend server |
| knows that it is no longer available to field out requests. |
| |
| Assigns an rpm count of -1 to this dispatcher so that it will be pushed |
| out of the min heap. |
| |
| Removes from _rpm_dict all entries with the value of this dispatcher so |
| that those RPM's can be reassigned to a new dispatcher. |
| |
| @param uri_to_unregister: Address of dispatcher server we are |
| unregistering. |
| """ |
| logging.info('Unregistering uri: %s as a rpm dispatcher.', |
| uri_to_unregister) |
| with self._lock: |
| heap_entry = self._entry_dict.get(uri_to_unregister) |
| if not heap_entry: |
| logging.warning('%s was not registered.', uri_to_unregister) |
| return |
| # Set this entry's RPM_COUNT to TERMINATED (-1). |
| heap_entry[RPM_COUNT] = TERMINATED |
| # Remove all RPM mappings. |
| for rpm, dispatcher in self._rpm_dict.items(): |
| if dispatcher == uri_to_unregister: |
| self._rpm_dict[rpm] = None |
| self._entry_dict[uri_to_unregister] = None |
| # Re-sort the heap and remove any terminated dispatchers. |
| heapq.heapify(self._dispatcher_minheap) |
| self._remove_terminated_dispatchers() |
| |
| |
| def _remove_terminated_dispatchers(self): |
| """ |
| Peek at the head of the heap and keep popping off values until there is |
| a non-terminated dispatcher at the top. |
| """ |
| # Heapq guarantees the head of the heap is in the '0' index. |
| try: |
| # Peek at the next element in the heap. |
| top_of_heap = self._dispatcher_minheap[0] |
| while top_of_heap[RPM_COUNT] is TERMINATED: |
| # Pop off the top element. |
| heapq.heappop(self._dispatcher_minheap) |
| # Peek at the next element in the heap. |
| top_of_heap = self._dispatcher_minheap[0] |
| except IndexError: |
| # No more values in the heap. Can be thrown by both minheap[0] |
| # statements. |
| pass |
| |
| |
| def suspend_emails(self, hours): |
| """Suspend email notifications. |
| |
| @param hours: How many hours to suspend email notifications. |
| """ |
| if self._email_handler: |
| self._email_handler.suspend_emails(hours) |
| |
| |
| def resume_emails(self): |
| """Resume email notifications.""" |
| if self._email_handler: |
| self._email_handler.resume_emails() |
| |
| |
| if __name__ == '__main__': |
| """ |
| Main function used to launch the frontend server. Creates an instance of |
| RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance. |
| """ |
| if len(sys.argv) != 2: |
| print 'Usage: ./%s <log_file_dir>.' % sys.argv[0] |
| sys.exit(1) |
| |
| email_handler = rpm_logging_config.set_up_logging_to_file( |
| sys.argv[1], LOG_FILENAME_FORMAT) |
| frontend_server = RPMFrontendServer(email_handler=email_handler) |
| # We assume that external clients will always connect to us via the |
| # hostname, so listening on the hostname ensures we pick the right network |
| # interface. |
| address = socket.gethostname() |
| port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') |
| server = MultiThreadedXMLRPCServer((address, port), allow_none=True) |
| server.register_instance(frontend_server) |
| logging.info('Listening on %s port %d', address, port) |
| server.serve_forever() |