blob: ba10d5214ed9e7ba838d92e4624b994c33579baf [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import random
import threading
import time
import unittest
import mock
from infra_libs.ts_mon.common import distribution
from infra_libs.ts_mon.common import interface
from infra_libs.ts_mon.common import errors
from infra_libs.ts_mon.common import metric_store
from infra_libs.ts_mon.common import metrics
from infra_libs.ts_mon.common import targets
class DefaultModifyFnTest(unittest.TestCase):
def test_adds(self):
fn = metric_store.default_modify_fn('foo')
self.assertEquals(5, fn(2, 3))
self.assertEquals(5, fn(3, 2))
def test_negative(self):
fn = metric_store.default_modify_fn('foo')
with self.assertRaises(errors.MonitoringDecreasingValueError) as cm:
fn(2, -1)
self.assertIn('"foo"', str(cm.exception))
class MetricStoreTestBase(object):
"""Abstract base class for testing MetricStore implementations.
This class doesn't inherit from unittest.TestCase to prevent it from being
run automatically by expect_tests.
Your subclass should inherit from this and unittest.TestCase, and set
METRIC_STORE_CLASS to the implementation you want to test. See
InProcessMetricStoreTest in this file for an example.
"""
METRIC_STORE_CLASS = None
def setUp(self):
super(MetricStoreTestBase, self).setUp()
self.mock_time = mock.create_autospec(time.time, spec_set=True)
self.mock_time.return_value = 1234
target = targets.TaskTarget(
'test_service', 'test_job', 'test_region', 'test_host')
self.state = interface.State(store_ctor=self.create_store, target=target)
mock.patch('infra_libs.ts_mon.common.interface.state',
new=self.state).start()
self.store = self.state.store
self.metric = metrics.Metric('foo', 'desc', None)
def create_store(self, *args, **kwargs):
kwargs['time_fn'] = self.mock_time
return self.METRIC_STORE_CLASS(*args, **kwargs)
def tearDown(self):
super(MetricStoreTestBase, self).tearDown()
mock.patch.stopall()
def test_sets_start_time(self):
self.metric._start_time = None
self.mock_time.return_value = 1234
self.store.set('foo', ('value',), None, 42)
self.store.set('foo', ('value2',), None, 43)
all_metrics = list(self.store.get_all())
self.assertEqual(1, len(all_metrics))
self.assertEqual('foo', all_metrics[0][1].name)
self.assertEqual(1234, all_metrics[0][2])
def test_uses_start_time_from_metric(self):
self.metric._start_time = 5678
self.store.set('foo', ('value',), None, 42)
self.store.set('foo', ('value2',), None, 43)
all_metrics = list(self.store.get_all())
self.assertEqual(1, len(all_metrics))
self.assertEqual('foo', all_metrics[0][1].name)
self.assertEqual(5678, all_metrics[0][2])
def test_get(self):
fields1 = ('value',)
fields2 = ('value2',)
fields3 = ('value3',)
target_fields1 = {'region': 'rrr'}
target_fields2 = {'region': 'rrr', 'hostname': 'hhh'}
self.store.set('foo', fields1, None, 42)
self.store.set('foo', fields2, None, 43)
self.store.set('foo', fields1, target_fields1, 24)
self.store.set('foo', fields2, target_fields2, 34)
self.assertEquals(42, self.store.get('foo', fields1, None))
self.assertEquals(43, self.store.get('foo', fields2, None))
self.assertEquals(24, self.store.get('foo', fields1, target_fields1))
self.assertEquals(34, self.store.get('foo', fields2, target_fields2))
self.assertIsNone(self.store.get('foo', fields3, None))
self.assertIsNone(self.store.get('foo', (), None))
self.assertIsNone(self.store.get('foo', fields1, target_fields2))
self.assertEquals(44, self.store.get('foo', fields3, None, default=44))
self.assertIsNone(self.store.get('bar', (), None))
def test_iter_field_values(self):
fields1 = ('value',)
fields2 = ('value2',)
target_fields1 = {'region': 'rrr'}
self.store.set('foo', fields1, None, 42)
self.store.set('foo', fields2, None, 43)
self.store.set('foo', fields2, target_fields1, 44)
field_values = list(self.store.iter_field_values('foo'))
self.assertEquals([
(('value',), 42),
(('value2',), 43),
(('value2',), 44),
], sorted(field_values))
def test_set_enforce_ge(self):
self.store.set('foo', ('value',), None, 42, enforce_ge=True)
self.store.set('foo', ('value',), None, 43, enforce_ge=True)
with self.assertRaises(errors.MonitoringDecreasingValueError):
self.store.set('foo', ('value',), None, 42, enforce_ge=True)
def test_incr(self):
self.store.set('foo', ('value',), None, 42)
self.store.incr('foo', ('value',), None, 4)
self.assertEquals(46, self.store.get('foo', ('value',), None))
with self.assertRaises(errors.MonitoringDecreasingValueError):
self.store.incr('foo', ('value',), None, -1)
def test_incr_modify_fn(self):
def spec_fn(n, i): # pragma: no cover
return n+i
modify_fn = mock.create_autospec(spec_fn, spec_set=True)
modify_fn.return_value = 7
self.store.set('foo', ('value',), None, 42)
self.store.incr('foo', ('value',), None, 3, modify_fn=modify_fn)
self.assertEquals(7, self.store.get('foo', ('value',), None))
modify_fn.assert_called_once_with(42, 3)
def test_reset_for_unittest(self):
self.store.set('foo', ('value',), None, 42)
self.store.reset_for_unittest()
self.assertIsNone(self.store.get('foo', ('value',), None))
def test_reset_for_unittest_name(self):
self.store.set('foo', ('value',), None, 42)
self.store.reset_for_unittest(name='bar')
self.assertEquals(42, self.store.get('foo', ('value',), None))
self.store.reset_for_unittest(name='foo')
self.assertIsNone(self.store.get('foo', ('value',), None))
def test_unregister_metric(self):
fields = (('field', 'value'),)
self.store.set('foo', fields, None, 42) # Registered in setUp().
self.store.set('bar', fields, None, 24) # Unregistered.
all_metrics = list(self.store.get_all())
self.assertEqual(1, len(all_metrics))
self.assertEqual('foo', all_metrics[0][1].name)
def test_copies_distributions(self):
def modify_fn(dist, delta):
# This is the same as the modify_fn in _DistributionMetricBase's add().
if dist == 0:
dist = distribution.Distribution(distribution.GeometricBucketer())
dist.add(delta)
return dist
# Increment the metric once to create it in the store.
self.store.incr('foo', (), None, 42, modify_fn)
# Get its value from get_all. We should get a copy of the distribution.
dist = list(list(self.store.get_all())[0][4].items())[0][1]
self.assertEqual(1, dist.count)
self.assertEqual(42, dist.sum)
# Increment the metric again.
self.store.incr('foo', (), None, 42, modify_fn)
# The object we got should not change.
self.assertEqual(1, dist.count)
self.assertEqual(42, dist.sum)
def test_get_all_thread_safe(self):
"""Dumb test to check that setting metrics while calling get_all is ok."""
start = threading.Event()
stop = threading.Event()
def modify_worker():
start.wait()
while not stop.is_set():
self.store.set('foo', (('field', random.random()),), None, 1)
successful_workers = []
def get_all_worker():
start.wait()
while not stop.is_set():
for _, _, _, _, fields_values in self.store.get_all():
list(fields_values.items())
successful_workers.append(True)
# Create 10 modify threads and 10 get_all threads.
threads = (
[threading.Thread(target=modify_worker) for _ in range(10)] +
[threading.Thread(target=get_all_worker) for _ in range(10)])
# Start all the threads at once.
for thread in threads:
thread.start()
start.set()
# Wait 2 seconds then stop them all.
time.sleep(2)
stop.set()
for thread in threads:
thread.join()
# All the threads should've been successful and not raised an exception in
# get_all.
self.assertEqual([True] * 10, successful_workers)
class InProcessMetricStoreTest(MetricStoreTestBase, unittest.TestCase):
METRIC_STORE_CLASS = metric_store.InProcessMetricStore