blob: d43ab644c68f56657d397d7ec35c5c1d376b885c [file] [log] [blame]
# Copyright 2016 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for chromite.lib.metrics."""
import sys
import tempfile
import unittest
from unittest import mock
from chromite.third_party.infra_libs import ts_mon
from chromite.lib import cros_test_lib
from chromite.lib import metrics
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import ts_mon_config
class FakeException(Exception):
"""FakeException to raise during tests."""
class TestIndirectMetrics(cros_test_lib.MockTestCase):
"""Tests the behavior of _Indirect metrics."""
def testEnqueue(self) -> None:
"""Test that _Indirect enqueues messages correctly."""
metric = metrics.Boolean
with parallel.Manager() as manager:
q = manager.Queue()
self.PatchObject(metrics, "MESSAGE_QUEUE", q)
proxy_metric = metric("foo")
proxy_metric.example("arg1", {"field_name": "value"})
message = q.get(timeout=10)
expected_metric_kwargs = {
"field_spec": [ts_mon.StringField("field_name")],
"description": "No description.",
}
self.assertEqual(
message,
metrics.MetricCall(
metric.__name__,
("foo",),
expected_metric_kwargs,
"example",
("arg1", {"field_name": "value"}),
{},
False,
),
)
def patchTime(self) -> None:
"""Patch time to force a Flush() every time a metric is sent."""
def TimeIterator():
t = 0
while True:
t += ts_mon_config.FLUSH_INTERVAL + 1
yield t
self.PatchObject(
ts_mon_config,
"time",
mock.Mock(time=mock.Mock(side_effect=TimeIterator())),
)
@unittest.skipIf(sys.version_info.major < 3, "Requires py3")
def testShortLived(self) -> None:
"""Tests that configuring ts-mon to use short-lived processes works."""
self.patchTime()
with tempfile.NamedTemporaryFile(dir="/var/tmp") as out:
with ts_mon_config.SetupTsMonGlobalState(
"metrics_unittest", short_lived=True, debug_file=out.name
):
# pylint: disable=protected-access
self.assertTrue(ts_mon_config._WasSetup)
@unittest.skipIf(sys.version_info.major < 3, "Requires py3")
def testResetAfter(self) -> None:
"""Tests that the reset_after flag works to send metrics only once."""
# By mocking out its "time" module, the forked flushing process will
# think it should call Flush() whenever we send a metric.
self.patchTime()
with tempfile.NamedTemporaryFile(dir="/var/tmp") as out:
# * The indirect=True flag is required for reset_after to work.
# * Using debug_file, we send metrics to the temporary file instead
# of sending metrics to production via PubSub.
with ts_mon_config.SetupTsMonGlobalState(
"metrics_unittest", indirect=True, debug_file=out.name
):
def MetricName(i, flushed):
return "test/metric/name/%d/%s" % (i, flushed)
# Each of these .set() calls will result in a Flush() call.
for i in range(7):
# any extra streams with different fields and
# reset_after=False will be cleared only if the below metric
# is cleared.
metrics.Boolean(MetricName(i, True), reset_after=False).set(
True, fields={"original": False}
)
metrics.Boolean(MetricName(i, True), reset_after=True).set(
True, fields={"original": True}
)
for i in range(7):
metrics.Boolean(
MetricName(i, False), reset_after=False
).set(True)
# By leaving the context, we .join() the flushing process.
content = osutils.ReadFile(out.name)
# The reset metrics should be sent only three times, because:
# * original=False is sent twice
# * original=True is sent once.
#
# The second flush() only results in one occurrence of the string
# MetricName(i, True) because both data points are in a
# "metrics_data_set" block, like so:
# metrics_collection {
# ... etc ...
# metrics_data_set {
# metric_name: "/chrome/infra/test/metric/name/0/True"
# data {
# bool_value: true
# field {
# name: "original"
# bool_value: false
# }
# }
# data {
# bool_value: true
# ... etc ...
for i in range(7):
self.assertEqual(content.count(MetricName(i, True)), 2)
# The non-reset metrics are sent once-per-flush.
# There are 7 of these metrics,
# * The 0th is sent 7 times,
# * The 1st is sent 6 times,
# ...
# * The 6th is sent 1 time.
# So the "i"th metric is sent (7-i) times.
for i in range(7):
self.assertEqual(content.count(MetricName(i, False)), 7 - i)
class TestSecondsTimer(cros_test_lib.MockTestCase):
"""Tests the behavior of SecondsTimer and SecondsTimerDecorator."""
def setUp(self) -> None:
self._mockMetric = mock.MagicMock()
self.PatchObject(
metrics,
"CumulativeSecondsDistribution",
return_value=self._mockMetric,
)
@metrics.SecondsTimerDecorator("fooname", fields={"foo": "bar"})
def _DecoratedFunction(self, *args, **kwargs) -> None:
pass
def testDecorator(self) -> None:
"""Test that calling a decorated function ends up emitting metric."""
self._DecoratedFunction(1, 2, 3, foo="bar")
self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1)
self.assertEqual(self._mockMetric.add.call_count, 1)
def testContextManager(self) -> None:
"""Test that timing context manager emits a metric."""
with metrics.SecondsTimer("fooname"):
pass
self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1)
self.assertEqual(self._mockMetric.add.call_count, 1)
def testContextManagerWithUpdate(self) -> None:
"""Verify timing context manager with a field update emits metric."""
with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c:
c["foo"] = "qux"
self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "qux"})
def testContextManagerWithoutUpdate(self) -> None:
"""Tests that the default value for fields is used when not updated."""
# pylint: disable=unused-variable
with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c:
pass
self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"})
def testContextManagerIgnoresInvalidField(self) -> None:
"""Test that we ignore fields that are set with no default."""
with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c:
c["qux"] = "qwert"
self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"})
def testContextManagerWithException(self) -> None:
"""Tests that we emit metrics if the timed method raised something."""
with self.assertRaises(AssertionError):
with metrics.SecondsTimer("fooname", fields={"foo": "bar"}):
assert False
self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"})
class TestSecondsInstanceTimer(cros_test_lib.MockTestCase):
"""Tests the behavior of SecondsInstanceTimer and decorator."""
def setUp(self) -> None:
self._mockMetric = mock.MagicMock()
self.PatchObject(metrics, "FloatMetric", return_value=self._mockMetric)
@metrics.SecondsInstanceTimerDecorator("fooname", fields={"foo": "bar"})
def _DecoratedFunction(self, *args, **kwargs) -> None:
pass
def testDecorator(self) -> None:
"""Test that calling a decorated function ends up emitting metric."""
self._DecoratedFunction(1, 2, 3, foo="bar")
self.assertEqual(metrics.FloatMetric.call_count, 1)
self.assertEqual(self._mockMetric.set.call_count, 1)
def testContextManager(self) -> None:
"""Test that timing context manager emits a metric."""
with metrics.SecondsInstanceTimer("fooname"):
pass
self.assertEqual(metrics.FloatMetric.call_count, 1)
self.assertEqual(self._mockMetric.set.call_count, 1)
def testContextManagerWithUpdate(self) -> None:
"""Verify timing context manager with a field update emits metric."""
with metrics.SecondsInstanceTimer(
"fooname", fields={"foo": "bar"}
) as c:
c["foo"] = "qux"
self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "qux"})
def testContextManagerWithoutUpdate(self) -> None:
"""Tests that the default value for fields is used when not updated."""
# pylint: disable=unused-variable
with metrics.SecondsInstanceTimer(
"fooname", fields={"foo": "bar"}
) as c:
pass
self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"})
def testContextManagerIgnoresInvalidField(self) -> None:
"""Test that we ignore fields that are set with no default."""
with metrics.SecondsInstanceTimer(
"fooname", fields={"foo": "bar"}
) as c:
c["qux"] = "qwert"
self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"})
def testContextManagerWithException(self) -> None:
"""Tests that we emit metrics if the timed method raised something."""
with self.assertRaises(AssertionError):
with metrics.SecondsInstanceTimer("fooname", fields={"foo": "bar"}):
assert False
self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"})
class TestSuccessCounter(cros_test_lib.MockTestCase):
"""Tests the behavior of SecondsTimer."""
def setUp(self) -> None:
self._mockMetric = mock.MagicMock()
self.PatchObject(metrics, "Counter", return_value=self._mockMetric)
def testContextManager(self) -> None:
"""Test that timing context manager emits a metric."""
with metrics.SuccessCounter("fooname"):
pass
self._mockMetric.increment.assert_called_with(fields={"success": True})
self.assertEqual(self._mockMetric.increment.call_count, 1)
def testContextManagerFailedException(self) -> None:
"""Test that we fail when an exception is raised."""
with self.assertRaises(FakeException):
with metrics.SuccessCounter("fooname"):
raise FakeException
self._mockMetric.increment.assert_called_with(fields={"success": False})
def testContextManagerFailedExplicit(self) -> None:
"""Test that we fail when an exception is raised."""
with metrics.SuccessCounter("fooname") as s:
s["success"] = False
self._mockMetric.increment.assert_called_with(fields={"success": False})
def testContextManagerWithUpdate(self) -> None:
"""Tests that context manager with a field update emits metric."""
with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c:
c["foo"] = "qux"
self._mockMetric.increment.assert_called_with(
fields={"foo": "qux", "success": True}
)
def testContextManagerWithoutUpdate(self) -> None:
"""Tests that the default value for fields is used when not updated."""
# pylint: disable=unused-variable
with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c:
pass
self._mockMetric.increment.assert_called_with(
fields={"foo": "bar", "success": True}
)
def testContextManagerIgnoresInvalidField(self) -> None:
"""Test that we ignore fields that are set with no default."""
with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c:
c["qux"] = "qwert"
self._mockMetric.increment.assert_called_with(
fields={"foo": "bar", "success": True}
)
class TestPresence(cros_test_lib.MockTestCase):
"""Tests the behavior of SecondsTimer."""
def setUp(self) -> None:
self._mockMetric = mock.MagicMock()
self.PatchObject(metrics, "Boolean", return_value=self._mockMetric)
def testContextManager(self) -> None:
"""Test that timing context manager emits a metric."""
with metrics.Presence("fooname"):
self.assertEqual(
self._mockMetric.mock_calls,
[
mock.call.set(True, fields=None),
],
)
self.assertEqual(
self._mockMetric.mock_calls,
[
mock.call.set(True, fields=None),
mock.call.set(False, fields=None),
],
)
def testContextManagerException(self) -> None:
"""Test that we fail when an exception is raised."""
with self.assertRaises(FakeException):
with metrics.Presence("fooname"):
raise FakeException
self.assertEqual(
self._mockMetric.mock_calls,
[
mock.call.set(True, fields=None),
mock.call.set(False, fields=None),
],
)
def testContextManagerFields(self) -> None:
"""Test that we fail when an exception is raised."""
with metrics.Presence("fooname", {"foo": "bar", "c": 3}):
pass
self.assertEqual(
self._mockMetric.mock_calls,
[
mock.call.set(True, fields={"c": 3, "foo": "bar"}),
mock.call.set(False, fields={"c": 3, "foo": "bar"}),
],
)
class ClientException(Exception):
"""An exception that client of the metrics module raises."""
class TestRuntimeBreakdownTimer(cros_test_lib.MockTestCase):
"""Tests the behaviour of RuntimeBreakdownTimer."""
def setUp(self) -> None:
# Only patch metrics.time because we don't want to affect calls to
# functions from the unittest running framework itself.
self.time_mock = self.PatchObject(metrics, "time")
self.time_mock.time.side_effect = self._GetFakeTime
# An arbitrary, but fixed, seed time.
self._fake_time = 1234
metric_mock = self.PatchObject(
metrics, "CumulativeSecondsDistribution", autospec=True
)
self._mockCumulativeSecondsDistribution = metric_mock.return_value
metric_mock = self.PatchObject(
metrics, "PercentageDistribution", autospec=True
)
self._mockPercentageDistribution = metric_mock.return_value
metric_mock = self.PatchObject(metrics, "Float", autospec=True)
self._mockFloat = metric_mock.return_value
metric_mock = self.PatchObject(
metrics, "CumulativeMetric", autospec=True
)
self._mockCumulativeMetric = metric_mock.return_value
def testSucessfulBreakdown(self) -> None:
"""Tests that the context manager emits expected breakdowns."""
with metrics.RuntimeBreakdownTimer("fubar") as runtime:
with runtime.Step("step1"):
self._IncrementFakeTime(4)
with runtime.Step("step2"):
self._IncrementFakeTime(1)
self._IncrementFakeTime(5)
self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1)
self.assertEqual(
metrics.CumulativeSecondsDistribution.call_args[0][0],
"fubar/total_duration",
)
self.assertEqual(
self._mockCumulativeSecondsDistribution.add.call_count, 1
)
self.assertEqual(
self._mockCumulativeSecondsDistribution.add.call_args[0][0], 10.0
)
self.assertEqual(metrics.PercentageDistribution.call_count, 3)
breakdown_names = [
x[0][0] for x in metrics.PercentageDistribution.call_args_list
]
self.assertEqual(
set(breakdown_names),
{
"fubar/breakdown/step1",
"fubar/breakdown/step2",
"fubar/breakdown_unaccounted",
},
)
breakdown_values = [
x[0][0] for x in self._mockPercentageDistribution.add.call_args_list
]
self.assertEqual(set(breakdown_values), {40.0, 10.0, 50.0})
self.assertEqual(metrics.CumulativeMetric.call_count, 1)
self.assertEqual(
metrics.CumulativeMetric.call_args[0][0], "fubar/bucketing_loss"
)
self.assertEqual(metrics.Float.call_count, 2)
for call_args in metrics.Float.call_args_list:
self.assertEqual(call_args[0][0], "fubar/duration_breakdown")
self.assertEqual(self._mockFloat.set.call_count, 2)
step_names = [
x[1]["fields"]["step_name"]
for x in self._mockFloat.set.call_args_list
]
step_ratios = [x[0][0] for x in self._mockFloat.set.call_args_list]
self.assertEqual(set(step_names), {"step1", "step2"})
self.assertEqual(set(step_ratios), {0.4, 0.1})
def testBucketingLossApproximately(self) -> None:
"""Tests that we report the bucketing loss correctly."""
with metrics.RuntimeBreakdownTimer("fubar") as runtime:
for i in range(300):
with runtime.Step("step%d" % i):
self._IncrementFakeTime(1)
self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1)
self.assertEqual(
metrics.CumulativeSecondsDistribution.call_args[0][0],
"fubar/total_duration",
)
self.assertEqual(
self._mockCumulativeSecondsDistribution.add.call_count, 1
)
self.assertEqual(
self._mockCumulativeSecondsDistribution.add.call_args[0][0], 300.0
)
self.assertEqual(metrics.CumulativeMetric.call_count, 1)
self.assertEqual(
metrics.CumulativeMetric.call_args[0][0], "fubar/bucketing_loss"
)
self.assertEqual(self._mockCumulativeMetric.increment_by.call_count, 1)
# Each step is roughly 1/300 ~ .33%.
# Our bucket resolution is 0.1 % so we expect to lose ~ 0.033% each
# report. Total # of reports = 300. So we'll lose ~9.99% Let's loosely
# bound that number to allow for floating point computation errors.
error = self._mockCumulativeMetric.increment_by.call_args[0][0]
self.assertGreater(error, 9.6)
self.assertLess(error, 10.2)
def testStepsWithClientCodeException(self) -> None:
"""Test that breakdown is reported correctly when client code raises."""
with self.assertRaises(ClientException):
with metrics.RuntimeBreakdownTimer("fubar") as runtime:
with runtime.Step("step1"):
self._IncrementFakeTime(1)
raise ClientException()
self.assertEqual(metrics.PercentageDistribution.call_count, 2)
breakdown_names = [
x[0][0] for x in metrics.PercentageDistribution.call_args_list
]
self.assertEqual(
set(breakdown_names),
{"fubar/breakdown/step1", "fubar/breakdown_unaccounted"},
)
self.assertEqual(metrics.Float.call_count, 1)
self.assertEqual(
metrics.Float.call_args[0][0], "fubar/duration_breakdown"
)
self.assertEqual(self._mockFloat.set.call_count, 1)
self.assertEqual(
self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1"
)
def testNestedStepIgnored(self) -> None:
"""Tests that trying to enter nested .Step contexts raises."""
with metrics.RuntimeBreakdownTimer("fubar") as runtime:
with runtime.Step("step1"):
with runtime.Step("step2"):
self._IncrementFakeTime(1)
self.assertEqual(metrics.PercentageDistribution.call_count, 2)
breakdown_names = [
x[0][0] for x in metrics.PercentageDistribution.call_args_list
]
self.assertEqual(
set(breakdown_names),
{"fubar/breakdown/step1", "fubar/breakdown_unaccounted"},
)
self.assertEqual(metrics.Float.call_count, 1)
self.assertEqual(
metrics.Float.call_args[0][0], "fubar/duration_breakdown"
)
self.assertEqual(self._mockFloat.set.call_count, 1)
self.assertEqual(
self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1"
)
def testNestedStepsWithClientCodeException(self) -> None:
"""Test that breakdown is reported correctly when client code raises."""
with self.assertRaises(ClientException):
with metrics.RuntimeBreakdownTimer("fubar") as runtime:
with runtime.Step("step1"):
with runtime.Step("step2"):
self._IncrementFakeTime(1)
raise ClientException()
self.assertEqual(metrics.PercentageDistribution.call_count, 2)
breakdown_names = [
x[0][0] for x in metrics.PercentageDistribution.call_args_list
]
self.assertEqual(
set(breakdown_names),
{"fubar/breakdown/step1", "fubar/breakdown_unaccounted"},
)
self.assertEqual(metrics.Float.call_count, 1)
self.assertEqual(
metrics.Float.call_args[0][0], "fubar/duration_breakdown"
)
self.assertEqual(self._mockFloat.set.call_count, 1)
self.assertEqual(
self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1"
)
def _GetFakeTime(self):
return self._fake_time
def _IncrementFakeTime(self, seconds) -> None:
self._fake_time = self._fake_time + seconds