blob: 96f608d829267e8ff86919ced959e7e05eab40b6 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Rdb server module.
"""
import collections
import logging
import common
from django.core import exceptions as django_exceptions
from django.db.models import fields
from django.db.models import Q
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import rdb_hosts
from autotest_lib.scheduler import rdb_requests
from autotest_lib.scheduler import rdb_utils
from autotest_lib.site_utils.graphite import stats
_timer = stats.Timer('rdb')
# Qeury managers: Provide a layer of abstraction over the database by
# encapsulating common query patterns used by the rdb.
class BaseHostQueryManager(object):
"""Base manager for host queries on all hosts.
"""
host_objects = models.Host.objects
def update_hosts(self, host_ids, **kwargs):
"""Update fields on a hosts.
@param host_ids: A list of ids of hosts to update.
@param kwargs: A key value dictionary corresponding to column, value
in the host database.
"""
self.host_objects.filter(id__in=host_ids).update(**kwargs)
@rdb_hosts.return_rdb_host
def get_hosts(self, ids):
"""Get host objects for the given ids.
@param ids: The ids for which we need host objects.
@returns: A list of RDBServerHostWrapper objects, ordered by host_id.
"""
return self.host_objects.filter(id__in=ids).order_by('id')
@rdb_hosts.return_rdb_host
def find_hosts(self, deps, acls):
"""Finds valid hosts matching deps, acls.
@param deps: A list of dependencies to match.
@param acls: A list of acls, at least one of which must coincide with
an acl group the chosen host is in.
@return: A set of matching hosts available.
"""
hosts_available = self.host_objects.filter(invalid=0)
queries = [Q(labels__id=dep) for dep in deps]
queries += [Q(aclgroup__id__in=acls)]
for query in queries:
hosts_available = hosts_available.filter(query)
return set(hosts_available)
class AvailableHostQueryManager(BaseHostQueryManager):
"""Query manager for requests on un-leased, un-locked hosts.
"""
host_objects = models.Host.leased_objects
# Request Handlers: Used in conjunction with requests in rdb_utils, these
# handlers acquire hosts for a request and record the acquisition in
# an response_map dictionary keyed on the request itself, with the host/hosts
# as values.
class BaseHostRequestHandler(object):
"""Handler for requests related to hosts, leased or unleased.
This class is only capable of blindly returning host information.
"""
def __init__(self):
self.host_query_manager = BaseHostQueryManager()
self.response_map = {}
def update_response_map(self, request, response):
"""Record a response for a request.
The response_map only contains requests that were either satisfied, or
that ran into an exception. Often this translates to reserving hosts
against a request. If the rdb hit an exception processing a request, the
exception gets recorded in the map for the client to reraise.
@param response: A response for the request.
@param request: The request that has reserved these hosts.
@raises RDBException: If an empty values is added to the map.
"""
if not response:
raise rdb_utils.RDBException('response_map dict can only contain '
'valid responses. Request %s, response %s is invalid.' %
(request, response))
if self.response_map.get(request):
raise rdb_utils.RDBException('Request %s already has response %s '
'the rdb cannot return multiple '
'responses for the same request.' %
(request, response))
self.response_map[request] = response
def _record_exceptions(self, request, exceptions):
"""Record a list of exceptions for a request.
@param request: The request for which the exceptions were hit.
@param exceptions: The exceptions hit while processing the request.
"""
rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
self.update_response_map(request, rdb_exceptions)
def get_response(self):
"""Convert all RDBServerHostWrapper objects to host info dictionaries.
@return: A dictionary mapping requests to a list of matching host_infos.
"""
for request, response in self.response_map.iteritems():
self.response_map[request] = [reply.wire_format()
for reply in response]
return self.response_map
def update_hosts(self, update_requests):
"""Updates host tables with a payload.
@param update_requests: A list of update requests, as defined in
rdb_requests.UpdateHostRequest.
"""
# Last payload for a host_id wins in the case of conflicting requests.
unique_host_requests = {}
for request in update_requests:
if unique_host_requests.get(request.host_id):
unique_host_requests[request.host_id].update(request.payload)
else:
unique_host_requests[request.host_id] = request.payload
# Batch similar payloads so we can do them in one table scan.
similar_requests = {}
for host_id, payload in unique_host_requests.iteritems():
similar_requests.setdefault(payload, []).append(host_id)
# If fields of the update don't match columns in the database,
# record the exception in the response map. This also means later
# updates will get applied even if previous updates fail.
for payload, hosts in similar_requests.iteritems():
try:
response = self.host_query_manager.update_hosts(hosts, **payload)
except (django_exceptions.FieldError,
fields.FieldDoesNotExist, ValueError) as e:
for host in hosts:
# Since update requests have a consistent hash this will map
# to the same key as the original request.
request = rdb_requests.UpdateHostRequest(
host_id=host, payload=payload).get_request()
self._record_exceptions(request, [e])
def batch_get_hosts(self, host_requests):
"""Get hosts matching the requests.
This method does not acquire the hosts, i.e it reserves hosts against
requests leaving their leased state untouched.
@param host_requests: A list of requests, as defined in
rdb_utils.BaseHostRequest.
"""
host_ids = set([request.host_id for request in host_requests])
host_map = {}
# This list will not contain available hosts if executed using
# an AvailableHostQueryManager.
for host in self.host_query_manager.get_hosts(host_ids):
host_map[host.id] = host
for request in host_requests:
if request.host_id in host_map:
self.update_response_map(request, [host_map[request.host_id]])
else:
logging.warning('rdb could not get host for request: %s, it '
'is already leased or locked', request)
class AvailableHostRequestHandler(BaseHostRequestHandler):
"""Handler for requests related to available (unleased and unlocked) hosts.
This class is capable of acquiring or validating hosts for requests.
"""
def __init__(self):
self.host_query_manager = AvailableHostQueryManager()
self.response_map = {}
def lease_hosts(self, hosts):
"""Leases a list of hosts.
@param hosts: A list of RDBServerHostWrapper instances to lease.
@return: The list of RDBServerHostWrappers that were successfully
leased.
"""
#TODO(beeps): crbug.com/353183.
unleased_hosts = set(hosts)
leased_hosts = set([])
for host in unleased_hosts:
try:
host.lease()
except rdb_utils.RDBException as e:
logging.error('Unable to lease host %s: %s', host.hostname, e)
else:
leased_hosts.add(host)
return list(leased_hosts)
@classmethod
def valid_host_assignment(cls, request, host):
"""Check if a host, request pairing is valid.
@param request: The request to match against the host.
@param host: An RDBServerHostWrapper instance.
@return: True if the host, request assignment is valid.
@raises RDBException: If the request already has another host_ids
associated with it.
"""
if request.host_id and request.host_id != host.id:
raise rdb_utils.RDBException(
'Cannot assign a different host for request: %s, it '
'already has one: %s ' % (request, host.id))
# Getting all labels and acls might result in large queries, so
# bail early if the host is already leased.
if host.leased:
return False
# If a host is invalid it must be a one time host added to the
# afe specifically for this purpose, so it doesn't require acl checking.
acl_match = (request.acls.intersection(host.acls) or host.invalid)
label_match = (request.deps.intersection(host.labels) == request.deps)
return acl_match and label_match
@classmethod
def batch_requests(cls, requests):
""" Group similar requests, sort by priority and parent_job_id.
@param requests: A list or unsorted, unordered requests.
@return: A list of tuples of the form (request, number of occurances)
formed by counting the number of requests with the same acls/deps/
priority in the input list of requests, and sorting by priority.
The order of this list ensures against priority inversion.
"""
sort_function = lambda request: (request[0].priority,
-request[0].parent_job_id)
return sorted(collections.Counter(requests).items(), key=sort_function,
reverse=True)
@_timer.decorate
def batch_acquire_hosts(self, host_requests):
"""Acquire hosts for a list of requests.
The act of acquisition involves finding and leasing a set of
hosts that match the parameters of a request. Each acquired
host is added to the response_map dictionary, as an
RDBServerHostWrapper.
@param host_requests: A list of requests to acquire hosts.
"""
for request, count in self.batch_requests(host_requests):
hosts = self.host_query_manager.find_hosts(
request.deps, request.acls)
num_hosts = min(len(hosts), count)
if num_hosts:
# Try leasing num hosts, but only update the response map with
# those that we could successfully lease.
leased_hosts = self.lease_hosts(hosts[:num_hosts])
self.update_response_map(request, leased_hosts)
num_hosts = len(leased_hosts)
if num_hosts < count:
logging.warning('%s Unsatisfied rdb acquisition request:%s ',
count-num_hosts, request)
@_timer.decorate
def batch_validate_hosts(self, requests):
"""Validate requests with hosts.
Reserve all hosts, check each one for validity and discard invalid
request-host pairings. Lease the remaining hsots.
@param requests: A list of requests to validate.
@raises RDBException: If multiple hosts or the wrong host is returned
for a response.
"""
# The following cases are possible for frontend requests:
# 1. Multiple requests for 1 host, with different acls/deps/priority:
# These form distinct requests because they hash differently.
# The response map will contain entries like: {r1: h1, r2: h1}
# after the batch_get_hosts call. There are 2 sub-cases:
# a. Same deps/acls, different priority:
# Since we sort the requests based on priority, the
# higher priority request r1, will lease h1. The
# validation of r2, h1 will fail because of the r1 lease.
# b. Different deps/acls, only one of which matches the host:
# The matching request will lease h1. The other host
# pairing will get dropped from the response map.
# 2. Multiple requests with the same acls/deps/priority and 1 host:
# These all have the same request hash, so the response map will
# contain: {r: h}, regardless of the number of r's. If this is not
# a valid host assignment it will get dropped from the response.
self.batch_get_hosts(set(requests))
for request in sorted(self.response_map.keys(),
key=lambda request: request.priority, reverse=True):
hosts = self.response_map[request]
if len(hosts) > 1:
raise rdb_utils.RDBException('Got multiple hosts for a single '
'request. Hosts: %s, request %s.' % (hosts, request))
if (self.valid_host_assignment(request, hosts[0]) and
self.lease_hosts(hosts)):
continue
del self.response_map[request]
logging.warning('Request %s was not able to lease host %s',
request, hosts[0])
# Request dispatchers: Create the appropriate request handler, send a list
# of requests to one of its methods. The corresponding request handler in
# rdb_lib must understand how to match each request with a response from a
# dispatcher, the easiest way to achieve this is to returned the response_map
# attribute of the request handler, after making the appropriate requests.
def get_hosts(host_requests):
"""Get host information about the requested hosts.
@param host_requests: A list of requests as defined in BaseHostRequest.
@return: A dictionary mapping each request to a list of hosts.
"""
rdb_handler = BaseHostRequestHandler()
rdb_handler.batch_get_hosts(host_requests)
return rdb_handler.get_response()
def update_hosts(update_requests):
"""Update hosts.
@param update_requests: A list of updates to host tables
as defined in UpdateHostRequest.
"""
rdb_handler = BaseHostRequestHandler()
rdb_handler.update_hosts(update_requests)
return rdb_handler.get_response()
def rdb_host_request_dispatcher(host_requests):
"""Dispatcher for all host acquisition queries.
@param host_requests: A list of requests for acquiring hosts, as defined in
AcquireHostRequest.
@return: A dictionary mapping each request to a list of hosts, or
an empty list if none could satisfy the request. Eg:
{AcquireHostRequest.template: [host_info_dictionaries]}
"""
validation_requests = []
require_hosts_requests = []
# Validation requests are made by a job scheduled against a specific host
# specific host (eg: through the frontend) and only require the rdb to
# match the parameters of the host against the request. Acquisition
# requests are made by jobs that need hosts (eg: suites) and the rdb needs
# to find hosts matching the parameters of the request.
for request in host_requests:
if request.host_id:
validation_requests.append(request)
else:
require_hosts_requests.append(request)
rdb_handler = AvailableHostRequestHandler()
rdb_handler.batch_validate_hosts(validation_requests)
rdb_handler.batch_acquire_hosts(require_hosts_requests)
return rdb_handler.get_response()