blob: b293d4a52305e623c1d81389768e7a24bfecea8a [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import Queue
import logging
import os
import socket
import threading
from multiprocessing import connection
import common
class AsyncListener(object):
"""A class for asynchronous listening on a unix socket.
This class opens a unix socket with the given address and auth key.
Connections are listened for on a separate thread, and queued up to be dealt
with.
"""
def __init__(self, address):
"""Opens a socket with the given address and key.
@param address: The socket address.
@raises socket.error: If the address is already in use or is not a valid
path.
@raises TypeError: If the address is not a valid unix domain socket
address.
"""
self._socket = connection.Listener(address, family='AF_UNIX')
# This is done mostly for local testing/dev purposes - the easiest/most
# reliable way to run the container pool locally is as root, but then
# only other processes owned by root can connect to the container.
# Setting open permissions on the socket makes it so that other users
# can connect, which enables developers to then run tests without sudo.
os.chmod(address, 0777)
self._address = address
self._queue = Queue.Queue()
self._thread = None
self._running = False
def start(self):
"""Starts listening for connections.
Starts a child thread that listens asynchronously for connections.
After calling this function, incoming connections may be retrieved by
calling the get_connection method.
"""
logging.debug('Starting connection listener.')
self._running = True
self._thread = threading.Thread(name='connection_listener',
target=self._poll)
self._thread.start()
def is_running(self):
"""Returns whether the listener is currently running."""
return self._running
def stop(self):
"""Stop listening for connections.
Stops the listening thread. After this is called, connections will no
longer be received by the socket. Note, however, that the socket is not
destroyed and that calling start again, will resume listening for
connections.
This function is expected to be called when the container pool service
is being killed/restarted, so it doesn't make an extraordinary effort to
ensure that the listener thread is cleanly destroyed.
@return: True if the listener thread was successfully killed, False
otherwise.
"""
if not self._running:
return False
logging.debug('Stopping connection listener.')
# Setting this to false causes the thread's event loop to exit on the
# next iteration.
self._running = False
# Initiate a connection to force a trip through the event loop. Use raw
# sockets because the connection module's convenience classes don't
# support timeouts, which leads to deadlocks.
try:
fake_connection = socket.socket(socket.AF_UNIX)
fake_connection.settimeout(0) # non-blocking
fake_connection.connect(self._address)
fake_connection.close()
except socket.timeout:
logging.error('Timeout while attempting to close socket listener.')
return False
logging.debug('Socket closed. Waiting for thread to terminate.')
self._thread.join(1)
return not self._thread.isAlive()
def close(self):
"""Closes and destroys the socket.
If the listener thread is running, it is first stopped.
"""
logging.debug('AsyncListener.close called.')
if self._running:
self.stop()
self._socket.close()
def get_connection(self, timeout=0):
"""Returns a connection, if one is pending.
The listener thread queues up connections for the main process to
handle. This method returns a pending connection on the queue. If no
connections are pending, None is returned.
@param timeout: Optional timeout. If set to 0 (the default), the method
will return instantly if no connections are awaiting.
Otherwise, the method will wait the specified number of
seconds before returning.
@return: A pending connection, or None of no connections are pending.
"""
try:
return self._queue.get(block=timeout>0, timeout=timeout)
except Queue.Empty:
return None
def _poll(self):
"""Polls the socket for incoming connections.
This function is intended to be run on the listener thread. It accepts
incoming socket connections, and queues them up to be handled.
"""
logging.debug('Start event loop.')
while self._running:
try:
self._queue.put(self._socket.accept())
logging.debug('Received incoming connection.')
except IOError:
# The stop method uses a fake connection to unblock the polling
# thread. This results in an IOError but this is an expected
# outcome.
logging.debug('Connection aborted.')
logging.debug('Exit event loop.')