# pylint: disable-msg=C0111
Functions to expose over the RPC interface.
For all modify* and delete* functions that ask for an 'id' parameter to
identify the object to operate on, the id may be either
* the database row ID
* the name of the object (label name, hostname, user login, etc.)
* a dictionary containing uniquely identifying field (this option should seldom
be used)
When specifying foreign key fields (i.e. adding hosts to a label, or adding
users to an ACL group), the given value may be either the database row ID or the
name of the object.
All get* functions return lists of dictionaries. Each dictionary represents one
object and maps field names to values.
Some examples:
modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
modify_test('sleeptest', test_type='Client', params=', seconds=60')
delete_acl_group(1) # delete by ID
delete_acl_group('Everyone') # delete by name
acl_group_add_users('Everyone', ['mbligh', 'showard'])
get_jobs(owner='showard', status='Queued')
See doctests/001_rpc_test.txt for (lots) more examples.
__author__ = ' (Steve Howard)'
import datetime
import common
from autotest_lib.client.common_lib import priorities
from autotest_lib.frontend.afe import models, model_logic, model_attributes
from autotest_lib.frontend.afe import control_file, rpc_utils
from autotest_lib.frontend.afe import site_rpc_interface
from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
from autotest_lib.server import utils
from autotest_lib.server.cros.dynamic_suite import tools
def get_parameterized_autoupdate_image_url(job):
"""Get the parameterized autoupdate image url from a parameterized job."""
known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
para_set = job.parameterized_job.parameterizedjobparameter_set
job_test_para = para_set.get(test_parameter=image_parameter)
return job_test_para.parameter_value
# labels
def add_label(name, kernel_config=None, platform=None, only_if_needed=None):
return models.Label.add_object(
name=name, kernel_config=kernel_config, platform=platform,
def modify_label(id, **data):
def delete_label(id):
def label_add_hosts(id, hosts):
"""Add the label with the given id to the list of hosts.
The given label will be created if it doesn't exist, provided the `id`
supplied is a label name not an int/long id.
@param id: An id or label name. More often a label name.
@param hosts: A list of hostnames or ids. More often hostnames.
@raises models.Label.DoesNotExist: If the id specified is an int/long
and a label with that id doesn't exist.
host_objs = models.Host.smart_get_bulk(hosts)
# In the rare event that we're given an id and not a label name,
# it should already exist.
label = models.Label.smart_get(id)
except models.Label.DoesNotExist:
# This matches the type checks in smart_get, which is a hack
# in and off itself. The aim here is to create any non-existent
# label, which we cannot do if the 'id' specified isn't a label name.
if isinstance(id, basestring):
label = models.Label.smart_get(add_label(id))
if label.platform:
def label_remove_hosts(id, hosts):
host_objs = models.Host.smart_get_bulk(hosts)
def get_labels(exclude_filters=(), **filter_data):
@param exclude_filters: A sequence of dictionaries of filters.
@returns A sequence of nested dictionaries of label information.
labels = models.Label.query_objects(filter_data)
for exclude_filter in exclude_filters:
labels = labels.exclude(**exclude_filter)
return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',))
# atomic groups
def add_atomic_group(name, max_number_of_machines=None, description=None):
return models.AtomicGroup.add_object(
name=name, max_number_of_machines=max_number_of_machines,
def modify_atomic_group(id, **data):
def delete_atomic_group(id):
def atomic_group_add_labels(id, labels):
label_objs = models.Label.smart_get_bulk(labels)
def atomic_group_remove_labels(id, labels):
label_objs = models.Label.smart_get_bulk(labels)
def get_atomic_groups(**filter_data):
return rpc_utils.prepare_for_serialization(
# hosts
def add_host(hostname, status=None, locked=None, protection=None):
return models.Host.add_object(hostname=hostname, status=status,
locked=locked, protection=protection).id
def modify_host(id, **data):
"""Modify local attributes of a host.
If this is called on the master, but the host is assigned to a shard, this
will also forward the call to the responsible shard. This means i.e. if a
host is being locked using this function, this change will also propagate to
@param id: id of the host to modify.
@param **data: key=value pairs of values to set on the host.
host = models.Host.smart_get(id)
rpc_utils.check_modify_host_locking(host, data)
def modify_hosts(host_filter_data, update_data):
"""Modify local attributes of multiple hosts.
If this is called on the master, but one of the hosts in that match the
filters is assigned to a shard, this will also forward the call to the
responsible shard.
The filters are always applied on the master, not on the shards. This means
if the states of a host differ on the master and a shard, the state on the
master will be used. I.e. this means:
A host was synced to Shard 1. On Shard 1 the status of the host was set to
'Repair Failed'.
- A call to modify_hosts with host_filter_data={'status': 'Ready'} will
update the host (both on the shard and on the master), because the state
of the host as the master knows it is still 'Ready'.
- A call to modify_hosts with host_filter_data={'status': 'Repair failed'
will not update the host, because the filter doesn't apply on the master.
@param host_filter_data: Filters out which hosts to modify.
@param update_data: A dictionary with the changes to make to the hosts.
hosts = models.Host.query_objects(host_filter_data)
affected_shard_hostnames = set()
affected_host_ids = []
# Check all hosts before changing data for exception safety.
for host in hosts:
rpc_utils.check_modify_host_locking(host, update_data)
if host.shard:
if not rpc_utils.is_shard():
# Caution: Changing the filter from the original here. See docstring.
'modify_hosts', affected_shard_hostnames,
host_filter_data={'id__in': affected_host_ids},
for host in hosts:
def host_add_labels(id, labels):
labels = models.Label.smart_get_bulk(labels)
host = models.Host.smart_get(id)
platforms = [ for label in labels if label.platform]
if len(platforms) > 1:
raise model_logic.ValidationError(
{'labels': 'Adding more than one platform label: %s' %
', '.join(platforms)})
if len(platforms) == 1:
def host_remove_labels(id, labels):
labels = models.Label.smart_get_bulk(labels)
def get_host_attribute(attribute, **host_filter_data):
@param attribute: string name of attribute
@param host_filter_data: filter data to apply to Hosts to choose hosts to
act upon
hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data)
hosts = list(hosts)
models.Host.objects.populate_relationships(hosts, models.HostAttribute,
host_attr_dicts = []
for host_obj in hosts:
for attr_obj in host_obj.attribute_list:
if attr_obj.attribute == attribute:
return rpc_utils.prepare_for_serialization(host_attr_dicts)
def set_host_attribute(attribute, value, **host_filter_data):
@param attribute string name of attribute
@param value string, or None to delete an attribute
@param host_filter_data filter data to apply to Hosts to choose hosts to act
assert host_filter_data # disallow accidental actions on all hosts
hosts = models.Host.query_objects(host_filter_data)
for host in hosts:
host.set_or_delete_attribute(attribute, value)
def delete_host(id):
def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
exclude_atomic_group_hosts=False, valid_only=True, **filter_data):
@param multiple_labels: match hosts in all of the labels given. Should
be a list of label names.
@param exclude_only_if_needed_labels: Exclude hosts with at least one
"only_if_needed" label applied.
@param exclude_atomic_group_hosts: Exclude hosts that have one or more
atomic group labels associated with them.
hosts = rpc_utils.get_host_query(multiple_labels,
valid_only, filter_data)
hosts = list(hosts)
models.Host.objects.populate_relationships(hosts, models.Label,
models.Host.objects.populate_relationships(hosts, models.AclGroup,
models.Host.objects.populate_relationships(hosts, models.HostAttribute,
host_dicts = []
for host_obj in hosts:
host_dict = host_obj.get_object_dict()
host_dict['labels'] = [ for label in host_obj.label_list]
host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.
host_dict['acls'] = [ for acl in host_obj.acl_list]
host_dict['attributes'] = dict((attribute.attribute, attribute.value)
for attribute in host_obj.attribute_list)
return rpc_utils.prepare_for_serialization(host_dicts)
def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
exclude_atomic_group_hosts=False, valid_only=True,
Same parameters as get_hosts().
@returns The number of matching hosts.
hosts = rpc_utils.get_host_query(multiple_labels,
valid_only, filter_data)
return hosts.count()
# tests
def add_test(name, test_type, path, author=None, dependencies=None,
experimental=True, run_verify=None, test_class=None,
test_time=None, test_category=None, description=None,
return models.Test.add_object(name=name, test_type=test_type, path=path,
author=author, dependencies=dependencies,
run_verify=run_verify, test_time=test_time,
def modify_test(id, **data):
def delete_test(id):
def get_tests(**filter_data):
return rpc_utils.prepare_for_serialization(
# profilers
def add_profiler(name, description=None):
return models.Profiler.add_object(name=name, description=description).id
def modify_profiler(id, **data):
def delete_profiler(id):
def get_profilers(**filter_data):
return rpc_utils.prepare_for_serialization(
# users
def add_user(login, access_level=None):
return models.User.add_object(login=login, access_level=access_level).id
def modify_user(id, **data):
def delete_user(id):
def get_users(**filter_data):
return rpc_utils.prepare_for_serialization(
# acl groups
def add_acl_group(name, description=None):
group = models.AclGroup.add_object(name=name, description=description)
def modify_acl_group(id, **data):
group = models.AclGroup.smart_get(id)
def acl_group_add_users(id, users):
group = models.AclGroup.smart_get(id)
users = models.User.smart_get_bulk(users)
def acl_group_remove_users(id, users):
group = models.AclGroup.smart_get(id)
users = models.User.smart_get_bulk(users)
def acl_group_add_hosts(id, hosts):
group = models.AclGroup.smart_get(id)
hosts = models.Host.smart_get_bulk(hosts)
def acl_group_remove_hosts(id, hosts):
group = models.AclGroup.smart_get(id)
hosts = models.Host.smart_get_bulk(hosts)
def delete_acl_group(id):
def get_acl_groups(**filter_data):
acl_groups = models.AclGroup.list_objects(filter_data)
for acl_group in acl_groups:
acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
acl_group['users'] = [user.login
for user in acl_group_obj.users.all()]
acl_group['hosts'] = [host.hostname
for host in acl_group_obj.hosts.all()]
return rpc_utils.prepare_for_serialization(acl_groups)
# jobs
def generate_control_file(tests=(), kernel=None, label=None, profilers=(),
client_control_file='', use_container=False,
profile_only=None, upload_kernel_config=False):
Generates a client-side control file to load a kernel and run tests.
@param tests List of tests to run.
@param kernel A list of kernel info dictionaries configuring which kernels
to boot for this job and other options for them
@param label Name of label to grab kernel config from.
@param profilers List of profilers to activate during the job.
@param client_control_file The contents of a client-side control file to
run at the end of all tests. If this is supplied, all tests must be
client side.
TODO: in the future we should support server control files directly
to wrap with a kernel. That'll require changing the parameter
name and adding a boolean to indicate if it is a client or server
control file.
@param use_container unused argument today. TODO: Enable containers
on the host during a client side test.
@param profile_only A boolean that indicates what default profile_only
mode to use in the control file. Passing None will generate a
control file that does not explcitly set the default mode at all.
@param upload_kernel_config: if enabled it will generate server control
file code that uploads the kernel config file to the client and
tells the client of the new (local) path when compiling the kernel;
the tests must be server side tests
@returns a dict with the following keys:
control_file: str, The control file text.
is_server: bool, is the control file a server-side control file?
synch_count: How many machines the job uses per autoserv execution.
synch_count == 1 means the job is asynchronous.
dependencies: A list of the names of labels on which the job depends.
if not tests and not client_control_file:
return dict(control_file='', is_server=False, synch_count=1,
cf_info, test_objects, profiler_objects, label = (
rpc_utils.prepare_generate_control_file(tests, kernel, label,
cf_info['control_file'] = control_file.generate_control(
tests=test_objects, kernels=kernel, platform=label,
profilers=profiler_objects, is_server=cf_info['is_server'],
client_control_file=client_control_file, profile_only=profile_only,
return cf_info
def create_parameterized_job(name, priority, test, parameters, kernel=None,
label=None, profilers=(), profiler_parameters=None,
use_container=False, profile_only=None,
upload_kernel_config=False, hosts=(),
meta_hosts=(), one_time_hosts=(),
atomic_group_name=None, synch_count=None,
is_template=False, timeout=None,
timeout_mins=None, max_runtime_mins=None,
run_verify=False, email_list='', dependencies=(),
reboot_before=None, reboot_after=None,
parse_failed_repair=None, hostless=False,
keyvals=None, drone_set=None, run_reset=True):
Creates and enqueues a parameterized job.
Most parameters a combination of the parameters for generate_control_file()
and create_job(), with the exception of:
@param test name or ID of the test to run
@param parameters a map of parameter name ->
tuple of (param value, param type)
@param profiler_parameters a dictionary of parameters for the profilers:
key: profiler name
value: dict of param name -> tuple of
(param value,
param type)
# Save the values of the passed arguments here. What we're going to do with
# them is pass them all to rpc_utils.get_create_job_common_args(), which
# will extract the subset of these arguments that apply for
# rpc_utils.create_job_common(), which we then pass in to that function.
args = locals()
# Set up the parameterized job configs
test_obj = models.Test.smart_get(test)
control_type = test_obj.test_type
label = models.Label.smart_get(label)
except models.Label.DoesNotExist:
label = None
kernel_objs = models.Kernel.create_kernels(kernel)
profiler_objs = [models.Profiler.smart_get(profiler)
for profiler in profilers]
parameterized_job = models.ParameterizedJob.objects.create(
test=test_obj, label=label, use_container=use_container,
for profiler in profiler_objs:
parameterized_profiler = models.ParameterizedJobProfiler.objects.create(
profiler_params = profiler_parameters.get(, {})
for name, (value, param_type) in profiler_params.iteritems():
for parameter in test_obj.testparameter_set.all():
if in parameters:
param_value, param_type = parameters.pop(
test_parameter=parameter, parameter_value=param_value,
if parameters:
raise Exception('Extra parameters remain: %r' % parameters)
return rpc_utils.create_job_common(,
def create_job_page_handler(name, priority, control_file, control_type,
image=None, hostless=False, **kwargs):
Create and enqueue a job.
@param name name of this job
@param priority Integer priority of this job. Higher is more important.
@param control_file String contents of the control file.
@param control_type Type of control file, Client or Server.
@param kwargs extra args that will be required by create_suite_job or
@returns The created Job id number.
control_file = rpc_utils.encode_ascii(control_file)
if not control_file:
raise model_logic.ValidationError({
'control_file' : "Control file cannot be empty"})
if image and hostless:
return site_rpc_interface.create_suite_job(
name=name, control_file=control_file, priority=priority,
build=image, **kwargs)
return create_job(name, priority, control_file, control_type, image=image,
hostless=hostless, **kwargs)
def create_job(name, priority, control_file, control_type,
hosts=(), meta_hosts=(), one_time_hosts=(),
atomic_group_name=None, synch_count=None, is_template=False,
timeout=None, timeout_mins=None, max_runtime_mins=None,
run_verify=False, email_list='', dependencies=(),
reboot_before=None, reboot_after=None, parse_failed_repair=None,
hostless=False, keyvals=None, drone_set=None, image=None,
parent_job_id=None, test_retry=0, run_reset=True, args=(),
Create and enqueue a job.
@param name name of this job
@param priority Integer priority of this job. Higher is more important.
@param control_file String contents of the control file.
@param control_type Type of control file, Client or Server.
@param synch_count How many machines the job uses per autoserv execution.
synch_count == 1 means the job is asynchronous. If an atomic group is
given this value is treated as a minimum.
@param is_template If true then create a template job.
@param timeout Hours after this call returns until the job times out.
@param timeout_mins Minutes after this call returns until the job times
@param max_runtime_mins Minutes from job starting time until job times out
@param run_verify Should the host be verified before running the test?
@param email_list String containing emails to mail when the job is done
@param dependencies List of label names on which this job depends
@param reboot_before Never, If dirty, or Always
@param reboot_after Never, If all tests passed, or Always
@param parse_failed_repair if true, results of failed repairs launched by
this job will be parsed as part of the job.
@param hostless if true, create a hostless job
@param keyvals dict of keyvals to associate with the job
@param hosts List of hosts to run job on.
@param meta_hosts List where each entry is a label name, and for each entry
one host will be chosen from that label to run the job on.
@param one_time_hosts List of hosts not in the database to run the job on.
@param atomic_group_name The name of an atomic group to schedule the job on.
@param drone_set The name of the drone set to run this test on.
@param image OS image to install before running job.
@param parent_job_id id of a job considered to be parent of created job.
@param test_retry Number of times to retry test if the test did not
complete successfully. (optional, default: 0)
@param run_reset Should the host be reset before running the test?
@param args A list of args to be injected into control file.
@param kwargs extra keyword args. NOT USED.
@returns The created Job id number.
if args:
control_file = tools.inject_vars({'args': args}, control_file)
if image is None:
return rpc_utils.create_job_common(
# When image is supplied use a known parameterized test already in the
# database to pass the OS image path from the front end, through the
# scheduler, and finally to autoserv as the --image parameter.
# The test autoupdate_ParameterizedJob is in afe_autotests and used to
# instantiate a Test object and from there a ParameterizedJob.
known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
known_parameterized_job = models.ParameterizedJob.objects.create(
# autoupdate_ParameterizedJob has a single parameter, the image parameter,
# stored in the table afe_test_parameters. We retrieve and set this
# instance of the parameter to the OS image path.
image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
test_parameter=image_parameter, parameter_value=image,
# By passing a parameterized_job to create_job_common the job entry in
# the afe_jobs table will have the field parameterized_job_id set.
# The scheduler uses this id in the afe_parameterized_jobs table to
# match this job to our known test, and then with the
# afe_parameterized_job_parameters table to get the actual image path.
return rpc_utils.create_job_common(,
def abort_host_queue_entries(**filter_data):
Abort a set of host queue entries.
query = models.HostQueueEntry.query_objects(filter_data)
# Dont allow aborts on:
# 1. Jobs that have already completed (whether or not they were aborted)
# 2. Jobs that we have already been aborted (but may not have completed)
query = query.filter(complete=False).filter(aborted=False)
host_queue_entries = list(query.select_related())
def abort_special_tasks(**filter_data):
Abort the special task, or tasks, specified in the filter.
query = models.SpecialTask.query_objects(filter_data)
special_tasks = query.filter(is_active=True)
for task in special_tasks:
def _call_special_tasks_on_hosts(task, hosts):
Schedules a set of hosts for a special task.
@returns A list of hostnames that a special task was created for.
for host in hosts:
models.SpecialTask.schedule_special_task(host, task)
return list(sorted(host.hostname for host in hosts))
def reverify_hosts(**filter_data):
Schedules a set of hosts for verify.
@returns A list of hostnames that a verify task was created for.
return _call_special_tasks_on_hosts(models.SpecialTask.Task.VERIFY,
def repair_hosts(**filter_data):
Schedules a set of hosts for repair.
@returns A list of hostnames that a repair task was created for.
return _call_special_tasks_on_hosts(models.SpecialTask.Task.REPAIR,
def get_jobs(not_yet_run=False, running=False, finished=False,
suite=False, sub=False, standalone=False, **filter_data):
Extra status filter args for get_jobs:
-not_yet_run: Include only jobs that have not yet started running.
-running: Include only jobs that have start running but for which not
all hosts have completed.
-finished: Include only jobs for which all hosts have completed (or
At most one of these three fields should be specified.
Extra type filter args for get_jobs:
-suite: Include only jobs with child jobs.
-sub: Include only jobs with a parent job.
-standalone: Inlcude only jobs with no child or parent jobs.
At most one of these three fields should be specified.
extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
job_dicts = []
jobs = list(models.Job.query_objects(filter_data))
models.Job.objects.populate_relationships(jobs, models.Label,
models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
for job in jobs:
job_dict = job.get_object_dict()
job_dict['dependencies'] = ','.join(
for label in job.dependencies)
job_dict['keyvals'] = dict((keyval.key, keyval.value)
for keyval in job.keyvals)
if job.parameterized_job:
job_dict['image'] = get_parameterized_autoupdate_image_url(job)
return rpc_utils.prepare_for_serialization(job_dicts)
def get_num_jobs(not_yet_run=False, running=False, finished=False,
suite=False, sub=False, standalone=False,
See get_jobs() for documentation of extra filter parameters.
extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
return models.Job.query_count(filter_data)
def get_jobs_summary(**filter_data):
Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
'status_counts' filed is a dictionary mapping status strings to the number
of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
'result_counts' field is piped to tko's rpc_interface and has the return
format specified under get_group_counts.
jobs = get_jobs(**filter_data)
ids = [job['id'] for job in jobs]
all_status_counts = models.Job.objects.get_status_counts(ids)
for job in jobs:
job['status_counts'] = all_status_counts[job['id']]
job['result_counts'] = tko_rpc_interface.get_status_counts(
['afe_job_id', 'afe_job_id'],
header_groups=[['afe_job_id'], ['afe_job_id']],
**{'afe_job_id': job['id']})
return rpc_utils.prepare_for_serialization(jobs)
def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
Retrieves all the information needed to clone a job.
job = models.Job.objects.get(id=id)
job_info = rpc_utils.get_job_info(job,
host_dicts = []
for host in job_info['hosts']:
host_dict = get_hosts([0]
other_labels = host_dict['labels']
if host_dict['platform']:
host_dict['other_labels'] = ', '.join(other_labels)
for host in job_info['one_time_hosts']:
host_dict = dict(hostname=host.hostname,,
platform='(one-time host)',
# convert keys from Label objects to strings (names of labels)
meta_host_counts = dict((, count) for meta_host, count
in job_info['meta_host_counts'].iteritems())
info = dict(job=job.get_object_dict(),
info['job']['dependencies'] = job_info['dependencies']
if job_info['atomic_group']:
info['atomic_group_name'] = (job_info['atomic_group']).name
info['atomic_group_name'] = None
info['hostless'] = job_info['hostless']
info['drone_set'] = job.drone_set and
if job.parameterized_job:
info['job']['image'] = get_parameterized_autoupdate_image_url(job)
return rpc_utils.prepare_for_serialization(info)
# host queue entries
def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
@returns A sequence of nested dictionaries of host and job information.
filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
return rpc_utils.prepare_rows_as_nested_dicts(
('host', 'atomic_group', 'job'))
def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
Get the number of host queue entries associated with this job.
filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
return models.HostQueueEntry.query_count(filter_data)
def get_hqe_percentage_complete(**filter_data):
Computes the fraction of host queue entries matching the given filter data
that are complete.
query = models.HostQueueEntry.query_objects(filter_data)
complete_count = query.filter(complete=True).count()
total_count = query.count()
if total_count == 0:
return 1
return float(complete_count) / total_count
# special tasks
def get_special_tasks(**filter_data):
return rpc_utils.prepare_rows_as_nested_dicts(
('host', 'queue_entry'))
# support for host detail view
def get_host_queue_entries_and_special_tasks(host_id, query_start=None,
query_limit=None, start_time=None,
@returns an interleaved list of HostQueueEntries and SpecialTasks,
in approximate run order. each dict contains keys for type, host,
job, status, started_on, execution_path, and ID.
total_limit = None
if query_limit is not None:
total_limit = query_start + query_limit
filter_data_common = {'host': host_id,
'query_limit': total_limit,
'sort_by': ['-id']}
filter_data_queue_entries, filter_data_special_tasks = (
filter_data_common, start_time, end_time))
queue_entries = list(models.HostQueueEntry.query_objects(
special_tasks = list(models.SpecialTask.query_objects(
interleaved_entries = rpc_utils.interleave_entries(queue_entries,
if query_start is not None:
interleaved_entries = interleaved_entries[query_start:]
if query_limit is not None:
interleaved_entries = interleaved_entries[:query_limit]
return rpc_utils.prepare_for_serialization(interleaved_entries)
def get_num_host_queue_entries_and_special_tasks(host_id, start_time=None,
filter_data_common = {'host': host_id}
filter_data_queue_entries, filter_data_special_tasks = (
filter_data_common, start_time, end_time))
return (models.HostQueueEntry.query_count(filter_data_queue_entries)
+ models.SpecialTask.query_count(filter_data_special_tasks))
# recurring run
def get_recurring(**filter_data):
return rpc_utils.prepare_rows_as_nested_dicts(
('job', 'owner'))
def get_num_recurring(**filter_data):
return models.RecurringRun.query_count(filter_data)
def delete_recurring_runs(**filter_data):
to_delete = models.RecurringRun.query_objects(filter_data)
def create_recurring_run(job_id, start_date, loop_period, loop_count):
owner = models.User.current_user().login
job = models.Job.objects.get(id=job_id)
return job.create_recurring_job(start_date=start_date,
# other
def echo(data=""):
Returns a passed in string. For doing a basic test to see if RPC calls
can successfully be made.
return data
def get_motd():
Returns the message of the day as a string.
return rpc_utils.get_motd()
def get_static_data():
Returns a dictionary containing a bunch of data that shouldn't change
often and is otherwise inaccessible. This includes:
priorities: List of job priority choices.
default_priority: Default priority value for new jobs.
users: Sorted list of all users.
labels: Sorted list of labels not start with 'cros-version' and
atomic_groups: Sorted list of all atomic groups.
tests: Sorted list of all tests.
profilers: Sorted list of all profilers.
current_user: Logged-in username.
host_statuses: Sorted list of possible Host statuses.
job_statuses: Sorted list of possible HostQueueEntry statuses.
job_timeout_default: The default job timeout length in minutes.
parse_failed_repair_default: Default value for the parse_failed_repair job
reboot_before_options: A list of valid RebootBefore string enums.
reboot_after_options: A list of valid RebootAfter string enums.
motd: Server's message of the day.
status_dictionary: A mapping from one word job status names to a more
informative description.
job_fields = models.Job.get_field_dict()
default_drone_set_name = models.DroneSet.default_drone_set_name()
drone_sets = ([default_drone_set_name] +
sorted( for drone_set in
result = {}
result['priorities'] = priorities.Priority.choices()
default_priority = priorities.Priority.DEFAULT
result['default_priority'] = 'Default'
result['max_schedulable_priority'] = priorities.Priority.DEFAULT
result['users'] = get_users(sort_by=['login'])
label_exclude_filters = [{'name__startswith': 'cros-version'},
{'name__startswith': 'fw-version'}]
result['labels'] = get_labels(
sort_by=['-platform', 'name'])
result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
result['tests'] = get_tests(sort_by=['name'])
result['profilers'] = get_profilers(sort_by=['name'])
result['current_user'] = rpc_utils.prepare_for_serialization(
result['host_statuses'] = sorted(models.Host.Status.names)
result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
result['job_max_runtime_mins_default'] = (
result['parse_failed_repair_default'] = bool(
result['reboot_before_options'] = model_attributes.RebootBefore.names
result['reboot_after_options'] = model_attributes.RebootAfter.names
result['motd'] = rpc_utils.get_motd()
result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
result['drone_sets'] = drone_sets
result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()
result['status_dictionary'] = {"Aborted": "Aborted",
"Verifying": "Verifying Host",
"Provisioning": "Provisioning Host",
"Pending": "Waiting on other hosts",
"Running": "Running autoserv",
"Completed": "Autoserv completed",
"Failed": "Failed to complete",
"Queued": "Queued",
"Starting": "Next in host's queue",
"Stopped": "Other host(s) failed verify",
"Parsing": "Awaiting parse of final results",
"Gathering": "Gathering log files",
"Template": "Template job for recurring run",
"Waiting": "Waiting for scheduler action",
"Archiving": "Archiving results",
"Resetting": "Resetting hosts"}
result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
result['is_moblab'] = bool(utils.is_moblab())
return result
def get_server_time():
return"%Y-%m-%d %H:%M")