blob: 4f551e924a09767d510c050ef6f9e9aa2542682e [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for inframon's command-line flag based configuration."""
from __future__ import print_function
import argparse
import contextlib
import multiprocessing
import os
import socket
import signal
import time
import Queue
from chromite.lib import cros_logging as logging
from chromite.lib import metrics
from chromite.lib import parallel
from infra_libs.ts_mon import config
import googleapiclient.discovery
except (ImportError, RuntimeError) as e:
config = None
logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)
_WasSetup = False
def TrivialContextManager():
"""Context manager with no side effects."""
def SetupTsMonGlobalState(service_name,
"""Uses a dummy argument parser to get the default behavior from ts-mon.
service_name: The name of the task we are sending metrics from.
indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
process for indirect metrics flushing. Useful for forking,
because forking would normally create a duplicate ts_mon thread.
suppress_exception: True to silence any exception during the setup. Default
is set to True.
short_lived: Whether this process is short-lived and should use the autogen
hostname prefix.
auto_flush: Whether to create a thread to automatically flush metrics every
debug_file: If non-none, send metrics to this path instead of to PubSub.
task_num: (Default 0) The task_num target field of the metrics to emit.
if not config:
return TrivialContextManager()
# The flushing subprocess calls .flush manually.
if indirect:
auto_flush = False
# google-api-client has too much noisey logging.
options = _GenerateTsMonArgparseOptions(
service_name, short_lived, auto_flush, debug_file, task_num)
if indirect:
return _CreateTsMonFlushingProcess(options)
_SetupTsMonFromOptions(options, suppress_exception)
return TrivialContextManager()
def _SetupTsMonFromOptions(options, suppress_exception):
"""Sets up ts-mon global state given parsed argparse options.
options: An argparse options object containing ts-mon flags.
suppress_exception: True to silence any exception during the setup. Default
is set to True.
logging.notice('ts_mon was set up.')
global _WasSetup # pylint: disable=global-statement
_WasSetup = True
except Exception as e:
logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
if not suppress_exception:
def _GenerateTsMonArgparseOptions(service_name, short_lived,
auto_flush, debug_file, task_num):
"""Generates an arg list for ts-mon to consume.
service_name: The name of the task we are sending metrics from.
short_lived: Whether this process is short-lived and should use the autogen
hostname prefix.
auto_flush: Whether to create a thread to automatically flush metrics every
debug_file: If non-none, send metrics to this path instead of to PubSub.
task_num: Override the default task num of 0.
parser = argparse.ArgumentParser()
args = [
'--ts-mon-target-type', 'task',
'--ts-mon-task-service-name', service_name,
'--ts-mon-task-job-name', service_name,
if debug_file:
args.extend(['--ts-mon-endpoint', 'file://' + debug_file])
# Short lived processes will have autogen: prepended to their hostname and
# use task-number=PID to trigger shorter retention policies under
# chrome-infra@, and used by a Monarch precomputation to group across the
# task number.
# Furthermore, we assume they manually call ts_mon.Flush(), because the
# ts_mon thread will drop messages if the process exits before it flushes.
if short_lived:
auto_flush = False
fqdn = socket.getfqdn().lower()
host = fqdn.split('.')[0]
args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
'--ts-mon-task-number', str(os.getpid())])
elif task_num:
args.extend(['--ts-mon-task-number', str(task_num)])
args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
return parser.parse_args(args=args)
def _CreateTsMonFlushingProcess(options):
"""Creates a separate process to flush ts_mon metrics.
Useful for multiprocessing scenarios where we don't want multiple ts-mon
threads send contradictory metrics. Instead, functions in
chromite.lib.metrics will send their calls to a Queue, which is consumed by a
dedicated flushing process.
options: An argparse options object to configure ts-mon with.
Side effects:
Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
to send their calls to the Queue instead of creating the metrics.
# If this is nested, we don't need to create another queue and another
# message consumer. Do nothing to continue to use the existing queue.
with parallel.Manager() as manager:
message_q = manager.Queue()
metrics.FLUSHING_PROCESS = multiprocessing.Process(
target=lambda: _SetupAndConsumeMessages(message_q, options))
# this makes the chromite.lib.metric functions use the queue.
# note - we have to do this *after* forking the ConsumeMessages process.
metrics.MESSAGE_QUEUE = message_q
yield message_q
def _CleanupMetricsFlushingProcess():
"""Sends sentinal value to flushing process and .joins it."""
# Now that there is no longer a process to listen to the Queue, re-set it
# to None so that any future metrics are created within this process.
message_q = metrics.MESSAGE_QUEUE
flushing_process = metrics.FLUSHING_PROCESS
metrics.MESSAGE_QUEUE = None
# If the process has already died, we don't need to try to clean it up.
if not flushing_process.is_alive():
# Send the sentinal value for "flush one more time and exit".
# If the flushing process quits, the message Queue can become full.
except IOError:
if not flushing_process.is_alive():
return"Waiting for ts_mon flushing process to finish...")
if flushing_process.is_alive():
if flushing_process.exitcode:
logging.warning("ts_mon_config flushing process did not exit cleanly.")"Finished waiting for ts_mon process.")
def _SetupAndConsumeMessages(message_q, options):
"""Sets up ts-mon, and starts a MetricConsumer loop.
message_q: The metric multiprocessing.Queue to read from.
options: An argparse options object to configure ts-mon with.
# Configure ts-mon, but don't start up a sending thread.
_SetupTsMonFromOptions(options, suppress_exception=True)
if not _WasSetup:
return MetricConsumer(message_q).Consume()
class MetricConsumer(object):
"""Configures ts_mon and gets metrics from a message queue.
This class is meant to be used in a subprocess. It configures itself
to receive a SIGHUP signal when the parent process dies, and catches the
signal in order to have a chance to flush any pending metrics one more time
before quitting.
def __init__(self, message_q):
# If our parent dies, finish flushing before exiting.
self.reset_after_flush = []
self.last_flush = 0
self.pending = False
self.message_q = message_q
if parallel.ExitWithParent(signal.SIGHUP):
signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush())
def Consume(self):
"""Emits metrics from self.message_q, flushing periodically.
The loop is terminated by a None entry on the Queue, which is a friendly
signal from the parent process that it's time to shut down. Before
returning, we wait to flush one more time to make sure that all the
metrics were sent.
message = self.message_q.get()
while message:
message = self._WaitForNextMessage()
if self.pending:
def _CallMetric(self, message):
"""Calls the metric method from |message|, ignoring exceptions."""
cls = getattr(metrics, message.metric_name)
metric = cls(*message.metric_args, **message.metric_kwargs)
if message.reset_after:
getattr(metric, message.method)(
self.pending = True
except Exception:
logging.exception('Caught an exception while running %s',
def _WaitForNextMessage(self):
"""Waits for a new message, flushing every |FLUSH_INTERVAL| seconds."""
while True:
time_delta = self._FlushIfReady()
timeout = FLUSH_INTERVAL - time_delta
message = self.message_q.get(timeout=timeout)
return message
except Queue.Empty:
def _WaitToFlush(self):
"""Sleeps until the next time we can call metrics.Flush(), then flushes."""
time_delta = time.time() - self.last_flush
time.sleep(max(0, FLUSH_INTERVAL - time_delta))
def _FlushIfReady(self):
"""Call metrics.Flush() if we are ready and have pending metrics.
This allows us to only call flush every FLUSH_INTERVAL seconds.
now = time.time()
time_delta = now - self.last_flush
if time_delta > FLUSH_INTERVAL:
self.last_flush = now
time_delta = 0
self.pending = False
return time_delta
def _MethodCallRepr(message):
"""Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)
message: A MetricCall object.
if not message:
return repr(message)
obj = message.metric_name,
method = message.method,
args = message.method_args,
kwargs = message.method_kwargs
args_strings = (map(repr, args) +
[(str(k) + '=' + repr(v))
for k, v in kwargs.iteritems()])
return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))