| # Copyright 2021 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Timing utility.""" |
| |
| import contextlib |
| import datetime |
| import functools |
| import logging |
| import time |
| from typing import Any, Callable, Optional |
| |
| from chromite.utils import pformat |
| |
| |
| class Timer(object): |
| """Simple timer class to make timing blocks of code easy. |
| |
| It does not have the features of timeit, but can be added anywhere, e.g. to |
| time a specific section of a script. The Timer class implements __add__ and |
| __truediv__ to allow averaging multiple timers, but the collection must be |
| done manually. |
| |
| Examples: |
| with Timer(): |
| code_to_be_timed() |
| |
| with Timer() as t: -> str(t) == "{formatted_delta}" |
| with Timer('name') as t: -> str(t) == "name: {formatted_delta}" |
| |
| To get an average: |
| |
| timers = [] |
| for _ in range(10): |
| with Timer() as t: |
| code_to_be_timed() |
| timers.append(t) |
| avg = sum(timers, start=Timer('Average')) / len(times) |
| avg.output() -> prints "Average: {formatted_delta}" |
| """ |
| |
| def __init__(self, name: Optional[str] = None): |
| """Init. |
| |
| Args: |
| name: A string to identify the timer, especially when using multiple. |
| """ |
| self.name = name |
| self.start = 0.0 |
| self.end = 0.0 |
| self.delta = 0.0 |
| |
| def __add__(self, other): |
| if not isinstance(other, Timer): |
| raise NotImplementedError(f'Cannot add {type(other)} to Timer') |
| result = Timer(self.name) |
| result.delta = self.delta + other.delta |
| |
| return result |
| |
| def __truediv__(self, other): |
| if not isinstance(other, int): |
| raise NotImplementedError(f'Only int is supported, given {type(other)}') |
| result = Timer(self.name) |
| result.delta = self.delta / other |
| |
| return result |
| |
| def __enter__(self): |
| self.start = time.perf_counter() |
| return self |
| |
| def __exit__(self, *args): |
| self.end = time.perf_counter() |
| self.delta = self.end - self.start |
| |
| def __str__(self): |
| name = f'{self.name}: ' if self.name else '' |
| return f'{name}{pformat.timedelta(datetime.timedelta(seconds=self.delta))}' |
| |
| |
| def timed(name: Optional[str] = None, |
| output: Callable[[str], Any] = logging.info): |
| """Timed decorator to add a timer to a function.""" |
| |
| def decorator(func): |
| |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| with timer(name or func.__name__, output): |
| return func(*args, **kwargs) |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| @contextlib.contextmanager |
| def timer(name: Optional[str] = None, |
| output: Callable[[str], Any] = logging.info): |
| """Timer context manager to automatically output results.""" |
| try: |
| with Timer(name) as t: |
| yield t |
| finally: |
| output(str(t)) |