| # -*- coding: utf-8 -*- |
| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Module to optimize the scheduling of benchmark_run tasks.""" |
| |
| from __future__ import division |
| from __future__ import print_function |
| |
| import sys |
| import traceback |
| |
| from collections import defaultdict |
| from threading import Lock |
| from threading import Thread |
| |
| import test_flag |
| |
| from machine_image_manager import MachineImageManager |
| from cros_utils import command_executer |
| from cros_utils import logger |
| |
| |
| class DutWorker(Thread): |
| """Working thread for a dut.""" |
| |
| def __init__(self, dut, sched): |
| super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) |
| self._dut = dut |
| self._sched = sched |
| self._stat_num_br_run = 0 |
| self._stat_num_reimage = 0 |
| self._stat_annotation = '' |
| self._logger = logger.GetLogger(self._sched.get_experiment().log_dir) |
| self.daemon = True |
| self._terminated = False |
| self._active_br = None |
| # Race condition accessing _active_br between _execute_benchmark_run and |
| # _terminate, so lock it up. |
| self._active_br_lock = Lock() |
| |
| def terminate(self): |
| self._terminated = True |
| with self._active_br_lock: |
| if self._active_br is not None: |
| # BenchmarkRun.Terminate() terminates any running testcase via |
| # suite_runner.Terminate and updates timeline. |
| self._active_br.Terminate() |
| |
| def run(self): |
| """Do the "run-test->(optionally reimage)->run-test" chore. |
| |
| Note - 'br' below means 'benchmark_run'. |
| """ |
| |
| # Firstly, handle benchmarkruns that have cache hit. |
| br = self._sched.get_cached_benchmark_run() |
| while br: |
| try: |
| self._stat_annotation = 'finishing cached {}'.format(br) |
| br.run() |
| except RuntimeError: |
| traceback.print_exc(file=sys.stdout) |
| br = self._sched.get_cached_benchmark_run() |
| |
| # Secondly, handle benchmarkruns that needs to be run on dut. |
| self._setup_dut_label() |
| try: |
| self._logger.LogOutput('{} started.'.format(self)) |
| while not self._terminated: |
| br = self._sched.get_benchmark_run(self._dut) |
| if br is None: |
| # No br left for this label. Considering reimaging. |
| label = self._sched.allocate_label(self._dut) |
| if label is None: |
| # No br even for other labels. We are done. |
| self._logger.LogOutput('ImageManager found no label ' |
| 'for dut, stopping working ' |
| 'thread {}.'.format(self)) |
| break |
| if self._reimage(label): |
| # Reimage to run other br fails, dut is doomed, stop |
| # this thread. |
| self._logger.LogWarning('Re-image failed, dut ' |
| 'in an unstable state, stopping ' |
| 'working thread {}.'.format(self)) |
| break |
| else: |
| # Execute the br. |
| self._execute_benchmark_run(br) |
| finally: |
| self._stat_annotation = 'finished' |
| # Thread finishes. Notify scheduler that I'm done. |
| self._sched.dut_worker_finished(self) |
| |
| def _reimage(self, label): |
| """Reimage image to label. |
| |
| Args: |
| label: the label to remimage onto dut. |
| |
| Returns: |
| 0 if successful, otherwise 1. |
| """ |
| |
| # Termination could happen anywhere, check it. |
| if self._terminated: |
| return 1 |
| |
| if self._sched.get_experiment().skylab: |
| self._logger.LogOutput('Skylab mode, do not image before testing.') |
| self._dut.label = label |
| return 0 |
| |
| self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) |
| self._stat_num_reimage += 1 |
| self._stat_annotation = 'reimaging using "{}"'.format(label.name) |
| try: |
| # Note, only 1 reimage at any given time, this is guaranteed in |
| # ImageMachine, so no sync needed below. |
| retval = self._sched.get_experiment().machine_manager.ImageMachine( |
| self._dut, label) |
| |
| if retval: |
| return 1 |
| except RuntimeError: |
| return 1 |
| |
| self._dut.label = label |
| return 0 |
| |
| def _execute_benchmark_run(self, br): |
| """Execute a single benchmark_run. |
| |
| Note - this function never throws exceptions. |
| """ |
| |
| # Termination could happen anywhere, check it. |
| if self._terminated: |
| return |
| |
| self._logger.LogOutput('{} started working on {}'.format(self, br)) |
| self._stat_num_br_run += 1 |
| self._stat_annotation = 'executing {}'.format(br) |
| # benchmark_run.run does not throws, but just play it safe here. |
| try: |
| assert br.owner_thread is None |
| br.owner_thread = self |
| with self._active_br_lock: |
| self._active_br = br |
| br.run() |
| finally: |
| self._sched.get_experiment().BenchmarkRunFinished(br) |
| with self._active_br_lock: |
| self._active_br = None |
| |
| def _setup_dut_label(self): |
| """Try to match dut image with a certain experiment label. |
| |
| If such match is found, we just skip doing reimage and jump to execute |
| some benchmark_runs. |
| """ |
| |
| checksum_file = '/usr/local/osimage_checksum_file' |
| try: |
| rv, checksum, _ = command_executer.GetCommandExecuter().\ |
| CrosRunCommandWOutput( |
| 'cat ' + checksum_file, |
| chromeos_root=self._sched.get_labels(0).chromeos_root, |
| machine=self._dut.name, |
| print_to_console=False) |
| if rv == 0: |
| checksum = checksum.strip() |
| for l in self._sched.get_labels(): |
| if l.checksum == checksum: |
| self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format( |
| self._dut.name, l)) |
| self._dut.label = l |
| return |
| except RuntimeError: |
| traceback.print_exc(file=sys.stdout) |
| self._dut.label = None |
| |
| def __str__(self): |
| return 'DutWorker[dut="{}", label="{}"]'.format( |
| self._dut.name, self._dut.label.name if self._dut.label else 'None') |
| |
| def dut(self): |
| return self._dut |
| |
| def status_str(self): |
| """Report thread status.""" |
| |
| return ('Worker thread "{}", label="{}", benchmark_run={}, ' |
| 'reimage={}, now {}'.format( |
| self._dut.name, |
| 'None' if self._dut.label is None else self._dut.label.name, |
| self._stat_num_br_run, self._stat_num_reimage, |
| self._stat_annotation)) |
| |
| |
| class BenchmarkRunCacheReader(Thread): |
| """The thread to read cache for a list of benchmark_runs. |
| |
| On creation, each instance of this class is given a br_list, which is a |
| subset of experiment._benchmark_runs. |
| """ |
| |
| def __init__(self, schedv2, br_list): |
| super(BenchmarkRunCacheReader, self).__init__() |
| self._schedv2 = schedv2 |
| self._br_list = br_list |
| self._logger = self._schedv2.get_logger() |
| |
| def run(self): |
| for br in self._br_list: |
| try: |
| br.ReadCache() |
| if br.cache_hit: |
| self._logger.LogOutput('Cache hit - {}'.format(br)) |
| with self._schedv2.lock_on('_cached_br_list'): |
| self._schedv2.get_cached_run_list().append(br) |
| else: |
| self._logger.LogOutput('Cache not hit - {}'.format(br)) |
| except RuntimeError: |
| traceback.print_exc(file=sys.stderr) |
| |
| |
| class Schedv2(object): |
| """New scheduler for crosperf.""" |
| |
| def __init__(self, experiment): |
| self._experiment = experiment |
| self._logger = logger.GetLogger(experiment.log_dir) |
| |
| # Create shortcuts to nested data structure. "_duts" points to a list of |
| # locked machines. _labels points to a list of all labels. |
| self._duts = self._experiment.machine_manager.GetMachines() |
| self._labels = self._experiment.labels |
| |
| # Bookkeeping for synchronization. |
| self._workers_lock = Lock() |
| # pylint: disable=unnecessary-lambda |
| self._lock_map = defaultdict(lambda: Lock()) |
| |
| # Test mode flag |
| self._in_test_mode = test_flag.GetTestMode() |
| |
| # Read benchmarkrun cache. |
| self._read_br_cache() |
| |
| # Mapping from label to a list of benchmark_runs. |
| self._label_brl_map = dict((l, []) for l in self._labels) |
| for br in self._experiment.benchmark_runs: |
| assert br.label in self._label_brl_map |
| # Only put no-cache-hit br into the map. |
| if br not in self._cached_br_list: |
| self._label_brl_map[br.label].append(br) |
| |
| # Use machine image manager to calculate initial label allocation. |
| self._mim = MachineImageManager(self._labels, self._duts) |
| self._mim.compute_initial_allocation() |
| |
| # Create worker thread, 1 per dut. |
| self._active_workers = [DutWorker(dut, self) for dut in self._duts] |
| self._finished_workers = [] |
| |
| # Termination flag. |
| self._terminated = False |
| |
| def run_sched(self): |
| """Start all dut worker threads and return immediately.""" |
| |
| for w in self._active_workers: |
| w.start() |
| |
| def _read_br_cache(self): |
| """Use multi-threading to read cache for all benchmarkruns. |
| |
| We do this by firstly creating a few threads, and then assign each |
| thread a segment of all brs. Each thread will check cache status for |
| each br and put those with cache into '_cached_br_list'. |
| """ |
| |
| self._cached_br_list = [] |
| n_benchmarkruns = len(self._experiment.benchmark_runs) |
| if n_benchmarkruns <= 4: |
| # Use single thread to read cache. |
| self._logger.LogOutput(('Starting to read cache status for ' |
| '{} benchmark runs ...').format(n_benchmarkruns)) |
| BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() |
| return |
| |
| # Split benchmarkruns set into segments. Each segment will be handled by |
| # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). |
| n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4)) |
| self._logger.LogOutput(('Starting {} threads to read cache status for ' |
| '{} benchmark runs ...').format( |
| n_threads, n_benchmarkruns)) |
| benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) // n_threads |
| benchmarkrun_segments = [] |
| for i in range(n_threads - 1): |
| start = i * benchmarkruns_per_thread |
| end = (i + 1) * benchmarkruns_per_thread |
| benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end]) |
| benchmarkrun_segments.append( |
| self._experiment.benchmark_runs[(n_threads - 1) * |
| benchmarkruns_per_thread:]) |
| |
| # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. |
| assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns |
| |
| # Create and start all readers. |
| cache_readers = [ |
| BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments |
| ] |
| |
| for x in cache_readers: |
| x.start() |
| |
| # Wait till all readers finish. |
| for x in cache_readers: |
| x.join() |
| |
| # Summarize. |
| self._logger.LogOutput( |
| 'Total {} cache hit out of {} benchmark_runs.'.format( |
| len(self._cached_br_list), n_benchmarkruns)) |
| |
| def get_cached_run_list(self): |
| return self._cached_br_list |
| |
| def get_label_map(self): |
| return self._label_brl_map |
| |
| def get_experiment(self): |
| return self._experiment |
| |
| def get_labels(self, i=None): |
| if i is None: |
| return self._labels |
| return self._labels[i] |
| |
| def get_logger(self): |
| return self._logger |
| |
| def get_cached_benchmark_run(self): |
| """Get a benchmark_run with 'cache hit'. |
| |
| Returns: |
| The benchmark that has cache hit, if any. Otherwise none. |
| """ |
| |
| with self.lock_on('_cached_br_list'): |
| if self._cached_br_list: |
| return self._cached_br_list.pop() |
| return None |
| |
| def get_benchmark_run(self, dut): |
| """Get a benchmark_run (br) object for a certain dut. |
| |
| Args: |
| dut: the dut for which a br is returned. |
| |
| Returns: |
| A br with its label matching that of the dut. If no such br could be |
| found, return None (this usually means a reimage is required for the |
| dut). |
| """ |
| |
| # If terminated, stop providing any br. |
| if self._terminated: |
| return None |
| |
| # If dut bears an unrecognized label, return None. |
| if dut.label is None: |
| return None |
| |
| # If br list for the dut's label is empty (that means all brs for this |
| # label have been done), return None. |
| with self.lock_on(dut.label): |
| brl = self._label_brl_map[dut.label] |
| if not brl: |
| return None |
| # Return the first br. |
| return brl.pop(0) |
| |
| def allocate_label(self, dut): |
| """Allocate a label to a dut. |
| |
| The work is delegated to MachineImageManager. |
| |
| The dut_worker calling this method is responsible for reimage the dut to |
| this label. |
| |
| Args: |
| dut: the new label that is to be reimaged onto the dut. |
| |
| Returns: |
| The label or None. |
| """ |
| |
| if self._terminated: |
| return None |
| |
| return self._mim.allocate(dut, self) |
| |
| def dut_worker_finished(self, dut_worker): |
| """Notify schedv2 that the dut_worker thread finished. |
| |
| Args: |
| dut_worker: the thread that is about to end. |
| """ |
| |
| self._logger.LogOutput('{} finished.'.format(dut_worker)) |
| with self._workers_lock: |
| self._active_workers.remove(dut_worker) |
| self._finished_workers.append(dut_worker) |
| |
| def is_complete(self): |
| return len(self._active_workers) == 0 |
| |
| def lock_on(self, my_object): |
| return self._lock_map[my_object] |
| |
| def terminate(self): |
| """Mark flag so we stop providing br/reimages. |
| |
| Also terminate each DutWorker, so they refuse to execute br or reimage. |
| """ |
| |
| self._terminated = True |
| for dut_worker in self._active_workers: |
| dut_worker.terminate() |
| |
| def threads_status_as_string(self): |
| """Report the dut worker threads status.""" |
| |
| status = '{} active threads, {} finished threads.\n'.format( |
| len(self._active_workers), len(self._finished_workers)) |
| status += ' Active threads:' |
| for dw in self._active_workers: |
| status += '\n ' + dw.status_str() |
| if self._finished_workers: |
| status += '\n Finished threads:' |
| for dw in self._finished_workers: |
| status += '\n ' + dw.status_str() |
| return status |