utils/timer: Add a small timer utility class A timer context manager for timing blocks of code that don't lend themselves well to timing with timeit. BUG=b:199404652 TEST=None Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/3183322 Tested-by: Alex Klein <saklein@chromium.org> Commit-Queue: Alex Klein <saklein@chromium.org> Reviewed-by: Sergey Frolov <sfrolov@google.com> (cherry picked from commit 5fcfa9ec5907eab1a3b78611a4f86ae01117c34d) [rkolchmeyer: patch needed for compatibility with updated repohooks] Change-Id: I7c19beddfd4ff669179b1e9e13f47d25e92eb3cb
diff --git a/utils/timer.py b/utils/timer.py new file mode 100644 index 0000000..c2e9d37 --- /dev/null +++ b/utils/timer.py
@@ -0,0 +1,103 @@ +# Copyright 2021 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Timing utility.""" + +import datetime +import functools +import logging +import time +from typing import Any, Callable, Optional + +from chromite.lib import pformat + + +class Timer(object): + """Simple timer class to make timing blocks of code easy. + + It does not have the features of timeit, but can be added anywhere, e.g. to + time a specific section of a script. The Timer class implements __add__ and + __truediv__ to allow averaging multiple timers, but the collection must be + done manually. + + Examples: + with Timer(): + code_to_be_timed() + + with Timer(): -> logs "{formatted_delta}" at info level + with Timer('name'): -> logs "name: {formatted_delta}" at info + with Timer('name', print): -> prints "name: {formatted_delta}" + + If you don't want it to output itself on exit, you can do it manually, + e.g. to get an average: + + timers = [] + for _ in range(10): + with Timer(output=None) as t: + ... + timers.append(t) + avg = sum(timers, start=Timer('Average')) / len(times) + avg.output() -> prints "Average: {formatted_delta}" + """ + + def __init__(self, + name: Optional[str] = None, + output: Optional[Callable[[str], Any]] = logging.info): + """Init. + + Args: + name: A string to identify the timer, especially when using multiple. + output: A function that takes only a string to output it somewhere. + """ + self.name = name + # Make output always callable, but do nothing when no output. + self.output = lambda: output(str(self)) if output else None + self.start = 0.0 + self.end = 0.0 + self.delta = 0.0 + + def __add__(self, other): + if not isinstance(other, Timer): + raise NotImplementedError(f'Cannot add {type(other)} to Timer') + result = Timer(self.name, self.output) + result.delta = self.delta + other.delta + + return result + + def __truediv__(self, other): + if not isinstance(other, int): + raise NotImplementedError(f'Only int is supported, given {type(other)}') + result = Timer(self.name, self.output) + result.delta = self.delta / other + + return result + + def __enter__(self): + self.start = time.perf_counter() + return self + + def __exit__(self, *args): + self.end = time.perf_counter() + self.delta = self.end - self.start + self.output() + + def __str__(self): + name = f'{self.name}: ' if self.name else '' + return f'{name}{pformat.timedelta(datetime.timedelta(seconds=self.delta))}' + + +def timer(name: Optional[str] = None, + output: Callable[[str], Any] = logging.info): + """Timer decorator.""" + + def decorator(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + with Timer(name, output): + return func(*args, **kwargs) + + return wrapper + + return decorator
diff --git a/utils/timer_unittest.py b/utils/timer_unittest.py new file mode 100644 index 0000000..22e8c16 --- /dev/null +++ b/utils/timer_unittest.py
@@ -0,0 +1,47 @@ +# Copyright 2021 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Tests for timer.""" + +import time + +from chromite.utils import timer + + +def test_timer_delta(monkeypatch): + """Test basic usage of a Timer.""" + last_t = 0.0 + + def time_mock(): + nonlocal last_t + last_t += 1.0 + return last_t + + monkeypatch.setattr(time, 'perf_counter', time_mock) + + with timer.Timer() as t: + pass + + assert t.delta == 1.0 + + +def test_timer_average(monkeypatch): + """Test the timer __add__ and __truediv__ functions.""" + last_t = 0.0 + + def time_mock(): + nonlocal last_t + last_t += 1.0 + return last_t + + monkeypatch.setattr(time, 'perf_counter', time_mock) + + timers = [] + for x in range(10): + with timer.Timer(str(x)) as t: + pass + timers.append(t) + + assert sum(timers, start=timer.Timer()).delta == 10.0 + assert (sum(timers, start=timer.Timer()) / len(timers)).delta == 1.0