blob: 105bafccc1db75a3a32025281efa59cb707bfb8d [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper library around ts_mon.
This library provides some wrapper functionality around ts_mon, to make it more
friendly to developers. It also provides import safety, in case ts_mon is not
deployed with your code.
"""
from __future__ import print_function
import contextlib
import datetime
import ssl
from functools import wraps
from collections import namedtuple
from chromite.lib import cros_logging as logging
try:
from infra_libs import ts_mon
# TODO(akeshet): AttributeError only needs to be caught while landing
# https://chromium-review.googlesource.com/#/c/359447/ , due to some issues in
# cbuildbot bootstrapping and backwards compatibility of ts_mon. I believe that
# after it lands, we no longer need to catch AttributeError here.
except (ImportError, RuntimeError, AttributeError):
ts_mon = None
# This number is chosen because 1.16^100 seconds is about
# 32 days. This is a good compromise between bucket size
# and dynamic range.
_SECONDS_BUCKET_FACTOR = 1.16
# If none, we create metrics in this process. Otherwise, we send metrics via
# this Queue to a dedicated flushing processes.
MESSAGE_QUEUE = None
MetricCall = namedtuple(
'MetricCall',
'metric_name metric_args metric_kwargs '
'method method_args method_kwargs '
'reset_after')
class ProxyMetric(object):
"""Redirects any method calls to the message queue."""
def __init__(self, metric, metric_args, metric_kwargs):
self.metric = metric
self.metric_args = metric_args
self.reset_after = metric_kwargs.pop('reset_after', False)
self.metric_kwargs = metric_kwargs
def __getattr__(self, method_name):
"""Redirects all method calls to the MESSAGE_QUEUE."""
def enqueue(*args, **kwargs):
MESSAGE_QUEUE.put(MetricCall(
metric_name=self.metric,
metric_args=self.metric_args,
metric_kwargs=self.metric_kwargs,
method=method_name,
method_args=args,
method_kwargs=kwargs,
reset_after=self.reset_after))
return enqueue
def _Indirect(fn):
"""Decorates a function to be indirect If MESSAGE_QUEUE is set.
If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics
object; otherwise, it behaves normally.
"""
@wraps(fn)
def AddToQueueIfPresent(*args, **kwargs):
if MESSAGE_QUEUE:
return ProxyMetric(fn.__name__, args, kwargs)
else:
# Whether to reset the metric after the flush; this is only used by
# |ProxyMetric|, so remove this from the kwargs.
kwargs.pop('reset_after', None)
return fn(*args, **kwargs)
return AddToQueueIfPresent
class MockMetric(object):
"""Mock metric object, to be returned if ts_mon is not set up."""
def _mock_method(self, *args, **kwargs):
pass
def __getattr__(self, _):
return self._mock_method
def _ImportSafe(fn):
"""Decorator which causes |fn| to return MockMetric if ts_mon not imported."""
@wraps(fn)
def wrapper(*args, **kwargs):
if ts_mon:
return fn(*args, **kwargs)
else:
return MockMetric()
return wrapper
def _Metric(fn):
"""A pipeline of decorators to apply to our metric constructors."""
return _ImportSafe(_Indirect(fn))
# This is needed for the reset_after flag used by @Indirect.
# pylint: disable=unused-argument
@_Metric
def Counter(name, reset_after=False):
"""Returns a metric handle for a counter named |name|."""
return ts_mon.CounterMetric(name)
@_Metric
def Gauge(name, reset_after=False):
"""Returns a metric handle for a gauge named |name|."""
return ts_mon.GaugeMetric(name)
@_Metric
def String(name, reset_after=False):
"""Returns a metric handle for a string named |name|."""
return ts_mon.StringMetric(name)
@_Metric
def Boolean(name, reset_after=False):
"""Returns a metric handle for a boolean named |name|."""
return ts_mon.BooleanMetric(name)
@_Metric
def Float(name, reset_after=False):
"""Returns a metric handle for a float named |name|."""
return ts_mon.FloatMetric(name)
@_Metric
def CumulativeDistribution(name, reset_after=False):
"""Returns a metric handle for a cumulative distribution named |name|."""
return ts_mon.CumulativeDistributionMetric(name)
@_Metric
def CumulativeSmallIntegerDistribution(name, reset_after=False):
"""Returns a metric handle for a cumulative distribution named |name|.
This differs slightly from CumulativeDistribution, in that the underlying
metric uses a uniform bucketer rather than a geometric one.
This metric type is suitable for holding a distribution of numbers that are
nonnegative integers in the range of 0 to 100.
"""
return ts_mon.CumulativeDistributionMetric(
name,
bucketer=ts_mon.FixedWidthBucketer(1))
@_Metric
def SecondsDistribution(name, reset_after=False):
"""Returns a metric handle for a cumulative distribution named |name|.
The distribution handle returned by this method is better suited than the
default one for recording handling times, in seconds.
This metric handle has bucketing that is optimized for time intervals
(in seconds) in the range of 1 second to 32 days.
"""
b = ts_mon.GeometricBucketer(growth_factor=_SECONDS_BUCKET_FACTOR)
return ts_mon.CumulativeDistributionMetric(
name, bucketer=b, units=ts_mon.MetricsDataUnits.SECONDS)
@contextlib.contextmanager
def SecondsTimer(name, fields=None):
"""Record the time of an operation to a SecondsDistributionMetric.
Usage:
with SecondsTimer('timer/name', fields={'foo': 'bar'}):
doSomething()
Will record the time taken inside of the context block, to the
SecondsDistribution named 'timer/name', with the given fields.
Note that this helper can only be used if the field values are known
at timer-start time and to not depend on the result of the operation
being timed.
"""
m = SecondsDistribution(name)
t0 = datetime.datetime.now()
yield
dt = (datetime.datetime.now() - t0).total_seconds()
m.add(dt, fields=fields)
def SecondsTimerDecorator(name, fields=None):
"""Decorator to time the duration of function calls.
Usage:
@SecondsTimerDecorator('timer/name', fields={'foo': 'bar'})
def Foo(bar):
return doStuff()
is equivalent to
def Foo(bar):
with SecondsTimer('timer/name', fields={'foo': 'bar'})
return doStuff()
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
with SecondsTimer(name, fields):
return fn(*args, **kwargs)
return wrapper
return decorator
def Flush(reset_after=()):
"""Flushes metrics, but warns on transient errors.
Args:
reset_after: A list of metrics to reset after flushing.
"""
if not ts_mon:
return
try:
ts_mon.flush()
while reset_after:
reset_after.pop().reset()
except ssl.SSLError as e:
logging.warning('Caught transient network error while flushing: %s', e)
except Exception as e:
logging.error('Caught exception while flushing: %s', e)