blob: efd312cd08338cd5cdb57d1f450711362a49961c [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import Queue
import threading
import time
import common
from autotest_lib.site_utils.lxc.container_pool import error
from autotest_lib.site_utils.lxc.container_pool import multi_logger
# The maximum number of concurrent workers. Each worker is responsible for
# managing the creation of a single container.
# TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full
# vs quarter shards)
_MAX_CONCURRENT_WORKERS = 5
# Timeout (in seconds) for container creation. After this amount of time,
# container creation tasks are abandoned and retried.
_CONTAINER_CREATION_TIMEOUT = 600
# The period (in seconds) affects the rate at which the monitor thread runs its
# event loop. This drives a number of other factors, e.g. how long to wait for
# the thread to respond to shutdown requests.
_MIN_MONITOR_PERIOD = 0.1
# The default size of the container pool.
_DEFAULT_POOL_SIZE = 20
_logger = multi_logger.create('pool')
class Pool(object):
"""A fixed-size pool of LXC containers.
Containers are created using a factory that is passed to the Pool. A pool
size is passed at construction time - this is the number of containers the
Pool will attempt to maintain. Whenever the number of containers falls
below the given size, the Pool will start creating new containers to
replenish itself.
In order to avoid overloading the host, the number of simultaneous container
creations is limited to _MAX_CONCURRENT_WORKERS.
When container creation produces errors, those errors are saved (see
Pool.errors). It is the client's responsibility to periodically check and
empty out the error queue.
"""
def __init__(self, factory, size=_DEFAULT_POOL_SIZE):
"""Creates a new Pool instance.
@param factory: A factory object that will be called upon to create new
containers. The passed object must have a method called
"create_container" that takes no arguments and returns
an instance of a Container.
@param size: The size of the Pool. The Pool attempts to keep this many
containers running at all times.
"""
# Pools of size less than 2 don't make sense. Don't allow them.
if size < 2:
raise ValueError('Invalid pool size.')
_logger.debug('Pool.__init__ called. Size: %d', size)
self._pool = Queue.Queue(size)
self._monitor = _Monitor(factory, self._pool)
self._monitor.start()
def get(self, timeout=0):
"""Gets a container from the pool.
@param timeout: Number of seconds to wait before returning.
- If 0 (the default), return immediately. If a
Container is not immediately available, return None.
- If a positive number, block at most <timeout> seconds,
then return None if a Container was not available
within that time.
- If None, block indefinitely until a Container is
available.
@return: A container from the pool.
"""
try:
# Block only if timeout is not zero.
_logger.info('Pool.get called.')
return self._pool.get(block=(timeout != 0),
timeout=timeout)
except Queue.Empty:
return None
def cleanup(self, timeout=0):
"""Cleans up the container pool.
Stops all worker threads, and destroys all Containers still in the Pool.
@param timeout: For testing. If this is non-zero, it specifies the
number of seconds to wait for each worker to shut down.
An error is raised if shutdown has not occurred by then.
If zero (the default), don't wait for worker threads to
shut down, just return immediately.
"""
_logger.info('Pool.cleanup called.')
# Stop the monitor thread, then drain the pool.
self._monitor.stop(timeout)
try:
dcount = 0
_logger.debug('Emptying container pool')
while True:
container = self._pool.get(block=False)
dcount += 1
container.destroy()
except Queue.Empty:
pass
finally:
_logger.debug('Done. Destroyed %d containers', dcount)
@property
def errors(self):
"""Returns worker errors.
@return: A Queue containing all the errors encountered on worker
threads.
"""
return self._monitor.errors;
class _Monitor(threading.Thread):
"""A thread that manages the creation of containers for the pool.
Container creation is potentially time-consuming and can hang or crash. The
Monitor class manages a pool of independent threads, each responsible for
the creation of a single Container. This provides parallelized container
creation and ensures that a single Container creation hanging/crashing does
not starve or crash the Pool.
"""
def __init__(self, factory, pool):
"""Creates a new monitor.
@param factory: A container factory.
@param pool: A pool instance to push created containers into.
"""
super(_Monitor, self).__init__(name='pool_monitor')
self._factory = factory
self._pool = pool
# List of worker threads. Access this only from the monitor thread.
self._workers = []
# A flag for stopping the monitor.
self._stop = False
# Stores errors from worker threads.
self.errors = Queue.Queue()
def run(self):
"""Supplies the container pool with containers."""
_logger.debug('Start event loop.')
while not self._stop:
self._create_workers()
self._poll_workers()
time.sleep(_MIN_MONITOR_PERIOD)
_logger.debug('Exit event loop.')
# Clean up - stop all workers.
for worker in self._workers:
worker.cancel()
def stop(self, timeout=0):
"""Stops this thread.
This function blocks until the monitor thread has stopped.
@param timeout: If this is a non-zero number, wait this long (in
seconds) for each worker thread to stop. If zero (the
default), don't wait for worker threads to exit.
@raise WorkerTimeoutError: If a worker thread does not exit within the
specified timeout.
"""
_logger.info('Stop requested.')
self._stop = True
self.join()
_logger.info('Stopped.')
# Wait for workers if timeout was requested.
if timeout > 0:
_logger.debug('Waiting for workers to terminate...')
for worker in self._workers:
worker.join(timeout)
if worker.is_alive():
raise error.WorkerTimeoutError()
def _create_workers(self):
"""Spawns workers to handle container requests.
This method modifies the _workers list and should only be called from
within run().
"""
if self._pool.full():
return
# Create workers to refill the pool.
qsize = self._pool.qsize()
shortfall = self._pool.maxsize - qsize
old_worker_count = len(self._workers)
# Avoid spamming - only log if the monitor is taking some action. Log
# this before creating worker threads, because we are counting live
# threads and want to avoid race conditions w.r.t. threads actually
# starting.
if old_worker_count != shortfall:
new_workers = shortfall - old_worker_count
# This can include workers that aren't currently in the self._worker
# list, e.g. workers that were dropped from the list because they
# timed out.
active_workers = sum([1 for t in threading.enumerate()
if type(t) is _Worker])
# qsize : Current size of the container pool.
# shortfall: Number of empty slots currently in the pool.
# workers : m+n, where m is the current number of active worker
# threads and n is the number of new threads created.
_logger.debug('qsize:%d shortfall:%d workers:%d+%d',
qsize, shortfall, active_workers, new_workers)
while len(self._workers) < shortfall:
worker = _Worker(self._factory)
worker.start()
self._workers.append(worker)
def _poll_workers(self):
"""Checks worker states and deals with them.
This method modifies the _workers list and should only be called from
within run().
Completed workers are taken off the worker list and their results/errors
are logged.
"""
completed = []
incomplete = []
for worker in self._workers:
if worker.check_health():
incomplete.append(worker)
else:
completed.append(worker)
self._workers = incomplete
for worker in completed:
try:
container = worker.get_result()
except Exception as e:
self.errors.put(e)
_logger.error('Error: %s', e)
else:
# A worker might yield no container if it was cancelled.
if container is not None:
self._pool.put(container)
_logger.debug('Pool size: %d/%d', self._pool.qsize(),
self._pool.maxsize)
else:
# Workers can return None results if they are cancelled.
# But -- workers aren't cancelled until the monitor exits
# its event loop, and this method is only called from within
# the monitor event loop. So this should never happen.
_logger.warn('Worker yielded no container.')
class _Worker(threading.Thread):
"""A worker thread tasked with managing the creation of a single container.
The worker is a daemon thread that calls upon a container factory to create
a single container. If container creation raises any exceptions, they are
logged and the worker exits. The worker also provides a mechanism for the
parent thread to impose timeouts on container creation.
"""
# Throttles the number of concurrent running workers. Workers attempt to
# acquire the semaphore at the start of their run() method. Throttled
# workers are thus alive, but waiting. Workers release the semaphore when
# they are done running, or when they are cancelled.
_throttle = threading.BoundedSemaphore(value=_MAX_CONCURRENT_WORKERS)
def __init__(self, factory):
"""Creates a new Worker.
@param factory: A factory object that will be called upon to create
Containers.
"""
super(_Worker, self).__init__(name='pool_worker')
# Hanging worker threads should not keep the pool process alive.
self.daemon = True
self._factory = factory
# Use a single-entry Queue to ensure atomicity when retrieving results.
# This ensures that the output container has only one owner, and it
# cannot (for example) accidentally be destroyed by calling
# Worker.cancel on a Worker whose result has already been retrieved.
self._result = Queue.Queue(1)
self._error = None
self._cancelled = False
self._start_time = None
# A lock for breaking race conditions in worker cancellation.
self._completion_lock = threading.Lock()
self._completed = False
def run(self):
"""Creates a single container."""
self._throttle.acquire()
self._start_time = time.time()
container = None
try:
container = self._factory.create_container()
except Exception as e:
_logger.error('Error: %s', e)
self._error = e
finally:
# All this has to happen atomically, otherwise race conditions can
# arise w.r.t. cancellation.
with self._completion_lock:
self._completed = True
if self._cancelled:
# If the job was cancelled, destroy the container instead of
# putting it in the result queue. Do not release the
# throttle, as it would have been released when the
# cancellation occurred.
if container is not None:
container.destroy()
else:
# Put the container in the result field. Release the
# throttle so another task can be picked up.
self._result.put(container)
self._throttle.release()
def cancel(self):
"""Cancels the work request.
The container will be destroyed when created, instead of being added to
the container pool.
"""
with self._completion_lock:
try:
container = self._result.get(block=False)
except Queue.Empty:
if self._completed:
# The thread completed, but the result was already
# retrieved. There is nothing to do in this case.
pass
else:
# The thread is not yet done - set the cancellation flag -
# when completion occurs, the container will be destroyed.
# Since a worker is being cancelled, release the throttle so
# another worker can proceed.
self._cancelled = True
self._throttle.release()
else:
# If completion occurred, destroy the resulting container.
# TODO(kenobi): Spawn a thread for this, so that we don't block
# Monitor._check_health when a worker times out.
container.destroy()
def check_health(self):
"""Checks that a worker is alive and has not timed out.
Checks the run time of the worker to make sure it hasn't timed out.
Cancels workers that exceed the timeout.
@return: True if the worker is alive and has not timed out, False
otherwise.
"""
if not self.is_alive() or self._cancelled:
return False
# If alive, check the timeout and cancel if necessary. Check for
# _start_time being None because it is not actually set until the thread
# acquires a grant from the throttle.
if (self._start_time is not None and
time.time() - self._start_time > _CONTAINER_CREATION_TIMEOUT):
# Set the error first before cancelling, to avoid race conditions
# where cancelled threads yield no error.
self._error = error.WorkerTimeoutError()
self.cancel()
return False
return True
def get_result(self):
"""Returns a container if one is available.
Note that calling this transfers ownership of the container from the
Worker to the caller. Calling this more than once will result in an
error.
@raises Exception: If some error occurred during container creation, it
is raised at this point.
"""
if self._error:
# If an error occurred during the operation, raise it here.
raise self._error
else:
# get_result should not be called before the worker has finished, or
# if it is cancelled. In that case, this will raise Queue.Empty.
return self._result.get(block=False)