blob: 6dca7541d9a964d1551bf649f5cefce28c6759b4 [file] [log] [blame]
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Timing utility."""
import contextlib
import datetime
import functools
import logging
import time
import types
from typing import Any, Callable, Generator, Optional, Type, TYPE_CHECKING
from chromite.utils import pformat
# The typing module adds ParamSpec in Python3.10, but we can access it earlier
# via the typing_extensions module.
# The typing_extensions module is available in `mypy` (when we're type checking)
# but it's not available in all of our runtimes.
# Thus, only import typing_extensions and use ParamSpec when type-checking.
if TYPE_CHECKING:
from typing_extensions import ParamSpec
_P = ParamSpec("_P")
class Timer:
"""Simple timer class to make timing blocks of code easy.
It does not have the features of timeit, but can be added anywhere, e.g. to
time a specific section of a script. The Timer class implements __add__ and
__truediv__ to allow averaging multiple timers, but the collection must be
done manually.
Examples:
with Timer():
code_to_be_timed()
with Timer() as t: -> str(t) == "{formatted_delta}"
with Timer('name') as t: -> str(t) == "name: {formatted_delta}"
To get an average:
timers = []
for _ in range(10):
with Timer() as t:
code_to_be_timed()
timers.append(t)
avg = sum(timers, start=Timer('Average')) / len(times)
avg.output() -> prints "Average: {formatted_delta}"
"""
def __init__(self, name: Optional[str] = None) -> None:
"""Init.
Args:
name: A string to identify the timer.
"""
self.name = name
self.start = 0.0
self.end = 0.0
self.delta = 0.0
@property
def timedelta(self) -> datetime.timedelta:
"""Convenience method for getting a timedelta object."""
return datetime.timedelta(seconds=self.delta)
def __add__(self, other: Any) -> "Timer":
if not isinstance(other, Timer):
raise NotImplementedError(f"Cannot add {type(other)} to Timer")
result = Timer(self.name)
result.delta = self.delta + other.delta
return result
def __truediv__(self, other: Any) -> "Timer":
if not isinstance(other, int):
raise NotImplementedError(
f"Only int is supported, given {type(other)}"
)
result = Timer(self.name)
result.delta = self.delta / other
return result
def __enter__(self) -> "Timer":
self.start = time.perf_counter()
return self
def __exit__(
self,
exctype: Optional[Type[BaseException]],
excinst: Optional[BaseException],
exctb: Optional[types.TracebackType],
) -> None:
del exctype, excinst, exctb # Unused.
self.end = time.perf_counter()
self.delta = self.end - self.start
def __str__(self) -> str:
name = f"{self.name}: " if self.name else ""
return f"{name}{pformat.timedelta(self.timedelta)}"
def timed(
name: Optional[str] = None, output: Callable[[str], Any] = logging.info
) -> Callable[["Callable[_P, Any]"], "Callable[_P, Any]"]:
"""Timed decorator to add a timer to a function."""
def decorator(func: "Callable[_P, Any]") -> "Callable[_P, Any]":
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with timer(name or func.__name__, output):
return func(*args, **kwargs)
return wrapper
return decorator
@contextlib.contextmanager
def timer(
name: Optional[str] = None, output: Callable[[str], Any] = logging.info
) -> Generator[Timer, None, None]:
"""Timer context manager to automatically output results."""
t = Timer(name)
try:
with t:
yield t
finally:
output(str(t))