# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper for inframon's command-line flag based configuration."""

from __future__ import print_function

import argparse
import contextlib
import multiprocessing
import os
import socket
import signal
import time
import Queue

from chromite.lib import cros_logging as logging
from chromite.lib import metrics
from chromite.lib import parallel

try:
  from infra_libs.ts_mon import config
  import googleapiclient.discovery
except (ImportError, RuntimeError) as e:
  config = None
  logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)


_WasSetup = False

FLUSH_INTERVAL = 60


@contextlib.contextmanager
def TrivialContextManager():
  """Context manager with no side effects."""
  yield


def SetupTsMonGlobalState(service_name,
                          short_lived=False,
                          indirect=False,
                          auto_flush=True,
                          debug_file=None):
  """Uses a dummy argument parser to get the default behavior from ts-mon.

  Args:
    service_name: The name of the task we are sending metrics from.
    short_lived: Whether this process is short-lived and should use the autogen
                 hostname prefix.
    indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
              process for indirect metrics flushing. Useful for forking,
              because forking would normally create a duplicate ts_mon thread.
    auto_flush: Whether to create a thread to automatically flush metrics every
                minute.
    debug_file: If non-none, send metrics to this path instead of to PubSub.
  """
  if not config:
    return TrivialContextManager()

  if indirect:
    return _CreateTsMonFlushingProcess([service_name],
                                       {'short_lived': short_lived,
                                        'debug_file': debug_file})

  # google-api-client has too much noisey logging.
  googleapiclient.discovery.logger.setLevel(logging.WARNING)
  parser = argparse.ArgumentParser()
  config.add_argparse_options(parser)
  args = [
      '--ts-mon-target-type', 'task',
      '--ts-mon-task-service-name', service_name,
      '--ts-mon-task-job-name', service_name,
  ]

  if debug_file:
    args.extend(['--ts-mon-endpoint', 'file://' + debug_file])

  # Short lived processes will have autogen: prepended to their hostname and
  # use task-number=PID to trigger shorter retention policies under
  # chrome-infra@, and used by a Monarch precomputation to group across the
  # task number.
  # Furthermore, we assume they manually call ts_mon.Flush(), because the
  # ts_mon thread will drop messages if the process exits before it flushes.
  if short_lived:
    auto_flush = False
    fqdn = socket.getfqdn().lower()
    host = fqdn.split('.')[0]
    args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
                 '--ts-mon-task-number', str(os.getpid())])

  args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])

  try:
    config.process_argparse_options(parser.parse_args(args=args))
    logging.notice('ts_mon was set up.')
    global _WasSetup  # pylint: disable=global-statement
    _WasSetup = True
  except Exception as e:
    logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
                    exc_info=True)


  return TrivialContextManager()


@contextlib.contextmanager
def _CreateTsMonFlushingProcess(setup_args, setup_kwargs):
  """Creates a separate process to flush ts_mon metrics.

  Useful for multiprocessing scenarios where we don't want multiple ts-mon
  threads send contradictory metrics. Instead, functions in
  chromite.lib.metrics will send their calls to a Queue, which is consumed by a
  dedicated flushing process.

  Args:
    setup_args: Arguments sent to SetupTsMonGlobalState in the child process
    setup_kwargs: Keyword arguments sent to SetupTsMonGlobalState in the child
      process

  Side effects:
    Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
    to send their calls to the Queue instead of creating the metrics.
  """
  # If this is nested, we don't need to create another queue and another
  # message consumer. Do nothing to continue to use the existing queue.
  if metrics.MESSAGE_QUEUE:
    return

  with parallel.Manager() as manager:
    message_q = manager.Queue()

    p = multiprocessing.Process(
        target=lambda: _ConsumeMessages(message_q, setup_args, setup_kwargs))
    p.start()

    # this makes the chromite.lib.metric functions use the queue.
    # note - we have to do this *after* forking the ConsumeMessages process.
    metrics.MESSAGE_QUEUE = message_q

    try:
      yield message_q
    finally:
      # Now that there is no longer a process to listen to the Queue, re-set it
      # to None so that any future metrics are created within this process.
      metrics.MESSAGE_QUEUE = None
      # Send the sentinal value for "flush one more time and exit".
      message_q.put(None)
      logging.info("Waiting for ts_mon flushing process to finish...")
      p.join(timeout=FLUSH_INTERVAL*2)
      if p.is_alive():
        p.terminate()
      if p.exitcode:
        logging.warning("ts_mon_config flushing process did not exit cleanly.")


def _FlushIfReady(pending, last_flush, reset_after=()):
  """Call metrics.Flush() if we are ready and have pending metrics.

  This allows us to only call flush every FLUSH_INTERVAL seconds.

  Args:
    pending: bool indicating whether there are pending metrics to flush.
    last_flush: time stamp of the last time flush() was called.
    reset_after: A list of metrics to reset after the flush.
  """
  now = time.time()
  time_delta = now - last_flush
  if time_delta > FLUSH_INTERVAL and pending:
    last_flush = now
    time_delta = 0
    metrics.Flush(reset_after=reset_after)
    pending = False
  else:
    pending = True

  return pending, last_flush, time_delta


def _MethodCallRepr(obj, method, args, kwargs):
  """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)

  Args:
    obj: An object
    method: A method name
    args: A list of arguments
    kwargs: A dict of keyword arguments
  """
  args_strings = (map(repr, args) +
                  [(str(k) + '=' + repr(v))
                   for (k, v) in kwargs.iteritems()])
  return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))


def _ConsumeMessages(message_q, setup_args, setup_kwargs):
  """Configures ts_mon and gets metrics from a message queue.

  Args:
    message_q: A multiprocessing.Queue to read metrics from.
    setup_args: Arguments to pass to SetupTsMonGlobalState.
    setup_kwargs: Keyword arguments to pass to SetupTsMonGlobalState.
  """

  last_flush = 0
  pending = False

  # If our parent dies, finish flushing before exiting.
  reset_after = []
  if parallel.ExitWithParent(signal.SIGHUP):
    signal.signal(signal.SIGHUP,
                  lambda _sig, _stack: metrics.Flush(reset_after=reset_after))

  # Configure ts-mon, but don't start up a sending thread.
  setup_kwargs['auto_flush'] = False
  SetupTsMonGlobalState(*setup_args, **setup_kwargs)

  message = message_q.get()
  while message:
    try:
      cls = getattr(metrics, message.metric_name)
      metric = cls(*message.metric_args, **message.metric_kwargs)
      if message.reset_after:
        reset_after.append(metric)
      getattr(metric, message.method)(
          *message.method_args,
          **message.method_kwargs)
    except Exception:
      logging.exception('Caught an exception while running %s',
                        _MethodCallRepr(message.metric_name,
                                        message.method,
                                        message.method_args,
                                        message.method_kwargs))

    pending, last_flush, time_delta = _FlushIfReady(True, last_flush,
                                                    reset_after=reset_after)

    try:
      # Only wait until the next flush time if we have pending metrics.
      timeout = FLUSH_INTERVAL - time_delta if pending else None
      message = message_q.get(timeout=timeout)
    except Queue.Empty:
      # We had pending metrics, but we didn't get a new message. Flush and wait
      # indefinitely.
      pending, last_flush, _ = _FlushIfReady(pending, last_flush,
                                             reset_after=reset_after)
      # Wait as long as we need to for the next metric.
      message = message_q.get()

  if pending:
    metrics.Flush(reset_after=reset_after)
