blob: 80224cba7a3eb6c289dc0f835f12cdc3c5c362e7 [file] [log] [blame]
# Copyright 2019 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Metrics for general consumption.
See infra/proto/metrics.proto for a description of the type of record that this
module will be creating.
"""
import collections
import contextlib
import functools
import logging
import os
from pathlib import Path
import time
from typing import List, NamedTuple, Optional, Union
import uuid
from chromite.lib import constants
from chromite.lib import locking
from chromite.lib import osutils
from chromite.utils import timer as timer_util
OP_START_TIMER = "start-timer"
OP_STOP_TIMER = "stop-timer"
OP_GAUGE = "gauge"
OP_NAMED_EVENT = "event"
OP_INCREMENT_COUNTER = "increment"
OP_DECREMENT_COUNTER = "decrement"
OP_EXPECTS_ARG = {
OP_START_TIMER: True,
OP_STOP_TIMER: True,
OP_NAMED_EVENT: False,
OP_GAUGE: True,
OP_INCREMENT_COUNTER: True,
OP_DECREMENT_COUNTER: True,
}
VALID_OPS = set(OP_EXPECTS_ARG)
class MetricEvent(NamedTuple):
"""Data class for metric events.
MetricEvent stores one of a few different types of metric events. The 'arg'
parameter is an overloaded value which is discriminated by the 'op'
parameter. Timers utilize 'arg' as a key value for disambiguation, and
gauges and counters use the arg as their gauge value.
"""
timestamp_epoch_millis: int
name: str
op: str
arg: Union[int, str, None]
class Metric(NamedTuple):
"""Data class for a metric."""
timestamp_epoch_millis: int
name: str
value: int
class NamedEvent(NamedTuple):
"""Data class for an event."""
timestamp_epoch_millis: int
name: str
class TimerMetric(Metric):
"""Just to allow differentiating for cases where it may be valuable."""
METRIC_TYPE = Union[NamedEvent, TimerMetric, Metric]
class Error(Exception):
"""Base Error class for other Error types to derive from."""
class ParseMetricError(Error):
"""ParseMetricError represents a coding error in metric events.
If you see this error there is probably an error in your metric event
emission code.
"""
def current_milli_time():
"""Return the current Epoch time in milliseconds."""
return int(round(time.time() * 1000))
def parse_timer(terms):
"""Parse a timer line.
Args:
terms: A list of the subdimensions of the MetricEvent type.
Returns:
A MetricEvent from the content of the terms.
Raises:
ParseMetricError: An error occurred parsing the data from the list of
terms.
"""
if len(terms) != 4:
raise ParseMetricError(
"Incorrect number of terms for timer metric. Should "
"have been 4, instead it is %d. See terms %s." % (len(terms), terms)
)
assert terms[2] in {OP_START_TIMER, OP_STOP_TIMER}
return MetricEvent(int(terms[0]), terms[1], terms[2], arg=terms[3])
def parse_named_event(terms):
"""Parse a named event line.
Args:
terms: A list of the subdimensions of the MetricEvent type, omitting
"arg".
Returns:
A MetricEvent from the content of the terms.
Raises:
ParseMetricError: An error occurred parsing the data from the list of
terms.
"""
if len(terms) != 3:
raise ParseMetricError(
"Incorrect number of terms for event metric. Should "
"have been 3, instead it is %d. See terms %s." % (len(terms), terms)
)
assert terms[2] == OP_NAMED_EVENT
return MetricEvent(int(terms[0]), terms[1], terms[2], arg=None)
def parse_gauge(terms):
"""Parse a gauge, which is an event with an associated integer value.
Args:
terms: A list of the subdimensions of the MetricEvent type, leveraging
|arg| as a container for the actual gauge value.
Returns:
A MetricEvent from the content of the terms.
Raises:
ParseMetricError: An error occurred parsing the data from the list of
terms.
"""
if len(terms) != 4:
raise ParseMetricError(
"Incorrect number of terms for gauge. Should "
"have been 4, instead it is %d. See terms %s." % (len(terms), terms)
)
assert terms[2] == OP_GAUGE
return MetricEvent(int(terms[0]), terms[1], terms[2], arg=int(terms[3]))
def parse_counter(terms: List) -> MetricEvent:
"""Parse an increment counter."""
arg = int(terms[3]) if len(terms) == 4 else 1
assert terms[2] in (OP_INCREMENT_COUNTER, OP_DECREMENT_COUNTER)
return MetricEvent(int(terms[0]), terms[1], terms[2], arg=arg)
def get_metric_parser(op):
"""Return a function which can parse a line with this operator."""
return {
OP_START_TIMER: parse_timer,
OP_STOP_TIMER: parse_timer,
OP_NAMED_EVENT: parse_named_event,
OP_GAUGE: parse_gauge,
OP_INCREMENT_COUNTER: parse_counter,
OP_DECREMENT_COUNTER: parse_counter,
}[op]
def parse_metric(line):
"""Take a line and return a MetricEvent."""
terms = line.strip().split("|")
if 3 <= len(terms) <= 4:
# Get a parser for this (call the factory).
parser = get_metric_parser(terms[2])
if parser:
return parser(terms)
raise ParseMetricError("Malformed metric line: %s" % line)
def read_metrics_events():
"""Generate metric events by parsing the metrics log file."""
metrics_dir = os.environ.get(constants.CROS_METRICS_DIR_ENVVAR)
if not metrics_dir:
return
metrics_logfile = Path(metrics_dir) / constants.METRICS_FILE
if not metrics_logfile.exists():
return
logging.info("reading metrics logs from %s", metrics_logfile)
with open(metrics_logfile, "r", encoding="utf-8") as f:
for line in f:
yield parse_metric(line)
def collect_metrics(functor):
"""Enable metric collection by setting up a temp file and env var."""
@functools.wraps(functor)
def wrapper(*args, **kwargs):
"""Wrapped function which implements collect_metrics behavior."""
metrics_dir = os.environ.get(constants.CROS_METRICS_DIR_ENVVAR)
if metrics_dir:
# We are in a reentrant scenario, let's just pass the logfile name
# along.
return functor(*args, **kwargs)
else:
# Let's manage the lifetime of a logfile for consumption within
# functor.
with osutils.TempDir() as tmpdir:
os.environ[constants.CROS_METRICS_DIR_ENVVAR] = tmpdir
logging.info(
"Setting up metrics collection (%s=%s).",
constants.CROS_METRICS_DIR_ENVVAR,
tmpdir,
)
try:
return functor(*args, **kwargs)
finally:
del os.environ[constants.CROS_METRICS_DIR_ENVVAR]
return wrapper
def append_metrics_log(timestamp, name, op, arg=None) -> None:
"""Handle appending a list of terms to the metrics log.
If the environment does not specify a metrics log, then skip silently.
Args:
timestamp: A millisecond epoch timestamp.
name: A period-separated string describing the event.
op: One of the OP_* values, determining which type of event this is.
arg: An accessory value for use based on the related |op|.
"""
metrics_dir = os.environ.get(constants.CROS_METRICS_DIR_ENVVAR)
if not metrics_dir:
return
metrics_log = Path(metrics_dir) / constants.METRICS_FILE
terms = [timestamp, name.replace("|", "_"), op]
if arg is not None:
terms.append(arg)
# Format the actual line to log.
line = "|".join(str(x) for x in terms)
with locking.FileLock(metrics_log).write_lock():
with open(metrics_log, "a", encoding="utf-8") as f:
f.write(f"{line}\n")
@contextlib.contextmanager
def timer(name):
"""A context manager to emit start/stop events.
Args:
name: A name for the timer event.
Yields:
Context for context manager surrounding event emission.
"""
# Timer events use a |arg| to disambiguate in case of multiple concurrent or
# overlapping timers with the same name.
key = uuid.uuid4()
with timer_util.timer(name) as t:
try:
append_metrics_log(
current_milli_time(), name, OP_START_TIMER, arg=key
)
yield t
finally:
append_metrics_log(
current_milli_time(), name, OP_STOP_TIMER, arg=key
)
def timed(name):
"""Decorator to add a metrics timer to a function."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with timer(name):
return func(*args, **kwargs)
return wrapper
return decorator
def event(name) -> None:
"""Emit a counter event.
Args:
name: A name for the timer event.
"""
append_metrics_log(current_milli_time(), name, OP_NAMED_EVENT)
def deserialize_metrics_log(
prefix: Optional[str] = None,
) -> List[METRIC_TYPE]:
"""Parse the metrics events from the metrics file.
Args:
prefix: A string to prepend to all metric event names.
"""
counters = collections.defaultdict(int)
counter_times = {}
timers = {}
results = []
def make_name(name):
"""Prepend a closed-over prefix to the given name."""
if prefix:
return f"{prefix}.{name}"
else:
return name
# Reduce over the input events to append output_events.
for input_event in read_metrics_events():
if input_event.op == OP_START_TIMER:
timers[input_event.arg] = (
input_event.name,
input_event.timestamp_epoch_millis,
)
elif input_event.op == OP_STOP_TIMER:
# TODO(b/187788898): Drop the None fallback.
start = timers.pop(input_event.arg, None)
if not start:
logging.error(
"%s: stop timer recorded, but missing start timer!?",
input_event.arg,
)
else:
assert input_event.name == start[0]
results.append(
TimerMetric(
input_event.timestamp_epoch_millis,
make_name(input_event.name),
input_event.timestamp_epoch_millis - start[1],
)
)
elif input_event.op == OP_NAMED_EVENT:
results.append(
NamedEvent(
input_event.timestamp_epoch_millis,
make_name(input_event.name),
)
)
elif input_event.op == OP_GAUGE:
results.append(
Metric(
input_event.timestamp_epoch_millis,
make_name(input_event.name),
input_event.arg,
)
)
elif input_event.op == OP_INCREMENT_COUNTER:
counters[input_event.name] += input_event.arg
counter_times[input_event.name] = max(
input_event.timestamp_epoch_millis,
counter_times.get(input_event.name, 0),
)
elif input_event.op == OP_DECREMENT_COUNTER:
counters[input_event.name] -= input_event.arg
counter_times[input_event.name] = max(
input_event.timestamp_epoch_millis,
counter_times.get(input_event.name, 0),
)
else:
logging.error(
'unexpected op "%s" found in metric event: %s',
input_event.op,
input_event,
)
for counter, value in counters.items():
results.append(
Metric(counter_times[counter], make_name(counter), value)
)
# Check for any unhandled timers.
# TODO(b/187788898): Turn this back into an assert.
if timers:
logging.error("excess timer metric data left over: %s", timers)
return results