blob: ba15ee1874bb872810a164a4df3fa7ba32788d37 [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module contains communication methods between cbuildbot instances."""
import Queue
import SocketServer
import os
import socket
import sys
import time
from chromite.lib.cros_build_lib import Info, Warning
# Communication port for master to slave communication.
_COMM_PORT = 32890
# TCP Buffer Size.
_BUFFER = 4096
# Timeout between checks for new status by either end.
_HEARTBEAT_TIMEOUT = 60 # in sec.
# Max Timeout to wait before assuming failure.
_MAX_TIMEOUT = 30 * 60 # in sec.
# Commands - sent to slave from master.
# Report whether you have completed or failed building.
_COMMAND_CHECK_STATUS = 'check-status'
# Return status - response to commands from slaves (self.explanatory)
_STATUS_COMMAND_REJECTED = 'rejected'
_STATUS_TIMEOUT = 'timeout'
# Public for cbuildbot.
STATUS_BUILD_COMPLETE = 'complete'
STATUS_BUILD_FAILED = 'failure'
# Global queues to communicate with server.
_status_queue = Queue.Queue(1)
_receive_queue = Queue.Queue(1)
_command_queue = Queue.Queue(1)
class _TCPServerWithReuse(SocketServer.TCPServer):
"""TCPServer that allows re-use of socket and timed out sockets."""
SocketServer.TCPServer.allow_reuse_address = True
def __init__(self, address, handler, timeout):
SocketServer.TCPServer.__init__(self, address, handler)
self.socket.settimeout(timeout)
class _SlaveCommandHandler(SocketServer.BaseRequestHandler):
"""Handles requests from a master pre-flight-queue bot."""
def _HandleCommand(self, command, args):
"""Handles command and returns status for master."""
Info('(Slave) - Received command %s with args %s' % (command, args))
command_to_expect = _command_queue.get()
# Check status also adds an entry on the status queue.
if command_to_expect == _COMMAND_CHECK_STATUS:
slave_status = _status_queue.get()
# Safety check to make sure the server is in a good state.
if command_to_expect != command:
Warning(
'(Slave) - Rejecting command %s. Was expecting %s.' % (command,
command_to_expect))
return _STATUS_COMMAND_REJECTED
# Give slave command with optional args.
_receive_queue.put(args)
if command == _COMMAND_CHECK_STATUS:
# Returns status to send.
return slave_status
def handle(self):
"""Overriden. Handles commands sent from master."""
data = self.request.recv(_BUFFER).strip()
(command, args) = data.split('\n')
response = self._HandleCommand(command, args)
self.request.send(response)
def _GetSlaveNames(configuration):
"""Returns an array of slave hostnames that are important."""
slaves = []
for slave_config in configuration.items():
if (not slave_config[1]['master'] and
slave_config[1]['important']):
slaves.append(slave_config[1]['hostname'])
return slaves
def _SendCommand(hostname, command, args):
"""Returns response from host or _STATUS_TIMEOUT on error."""
data = '%s\n%s\n' % (command, args)
Info('(Master) - Sending %s %s to %s' % (command, args, hostname))
# Create a socket (SOCK_STREAM means a TCP socket).
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to server and send data
sock.connect((hostname, _COMM_PORT))
sock.send(data)
# Receive data from the server and shut down.
received = sock.recv(_BUFFER)
except:
received = _STATUS_TIMEOUT
finally:
sock.close()
return received
def _CheckSlavesLeftStatus(slaves_to_check):
"""Returns True if remaining slaves have completed.
Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns
True as long as no slave reports STATUS_BUILD_FAILED.
Keyword arguments:
slaves_to_check -- Array of hostnames to check.
"""
slaves_to_remove = []
for slave in slaves_to_check:
status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty')
if status == STATUS_BUILD_FAILED:
Warning('(Master) - Slave %s failed' % slave)
return False
elif status == STATUS_BUILD_COMPLETE:
Info('(Master) - Slave %s completed' % slave)
slaves_to_remove.append(slave)
for slave in slaves_to_remove:
slaves_to_check.remove(slave)
return True
def HaveSlavesCompleted(configuration):
"""Returns True if all other slaves have succeeded.
Checks other slaves status until either '_MAX_TIMEOUT' has passed,
at least one slaves reports a failure, or all slaves report success.
Keyword arguments:
configuration -- configuration dictionary for slaves.
"""
not_failed = True
slaves_to_check = _GetSlaveNames(configuration)
timeout = 0
while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT:
not_failed = _CheckSlavesLeftStatus(slaves_to_check)
if slaves_to_check and not_failed:
time.sleep(_HEARTBEAT_TIMEOUT)
timeout += _HEARTBEAT_TIMEOUT
return len(slaves_to_check) == 0
def PublishStatus(status):
"""Publishes status and Returns True if master received it.
This call is blocking until either the master pre-flight-queue bot picks
up the status, or a '_MAX_TIMEOUT' has passed.
Keyword arguments:
status -- should be a string and one of STATUS_BUILD_.*.
"""
# Clean up queues.
try:
_command_queue.get_nowait()
except Queue.Empty: pass
try:
_status_queue.get_nowait()
except Queue.Empty: pass
_command_queue.put(_COMMAND_CHECK_STATUS)
_status_queue.put(status)
server = _TCPServerWithReuse(('localhost', _COMM_PORT),
_SlaveCommandHandler, _HEARTBEAT_TIMEOUT)
timeout = 0
response = None
try:
while not response and timeout < _MAX_TIMEOUT:
server.handle_request()
try:
response = _receive_queue.get_nowait()
except Queue.Empty:
Info('(Slave) - Waiting for master to accept %s' % status)
timeout += _HEARTBEAT_TIMEOUT
response = None
except Exception, e:
Warning('%s' % e)
server.server_close()
return response != None