blob: 84797d1c7f1c9c4aeb95b6804e68ccce35f7f98d [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module of benchmark runs."""
import datetime
import threading
import time
import traceback
from cros_utils import command_executer
from cros_utils import timeline
from results_cache import MockResult
from results_cache import MockResultsCache
from results_cache import Result
from results_cache import ResultsCache
from suite_runner import SuiteRunner
STATUS_FAILED = "FAILED"
STATUS_SUCCEEDED = "SUCCEEDED"
STATUS_IMAGING = "IMAGING"
STATUS_RUNNING = "RUNNING"
STATUS_WAITING = "WAITING"
STATUS_PENDING = "PENDING"
class BenchmarkRun(threading.Thread):
"""The benchmarkrun class."""
def __init__(
self,
name,
benchmark,
label,
iteration,
cache_conditions,
machine_manager,
logger_to_use,
log_level,
share_cache,
dut_config,
):
threading.Thread.__init__(self)
self.name = name
self._logger = logger_to_use
self.log_level = log_level
self.benchmark = benchmark
self.iteration = iteration
self.label = label
self.result = None
self.terminated = False
self.retval = None
self.run_completed = False
self.machine_manager = machine_manager
self.suite_runner = SuiteRunner(
dut_config, self._logger, self.log_level
)
self.machine = None
self.cache_conditions = cache_conditions
self.runs_complete = 0
self.cache_hit = False
self.failure_reason = ""
self.test_args = benchmark.test_args
self.cache = None
self.profiler_args = self.GetExtraAutotestArgs()
self._ce = command_executer.GetCommandExecuter(
self._logger, log_level=self.log_level
)
self.timeline = timeline.Timeline()
self.timeline.Record(STATUS_PENDING)
self.share_cache = share_cache
self.cache_has_been_read = False
# This is used by schedv2.
self.owner_thread = None
def ReadCache(self):
# Just use the first machine for running the cached version,
# without locking it.
self.cache = ResultsCache()
self.cache.Init(
self.label.chromeos_image,
self.label.chromeos_root,
self.benchmark.test_name,
self.iteration,
self.test_args,
self.profiler_args,
self.machine_manager,
self.machine,
self.label.board,
self.cache_conditions,
self._logger,
self.log_level,
self.label,
self.share_cache,
self.benchmark.suite,
self.benchmark.show_all_results,
self.benchmark.run_local,
self.benchmark.cwp_dso,
)
self.result = self.cache.ReadResult()
self.cache_hit = self.result is not None
self.cache_has_been_read = True
def run(self):
try:
if not self.cache_has_been_read:
self.ReadCache()
if self.result:
self._logger.LogOutput("%s: Cache hit." % self.name)
self._logger.LogOutput(self.result.out, print_to_console=False)
self._logger.LogError(self.result.err, print_to_console=False)
elif self.label.cache_only:
self._logger.LogOutput("%s: No cache hit." % self.name)
output = "%s: No Cache hit." % self.name
retval = 1
err = "No cache hit."
self.result = Result.CreateFromRun(
self._logger,
self.log_level,
self.label,
self.machine,
output,
err,
retval,
self.benchmark.test_name,
self.benchmark.suite,
self.benchmark.cwp_dso,
)
else:
self._logger.LogOutput("%s: No cache hit." % self.name)
self.timeline.Record(STATUS_WAITING)
# Try to acquire a machine now.
self.machine = self.AcquireMachine()
self.cache.machine = self.machine
self.result = self.RunTest(self.machine)
self.cache.remote = self.machine.name
self.label.chrome_version = (
self.machine_manager.GetChromeVersion(self.machine)
)
self.cache.StoreResult(self.result)
if not self.label.chrome_version:
if self.machine:
self.label.chrome_version = (
self.machine_manager.GetChromeVersion(self.machine)
)
elif self.result.chrome_version:
self.label.chrome_version = self.result.chrome_version
if self.terminated:
return
if not self.result.retval:
self.timeline.Record(STATUS_SUCCEEDED)
else:
if self.timeline.GetLastEvent() != STATUS_FAILED:
self.failure_reason = (
"Return value of test suite was non-zero."
)
self.timeline.Record(STATUS_FAILED)
except Exception as e:
self._logger.LogError(
"Benchmark run: '%s' failed: %s" % (self.name, e)
)
traceback.print_exc()
if self.timeline.GetLastEvent() != STATUS_FAILED:
self.timeline.Record(STATUS_FAILED)
self.failure_reason = str(e)
finally:
if self.owner_thread is not None:
# In schedv2 mode, we do not lock machine locally. So noop here.
pass
elif self.machine:
if not self.machine.IsReachable():
self._logger.LogOutput(
"Machine %s is not reachable, removing it."
% self.machine.name
)
self.machine_manager.RemoveMachine(self.machine.name)
self._logger.LogOutput(
"Releasing machine: %s" % self.machine.name
)
self.machine_manager.ReleaseMachine(self.machine)
self._logger.LogOutput(
"Released machine: %s" % self.machine.name
)
def Terminate(self):
self.terminated = True
self.suite_runner.Terminate()
if self.timeline.GetLastEvent() != STATUS_FAILED:
self.timeline.Record(STATUS_FAILED)
self.failure_reason = "Thread terminated."
def AcquireMachine(self):
if self.owner_thread is not None:
# No need to lock machine locally, DutWorker, which is a thread, is
# responsible for running br.
return self.owner_thread.dut()
while True:
machine = None
if self.terminated:
raise RuntimeError(
"Thread terminated while trying to acquire machine."
)
machine = self.machine_manager.AcquireMachine(self.label)
if machine:
self._logger.LogOutput(
"%s: Machine %s acquired at %s"
% (self.name, machine.name, datetime.datetime.now())
)
break
time.sleep(10)
return machine
def GetExtraAutotestArgs(self):
if (
self.benchmark.perf_args
and self.benchmark.suite != "telemetry_Crosperf"
):
self._logger.LogError(
"Non-telemetry benchmark does not support profiler."
)
self.benchmark.perf_args = ""
if self.benchmark.perf_args:
perf_args_list = self.benchmark.perf_args.split(" ")
perf_args_list = [perf_args_list[0]] + ["-a"] + perf_args_list[1:]
perf_args = " ".join(perf_args_list)
if not perf_args_list[0] in ["record", "stat"]:
raise SyntaxError(
"perf_args must start with either record or stat"
)
extra_test_args = [
"--profiler=custom_perf",
("--profiler_args='perf_options=\"%s\"'" % perf_args),
]
return " ".join(extra_test_args)
else:
return ""
def RunTest(self, machine):
self.timeline.Record(STATUS_IMAGING)
if self.owner_thread is not None:
# In schedv2 mode, do not even call ImageMachine. Machine image is
# guarenteed.
pass
else:
self.machine_manager.ImageMachine(machine, self.label)
self.timeline.Record(STATUS_RUNNING)
retval, out, err = self.suite_runner.Run(
machine,
self.label,
self.benchmark,
self.test_args,
self.profiler_args,
)
self.run_completed = True
return Result.CreateFromRun(
self._logger,
self.log_level,
self.label,
self.machine,
out,
err,
retval,
self.benchmark.test_name,
self.benchmark.suite,
self.benchmark.cwp_dso,
)
def SetCacheConditions(self, cache_conditions):
self.cache_conditions = cache_conditions
def logger(self):
"""Return the logger, only used by unittest.
Returns:
self._logger
"""
return self._logger
def __str__(self):
"""For better debugging."""
return 'BenchmarkRun[name="{}"]'.format(self.name)
class MockBenchmarkRun(BenchmarkRun):
"""Inherited from BenchmarkRun."""
def ReadCache(self):
# Just use the first machine for running the cached version,
# without locking it.
self.cache = MockResultsCache()
self.cache.Init(
self.label.chromeos_image,
self.label.chromeos_root,
self.benchmark.test_name,
self.iteration,
self.test_args,
self.profiler_args,
self.machine_manager,
self.machine,
self.label.board,
self.cache_conditions,
self._logger,
self.log_level,
self.label,
self.share_cache,
self.benchmark.suite,
self.benchmark.show_all_results,
self.benchmark.run_local,
self.benchmark.cwp_dso,
)
self.result = self.cache.ReadResult()
self.cache_hit = self.result is not None
def RunTest(self, machine):
"""Remove Result.CreateFromRun for testing."""
self.timeline.Record(STATUS_IMAGING)
self.machine_manager.ImageMachine(machine, self.label)
self.timeline.Record(STATUS_RUNNING)
[retval, out, err] = self.suite_runner.Run(
machine,
self.label,
self.benchmark,
self.test_args,
self.profiler_args,
)
self.run_completed = True
rr = MockResult("logger", self.label, self.log_level, machine)
rr.out = out
rr.err = err
rr.retval = retval
return rr