blob: 4d21feeabd964d1a1a1354f0f77bbab14eb355d9 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
import itertools
import threading
import time
from infra_libs.ts_mon.common import errors
"""A light-weight representation of a set or an incr.
Args:
name: The metric name.
fields: The normalized field tuple.
mod_type: Either 'set' or 'incr'. Other values will raise
UnknownModificationTypeError when it's used.
args: (value, enforce_ge) for 'set' or (delta, modify_fn) for 'incr'.
""" # pylint: disable=pointless-string-statement
Modification = collections.namedtuple(
'Modification', ['name', 'fields', 'mod_type', 'args'])
def default_modify_fn(name):
def _modify_fn(value, delta):
if delta < 0:
raise errors.MonitoringDecreasingValueError(name, None, delta)
return value + delta
return _modify_fn
class MetricStore(object):
"""A place to store values for each metric.
Several methods take "a normalized field tuple". This is a tuple of
(key, value) tuples sorted by key. (The reason this is given as a tuple
instead of a dict is because tuples are hashable and can be used as dict keys,
dicts can not).
The MetricStore is also responsible for keeping the start_time of each metric.
This is what goes into the start_timestamp_us field in the MetricsData proto
for cumulative metrics and distributions, and helps Monarch identify when a
counter was reset. This is the MetricStore's job because an implementation
might share counter values across multiple instances of a task (like on
Appengine), so the start time must be associated with that value so that it
can be reset for all tasks at once when the value is reset.
External metric stores (like those backed by memcache) may be cleared (either
wholly or partially) at any time. When this happens the MetricStore *must*
generate a new start_time for all the affected metrics.
Metrics can specify their own explicit start time if they are mirroring the
value of some external counter that started counting at a known time.
Otherwise the MetricStore's time_fn (defaults to time.time()) is called the
first time a metric is set or incremented, or after it is cleared externally.
"""
def __init__(self, state, time_fn=None):
self._state = state
self._time_fn = time_fn or time.time
def get(self, name, fields, target_fields, default=None):
"""Fetches the current value for the metric.
Args:
name (string): the metric's name.
fields (tuple): a normalized field tuple.
target_fields (dict or None): target fields to override.
default: the value to return if the metric has no value of this set of
field values.
"""
raise NotImplementedError
def get_all(self):
"""Returns an iterator over all the metrics present in the store.
The iterator yields 4-tuples:
(target, metric, start_time, field_values)
"""
raise NotImplementedError
def set(self, name, fields, target_fields, value, enforce_ge=False):
"""Sets the metric's value.
Args:
name: the metric's name.
fields: a normalized field tuple.
target_fields (dict or None): target fields to override.
value: the new value for the metric.
enforce_ge: if this is True, raise an exception if the new value is
less than the old value.
Raises:
MonitoringDecreasingValueError: if enforce_ge is True and the new value is
smaller than the old value.
"""
raise NotImplementedError
def incr(self, name, fields, target_fields, delta, modify_fn=None):
"""Increments the metric's value.
Args:
name: the metric's name.
fields: a normalized field tuple.
target_fields (dict or None): target fields to override.
delta: how much to increment the value by.
modify_fn: this function is called with the original value and the delta
as its arguments and is expected to return the new value. The
function must be idempotent as it may be called multiple times.
"""
raise NotImplementedError
def modify_multi(self, modifications):
"""Modifies multiple metrics in one go.
Args:
modifications: an iterable of Modification objects.
"""
raise NotImplementedError
def reset_for_unittest(self, name=None):
"""Clears the values metrics. Useful in unittests.
Args:
name: the name of an individual metric to reset, or if None resets all
metrics.
"""
raise NotImplementedError
def initialize_context(self):
"""Opens a request-local context for deferring metric updates."""
pass # pragma: no cover
def finalize_context(self):
"""Closes a request-local context opened by initialize_context."""
pass # pragma: no cover
def _start_time(self, name):
if name in self._state.metrics:
ret = self._state.metrics[name].start_time
if ret is not None:
return ret
return self._time_fn()
@staticmethod
def _normalize_target_fields(target_fields):
"""Converts target fields into a hashable tuple.
Args:
target_fields (dict): target fields to override the default target.
"""
if not target_fields:
target_fields = {}
return tuple(sorted(target_fields.iteritems()))
class MetricFieldsValues(object):
def __init__(self):
# Map normalized fields to single metric values.
self._values = {}
self._thread_lock = threading.Lock()
def get_value(self, fields, default=None):
return self._values.get(fields, default)
def set_value(self, fields, value):
self._values[fields] = value
def iteritems(self):
# Make a copy of the metric values in case another thread (or this
# generator's consumer) modifies them while we're iterating.
with self._thread_lock:
values = copy.copy(self._values)
for fields, value in values.iteritems():
yield fields, value
class TargetFieldsValues(object):
def __init__(self, store):
# Map normalized target fields to MetricFieldsValues.
self._values = collections.defaultdict(MetricFieldsValues)
self._store = store
self._thread_lock = threading.Lock()
def get_target_values(self, target_fields):
key = self._store._normalize_target_fields(target_fields)
return self._values[key]
def get_value(self, fields, target_fields, default=None):
return self.get_target_values(target_fields).get_value(
fields, default)
def set_value(self, fields, target_fields, value):
self.get_target_values(target_fields).set_value(fields, value)
def iter_targets(self):
# Make a copy of the values in case another thread (or this
# generator's consumer) modifies them while we're iterating.
with self._thread_lock:
values = copy.copy(self._values)
for target_fields, fields_values in values.iteritems():
target = copy.copy(self._store._state.target)
if target_fields:
target.update({k: v for k, v in target_fields})
yield target, fields_values
class MetricValues(object):
def __init__(self, store, start_time):
self._start_time = start_time
self._values = TargetFieldsValues(store)
@property
def start_time(self):
return self._start_time
@property
def values(self):
return self._values
def get_value(self, fields, target_fields, default=None):
return self.values.get_value(fields, target_fields, default)
def set_value(self, fields, target_fields, value):
self.values.set_value(fields, target_fields, value)
class InProcessMetricStore(MetricStore):
"""A thread-safe metric store that keeps values in memory."""
def __init__(self, state, time_fn=None):
super(InProcessMetricStore, self).__init__(state, time_fn=time_fn)
self._values = {}
self._thread_lock = threading.Lock()
def _entry(self, name):
if name not in self._values:
self._reset(name)
return self._values[name]
def get(self, name, fields, target_fields, default=None):
return self._entry(name).get_value(fields, target_fields, default)
def iter_field_values(self, name):
return itertools.chain.from_iterable(
x.iteritems() for _, x in self._entry(name).values.iter_targets())
def get_all(self):
# Make a copy of the metric values in case another thread (or this
# generator's consumer) modifies them while we're iterating.
with self._thread_lock:
values = copy.copy(self._values)
for name, metric_values in values.iteritems():
if name not in self._state.metrics:
continue
start_time = metric_values.start_time
for target, fields_values in metric_values.values.iter_targets():
yield target, self._state.metrics[name], start_time, fields_values
def set(self, name, fields, target_fields, value, enforce_ge=False):
with self._thread_lock:
if enforce_ge:
old_value = self._entry(name).get_value(fields, target_fields, 0)
if value < old_value:
raise errors.MonitoringDecreasingValueError(name, old_value, value)
self._entry(name).set_value(fields, target_fields, value)
def incr(self, name, fields, target_fields, delta, modify_fn=None):
if delta < 0:
raise errors.MonitoringDecreasingValueError(name, None, delta)
if modify_fn is None:
modify_fn = default_modify_fn(name)
with self._thread_lock:
self._entry(name).set_value(fields, target_fields, modify_fn(
self.get(name, fields, target_fields, 0), delta))
def modify_multi(self, modifications):
# This is only used by DeferredMetricStore on top of MemcacheMetricStore,
# but could be implemented here if required in the future.
raise NotImplementedError
def reset_for_unittest(self, name=None):
if name is not None:
self._reset(name)
else:
for name in self._values.keys():
self._reset(name)
def _reset(self, name):
self._values[name] = MetricValues(self, self._start_time(name))