blob: 3fb95d1b058ca69efb6b256afef7a955ff3b8faf [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for inframon's command-line flag based configuration."""
from __future__ import print_function
import argparse
import contextlib
import multiprocessing
import os
import socket
import signal
import time
import Queue
from chromite.lib import cros_logging as logging
from chromite.lib import metrics
from chromite.lib import parallel
from infra_libs.ts_mon import config
import googleapiclient.discovery
except (ImportError, RuntimeError) as e:
config = None
logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)
_WasSetup = False
def TrivialContextManager():
"""Context manager with no side effects."""
def SetupTsMonGlobalState(service_name,
"""Uses a dummy argument parser to get the default behavior from ts-mon.
service_name: The name of the task we are sending metrics from.
short_lived: Whether this process is short-lived and should use the autogen
hostname prefix.
indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
process for indirect metrics flushing. Useful for forking,
because forking would normally create a duplicate ts_mon thread.
auto_flush: Whether to create a thread to automatically flush metrics every
debug_file: If non-none, send metrics to this path instead of to PubSub.
if not config:
return TrivialContextManager()
if indirect:
return _CreateTsMonFlushingProcess([service_name],
{'short_lived': short_lived,
'debug_file': debug_file})
# google-api-client has too much noisey logging.
parser = argparse.ArgumentParser()
args = [
'--ts-mon-target-type', 'task',
'--ts-mon-task-service-name', service_name,
'--ts-mon-task-job-name', service_name,
if debug_file:
args.extend(['--ts-mon-endpoint', 'file://' + debug_file])
# Short lived processes will have autogen: prepended to their hostname and
# use task-number=PID to trigger shorter retention policies under
# chrome-infra@, and used by a Monarch precomputation to group across the
# task number.
# Furthermore, we assume they manually call ts_mon.Flush(), because the
# ts_mon thread will drop messages if the process exits before it flushes.
if short_lived:
auto_flush = False
fqdn = socket.getfqdn().lower()
host = fqdn.split('.')[0]
args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
'--ts-mon-task-number', str(os.getpid())])
args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
logging.notice('ts_mon was set up.')
global _WasSetup # pylint: disable=global-statement
_WasSetup = True
except Exception as e:
logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
return TrivialContextManager()
def _CreateTsMonFlushingProcess(setup_args, setup_kwargs):
"""Creates a separate process to flush ts_mon metrics.
Useful for multiprocessing scenarios where we don't want multiple ts-mon
threads send contradictory metrics. Instead, functions in
chromite.lib.metrics will send their calls to a Queue, which is consumed by a
dedicated flushing process.
setup_args: Arguments sent to SetupTsMonGlobalState in the child process
setup_kwargs: Keyword arguments sent to SetupTsMonGlobalState in the child
Side effects:
Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
to send their calls to the Queue instead of creating the metrics.
# If this is nested, we don't need to create another queue and another
# message consumer. Do nothing to continue to use the existing queue.
if metrics.MESSAGE_QUEUE:
with parallel.Manager() as manager:
message_q = manager.Queue()
p = multiprocessing.Process(
target=lambda: _ConsumeMessages(message_q, setup_args, setup_kwargs))
# this makes the chromite.lib.metric functions use the queue.
# note - we have to do this *after* forking the ConsumeMessages process.
metrics.MESSAGE_QUEUE = message_q
yield message_q
# Now that there is no longer a process to listen to the Queue, re-set it
# to None so that any future metrics are created within this process.
metrics.MESSAGE_QUEUE = None
# Send the sentinal value for "flush one more time and exit".
message_q.put(None)"Waiting for ts_mon flushing process to finish...")
if p.is_alive():
if p.exitcode:
logging.warning("ts_mon_config flushing process did not exit cleanly.")
def _WaitToFlush(last_flush, reset_after=()):
"""Sleeps until the next time we can call metrics.Flush(), then flushes.
last_flush: timestamp of the last flush
reset_after: A list of metrics to reset after the flush.
time_delta = time.time() - last_flush
time.sleep(max(0, FLUSH_INTERVAL - time_delta))
def _FlushIfReady(pending, last_flush, reset_after=()):
"""Call metrics.Flush() if we are ready and have pending metrics.
This allows us to only call flush every FLUSH_INTERVAL seconds.
pending: bool indicating whether there are pending metrics to flush.
last_flush: time stamp of the last time flush() was called.
reset_after: A list of metrics to reset after the flush.
now = time.time()
time_delta = now - last_flush
if time_delta > FLUSH_INTERVAL and pending:
last_flush = now
time_delta = 0
pending = False
pending = True
return pending, last_flush, time_delta
def _MethodCallRepr(obj, method, args, kwargs):
"""Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)
obj: An object
method: A method name
args: A list of arguments
kwargs: A dict of keyword arguments
args_strings = (map(repr, args) +
[(str(k) + '=' + repr(v))
for (k, v) in kwargs.iteritems()])
return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))
def _ConsumeMessages(message_q, setup_args, setup_kwargs):
"""Configures ts_mon and gets metrics from a message queue.
message_q: A multiprocessing.Queue to read metrics from.
setup_args: Arguments to pass to SetupTsMonGlobalState.
setup_kwargs: Keyword arguments to pass to SetupTsMonGlobalState.
last_flush = 0
pending = False
# If our parent dies, finish flushing before exiting.
reset_after = []
if parallel.ExitWithParent(signal.SIGHUP):
lambda _sig, _stack: _WaitToFlush(last_flush,
# Configure ts-mon, but don't start up a sending thread.
setup_kwargs['auto_flush'] = False
SetupTsMonGlobalState(*setup_args, **setup_kwargs)
message = message_q.get()
while message:
cls = getattr(metrics, message.metric_name)
metric = cls(*message.metric_args, **message.metric_kwargs)
if message.reset_after:
getattr(metric, message.method)(
except Exception:
logging.exception('Caught an exception while running %s',
pending, last_flush, time_delta = _FlushIfReady(True, last_flush,
# Only wait until the next flush time if we have pending metrics.
timeout = FLUSH_INTERVAL - time_delta if pending else None
message = message_q.get(timeout=timeout)
except Queue.Empty:
# We had pending metrics, but we didn't get a new message. Flush and wait
# indefinitely.
pending, last_flush, _ = _FlushIfReady(pending, last_flush,
# Wait as long as we need to for the next metric.
message = message_q.get()
if pending:
_WaitToFlush(last_flush, reset_after=reset_after)