blob: f314fa76db0b807cc991cbfda66561caa022f3da [file] [log] [blame]
import common
import os
from autotest_lib.frontend.afe import models as afe_models, model_logic
from autotest_lib.frontend.planner import models, model_attributes
from autotest_lib.frontend.planner import failure_actions, control_file
from autotest_lib.frontend.tko import models as tko_models
from autotest_lib.client.common_lib import global_config, utils, global_config
from autotest_lib.client.common_lib import enum
PLANNER_ATOMIC_GROUP_NAME = 'planner_global_atomic_group'
SERVER = global_config.global_config.get_config_value('SERVER', 'hostname')
def create_plan_label(plan):
Creates the host label to apply on the plan hosts
group, _ = afe_models.AtomicGroup.objects.get_or_create(
if group.invalid:
group.invalid = False
if bool(afe_models.Label.valid_objects.filter(name=name)):
raise model_logic.ValidationError('Label %s already exists, '
'cannot start plan' % name)
label = afe_models.Label(name=name, atomic_group=group)
return label
def start_plan(plan, label):
Takes the necessary steps to start a test plan in Autotest
timeout = global_config.global_config.get_config_value(
'PLANNER', 'execution_engine_timeout')
control = _get_execution_engine_control(
options = {'name': + '_execution_engine',
'priority': afe_models.Job.Priority.MEDIUM,
'control_file': control,
'control_type': afe_models.Job.ControlType.SERVER,
'synch_count': None,
'timeout': timeout,
'max_runtime_hrs': timeout,
'run_verify': False,
'reboot_before': False,
'reboot_after': False,
'dependencies': ()}
job = afe_models.Job.create(owner=afe_models.User.current_user().login,
options=options, hosts=())
def _get_execution_engine_control(server, plan_id, label_name, owner):
Gets the control file to run the execution engine
control = lazy_load(os.path.join(os.path.dirname(__file__),
return control % dict(server=server, plan_id=plan_id,
label_name=label_name, owner=owner)
def lazy_load(path):
Lazily loads the file indicated by the path given, and caches the result
if path not in LAZY_LOADED_FILES:
LAZY_LOADED_FILES[path] = utils.read_file(path)
return LAZY_LOADED_FILES[path]
def update_hosts_table(plan):
Resolves the host labels into host objects
Adds or removes hosts from the planner Hosts model based on changes to the
host label
label_hosts = set()
for label in plan.host_labels.all():
for afe_host in label.host_set.all():
host, created = models.Host.objects.get_or_create(plan=plan,
if created:
host.added_by_label = True
deleted_hosts = models.Host.objects.filter(
plan=plan, added_by_label=True).exclude(host__id__in=label_hosts)
def compute_next_test_config(plan, host):
Gets the next test config that should be run for this plan and host
Returns None if the host is already running a job. Also sets the host's
complete bit if the host is finished running tests.
if host.blocked:
return None
test_configs = plan.testconfig_set.exclude('execution_order')
result = None
for test_config in test_configs:
planner_jobs = test_config.job_set.filter(
for planner_job in planner_jobs:
# There is a job active; do not start another one
return None
planner_job = planner_jobs.get(requires_rerun=False)
except models.Job.DoesNotExist:
if not result:
result = test_config
if result:
return result
# All jobs related to this host are complete
host.complete = True
return None
def check_for_completion(plan):
Checks if a plan is actually complete. Sets complete=True if so
if not models.Host.objects.filter(plan=plan, complete=False):
plan.complete = True
def compute_test_run_status(status):
Converts a TKO test status to a Planner test run status
Status = model_attributes.TestRunStatus
if status == 'GOOD':
return Status.PASSED
if status == 'RUNNING':
return Status.ACTIVE
return Status.FAILED
def add_test_run(plan, planner_job, tko_test, hostname, status):
Adds a TKO test to the Planner Test Run tables
host = afe_models.Host.objects.get(hostname=hostname)
planner_host = models.Host.objects.get(plan=plan, host=host)
test_run, _ = models.TestRun.objects.get_or_create(plan=plan,
test_run.status = status
def process_failure(failure_id, host_action, test_action, labels, keyvals,
bugs, reason, invalidate):
if keyvals is None:
keyvals = {}
failure = models.TestRun.objects.get(id=failure_id)
_process_host_action(, host_action)
_process_test_action(failure.test_job, test_action)
# Add the test labels
for label in labels:
tko_test_label, _ = (
# Set the job keyvals
for key, value in keyvals.iteritems():
keyval, created = tko_models.JobKeyval.objects.get_or_create(
job=failure.tko_test.job, key=key)
if not created:
key='original_' + key,
keyval.value = value
# Add the bugs
for bug_id in bugs:
bug, _ = models.Bug.objects.get_or_create(external_uid=bug_id)
# Set the failure reason
if reason is not None:
failure.tko_test.reason = reason
# Set 'invalidated', 'seen', and 'triaged'
failure.invalidated = invalidate
failure.seen = True
failure.triaged = True
def _site_process_host_action_dummy(host, action):
return False
def _process_host_action(host, action):
Takes the specified action on the host
HostAction = failure_actions.HostAction
if action not in HostAction.values:
raise ValueError('Unexpected host action %s' % action)
site_process = utils.import_site_function(
__file__, 'autotest_lib.frontend.planner.site_rpc_utils',
'site_process_host_action', _site_process_host_action_dummy)
if not site_process(host, action):
# site_process_host_action returns True and and only if it matched a
# site-specific processing option
if action == HostAction.BLOCK:
host.blocked = True
elif action == HostAction.UNBLOCK:
host.blocked = False
assert action == HostAction.REINSTALL
raise NotImplemented('TODO: implement reinstall')
def _process_test_action(planner_job, action):
Takes the specified action for this planner job
TestAction = failure_actions.TestAction
if action not in TestAction.values:
raise ValueError('Unexpected test action %s' % action)
if action == TestAction.SKIP:
# Do nothing
assert action == TestAction.RERUN
planner_job.requires_rerun = True
def set_additional_parameters(plan, additional_parameters):
if not additional_parameters:
for index, additional_parameter in enumerate(additional_parameters):
hostname_regex = additional_parameter['hostname_regex']
param_type = additional_parameter['param_type']
param_values = additional_parameter['param_values']
additional_param = models.AdditionalParameter.objects.create(
plan=plan, hostname_regex=hostname_regex,
param_type=param_type, application_order=index)
for key, value in param_values.iteritems():
key=key, value=repr(value))
def _additional_wrap_arguments_dummy(plan, hostname):
return {}
def get_wrap_arguments(plan, hostname, param_type):
additional_param = (
plan=plan, hostname=hostname, param_type=param_type))
if not additional_param:
return {}
param_values = additional_param.additionalparametervalue_set.values_list(
'key', 'value')
return dict(param_values)
def wrap_control_file(plan, hostname, run_verify, test_config):
Wraps a control file using the ControlParameters for the plan
site_additional_wrap_arguments = utils.import_site_function(
__file__, 'autotest_lib.frontend.planner.site_rpc_utils',
'additional_wrap_arguments', _additional_wrap_arguments_dummy)
additional_wrap_arguments = site_additional_wrap_arguments(plan, hostname)
verify_params = get_wrap_arguments(
plan, hostname, model_attributes.AdditionalParameterType.VERIFY)
return control_file.wrap_control_file(
skip_verify=(not run_verify),
ComputeTestConfigStatusResult = enum.Enum('Pass', 'Fail', 'Scheduled',
'Running', string_values=True)
def compute_test_config_status(host, test_config=None):
Returns a value of ComputeTestConfigStatusResult:
Pass: This host passed the test config
Fail: This host failed the test config
Scheduled: This host has not yet run this test config
Running: This host is currently running this test config
A 'pass' means that, for every test configuration in the plan, the machine
had at least one AFE job with no failed tests. 'passed' could also be None,
meaning that this host is still running tests.
@param test_config: A test config to check. None to check all test configs
in the plan
if test_config:
test_configs = [test_config]
test_configs = host.plan.testconfig_set.exclude(
for test_config in test_configs:
planner_job = test_config.job_set.get(,
except models.Job.DoesNotExist:
return ComputeTestConfigStatusResult.SCHEDULED
return ComputeTestConfigStatusResult.RUNNING
if planner_job.testrun_set.exclude(tko_test__status__word='GOOD'):
return ComputeTestConfigStatusResult.FAIL
return ComputeTestConfigStatusResult.PASS