# Copyright 2016 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper library around ts_mon.

This library provides some wrapper functionality around ts_mon, to make it more
friendly to developers. It also provides import safety, in case ts_mon is not
deployed with your code.
"""

from __future__ import division

import collections
import contextlib
from functools import wraps
import logging
import queue as Queue
import ssl
import time


try:
    from chromite.third_party.infra_libs import ts_mon
except (ImportError, RuntimeError):
    ts_mon = None


# This number is chosen because 1.16^100 seconds is about
# 32 days. This is a good compromise between bucket size
# and dynamic range.
_SECONDS_BUCKET_FACTOR = 1.16

# If none, we create metrics in this process. Otherwise, we send metrics via
# this Queue to a dedicated flushing processes.
# These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState.
FLUSHING_PROCESS = None
MESSAGE_QUEUE = None

_MISSING = object()

MetricCall = collections.namedtuple(
    "MetricCall",
    [
        "metric_name",
        "metric_args",
        "metric_kwargs",
        "method",
        "method_args",
        "method_kwargs",
        "reset_after",
    ],
)


def _FlushingProcessClosed():
    """Returns whether the metrics flushing process has been closed."""
    return (
        FLUSHING_PROCESS is not None and FLUSHING_PROCESS.exitcode is not None
    )


class ProxyMetric:
    """Redirects any method calls to the message queue."""

    def __init__(self, metric, metric_args, metric_kwargs):
        self.metric = metric
        self.metric_args = metric_args
        self.reset_after = metric_kwargs.pop("reset_after", False)
        self.metric_kwargs = metric_kwargs

    def __getattr__(self, method_name):
        """Redirects all method calls to the MESSAGE_QUEUE."""

        def enqueue(*args, **kwargs):
            if not _FlushingProcessClosed():
                try:
                    MESSAGE_QUEUE.put_nowait(
                        MetricCall(
                            metric_name=self.metric,
                            metric_args=self.metric_args,
                            metric_kwargs=self.metric_kwargs,
                            method=method_name,
                            method_args=args,
                            method_kwargs=kwargs,
                            reset_after=self.reset_after,
                        )
                    )
                except Queue.Full:
                    logging.warning(
                        "Metrics queue is full; skipped sending metric '%s'",
                        self.metric,
                    )
            else:
                try:
                    exit_code = FLUSHING_PROCESS.exitcode
                except AttributeError:
                    exit_code = None
                logging.warning(
                    "Flushing process has been closed (exit code %s),"
                    " skipped sending metric '%s'",
                    exit_code,
                    self.metric,
                )

        return enqueue


def _Indirect(fn):
    """Decorates a function to be indirect If MESSAGE_QUEUE is set.

    If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics
    object; otherwise, it behaves normally.
    """

    @wraps(fn)
    def AddToQueueIfPresent(*args, **kwargs):
        if MESSAGE_QUEUE:
            return ProxyMetric(fn.__name__, args, kwargs)
        else:
            # Whether to reset the metric after the flush; this is only used by
            # |ProxyMetric|, so remove this from the kwargs.
            kwargs.pop("reset_after", None)
            return fn(*args, **kwargs)

    return AddToQueueIfPresent


class MockMetric:
    """Mock metric object, to be returned if ts_mon is not set up."""

    def _mock_method(self, *args, **kwargs):
        pass

    def __getattr__(self, _):
        return self._mock_method


def _ImportSafe(fn):
    """Decorator causing |fn| to return MockMetric if ts_mon not imported."""

    @wraps(fn)
    def wrapper(*args, **kwargs):
        if ts_mon:
            return fn(*args, **kwargs)
        else:
            return MockMetric()

    return wrapper


class FieldSpecAdapter:
    """Infers the types of fields values to work around field_spec requirement.

    See: https://chromium-review.googlesource.com/c/432120/ for the change
    which added a required field_spec argument. This class is a temporary
    workaround to allow inferring the field_spec if is not provided.
    """

    FIELD_CLASSES = (
        {}
        if ts_mon is None
        else {
            bool: ts_mon.BooleanField,
            int: ts_mon.IntegerField,
            str: ts_mon.StringField,
            str: ts_mon.StringField,
        }
    )

    def __init__(self, metric_cls, *args, **kwargs):
        self._metric_cls = metric_cls
        self._args = args
        self._kwargs = kwargs
        self._instance = _MISSING

    def __getattr__(self, prop):
        """Return a wrapper which constructs the metric object on demand.

        Args:
            prop: The property name

        Returns:
            If self._instance has been created, the instance's .|prop| property,
            otherwise, a wrapper function which creates the ._instance and then
            calls the |prop| method on the instance.
        """
        if self._instance is not _MISSING:
            return getattr(self._instance, prop)

        def func(*args, **kwargs):
            if self._instance is not _MISSING:
                return getattr(self._instance, prop)(*args, **kwargs)
            fields = FieldSpecAdapter._InferFields(prop, args, kwargs)
            self._kwargs["field_spec"] = FieldSpecAdapter._InferFieldSpec(
                fields
            )
            self._instance = self._metric_cls(*self._args, **self._kwargs)
            return getattr(self._instance, prop)(*args, **kwargs)

        func.__name__ = prop
        return func

    @staticmethod
    def _InferFields(method_name, args, kwargs):
        """Infers the fields argument.

        Args:
            method_name: The method called.
            args: The args list
            kwargs: The keyword args
        """
        if "fields" in kwargs:
            return kwargs["fields"]

        if method_name == "increment" and args:
            return args[0]

        if len(args) >= 2:
            return args[1]

    @staticmethod
    def _InferFieldSpec(fields):
        """Infers the fields types from the given fields.

        Args:
            fields: A dictionary with metric fields.
        """
        if not fields or not ts_mon:
            return None

        return [
            FieldSpecAdapter.FIELD_CLASSES[type(v)](field)
            for (field, v) in sorted(fields.items())
        ]


def _OptionalFieldSpec(fn):
    """Decorates a function to allow an optional description and field_spec."""

    @wraps(fn)
    def wrapper(*args, **kwargs):
        kwargs = dict(**kwargs)  # It's bad practice to mutate **kwargs
        # Slightly different than .setdefault, this line sets a default even
        # when the key is present (as long as the value is not truthy). Empty or
        # None is not allowed for descriptions.
        kwargs["description"] = kwargs.get("description") or "No description."
        if "field_spec" in kwargs and kwargs["field_spec"] is not _MISSING:
            return fn(*args, **kwargs)
        else:
            return FieldSpecAdapter(fn, *args, **kwargs)

    return wrapper


def _Metric(fn):
    """A pipeline of decorators to apply to our metric constructors."""
    return _OptionalFieldSpec(_ImportSafe(_Indirect(fn)))


# This is needed for the reset_after flag used by @Indirect.
# pylint: disable=unused-argument


@_Metric
def CounterMetric(
    name,
    reset_after=False,
    description=None,
    field_spec=_MISSING,
    start_time=None,
):
    """Returns a metric handle for a counter named |name|."""
    return ts_mon.CounterMetric(
        name,
        description=description,
        field_spec=field_spec,
        start_time=start_time,
    )


Counter = CounterMetric


@_Metric
def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING):
    """Returns a metric handle for a gauge named |name|."""
    return ts_mon.GaugeMetric(
        name, description=description, field_spec=field_spec
    )


Gauge = GaugeMetric


@_Metric
def CumulativeMetric(
    name, reset_after=False, description=None, field_spec=_MISSING
):
    """Returns a metric handle for a cumulative float named |name|."""
    return ts_mon.CumulativeMetric(
        name, description=description, field_spec=field_spec
    )


@_Metric
def StringMetric(
    name, reset_after=False, description=None, field_spec=_MISSING
):
    """Returns a metric handle for a string named |name|."""
    return ts_mon.StringMetric(
        name, description=description, field_spec=field_spec
    )


String = StringMetric


@_Metric
def BooleanMetric(
    name, reset_after=False, description=None, field_spec=_MISSING
):
    """Returns a metric handle for a boolean named |name|."""
    return ts_mon.BooleanMetric(
        name, description=description, field_spec=field_spec
    )


Boolean = BooleanMetric


@_Metric
def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING):
    """Returns a metric handle for a float named |name|."""
    return ts_mon.FloatMetric(
        name, description=description, field_spec=field_spec
    )


Float = FloatMetric


@_Metric
def CumulativeDistributionMetric(
    name,
    reset_after=False,
    description=None,
    bucketer=None,
    field_spec=_MISSING,
):
    """Returns a metric handle for a cumulative distribution named |name|."""
    return ts_mon.CumulativeDistributionMetric(
        name, description=description, bucketer=bucketer, field_spec=field_spec
    )


CumulativeDistribution = CumulativeDistributionMetric


@_Metric
def DistributionMetric(
    name,
    reset_after=False,
    description=None,
    bucketer=None,
    field_spec=_MISSING,
):
    """Returns a metric handle for a distribution named |name|."""
    return ts_mon.NonCumulativeDistributionMetric(
        name, description=description, bucketer=bucketer, field_spec=field_spec
    )


Distribution = DistributionMetric


@_Metric
def CumulativeSmallIntegerDistribution(
    name, reset_after=False, description=None, field_spec=_MISSING
):
    """Returns a metric handle for a cumulative distribution named |name|.

    This differs slightly from CumulativeDistribution, in that the underlying
    metric uses a uniform bucketer rather than a geometric one.

    This metric type is suitable for holding a distribution of numbers that are
    nonnegative integers in the range of 0 to 100.
    """
    return ts_mon.CumulativeDistributionMetric(
        name,
        bucketer=ts_mon.FixedWidthBucketer(1),
        description=description,
        field_spec=field_spec,
    )


@_Metric
def CumulativeSecondsDistribution(
    name, scale=1, reset_after=False, description=None, field_spec=_MISSING
):
    """Returns a metric handle for a cumulative distribution named |name|.

    The distribution handle returned by this method is better suited than the
    default one for recording handling times, in seconds.

    This metric handle has bucketing that is optimized for time intervals
    (in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this
    (e.g. scale=0.1 covers a range from .1 seconds to 3.2 days).

    Args:
        name: string name of metric
        scale: scaling factor of buckets, and size of the first bucket.
            default: 1
        reset_after: Should the metric be reset after reporting.
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
    """
    b = ts_mon.GeometricBucketer(
        growth_factor=_SECONDS_BUCKET_FACTOR, scale=scale
    )
    return ts_mon.CumulativeDistributionMetric(
        name,
        bucketer=b,
        units=ts_mon.MetricsDataUnits.SECONDS,
        description=description,
        field_spec=field_spec,
    )


SecondsDistribution = CumulativeSecondsDistribution


@_Metric
def PercentageDistribution(
    name,
    num_buckets=1000,
    reset_after=False,
    description=None,
    field_spec=_MISSING,
):
    """Returns a metric handle for a cumulative distribution for percentage.

    The distribution handle returned by this method is better suited for
    reporting percentage values than the default one. The bucketing is optimized
    for values in [0,100].

    Args:
        name: The name of this metric.
        num_buckets: This metric buckets the percentage values before
            reporting. This argument controls the number of the bucket the range
            [0,100] is divided in. The default gives you 0.1% resolution.
        reset_after: Should the metric be reset after reporting.
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
    """
    # The last bucket actually covers [100, 100 + 1.0/num_buckets), so it
    # corresponds to values that exactly match 100%.
    bucket_width = 100 / num_buckets
    b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets)
    return ts_mon.CumulativeDistributionMetric(
        name, bucketer=b, description=description, field_spec=field_spec
    )


@contextlib.contextmanager
def SecondsTimer(
    name,
    fields=None,
    description=None,
    field_spec=_MISSING,
    scale=1,
    record_on_exception=True,
    add_exception_field=False,
):
    """Record the time of an operation to a CumulativeSecondsDistributionMetric.

    Records the time taken inside the context block, to the
    CumulativeSecondsDistribution named |name|, with the given fields.

    Examples:
        # Time the doSomething() call, with field values that are independent of
        # the results of the operation.
        with SecondsTimer(
            "timer/name",
            fields={"foo": "bar"},
            description="My timer",
            field_spec=[
                ts_mon.StringField("foo"),
                ts_mon.BooleanField("success"),
            ],
        ):
            doSomething()

        # Time the doSomethingElse call, with field values that depend on the
        # results of that operation. Note that it is important that a default
        # value is specified for these fields, in case an exception is thrown by
        # doSomethingElse()
        f = {'success': False, 'foo': 'bar'}
        with SecondsTimer(
            "timer/name",
            fields=f,
            description="My timer",
            field_spec=[ts_mon.StringField("foo")],
        ) as c:
            doSomethingElse()
            c["success"] = True

        # Incorrect Usage!
        with SecondsTimer('timer/name', description='My timer') as c:
            doSomething()
            c['foo'] = bar  # 'foo' is not a valid field, because no default
                            # value for it was specified in the context
                            # constructor. It will be silently ignored.

    Args:
        name: The name of the metric to create
        fields: The fields of the metric to create.
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
        scale: A float to scale the CumulativeSecondsDistribution buckets by.
        record_on_exception: Whether to record metrics if an exception is
            raised.
        add_exception_field: Whether to add a
            BooleanField('encountered_exception') to the FieldSpec provided, and
            set its value to True iff an exception was raised in the context.
    """
    if field_spec is not None and field_spec is not _MISSING:
        field_spec.append(ts_mon.BooleanField("encountered_exception"))

    m = CumulativeSecondsDistribution(
        name, scale=scale, description=description, field_spec=field_spec
    )
    f = fields or {}
    f = dict(f)
    keys = list(f)
    t0 = _GetSystemClock()

    error = True
    try:
        yield f
        error = False
    finally:
        if record_on_exception and add_exception_field:
            keys.append("encountered_exception")
            f.setdefault("encountered_exception", error)
        # Filter out keys that were not part of the initial key set. This is to
        # avoid inconsistent fields.
        # TODO(akeshet): Doing this filtering isn't super efficient. Would be
        #   better to implement some key-restricted subclass or wrapper around
        #   dict, and just yield that above rather than yielding a regular dict.
        if record_on_exception or not error:
            dt = _GetSystemClock() - t0
            # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
            if dt >= 0:
                m.add(dt, fields={k: f[k] for k in keys})


def SecondsTimerDecorator(
    name,
    fields=None,
    description=None,
    field_spec=_MISSING,
    scale=1,
    record_on_exception=True,
    add_exception_field=False,
):
    """Decorator to time the duration of function calls.

    Examples:
        @SecondsTimerDecorator(
            "timer/name",
            fields={"foo": "bar"},
            description="My timer",
            field_spec=[ts_mon.StringField("foo")],
        )
        def Foo(bar):
            return doStuff()

        is equivalent to

        def Foo(bar):
            with SecondsTimer(
                "timer/name",
                fields={"foo": "bar"},
                description="My timer",
                field_spec=[ts_mon.StringField("foo")],
            ):
                return doStuff()

    Args:
        name: The name of the metric to create.
        fields: The fields of the metric to create.
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
        scale: A float to scale the distribution by.
        record_on_exception: Whether to record metrics if an exception is
            raised.
        add_exception_field: Whether to add a
            BooleanField('encountered_exception') to the FieldSpec provided, and
            set its value to True iff an exception was raised in the context.
    """

    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            with SecondsTimer(
                name,
                fields=fields,
                description=description,
                field_spec=field_spec,
                scale=scale,
                record_on_exception=record_on_exception,
                add_exception_field=add_exception_field,
            ):
                return fn(*args, **kwargs)

        return wrapper

    return decorator


@contextlib.contextmanager
def SecondsInstanceTimer(
    name,
    fields=None,
    description=None,
    field_spec=_MISSING,
    record_on_exception=True,
    add_exception_field=False,
):
    """Record the time of an operation to a FloatMetric.

    Records the time taken inside of the context block, to the
    Float metric named |name|, with the given fields.  This is
    a non-cumulative metric; this represents the absolute time
    taken for a specific block.  The duration is stored in a float
    to provide flexibility in the future for higher accuracy.

    Examples:
        # Time the doSomething() call, with field values that are independent of
        # the results of the operation.
        with SecondsInstanceTimer(
            "timer/name",
            fields={"foo": "bar"},
            description="My timer",
            field_spec=[
                ts_mon.StringField("foo"),
                ts_mon.BooleanField("success"),
            ],
        ):
            doSomething()

        # Time the doSomethingElse call, with field values that depend on the
        # results of that operation. Note that it is important that a default
        # value is specified for these fields, in case an exception is thrown by
        # doSomethingElse()
        f = {"success": False, "foo": "bar"}
        with SecondsInstanceTimer(
            "timer/name",
            fields=f,
            description="My timer",
            field_spec=[ts_mon.StringField("foo")],
        ) as c:
            doSomethingElse()
            c["success"] = True

        # Incorrect Usage!
        with SecondsInstanceTimer('timer/name', description='My timer') as c:
            doSomething()
            c['foo'] = bar  # 'foo' is not a valid field, because no default
                            # value for it was specified in the context
                            # constructor. It will be silently ignored.

    Args:
        name: The name of the metric to create
        fields: The fields of the metric to create.
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
        record_on_exception: Whether to record metrics if an exception is
            raised.
        add_exception_field: Whether to add a
            BooleanField('encountered_exception') to the FieldSpec provided, and
            set its value to True iff an exception was raised in the context.

    Yields:
        Float based metric measuring the duration of execution.
    """
    if field_spec is not None and field_spec is not _MISSING:
        field_spec.append(ts_mon.BooleanField("encountered_exception"))

    m = FloatMetric(name, description=description, field_spec=field_spec)
    f = dict(fields or {})
    keys = list(f)
    t0 = _GetSystemClock()

    error = True
    try:
        yield f
        error = False
    finally:
        if record_on_exception and add_exception_field:
            keys.append("encountered_exception")
            f.setdefault("encountered_exception", error)
        # Filter out keys that were not part of the initial key set. This is to
        # avoid inconsistent fields.
        # TODO(akeshet): Doing this filtering isn't super efficient. Would be
        #   better to implement some key-restricted subclass or wrapper around
        #   dict, and just yield that above rather than yielding a regular dict.
        if record_on_exception or not error:
            dt = _GetSystemClock() - t0
            m.set(dt, fields={k: f[k] for k in keys})


def SecondsInstanceTimerDecorator(
    name,
    fields=None,
    description=None,
    field_spec=_MISSING,
    record_on_exception=True,
    add_exception_field=False,
):
    """Decorator to time the gauge duration of function calls.

    Examples:
        @SecondsInstanceTimerDecorator(
            "timer/name",
            fields={"foo": "bar"},
            description="My timer",
            field_spec=[
                ts_mon.StringField("foo"),
                ts_mon.BooleanField("success"),
            ],
        )
        def Foo(bar):
            return doStuff()

        is equivalent to

        def Foo(bar):
            with SecondsInstanceTimer(
                "timer/name",
                fields={"foo": "bar"},
                description="My timer",
                field_spec=[
                    ts_mon.StringField("foo"),
                    ts_mon.BooleanField("success"),
                ],
            ):
                return doStuff()

    Args:
        name: The name of the metric to create
        fields: The fields of the metric to create
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
        record_on_exception: Whether to record metrics if an exception is
            raised.
        add_exception_field: Whether to add a
            BooleanField('encountered_exception') to the FieldSpec provided, and
            set its value to True iff an exception was raised in the context.

    Returns:
        A SecondsInstanceTimer metric decorator.
    """

    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            with SecondsInstanceTimer(
                name,
                fields=fields,
                description=description,
                field_spec=field_spec,
                record_on_exception=record_on_exception,
                add_exception_field=add_exception_field,
            ):
                return fn(*args, **kwargs)

        return wrapper

    return decorator


@contextlib.contextmanager
def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING):
    """Create a counter that tracks if something succeeds.

    Args:
        name: The name of the metric to create
        fields: The fields of the metric
        description: A string description of the metric.
        field_spec: A sequence of ts_mon.Field objects to specify the field
            schema.
    """
    c = Counter(name)
    f = fields or {}
    f = f.copy()
    # We add in the additional field success.
    keys = list(f) + ["success"]
    success = False
    try:
        yield f
        success = True
    finally:
        f.setdefault("success", success)
        f = {k: f[k] for k in keys}
        c.increment(fields=f)


@contextlib.contextmanager
def Presence(name, fields=None, description=None, field_spec=_MISSING):
    """A counter of 'active' things.

    This keeps track of how many names are active at any given time. However,
    it's only suitable for long-running tasks, since the initial true value may
    never be written out if the task doesn't run for at least a minute.
    """
    b = Boolean(name, description=None, field_spec=field_spec)
    b.set(True, fields=fields)
    try:
        yield
    finally:
        b.set(False, fields=fields)


class RuntimeBreakdownTimer:
    """Record the time of an operation and the breakdown into sub-steps.

    Examples:
        with RuntimeBreakdownTimer(
            "timer/name",
            fields={"foo": "bar"},
            description="My timer",
            field_spec=[ts_mon.StringField("foo")],
        ) as timer:
            with timer.Step("first_step"):
                doFirstStep()
            with timer.Step("second_step"):
                doSecondStep()
            # The time spent next will show up under
            # .../timer/name/breakdown_no_step
            doSomeNonStepWork()

    This will emit the following metrics:
    - .../timer/name/total_duration - A CumulativeSecondsDistribution metric for
        the time spent inside the outer with block.
    - .../timer/name/breakdown/first_step and
        .../timer/name/breakdown/second_step - PercentageDistribution metrics
        for the fraction of time devoted to each substep.
    - .../timer/name/breakdown_unaccounted - PercentageDistribution metric for
        the fraction of time that is not accounted for in any of the substeps.
    - .../timer/name/bucketing_loss - PercentageDistribution metric buckets
        values before reporting them as distributions. This causes small errors
        in the reported values because they are rounded to the reported buckets
        lower bound. This is a CumulativeMetric measuring the total rounding
        error accrued in reporting all the percentages. The worst case bucketing
        loss for x steps is (x+1)/10. So, if you time across 9 steps, you should
        expect no more than 1% rounding error.
    [experimental]
    - .../timer/name/duration_breakdown - A Float metric, with one stream per
        Step indicating the ratio of time spent in that step. The different
        steps are differentiated via a field with key 'step_name'. Since some of
        the time can be spent outside any steps, these ratios will sum to <= 1.

    NB: This helper can only be used if the field values are known at the
    beginning of the outer context and do not change as a result of any of the
    operations timed.
    """

    PERCENT_BUCKET_COUNT = 1000

    _StepMetrics = collections.namedtuple("_StepMetrics", ["name", "time_s"])

    def __init__(
        self, name, fields=None, description=None, field_spec=_MISSING
    ):
        self._name = name
        self._fields = fields
        self._field_spec = field_spec
        self._description = description
        self._outer_t0 = None
        self._total_time_s = 0
        self._inside_step = False
        self._step_metrics = []

    def __enter__(self):
        self._outer_t0 = _GetSystemClock()
        return self

    def __exit__(self, _type, _value, _traceback):
        self._RecordTotalTime()

        outer_timer = CumulativeSecondsDistribution(
            "%s/total_duration" % (self._name,),
            field_spec=self._field_spec,
            description=self._description,
        )
        outer_timer.add(self._total_time_s, fields=self._fields)

        for name, percent in self._GetStepBreakdowns().items():
            step_metric = PercentageDistribution(
                "%s/breakdown/%s" % (self._name, name),
                num_buckets=self.PERCENT_BUCKET_COUNT,
                field_spec=self._field_spec,
                description=self._description,
            )
            step_metric.add(percent, fields=self._fields)

            fields = dict(self._fields) if self._fields is not None else {}
            fields["step_name"] = name
            # TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios
            # instead of percentage when the old PercentageDistribution
            # reporting is deleted.
            Float("%s/duration_breakdown" % self._name).set(
                percent / 100, fields=fields
            )

        unaccounted_metric = PercentageDistribution(
            "%s/breakdown_unaccounted" % self._name,
            num_buckets=self.PERCENT_BUCKET_COUNT,
            field_spec=self._field_spec,
            description=self._description,
        )
        unaccounted_metric.add(
            self._GetUnaccountedBreakdown(), fields=self._fields
        )

        bucketing_loss_metric = CumulativeMetric(
            "%s/bucketing_loss" % self._name,
            field_spec=self._field_spec,
            description=self._description,
        )
        bucketing_loss_metric.increment_by(
            self._GetBucketingLoss(), fields=self._fields
        )

    @contextlib.contextmanager
    def Step(self, step_name):
        """Start a new step named step_name in the timed operation.

        Note that it is not possible to start a step inside a step. i.e.,

        with RuntimeBreakdownTimer('timer') as timer:
            with timer.Step('outer_step'):
                with timer.Step('inner_step'):
                    # will by design raise an exception.

        Args:
            step_name: The name of the step being timed.
        """
        if self._inside_step:
            logging.error(
                "RuntimeBreakdownTimer.Step is not reentrant. "
                "Dropping step: %s",
                step_name,
            )
            yield
            return

        self._inside_step = True
        t0 = _GetSystemClock()
        try:
            yield
        finally:
            self._inside_step = False
            step_time_s = _GetSystemClock() - t0
            # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
            step_time_s = max(0, step_time_s)
            self._step_metrics.append(self._StepMetrics(step_name, step_time_s))

    def _GetStepBreakdowns(self):
        """Returns percentage of time spent in each step.

        Must be called after |_RecordTotalTime|.
        """
        if not self._total_time_s:
            return {}
        return {
            x.name: (x.time_s * 100) / self._total_time_s
            for x in self._step_metrics
        }

    def _GetUnaccountedBreakdown(self):
        """Returns the percentage time spent outside all steps.

        Must be called after |_RecordTotalTime|.
        """
        breakdown_percentages = sum(self._GetStepBreakdowns().values())
        return max(0, 100 - breakdown_percentages)

    def _GetBucketingLoss(self):
        """Compute the actual loss in reported percentages due to bucketing.

        Must be called after |_RecordTotalTime|.
        """
        reported = list(self._GetStepBreakdowns().values())
        reported.append(self._GetUnaccountedBreakdown())
        bucket_width = 100 / self.PERCENT_BUCKET_COUNT
        return sum(x % bucket_width for x in reported)

    def _RecordTotalTime(self):
        self._total_time_s = _GetSystemClock() - self._outer_t0
        # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
        self._total_time_s = max(0, self._total_time_s)


def _GetSystemClock():
    """Return a clock time.

    The only thing that the return value can be used for is to subtract from
    other instances to determine time elapsed.
    """
    # TODO(ayatane): We should use a monotonic clock to measure this,
    # but Python 2 does not have one.
    return time.time()


def Flush(reset_after=()):
    """Flushes metrics, but warns on transient errors.

    Args:
        reset_after: A list of metrics to reset after flushing.
    """
    if not ts_mon:
        return

    try:
        ts_mon.flush()
        while reset_after:
            reset_after.pop().reset()
    except ssl.SSLError as e:
        logging.warning("Caught transient network error while flushing: %s", e)
    except Exception as e:
        logging.error("Caught exception while flushing: %s", e)
