blob: 2ccd0e9f0cdeeb8dec5826cbd95e770265d4b771 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for chromite.lib.metrics."""
from __future__ import print_function
import itertools
import multiprocessing
import sys
from six.moves import queue as Queue
from chromite.lib import cros_test_lib
from chromite.lib import metrics
from chromite.lib import ts_mon_config
from infra_libs import ts_mon
assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'
# pylint: disable=protected-access
DEFAULT_OPTIONS = ts_mon_config._GenerateTsMonArgparseOptions(
'unittest', False, False, None, 0)
class TestConsumeMessages(cros_test_lib.MockTestCase):
"""Test that ConsumeMessages works correctly."""
def setUp(self):
# Every call to "time.time()" will look like 1 second has passed.
# Nb. we only want to mock out ts_mon_config's view of time, otherwise
# things like Process.join(10) won't sleep.
self.time_mock = self.PatchObject(ts_mon_config, 'time')
self.time_mock.time.side_effect = itertools.count(0)
self.flush_mock = self.PatchObject(ts_mon_config.metrics, 'Flush')
self.PatchObject(ts_mon_config, '_SetupTsMonFromOptions')
self.PatchObject(ts_mon_config, '_WasSetup', True)
self.mock_metric = self.PatchObject(metrics, 'Boolean')
self.common_metric_fields, _ = ts_mon_config.AddCommonFields({}, [])
def testGetMetricFieldSpec(self):
"""Test each field type gets its FieldSpec."""
fields = {
'int': 12,
'bool': True,
'str': 'string',
}
expected_fieldspec = [ts_mon.IntegerField('int'),
ts_mon.BooleanField('bool'),
ts_mon.StringField('str')]
self.assertEqual(ts_mon_config.GetMetricFieldSpec(fields),
expected_fieldspec)
def testNoneEndsProcess(self):
"""Putting None on the Queue should immediately end the consumption loop."""
q = Queue.Queue()
q.put(None)
ts_mon_config._SetupAndConsumeMessages(q, DEFAULT_OPTIONS)
ts_mon_config._SetupTsMonFromOptions.assert_called_once_with(
DEFAULT_OPTIONS, suppress_exception=True)
self.assertFalse(ts_mon_config.time.time.called)
self.assertFalse(ts_mon_config.metrics.Flush.called)
def testConsumeOneMetric(self):
"""Tests that sending one metric calls flush once."""
q = Queue.Queue()
q.put(metrics.MetricCall('Boolean', [], {},
'mock_name', ['arg1'], {'kwarg1': 'value'},
False))
q.put(None)
ts_mon_config._SetupAndConsumeMessages(q, DEFAULT_OPTIONS)
self.assertEqual(2, ts_mon_config.time.time.call_count)
ts_mon_config.time.sleep.assert_called_once_with(
ts_mon_config.FLUSH_INTERVAL - 1)
ts_mon_config.metrics.Flush.assert_called_once_with(reset_after=[])
self.mock_metric.return_value.mock_name.assert_called_once_with(
'arg1', fields=self.common_metric_fields, kwarg1='value')
def testConsumeTwoMetrics(self):
"""Tests that sending two metrics only calls flush once."""
q = Queue.Queue()
q.put(metrics.MetricCall('Boolean', [], {},
'mock_name1', ['arg1'], {'kwarg1': 'value'},
False))
q.put(metrics.MetricCall('Boolean', [], {},
'mock_name2', ['arg2'], {'kwarg2': 'value'},
False))
q.put(None)
ts_mon_config._SetupAndConsumeMessages(q, DEFAULT_OPTIONS)
self.assertEqual(3, ts_mon_config.time.time.call_count)
ts_mon_config.time.sleep.assert_called_once_with(
ts_mon_config.FLUSH_INTERVAL - 2)
ts_mon_config.metrics.Flush.assert_called_once_with(reset_after=[])
self.mock_metric.return_value.mock_name1.assert_called_once_with(
'arg1', fields=self.common_metric_fields, kwarg1='value')
self.mock_metric.return_value.mock_name2.assert_called_once_with(
'arg2', fields=self.common_metric_fields, kwarg2='value')
def testFlushingProcessExits(self):
"""Tests that _CreateTsMonFlushingProcess cleans up the process."""
processes = []
original_process_function = multiprocessing.Process
def SaveProcess(*args, **kwargs):
p = original_process_function(*args, **kwargs)
processes.append(p)
return p
self.PatchObject(multiprocessing, 'Process', SaveProcess)
with ts_mon_config._CreateTsMonFlushingProcess(DEFAULT_OPTIONS) as q:
q.put(metrics.MetricCall('Boolean', [], {},
'__class__', [], {},
False))
# wait a bit for the process to close, since multiprocessing.Queue and
# Process.join() is not synchronous.
processes[0].join(5)
self.assertEqual(0, processes[0].exitcode)
def testCatchesException(self):
"""Tests that the _SetupAndConsumeMessages loop catches exceptions."""
q = Queue.Queue()
class RaisesException(object):
"""Class to raise an exception"""
def raiseException(self, *_args, **_kwargs):
raise Exception()
metrics.RaisesException = RaisesException
q.put(metrics.MetricCall('RaisesException', [], {},
'raiseException', ['arg1'], {'kwarg1': 'value1'},
False))
q.put(None)
exception_log = self.PatchObject(ts_mon_config.logging, 'exception')
ts_mon_config._SetupAndConsumeMessages(q, DEFAULT_OPTIONS)
self.assertEqual(1, exception_log.call_count)
# time.time is called once because we check if we need to Flush() before
# receiving the None message.
self.assertEqual(1, ts_mon_config.time.time.call_count)
self.assertEqual(0, ts_mon_config.time.sleep.call_count)
self.assertEqual(0, ts_mon_config.metrics.Flush.call_count)
def testResetAfter(self):
"""Tests that metrics with reset_after set are cleared after."""
q = Queue.Queue()
q.put(metrics.MetricCall('Boolean', [], {},
'mock_name', ['arg1'], {'kwarg1': 'value1'},
reset_after=True))
q.put(None)
ts_mon_config._SetupAndConsumeMessages(q, DEFAULT_OPTIONS)
self.assertEqual(
[self.mock_metric.return_value],
ts_mon_config.metrics.Flush.call_args[1]['reset_after'])
self.mock_metric.return_value.mock_name.assert_called_once_with(
'arg1', fields=self.common_metric_fields, kwarg1='value1')
def testSubprocessQuitsWhenNotSetup(self):
self.PatchObject(ts_mon_config.logging, 'exception')
self.PatchObject(ts_mon_config, '_WasSetup', False)
ts_mon_config._SetupAndConsumeMessages(None, DEFAULT_OPTIONS)
self.assertEqual(False, ts_mon_config._WasSetup)
# The entry should not have been consumed by _ConsumeMessages
self.assertEqual(0, ts_mon_config.logging.exception.call_count)
def testSetOnceMetricKeepsEmitting(self):
"""Tests that a metric which is set once emits many times if left alone."""
self.PatchObject(ts_mon_config, 'FLUSH_INTERVAL', 0)
self.time_mock.time.side_effect = [1, 2, 3, 4, 5]
q = Queue.Queue()
q.put(metrics.MetricCall('Boolean', [], {},
'__class__', [], {},
False))
try:
ts_mon_config.MetricConsumer(q).Consume()
except StopIteration:
pass # No more time calls left.
self.assertEqual(self.flush_mock.call_count, 5)
class TestSetupTsMonGlobalState(cros_test_lib.MockTestCase):
"""Test that SetupTsMonGlobalState works correctly."""
def testTaskNumArgument(self):
"""The task_num argument should set the task_num in ts-mon."""
ts_mon_config.SetupTsMonGlobalState('unittest', auto_flush=False,
task_num=42)
self.assertEqual(ts_mon.common.interface.state.target.task_num, 42)
def testTaskNumWithIndirect(self):
"""The task_num argument should propagate to the flushing subprocess."""
create_flushing_process = self.PatchObject(
ts_mon_config, '_CreateTsMonFlushingProcess')
ts_mon_config.SetupTsMonGlobalState('unittest', indirect=True, task_num=42)
options = ts_mon_config._GenerateTsMonArgparseOptions(
'unittest', False, False, None, 42)
create_flushing_process.assert_called_once_with(options)