| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import threading |
| import time |
| |
| import common |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.site_utils.lxc import base_image |
| from autotest_lib.site_utils.lxc import constants |
| from autotest_lib.site_utils.lxc import container_factory |
| from autotest_lib.site_utils.lxc import zygote |
| from autotest_lib.site_utils.lxc.constants import \ |
| CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX |
| from autotest_lib.site_utils.lxc.container_pool import async_listener |
| from autotest_lib.site_utils.lxc.container_pool import error |
| from autotest_lib.site_utils.lxc.container_pool import message |
| from autotest_lib.site_utils.lxc.container_pool import pool |
| |
| try: |
| import cPickle as pickle |
| except: |
| import pickle |
| |
| try: |
| from chromite.lib import metrics |
| from infra_libs import ts_mon |
| except ImportError: |
| import mock |
| metrics = utils.metrics_mock |
| ts_mon = mock.Mock() |
| |
| |
| # The minimum period between polling for new connections, in seconds. |
| _MIN_POLLING_PERIOD = 0.1 |
| |
| |
| class Service(object): |
| """A manager for a pool of LXC containers. |
| |
| The Service class manages client communication with an underlying container |
| pool. It listens for incoming client connections, then spawns threads to |
| deal with communication with each client. |
| """ |
| |
| def __init__(self, host_dir, pool=None): |
| """Sets up a new container pool service. |
| |
| @param host_dir: A SharedHostDir. This will be used for Zygote |
| configuration as well as for general pool operation |
| (e.g. opening linux domain sockets for communication). |
| @param pool: (for testing) A container pool that the service will |
| maintain. This parameter exists for DI, for testing. |
| Under normal circumstances the service instantiates the |
| container pool internally. |
| """ |
| # Create socket for receiving container pool requests. This also acts |
| # as a mutex, preventing multiple container pools from being |
| # instantiated. |
| self._socket_path = os.path.join( |
| host_dir.path, constants.DEFAULT_CONTAINER_POOL_SOCKET) |
| self._connection_listener = async_listener.AsyncListener( |
| self._socket_path) |
| self._client_threads = [] |
| self._stop_event = None |
| self._running = False |
| self._pool = pool |
| |
| |
| def start(self, pool_size=constants.DEFAULT_CONTAINER_POOL_SIZE): |
| """Starts the service. |
| |
| @param pool_size: The desired size of the container pool. This |
| parameter has no effect if a pre-created pool was DI'd |
| into the Service constructor. |
| """ |
| self._running = True |
| |
| # Start the container pool. |
| if self._pool is None: |
| factory = container_factory.ContainerFactory( |
| base_container=base_image.BaseImage().get(), |
| container_class=zygote.Zygote) |
| self._pool = pool.Pool(factory=factory, size=pool_size) |
| |
| # Start listening asynchronously for incoming connections. |
| self._connection_listener.start() |
| |
| # Poll for incoming connections, and spawn threads to handle them. |
| logging.debug('Start event loop.') |
| while self._stop_event is None: |
| self._handle_incoming_connections() |
| self._cleanup_closed_connections() |
| # TODO(kenobi): Poll for and log errors from pool. |
| metrics.Counter(METRICS_PREFIX + '/tick').increment() |
| time.sleep(_MIN_POLLING_PERIOD) |
| |
| logging.debug('Exit event loop.') |
| |
| # Stopped - stop all the client threads, stop listening, then signal |
| # that shutdown is complete. |
| for thread in self._client_threads: |
| thread.stop() |
| try: |
| self._connection_listener.close() |
| except Exception as e: |
| logging.error('Error stopping pool service: %r', e) |
| raise |
| finally: |
| # Clean up the container pool. |
| self._pool.cleanup() |
| # Make sure state is consistent. |
| self._stop_event.set() |
| self._stop_event = None |
| self._running = False |
| metrics.Counter(METRICS_PREFIX + '/service_stopped').increment() |
| logging.debug('Container pool stopped.') |
| |
| |
| def stop(self): |
| """Stops the service.""" |
| self._stop_event = threading.Event() |
| return self._stop_event |
| |
| |
| def is_running(self): |
| """Returns whether or not the service is currently running.""" |
| return self._running |
| |
| |
| def get_status(self): |
| """Returns a dictionary of values describing the current status.""" |
| status = {} |
| status['running'] = self._running |
| status['socket_path'] = self._socket_path |
| if self._running: |
| status['pool capacity'] = self._pool.capacity |
| status['pool size'] = self._pool.size |
| status['pool worker count'] = self._pool.worker_count |
| status['pool errors'] = self._pool.errors.qsize() |
| status['client thread count'] = len(self._client_threads) |
| return status |
| |
| |
| def _handle_incoming_connections(self): |
| """Checks for connections, and spawn sub-threads to handle requests.""" |
| connection = self._connection_listener.get_connection() |
| if connection is not None: |
| # Spawn a thread to deal with the new connection. |
| thread = _ClientThread(self, self._pool, connection) |
| thread.start() |
| self._client_threads.append(thread) |
| thread_count = len(self._client_threads) |
| metrics.Counter(METRICS_PREFIX + '/client_threads' |
| ).increment_by(thread_count) |
| logging.debug('Client thread count: %d', thread_count) |
| |
| |
| def _cleanup_closed_connections(self): |
| """Cleans up dead client threads.""" |
| # We don't need to lock because all operations on self._client_threads |
| # take place on the main thread. |
| self._client_threads = [t for t in self._client_threads if t.is_alive()] |
| |
| |
| class _ClientThread(threading.Thread): |
| """A class that handles communication with a pool client. |
| |
| Use a thread-per-connection model instead of select()/poll() for a few |
| reasons: |
| - the number of simultaneous clients is not expected to be high enough for |
| select or poll to really pay off. |
| - one thread per connection is more robust - if a single client somehow |
| crashes its communication thread, that will not affect the other |
| communication threads or the main pool service. |
| """ |
| |
| def __init__(self, service, pool, connection): |
| self._service = service |
| self._pool = pool |
| self._connection = connection |
| self._running = False |
| super(_ClientThread, self).__init__(name='client_thread') |
| |
| |
| def run(self): |
| """Handles messages coming in from clients. |
| |
| The thread main loop monitors the connection and handles incoming |
| messages. Polling is used so that the loop condition can be checked |
| regularly - this enables the thread to exit cleanly if required. |
| |
| Any kind of error will exit the event loop and close the connection. |
| """ |
| logging.debug('Start event loop.') |
| try: |
| self._running = True |
| while self._running: |
| # Poll and deal with messages every second. The timeout enables |
| # the thread to exit cleanly when stop() is called. |
| if self._connection.poll(1): |
| try: |
| msg = self._connection.recv() |
| except (AttributeError, |
| ImportError, |
| IndexError, |
| pickle.UnpicklingError) as e: |
| # All of these can occur while unpickling data. |
| logging.error('Error while receiving message: %r', e) |
| # Exit if an error occurs |
| break |
| except EOFError: |
| # EOFError means the client closed the connection. This |
| # is not an error - just exit. |
| break |
| |
| try: |
| response = self._handle_message(msg) |
| # Always send the response, even if it's None. This |
| # provides more consistent client-side behaviour. |
| self._connection.send(response) |
| except error.UnknownMessageTypeError as e: |
| # The message received was a valid python object, but |
| # not a valid Message. |
| logging.error('Message error: %s', e) |
| # Exit if an error occurs |
| break |
| except EOFError: |
| # EOFError means the client closed the connection early. |
| # TODO(chromium:794685): Return container to pool. |
| logging.error('Client closed connection before return.') |
| break |
| |
| finally: |
| # Always close the connection. |
| logging.debug('Exit event loop.') |
| self._connection.close() |
| |
| |
| def stop(self): |
| """Stops the client thread.""" |
| self._running = False |
| |
| |
| def _handle_message(self, msg): |
| """Handles incoming messages. |
| |
| @param msg: The incoming message to be handled. |
| |
| @return: A pickleable object (or None) that should be sent back to the |
| client. |
| """ |
| |
| # Only handle Message objects. |
| if not isinstance(msg, message.Message): |
| raise error.UnknownMessageTypeError( |
| 'Invalid message class %s' % type(msg)) |
| |
| # Use a dispatch table to simulate switch/case. |
| handlers = { |
| message.ECHO: self._echo, |
| message.GET: self._get, |
| message.SHUTDOWN: self._shutdown, |
| message.STATUS: self._status, |
| } |
| try: |
| return handlers[msg.type](**msg.args) |
| except KeyError: |
| raise error.UnknownMessageTypeError( |
| 'Invalid message type %s' % msg.type) |
| |
| |
| def _echo(self, msg): |
| """Handles ECHO messages. |
| |
| @param msg: A string that will be echoed back to the client. |
| |
| @return: The message, for echoing back to the client. |
| """ |
| # Just echo the message back, for testing aliveness. |
| logging.debug('Echo: %r', msg) |
| return msg |
| |
| |
| def _shutdown(self): |
| """Handles SHUTDOWN messages. |
| |
| @return: An ACK message. This function is synchronous (i.e. it blocks, |
| and only returns the ACK when shutdown is complete). |
| """ |
| logging.debug('Received shutdown request.') |
| # Request shutdown. Wait for the service to actually stop before |
| # sending the response. |
| self._service.stop().wait() |
| logging.debug('Service shutdown complete.') |
| return message.ack() |
| |
| |
| def _status(self): |
| """Handles STATUS messages. |
| |
| @return: The result of the service status call. |
| """ |
| logging.debug('Received status request.') |
| return self._service.get_status() |
| |
| |
| def _get(self, id, timeout): |
| """Gets a container from the pool. |
| |
| @param id: A ContainerId to assign to the new container. |
| @param timeout: A timeout (in seconds) to wait for the pool. If a |
| container is not available from the pool within the |
| given period, None will be returned. |
| |
| @return: A container from the pool. |
| """ |
| logging.debug('Received get request (id=%s)', id) |
| container = self._pool.get(timeout) |
| # Assign an ID to the container as soon as it is removed from the pool. |
| # This associates the container with the process to which it will be |
| # handed off. |
| if container is not None: |
| logging.debug( |
| 'Assigning container (name=%s, id=%s)', container.name, id) |
| container.id = id |
| else: |
| logging.debug('No container (id=%s)', id) |
| metrics.Counter(METRICS_PREFIX + '/container_requests', |
| field_spec=[ts_mon.BooleanField('success')] |
| ).increment(fields={'success': (container is not None)}) |
| return container |