| # Copyright 2016 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittests for chromite.lib.metrics.""" |
| |
| import sys |
| import tempfile |
| import unittest |
| from unittest import mock |
| |
| from chromite.third_party.infra_libs import ts_mon |
| |
| from chromite.lib import cros_test_lib |
| from chromite.lib import metrics |
| from chromite.lib import osutils |
| from chromite.lib import parallel |
| from chromite.lib import ts_mon_config |
| |
| |
| class FakeException(Exception): |
| """FakeException to raise during tests.""" |
| |
| |
| class TestIndirectMetrics(cros_test_lib.MockTestCase): |
| """Tests the behavior of _Indirect metrics.""" |
| |
| def testEnqueue(self) -> None: |
| """Test that _Indirect enqueues messages correctly.""" |
| metric = metrics.Boolean |
| |
| with parallel.Manager() as manager: |
| q = manager.Queue() |
| self.PatchObject(metrics, "MESSAGE_QUEUE", q) |
| |
| proxy_metric = metric("foo") |
| proxy_metric.example("arg1", {"field_name": "value"}) |
| |
| message = q.get(timeout=10) |
| |
| expected_metric_kwargs = { |
| "field_spec": [ts_mon.StringField("field_name")], |
| "description": "No description.", |
| } |
| self.assertEqual( |
| message, |
| metrics.MetricCall( |
| metric.__name__, |
| ("foo",), |
| expected_metric_kwargs, |
| "example", |
| ("arg1", {"field_name": "value"}), |
| {}, |
| False, |
| ), |
| ) |
| |
| def patchTime(self) -> None: |
| """Patch time to force a Flush() every time a metric is sent.""" |
| |
| def TimeIterator(): |
| t = 0 |
| while True: |
| t += ts_mon_config.FLUSH_INTERVAL + 1 |
| yield t |
| |
| self.PatchObject( |
| ts_mon_config, |
| "time", |
| mock.Mock(time=mock.Mock(side_effect=TimeIterator())), |
| ) |
| |
| @unittest.skipIf(sys.version_info.major < 3, "Requires py3") |
| def testShortLived(self) -> None: |
| """Tests that configuring ts-mon to use short-lived processes works.""" |
| self.patchTime() |
| with tempfile.NamedTemporaryFile(dir="/var/tmp") as out: |
| with ts_mon_config.SetupTsMonGlobalState( |
| "metrics_unittest", short_lived=True, debug_file=out.name |
| ): |
| # pylint: disable=protected-access |
| self.assertTrue(ts_mon_config._WasSetup) |
| |
| @unittest.skipIf(sys.version_info.major < 3, "Requires py3") |
| def testResetAfter(self) -> None: |
| """Tests that the reset_after flag works to send metrics only once.""" |
| # By mocking out its "time" module, the forked flushing process will |
| # think it should call Flush() whenever we send a metric. |
| self.patchTime() |
| |
| with tempfile.NamedTemporaryFile(dir="/var/tmp") as out: |
| # * The indirect=True flag is required for reset_after to work. |
| # * Using debug_file, we send metrics to the temporary file instead |
| # of sending metrics to production via PubSub. |
| with ts_mon_config.SetupTsMonGlobalState( |
| "metrics_unittest", indirect=True, debug_file=out.name |
| ): |
| |
| def MetricName(i, flushed): |
| return "test/metric/name/%d/%s" % (i, flushed) |
| |
| # Each of these .set() calls will result in a Flush() call. |
| for i in range(7): |
| # any extra streams with different fields and |
| # reset_after=False will be cleared only if the below metric |
| # is cleared. |
| metrics.Boolean(MetricName(i, True), reset_after=False).set( |
| True, fields={"original": False} |
| ) |
| |
| metrics.Boolean(MetricName(i, True), reset_after=True).set( |
| True, fields={"original": True} |
| ) |
| |
| for i in range(7): |
| metrics.Boolean( |
| MetricName(i, False), reset_after=False |
| ).set(True) |
| |
| # By leaving the context, we .join() the flushing process. |
| content = osutils.ReadFile(out.name) |
| |
| # The reset metrics should be sent only three times, because: |
| # * original=False is sent twice |
| # * original=True is sent once. |
| # |
| # The second flush() only results in one occurrence of the string |
| # MetricName(i, True) because both data points are in a |
| # "metrics_data_set" block, like so: |
| # metrics_collection { |
| # ... etc ... |
| # metrics_data_set { |
| # metric_name: "/chrome/infra/test/metric/name/0/True" |
| # data { |
| # bool_value: true |
| # field { |
| # name: "original" |
| # bool_value: false |
| # } |
| # } |
| # data { |
| # bool_value: true |
| # ... etc ... |
| for i in range(7): |
| self.assertEqual(content.count(MetricName(i, True)), 2) |
| |
| # The non-reset metrics are sent once-per-flush. |
| # There are 7 of these metrics, |
| # * The 0th is sent 7 times, |
| # * The 1st is sent 6 times, |
| # ... |
| # * The 6th is sent 1 time. |
| # So the "i"th metric is sent (7-i) times. |
| for i in range(7): |
| self.assertEqual(content.count(MetricName(i, False)), 7 - i) |
| |
| |
| class TestSecondsTimer(cros_test_lib.MockTestCase): |
| """Tests the behavior of SecondsTimer and SecondsTimerDecorator.""" |
| |
| def setUp(self) -> None: |
| self._mockMetric = mock.MagicMock() |
| self.PatchObject( |
| metrics, |
| "CumulativeSecondsDistribution", |
| return_value=self._mockMetric, |
| ) |
| |
| @metrics.SecondsTimerDecorator("fooname", fields={"foo": "bar"}) |
| def _DecoratedFunction(self, *args, **kwargs) -> None: |
| pass |
| |
| def testDecorator(self) -> None: |
| """Test that calling a decorated function ends up emitting metric.""" |
| self._DecoratedFunction(1, 2, 3, foo="bar") |
| self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1) |
| self.assertEqual(self._mockMetric.add.call_count, 1) |
| |
| def testContextManager(self) -> None: |
| """Test that timing context manager emits a metric.""" |
| with metrics.SecondsTimer("fooname"): |
| pass |
| self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1) |
| self.assertEqual(self._mockMetric.add.call_count, 1) |
| |
| def testContextManagerWithUpdate(self) -> None: |
| """Verify timing context manager with a field update emits metric.""" |
| with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c: |
| c["foo"] = "qux" |
| self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "qux"}) |
| |
| def testContextManagerWithoutUpdate(self) -> None: |
| """Tests that the default value for fields is used when not updated.""" |
| # pylint: disable=unused-variable |
| with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c: |
| pass |
| self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| def testContextManagerIgnoresInvalidField(self) -> None: |
| """Test that we ignore fields that are set with no default.""" |
| with metrics.SecondsTimer("fooname", fields={"foo": "bar"}) as c: |
| c["qux"] = "qwert" |
| self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| def testContextManagerWithException(self) -> None: |
| """Tests that we emit metrics if the timed method raised something.""" |
| with self.assertRaises(AssertionError): |
| with metrics.SecondsTimer("fooname", fields={"foo": "bar"}): |
| assert False |
| |
| self._mockMetric.add.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| |
| class TestSecondsInstanceTimer(cros_test_lib.MockTestCase): |
| """Tests the behavior of SecondsInstanceTimer and decorator.""" |
| |
| def setUp(self) -> None: |
| self._mockMetric = mock.MagicMock() |
| self.PatchObject(metrics, "FloatMetric", return_value=self._mockMetric) |
| |
| @metrics.SecondsInstanceTimerDecorator("fooname", fields={"foo": "bar"}) |
| def _DecoratedFunction(self, *args, **kwargs) -> None: |
| pass |
| |
| def testDecorator(self) -> None: |
| """Test that calling a decorated function ends up emitting metric.""" |
| self._DecoratedFunction(1, 2, 3, foo="bar") |
| self.assertEqual(metrics.FloatMetric.call_count, 1) |
| self.assertEqual(self._mockMetric.set.call_count, 1) |
| |
| def testContextManager(self) -> None: |
| """Test that timing context manager emits a metric.""" |
| with metrics.SecondsInstanceTimer("fooname"): |
| pass |
| self.assertEqual(metrics.FloatMetric.call_count, 1) |
| self.assertEqual(self._mockMetric.set.call_count, 1) |
| |
| def testContextManagerWithUpdate(self) -> None: |
| """Verify timing context manager with a field update emits metric.""" |
| with metrics.SecondsInstanceTimer( |
| "fooname", fields={"foo": "bar"} |
| ) as c: |
| c["foo"] = "qux" |
| self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "qux"}) |
| |
| def testContextManagerWithoutUpdate(self) -> None: |
| """Tests that the default value for fields is used when not updated.""" |
| # pylint: disable=unused-variable |
| with metrics.SecondsInstanceTimer( |
| "fooname", fields={"foo": "bar"} |
| ) as c: |
| pass |
| self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| def testContextManagerIgnoresInvalidField(self) -> None: |
| """Test that we ignore fields that are set with no default.""" |
| with metrics.SecondsInstanceTimer( |
| "fooname", fields={"foo": "bar"} |
| ) as c: |
| c["qux"] = "qwert" |
| self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| def testContextManagerWithException(self) -> None: |
| """Tests that we emit metrics if the timed method raised something.""" |
| with self.assertRaises(AssertionError): |
| with metrics.SecondsInstanceTimer("fooname", fields={"foo": "bar"}): |
| assert False |
| |
| self._mockMetric.set.assert_called_with(mock.ANY, fields={"foo": "bar"}) |
| |
| |
| class TestSuccessCounter(cros_test_lib.MockTestCase): |
| """Tests the behavior of SecondsTimer.""" |
| |
| def setUp(self) -> None: |
| self._mockMetric = mock.MagicMock() |
| self.PatchObject(metrics, "Counter", return_value=self._mockMetric) |
| |
| def testContextManager(self) -> None: |
| """Test that timing context manager emits a metric.""" |
| with metrics.SuccessCounter("fooname"): |
| pass |
| self._mockMetric.increment.assert_called_with(fields={"success": True}) |
| self.assertEqual(self._mockMetric.increment.call_count, 1) |
| |
| def testContextManagerFailedException(self) -> None: |
| """Test that we fail when an exception is raised.""" |
| with self.assertRaises(FakeException): |
| with metrics.SuccessCounter("fooname"): |
| raise FakeException |
| |
| self._mockMetric.increment.assert_called_with(fields={"success": False}) |
| |
| def testContextManagerFailedExplicit(self) -> None: |
| """Test that we fail when an exception is raised.""" |
| with metrics.SuccessCounter("fooname") as s: |
| s["success"] = False |
| |
| self._mockMetric.increment.assert_called_with(fields={"success": False}) |
| |
| def testContextManagerWithUpdate(self) -> None: |
| """Tests that context manager with a field update emits metric.""" |
| with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c: |
| c["foo"] = "qux" |
| self._mockMetric.increment.assert_called_with( |
| fields={"foo": "qux", "success": True} |
| ) |
| |
| def testContextManagerWithoutUpdate(self) -> None: |
| """Tests that the default value for fields is used when not updated.""" |
| # pylint: disable=unused-variable |
| with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c: |
| pass |
| self._mockMetric.increment.assert_called_with( |
| fields={"foo": "bar", "success": True} |
| ) |
| |
| def testContextManagerIgnoresInvalidField(self) -> None: |
| """Test that we ignore fields that are set with no default.""" |
| with metrics.SuccessCounter("fooname", fields={"foo": "bar"}) as c: |
| c["qux"] = "qwert" |
| |
| self._mockMetric.increment.assert_called_with( |
| fields={"foo": "bar", "success": True} |
| ) |
| |
| |
| class TestPresence(cros_test_lib.MockTestCase): |
| """Tests the behavior of SecondsTimer.""" |
| |
| def setUp(self) -> None: |
| self._mockMetric = mock.MagicMock() |
| self.PatchObject(metrics, "Boolean", return_value=self._mockMetric) |
| |
| def testContextManager(self) -> None: |
| """Test that timing context manager emits a metric.""" |
| with metrics.Presence("fooname"): |
| self.assertEqual( |
| self._mockMetric.mock_calls, |
| [ |
| mock.call.set(True, fields=None), |
| ], |
| ) |
| |
| self.assertEqual( |
| self._mockMetric.mock_calls, |
| [ |
| mock.call.set(True, fields=None), |
| mock.call.set(False, fields=None), |
| ], |
| ) |
| |
| def testContextManagerException(self) -> None: |
| """Test that we fail when an exception is raised.""" |
| with self.assertRaises(FakeException): |
| with metrics.Presence("fooname"): |
| raise FakeException |
| |
| self.assertEqual( |
| self._mockMetric.mock_calls, |
| [ |
| mock.call.set(True, fields=None), |
| mock.call.set(False, fields=None), |
| ], |
| ) |
| |
| def testContextManagerFields(self) -> None: |
| """Test that we fail when an exception is raised.""" |
| with metrics.Presence("fooname", {"foo": "bar", "c": 3}): |
| pass |
| |
| self.assertEqual( |
| self._mockMetric.mock_calls, |
| [ |
| mock.call.set(True, fields={"c": 3, "foo": "bar"}), |
| mock.call.set(False, fields={"c": 3, "foo": "bar"}), |
| ], |
| ) |
| |
| |
| class ClientException(Exception): |
| """An exception that client of the metrics module raises.""" |
| |
| |
| class TestRuntimeBreakdownTimer(cros_test_lib.MockTestCase): |
| """Tests the behaviour of RuntimeBreakdownTimer.""" |
| |
| def setUp(self) -> None: |
| # Only patch metrics.time because we don't want to affect calls to |
| # functions from the unittest running framework itself. |
| self.time_mock = self.PatchObject(metrics, "time") |
| self.time_mock.time.side_effect = self._GetFakeTime |
| # An arbitrary, but fixed, seed time. |
| self._fake_time = 1234 |
| |
| metric_mock = self.PatchObject( |
| metrics, "CumulativeSecondsDistribution", autospec=True |
| ) |
| self._mockCumulativeSecondsDistribution = metric_mock.return_value |
| metric_mock = self.PatchObject( |
| metrics, "PercentageDistribution", autospec=True |
| ) |
| self._mockPercentageDistribution = metric_mock.return_value |
| metric_mock = self.PatchObject(metrics, "Float", autospec=True) |
| self._mockFloat = metric_mock.return_value |
| |
| metric_mock = self.PatchObject( |
| metrics, "CumulativeMetric", autospec=True |
| ) |
| self._mockCumulativeMetric = metric_mock.return_value |
| |
| def testSucessfulBreakdown(self) -> None: |
| """Tests that the context manager emits expected breakdowns.""" |
| with metrics.RuntimeBreakdownTimer("fubar") as runtime: |
| with runtime.Step("step1"): |
| self._IncrementFakeTime(4) |
| with runtime.Step("step2"): |
| self._IncrementFakeTime(1) |
| self._IncrementFakeTime(5) |
| |
| self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1) |
| self.assertEqual( |
| metrics.CumulativeSecondsDistribution.call_args[0][0], |
| "fubar/total_duration", |
| ) |
| self.assertEqual( |
| self._mockCumulativeSecondsDistribution.add.call_count, 1 |
| ) |
| self.assertEqual( |
| self._mockCumulativeSecondsDistribution.add.call_args[0][0], 10.0 |
| ) |
| |
| self.assertEqual(metrics.PercentageDistribution.call_count, 3) |
| breakdown_names = [ |
| x[0][0] for x in metrics.PercentageDistribution.call_args_list |
| ] |
| self.assertEqual( |
| set(breakdown_names), |
| { |
| "fubar/breakdown/step1", |
| "fubar/breakdown/step2", |
| "fubar/breakdown_unaccounted", |
| }, |
| ) |
| breakdown_values = [ |
| x[0][0] for x in self._mockPercentageDistribution.add.call_args_list |
| ] |
| self.assertEqual(set(breakdown_values), {40.0, 10.0, 50.0}) |
| |
| self.assertEqual(metrics.CumulativeMetric.call_count, 1) |
| self.assertEqual( |
| metrics.CumulativeMetric.call_args[0][0], "fubar/bucketing_loss" |
| ) |
| |
| self.assertEqual(metrics.Float.call_count, 2) |
| for call_args in metrics.Float.call_args_list: |
| self.assertEqual(call_args[0][0], "fubar/duration_breakdown") |
| self.assertEqual(self._mockFloat.set.call_count, 2) |
| step_names = [ |
| x[1]["fields"]["step_name"] |
| for x in self._mockFloat.set.call_args_list |
| ] |
| step_ratios = [x[0][0] for x in self._mockFloat.set.call_args_list] |
| self.assertEqual(set(step_names), {"step1", "step2"}) |
| self.assertEqual(set(step_ratios), {0.4, 0.1}) |
| |
| def testBucketingLossApproximately(self) -> None: |
| """Tests that we report the bucketing loss correctly.""" |
| with metrics.RuntimeBreakdownTimer("fubar") as runtime: |
| for i in range(300): |
| with runtime.Step("step%d" % i): |
| self._IncrementFakeTime(1) |
| |
| self.assertEqual(metrics.CumulativeSecondsDistribution.call_count, 1) |
| self.assertEqual( |
| metrics.CumulativeSecondsDistribution.call_args[0][0], |
| "fubar/total_duration", |
| ) |
| self.assertEqual( |
| self._mockCumulativeSecondsDistribution.add.call_count, 1 |
| ) |
| self.assertEqual( |
| self._mockCumulativeSecondsDistribution.add.call_args[0][0], 300.0 |
| ) |
| |
| self.assertEqual(metrics.CumulativeMetric.call_count, 1) |
| self.assertEqual( |
| metrics.CumulativeMetric.call_args[0][0], "fubar/bucketing_loss" |
| ) |
| self.assertEqual(self._mockCumulativeMetric.increment_by.call_count, 1) |
| # Each step is roughly 1/300 ~ .33%. |
| # Our bucket resolution is 0.1 % so we expect to lose ~ 0.033% each |
| # report. Total # of reports = 300. So we'll lose ~9.99% Let's loosely |
| # bound that number to allow for floating point computation errors. |
| error = self._mockCumulativeMetric.increment_by.call_args[0][0] |
| self.assertGreater(error, 9.6) |
| self.assertLess(error, 10.2) |
| |
| def testStepsWithClientCodeException(self) -> None: |
| """Test that breakdown is reported correctly when client code raises.""" |
| with self.assertRaises(ClientException): |
| with metrics.RuntimeBreakdownTimer("fubar") as runtime: |
| with runtime.Step("step1"): |
| self._IncrementFakeTime(1) |
| raise ClientException() |
| |
| self.assertEqual(metrics.PercentageDistribution.call_count, 2) |
| breakdown_names = [ |
| x[0][0] for x in metrics.PercentageDistribution.call_args_list |
| ] |
| self.assertEqual( |
| set(breakdown_names), |
| {"fubar/breakdown/step1", "fubar/breakdown_unaccounted"}, |
| ) |
| |
| self.assertEqual(metrics.Float.call_count, 1) |
| self.assertEqual( |
| metrics.Float.call_args[0][0], "fubar/duration_breakdown" |
| ) |
| self.assertEqual(self._mockFloat.set.call_count, 1) |
| self.assertEqual( |
| self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1" |
| ) |
| |
| def testNestedStepIgnored(self) -> None: |
| """Tests that trying to enter nested .Step contexts raises.""" |
| with metrics.RuntimeBreakdownTimer("fubar") as runtime: |
| with runtime.Step("step1"): |
| with runtime.Step("step2"): |
| self._IncrementFakeTime(1) |
| |
| self.assertEqual(metrics.PercentageDistribution.call_count, 2) |
| breakdown_names = [ |
| x[0][0] for x in metrics.PercentageDistribution.call_args_list |
| ] |
| self.assertEqual( |
| set(breakdown_names), |
| {"fubar/breakdown/step1", "fubar/breakdown_unaccounted"}, |
| ) |
| |
| self.assertEqual(metrics.Float.call_count, 1) |
| self.assertEqual( |
| metrics.Float.call_args[0][0], "fubar/duration_breakdown" |
| ) |
| self.assertEqual(self._mockFloat.set.call_count, 1) |
| self.assertEqual( |
| self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1" |
| ) |
| |
| def testNestedStepsWithClientCodeException(self) -> None: |
| """Test that breakdown is reported correctly when client code raises.""" |
| with self.assertRaises(ClientException): |
| with metrics.RuntimeBreakdownTimer("fubar") as runtime: |
| with runtime.Step("step1"): |
| with runtime.Step("step2"): |
| self._IncrementFakeTime(1) |
| raise ClientException() |
| |
| self.assertEqual(metrics.PercentageDistribution.call_count, 2) |
| breakdown_names = [ |
| x[0][0] for x in metrics.PercentageDistribution.call_args_list |
| ] |
| self.assertEqual( |
| set(breakdown_names), |
| {"fubar/breakdown/step1", "fubar/breakdown_unaccounted"}, |
| ) |
| |
| self.assertEqual(metrics.Float.call_count, 1) |
| self.assertEqual( |
| metrics.Float.call_args[0][0], "fubar/duration_breakdown" |
| ) |
| self.assertEqual(self._mockFloat.set.call_count, 1) |
| self.assertEqual( |
| self._mockFloat.set.call_args[1]["fields"]["step_name"], "step1" |
| ) |
| |
| def _GetFakeTime(self): |
| return self._fake_time |
| |
| def _IncrementFakeTime(self, seconds) -> None: |
| self._fake_time = self._fake_time + seconds |