# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper for inframon's command-line flag based configuration."""

import argparse
import contextlib
import logging
import multiprocessing
import os
import queue as Queue
import signal
import socket
import time

from chromite.third_party.googleapiclient import discovery

from chromite.lib import metrics
from chromite.lib import parallel


try:
  from chromite.third_party.infra_libs.ts_mon import BooleanField
  from chromite.third_party.infra_libs.ts_mon import config
  from chromite.third_party.infra_libs.ts_mon import IntegerField
  from chromite.third_party.infra_libs.ts_mon import StringField
except (ImportError, RuntimeError) as e:
  config = None
  logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)


_WasSetup = False
_CommonMetricFields = {}

FLUSH_INTERVAL = 60


@contextlib.contextmanager
def TrivialContextManager():
  """Context manager with no side effects."""
  yield


def GetMetricFieldSpec(fields=None):
  """Return the corresponding field_spec for metric fields.

  Args:
    fields: Dictionary containing metric fields.

  Returns:
    field_spec: List containing any *Field object associated with metric.
  """
  field_spec = []
  if fields:
    for key, val in fields.items():
      if isinstance(val, bool):
        field_spec.append(BooleanField(key))
      elif isinstance(val, int):
        field_spec.append(IntegerField(key))
      elif isinstance(val, str):
        field_spec.append(StringField(key))
      else:
        logging.error("Couldn't classify the metric field %s:%s",
                      key, val)

  return field_spec

def AddCommonFields(fields=None, field_spec=None):
  """Add cbuildbot-wide common fields to a given field set.

  Args:
    fields: Dictionary containing metric fields to which common metric fields
            will be added.
    field_spec: List containing any *Field object associated with metric.

  Returns:
    Dictionary containing complete set of metric fields to be applied to
    metric and a list of corresponding field_spec.
  """
  metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields
                   else {})

  if metric_fields:
    metric_fields.update(fields or {})
    return metric_fields, GetMetricFieldSpec(metric_fields)
  else:
    return fields, field_spec


def SetupTsMonGlobalState(service_name,
                          indirect=False,
                          suppress_exception=True,
                          short_lived=False,
                          auto_flush=True,
                          common_metric_fields=None,
                          debug_file=None,
                          task_num=0):
  """Uses a dummy argument parser to get the default behavior from ts-mon.

  Args:
    service_name: The name of the task we are sending metrics from.
    indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
              process for indirect metrics flushing. Useful for forking,
              because forking would normally create a duplicate ts_mon thread.
    suppress_exception: True to silence any exception during the setup. Default
              is set to True.
    short_lived: Whether this process is short-lived and should use the autogen
              hostname prefix.
    auto_flush: Whether to create a thread to automatically flush metrics every
              minute.
    common_metric_fields: Dictionary containing the metric fields that will be
              added to all metrics.
    debug_file: If non-none, send metrics to this path instead of to PubSub.
    task_num: (Default 0) The task_num target field of the metrics to emit.
  """
  if not config:
    return TrivialContextManager()

  # The flushing subprocess calls .flush manually.
  if indirect:
    auto_flush = False

  if common_metric_fields:
    _CommonMetricFields.update(common_metric_fields)

  # google-api-client has too much noisey logging.
  options = _GenerateTsMonArgparseOptions(
      service_name, short_lived, auto_flush, debug_file, task_num)

  if indirect:
    return _CreateTsMonFlushingProcess(options)
  else:
    _SetupTsMonFromOptions(options, suppress_exception)
    return TrivialContextManager()


def _SetupTsMonFromOptions(options, suppress_exception):
  """Sets up ts-mon global state given parsed argparse options.

  Args:
    options: An argparse options object containing ts-mon flags.
    suppress_exception: True to silence any exception during the setup. Default
                        is set to True.
  """
  discovery.logger.setLevel(logging.WARNING)
  try:
    config.process_argparse_options(options)
    logging.notice('ts_mon was set up.')
    global _WasSetup  # pylint: disable=global-statement
    _WasSetup = True
  except Exception as e:
    logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
                    exc_info=True)
    if not suppress_exception:
      raise


def _GenerateTsMonArgparseOptions(service_name, short_lived,
                                  auto_flush, debug_file, task_num):
  """Generates an arg list for ts-mon to consume.

  Args:
    service_name: The name of the task we are sending metrics from.
    short_lived: Whether this process is short-lived and should use the autogen
                 hostname prefix.
    auto_flush: Whether to create a thread to automatically flush metrics every
                minute.
    debug_file: If non-none, send metrics to this path instead of to PubSub.
    task_num: Override the default task num of 0.
  """
  parser = argparse.ArgumentParser()
  config.add_argparse_options(parser)

  args = [
      '--ts-mon-target-type', 'task',
      '--ts-mon-task-service-name', service_name,
      '--ts-mon-task-job-name', service_name,
  ]

  if debug_file:
    args.extend(['--ts-mon-endpoint', 'file://' + debug_file])

  # Short lived processes will have autogen: prepended to their hostname and
  # use task-number=PID to trigger shorter retention policies under
  # chrome-infra@, and used by a Monarch precomputation to group across the
  # task number.
  # Furthermore, we assume they manually call ts_mon.Flush(), because the
  # ts_mon thread will drop messages if the process exits before it flushes.
  if short_lived:
    auto_flush = False
    fqdn = socket.getfqdn().lower()
    host = fqdn.split('.')[0]
    args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
                 '--ts-mon-task-number', str(os.getpid())])
  elif task_num:
    args.extend(['--ts-mon-task-number', str(task_num)])

  args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
  return parser.parse_args(args=args)


@contextlib.contextmanager
def _CreateTsMonFlushingProcess(options):
  """Creates a separate process to flush ts_mon metrics.

  Useful for multiprocessing scenarios where we don't want multiple ts-mon
  threads send contradictory metrics. Instead, functions in
  chromite.lib.metrics will send their calls to a Queue, which is consumed by a
  dedicated flushing process.

  Args:
    options: An argparse options object to configure ts-mon with.

  Side effects:
    Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
    to send their calls to the Queue instead of creating the metrics.
  """
  # If this is nested, we don't need to create another queue and another
  # message consumer. Do nothing to continue to use the existing queue.
  if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS:
    return

  with parallel.Manager() as manager:
    message_q = manager.Queue()

    metrics.FLUSHING_PROCESS = multiprocessing.Process(
        target=lambda: _SetupAndConsumeMessages(message_q, options))
    metrics.FLUSHING_PROCESS.start()

    # this makes the chromite.lib.metric functions use the queue.
    # note - we have to do this *after* forking the ConsumeMessages process.
    metrics.MESSAGE_QUEUE = message_q

    try:
      yield message_q
    finally:
      _CleanupMetricsFlushingProcess()


def _CleanupMetricsFlushingProcess():
  """Sends sentinal value to flushing process and .joins it."""
  # Now that there is no longer a process to listen to the Queue, re-set it
  # to None so that any future metrics are created within this process.
  message_q = metrics.MESSAGE_QUEUE
  flushing_process = metrics.FLUSHING_PROCESS
  metrics.MESSAGE_QUEUE = None
  metrics.FLUSHING_PROCESS = None

  # If the process has already died, we don't need to try to clean it up.
  if not flushing_process.is_alive():
    return

  # Send the sentinal value for "flush one more time and exit".
  try:
    message_q.put(None)
  # If the flushing process quits, the message Queue can become full.
  except IOError:
    if not flushing_process.is_alive():
      return

  logging.info('Waiting for ts_mon flushing process to finish...')
  flushing_process.join(timeout=FLUSH_INTERVAL*2)
  if flushing_process.is_alive():
    flushing_process.terminate()
  if flushing_process.exitcode:
    logging.warning('ts_mon_config flushing process did not exit cleanly.')
  logging.info('Finished waiting for ts_mon process.')


def _SetupAndConsumeMessages(message_q, options):
  """Sets up ts-mon, and starts a MetricConsumer loop.

  Args:
    message_q: The metric multiprocessing.Queue to read from.
    options: An argparse options object to configure ts-mon with.
  """
  # Configure ts-mon, but don't start up a sending thread.
  _SetupTsMonFromOptions(options, suppress_exception=True)
  if not _WasSetup:
    return

  return MetricConsumer(message_q).Consume()


class MetricConsumer(object):
  """Configures ts_mon and gets metrics from a message queue.

  This class is meant to be used in a subprocess. It configures itself
  to receive a SIGHUP signal when the parent process dies, and catches the
  signal in order to have a chance to flush any pending metrics one more time
  before quitting.
  """
  def __init__(self, message_q):
    # If our parent dies, finish flushing before exiting.
    self.reset_after_flush = []
    self.last_flush = 0
    self.pending = False
    self.message_q = message_q

    if parallel.ExitWithParent(signal.SIGHUP):
      signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush())


  def Consume(self):
    """Emits metrics from self.message_q, flushing periodically.

    The loop is terminated by a None entry on the Queue, which is a friendly
    signal from the parent process that it's time to shut down. Before
    returning, we wait to flush one more time to make sure that all the
    metrics were sent.
    """
    message = self.message_q.get()
    while message:
      self._CallMetric(message)
      message = self._WaitForNextMessage()

    if self.pending:
      self._WaitToFlush()


  def _CallMetric(self, message):
    """Calls the metric method from |message|, ignoring exceptions."""
    try:
      cls = getattr(metrics, message.metric_name)
      message.method_kwargs.setdefault('fields', {})
      message.metric_kwargs.setdefault('field_spec', [])
      message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = (
          AddCommonFields(message.method_kwargs['fields'],
                          message.metric_kwargs['field_spec']))
      metric = cls(*message.metric_args, **message.metric_kwargs)
      if message.reset_after:
        self.reset_after_flush.append(metric)
      getattr(metric, message.method)(
          *message.method_args,
          **message.method_kwargs)
      self.pending = True
    except Exception:
      logging.exception('Caught an exception while running %s',
                        _MethodCallRepr(message))


  def _WaitForNextMessage(self):
    """Waits for a new message, flushing every |FLUSH_INTERVAL| seconds."""
    while True:
      time_delta = self._FlushIfReady()
      try:
        timeout = FLUSH_INTERVAL - time_delta
        message = self.message_q.get(timeout=timeout)
        return message
      except Queue.Empty:
        pass


  def _WaitToFlush(self):
    """Sleeps until the next time we can call metrics.Flush(), then flushes."""
    time_delta = time.time() - self.last_flush
    time.sleep(max(0, FLUSH_INTERVAL - time_delta))
    metrics.Flush(reset_after=self.reset_after_flush)


  def _FlushIfReady(self):
    """Call metrics.Flush() if we are ready and have pending metrics.

    This allows us to only call flush every FLUSH_INTERVAL seconds.
    """
    now = time.time()
    time_delta = now - self.last_flush
    if time_delta > FLUSH_INTERVAL:
      self.last_flush = now
      time_delta = 0
      metrics.Flush(reset_after=self.reset_after_flush)
      self.pending = False
    return time_delta


def _MethodCallRepr(message):
  """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)

  Args:
    message: A MetricCall object.
  """
  if not message:
    return repr(message)
  obj = message.metric_name
  method = message.method
  args = message.method_args
  kwargs = message.method_kwargs

  args_strings = ([repr(x) for x in args] +
                  [(str(k) + '=' + repr(v))
                   for k, v in kwargs.items()])
  return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))
