# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module for swarming execution."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import json
import logging
import operator
import os
import urllib
import uuid

from lucifer import autotest
from skylab_suite import errors


SERVICE_ACCOUNT = '/creds/skylab_swarming_bot/skylab_bot_service_account.json'
SKYLAB_DRONE_POOL = 'ChromeOSSkylab'
SKYLAB_SUITE_POOL = 'ChromeOSSkylab-suite'

TASK_COMPLETED = 'COMPLETED'
TASK_COMPLETED_SUCCESS = 'COMPLETED (SUCCESS)'
TASK_COMPLETED_FAILURE = 'COMPLETED (FAILURE)'
TASK_EXPIRED = 'EXPIRED'
TASK_CANCELED = 'CANCELED'
TASK_TIMEDOUT = 'TIMED_OUT'
TASK_RUNNING = 'RUNNING'
TASK_PENDING = 'PENDING'
TASK_BOT_DIED = 'BOT_DIED'
TASK_NO_RESOURCE = 'NO_RESOURCE'
TASK_KILLED = 'KILLED'
TASK_FINISHED_STATUS = [TASK_COMPLETED,
                        TASK_EXPIRED,
                        TASK_CANCELED,
                        TASK_TIMEDOUT,
                        TASK_BOT_DIED,
                        TASK_NO_RESOURCE,
                        TASK_KILLED]
# The swarming task failure status to retry. TASK_CANCELED won't get
# retried since it's intentionally aborted.
TASK_STATUS_TO_RETRY = [TASK_EXPIRED, TASK_TIMEDOUT, TASK_BOT_DIED,
                        TASK_NO_RESOURCE]

DEFAULT_EXPIRATION_SECS = 10 * 60
DEFAULT_TIMEOUT_SECS = 60 * 60

# A mapping of priorities for skylab hwtest tasks. In swarming,
# lower number means high priorities. Priority lower than 48 will
# be special tasks. The upper bound for priority is 255.
# Use the same priorities mapping as chromite/lib/constants.py
SKYLAB_HWTEST_PRIORITIES_MAP = {
    'Weekly': 230,
    'CTS': 215,
    'Daily': 200,
    'PostBuild': 170,
    'Default': 140,
    'Build': 110,
    'PFQ': 80,
    'CQ': 50,
    'Super': 49,
}
SORTED_SKYLAB_HWTEST_PRIORITY = sorted(
        SKYLAB_HWTEST_PRIORITIES_MAP.items(),
        key=operator.itemgetter(1))

SWARMING_DUT_READY_STATUS = 'ready'

# The structure of fallback swarming task request is:
# NewTaskRequest:
#     ...
#     task_slices  ->  NewTaskSlice:
#                          ...
#                          properties  ->  TaskProperties
#                                              ...
TaskProperties = collections.namedtuple(
        'TaskProperties',
        [
                'command',
                'dimensions',
                'execution_timeout_secs',
                'grace_period_secs',
                'io_timeout_secs',
        ])

NewTaskSlice = collections.namedtuple(
        'NewTaskSlice',
        [
                'expiration_secs',
                'properties',
        ])

NewTaskRequest = collections.namedtuple(
        'NewTaskRequest',
        [
                'name',
                'parent_task_id',
                'priority',
                'tags',
                'user',
                'task_slices',
        ])


def _get_client():
    return os.path.join(
            os.path.expanduser('~'),
            'chromiumos/chromite/third_party/swarming.client/swarming.py')


def task_dependencies_from_labels(labels):
    """Parse dependencies from autotest labels.

    @param labels: A list of label string.

    @return a dict [key: value] to represent dependencies.
    """
    translation_autotest = autotest.deps_load(
            'skylab_inventory.translation.autotest')
    translation_swarming = autotest.deps_load(
            'skylab_inventory.translation.swarming')
    dimensions = translation_swarming.labels_to_dimensions(
            translation_autotest.from_autotest_labels(labels))
    dependencies = {}
    for k, v in dimensions.iteritems():
      if isinstance(v, list):
        if len(v) > 1:
          raise ValueError(
              'Invalid dependencies: Multiple value %r for key %s' % (k, v))

        dependencies[k] = v[0]

    return dependencies


def get_basic_swarming_cmd(command):
    return [_get_client(), command,
            '--auth-service-account-json', SERVICE_ACCOUNT,
            '--swarming', get_swarming_server()]


def make_logdog_annotation_url():
    """Return a unique LogDog annotation URL.

    If the appropriate LogDog server cannot be determined, return an
    empty string.
    """
    logdog_server = get_logdog_server()
    if not logdog_server:
        return ''
    return ('logdog://%s/chromeos/skylab/%s/+/annotations'
            % (logdog_server, uuid.uuid4().hex))


def get_swarming_server():
    """Return the swarming server for the current environment."""
    try:
        return os.environ['SWARMING_SERVER']
    except KeyError:
        raise errors.DroneEnvironmentError(
                'SWARMING_SERVER environment variable not set'
        )


def get_logdog_server():
    """Return the LogDog server for the current environment.

    If the appropriate server cannot be determined, return an empty
    string.
    """
    try:
        return os.environ['LOGDOG_SERVER']
    except KeyError:
        raise errors.DroneEnvironmentError(
                'LOGDOG_SERVER environment variable not set'
        )


def get_new_task_swarming_cmd():
    """Return a list of command args for creating a new task."""
    return get_basic_swarming_cmd('post') + ['tasks/new']


def make_fallback_request_dict(cmds, slices_dimensions, slices_expiration_secs,
                               task_name, priority, tags, user,
                               parent_task_id='',
                               expiration_secs=DEFAULT_EXPIRATION_SECS,
                               grace_period_secs=DEFAULT_TIMEOUT_SECS,
                               execution_timeout_secs=DEFAULT_TIMEOUT_SECS,
                               io_timeout_secs=DEFAULT_TIMEOUT_SECS):
    """Form a json-compatible dict for fallback swarming call.

    @param cmds: A list of cmd to run on swarming bots.
    @param slices_dimensions: A list of dict to indicates different tries'
        dimensions.
    @param slices_expiration_secs: A list of Integer to indicates each slice's
        expiration_secs.
    @param task_name: The request's name.
    @param priority: The request's priority. An integer.
    @param grace_period_secs: The seconds to send a task after a SIGTERM before
        sending it a SIGKILL.
    @param execution_timeout_secs: The seconds to run before a task gets
        terminated.
    @param io_timeout_secs: The seconds to wait before a task is considered
        hung.

    @return a json-compatible dict, as a request for swarming call.
    """
    assert len(cmds) == len(slices_dimensions)
    assert len(cmds) == len(slices_expiration_secs)
    task_slices = []
    for cmd, dimensions, expiration_secs in zip(cmds, slices_dimensions,
                                                slices_expiration_secs):
        properties = TaskProperties(
                command=cmd,
                dimensions=dimensions,
                execution_timeout_secs=execution_timeout_secs,
                grace_period_secs=grace_period_secs,
                io_timeout_secs=io_timeout_secs)
        task_slices.append(
                NewTaskSlice(
                        expiration_secs=expiration_secs,
                        properties=properties))

    task_request = NewTaskRequest(
        name=task_name,
        parent_task_id=parent_task_id,
        priority=priority,
        tags=tags,
        user=user,
        task_slices=task_slices)

    return _to_raw_request(task_request)


def _namedtuple_to_dict(value):
    """Recursively converts a namedtuple to a dict.

    Args:
      value: a namedtuple object.

    Returns:
      A dict object with the same value.
    """
    out = dict(value._asdict())
    for k, v in out.iteritems():
      if hasattr(v, '_asdict'):
        out[k] = _namedtuple_to_dict(v)
      elif isinstance(v, (list, tuple)):
        l = []
        for elem in v:
          if hasattr(elem, '_asdict'):
            l.append(_namedtuple_to_dict(elem))
          else:
            l.append(elem)
        out[k] = l

    return out


def _to_raw_request(request):
    """Returns the json-compatible dict expected by the server.

    Args:
      request: a NewTaskRequest object.

    Returns:
      A json-compatible dict, which could be parsed by swarming proxy
      service.
    """
    out = _namedtuple_to_dict(request)
    for task_slice in out['task_slices']:
        task_slice['properties']['dimensions'] = [
                {'key': k, 'value': v}
                for k, v in task_slice['properties']['dimensions'].iteritems()
        ]
        task_slice['properties']['dimensions'].sort(key=lambda x: x['key'])
    return out


def get_task_link(task_id):
    return '%s/user/task/%s' % (os.environ.get('SWARMING_SERVER'), task_id)


def get_task_final_state(task):
    """Get the final state of a swarming task.

    @param task: the json output of a swarming task fetched by API tasks.list.
    """
    state = task['state']
    if state == TASK_COMPLETED:
        state = (TASK_COMPLETED_FAILURE if task['failure'] else
                 TASK_COMPLETED_SUCCESS)

    return state


def get_task_dut_name(task_dimensions):
    """Get the DUT name of running this task.

    @param task_dimensions: a list of dict, e.g. [{'key': k, 'value': v}, ...]
    """
    for dimension in task_dimensions:
        if dimension['key'] == 'dut_name':
            return dimension['value'][0]

    return ''


def query_bots_count(dimensions):
    """Get bots count for given requirements.

    @param dimensions: A dict of dimensions for swarming bots.

    @return a dict, which contains counts for different status of bots.
    """
    basic_swarming_cmd = get_basic_swarming_cmd('query')
    conditions = [('dimensions', '%s:%s' % (k, v))
                  for k, v in dimensions.iteritems()]
    swarming_cmd = basic_swarming_cmd + ['bots/count?%s' %
                                         urllib.urlencode(conditions)]
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
    return json.loads(result.output)


def get_idle_bots_count(outputs):
    """Get the idle bots count.

    @param outputs: The outputs of |query_bots_count|.
    """
    return (int(outputs['count']) - int(outputs['busy']) - int(outputs['dead'])
            - int(outputs['quarantined']))


def query_task_by_tags(tags):
    """Get tasks for given tags.

    @param tags: A dict of tags for swarming tasks.

    @return a list, which contains all tasks queried by the given tags.
    """
    basic_swarming_cmd = get_basic_swarming_cmd('query')
    conditions = [('tags', '%s:%s' % (k, v)) for k, v in tags.iteritems()]
    swarming_cmd = basic_swarming_cmd + ['tasks/list?%s' %
                                         urllib.urlencode(conditions)]
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
    json_output = json.loads(result.output)
    return json_output.get('items', [])


def query_task_by_id(task_id):
    """Get task for given id.

    @param task_id: A string to indicate a swarming task id.

    @return a dict, which contains the task with the given task_id.
    """
    basic_swarming_cmd = get_basic_swarming_cmd('query')
    swarming_cmd = basic_swarming_cmd + ['task/%s/result' % task_id]
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
    return json.loads(result.output)


def abort_task(task_id):
    """Abort a swarming task by its id.

    @param task_id: A string swarming task id.
    """
    basic_swarming_cmd = get_basic_swarming_cmd('cancel')
    swarming_cmd = basic_swarming_cmd + ['--kill-running', task_id]
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    try:
        cros_build_lib.RunCommand(swarming_cmd, log_output=True)
    except cros_build_lib.RunCommandError:
        logging.error('Task %s probably already gone, skip canceling it.',
                      task_id)


def query_bots_list(dimensions):
    """Get bots list for given requirements.

    @param dimensions: A dict of dimensions for swarming bots.

    @return a list of bot dicts.
    """
    basic_swarming_cmd = get_basic_swarming_cmd('query')
    conditions = [('dimensions', '%s:%s' % (k, v))
                  for k, v in dimensions.iteritems()]
    swarming_cmd = basic_swarming_cmd + ['bots/list?%s' %
                                         urllib.urlencode(conditions)]
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True)
    return json.loads(result.output).get('items', [])


def bot_available(bot):
    """Check whether a bot is available.

    @param bot: A dict describes a bot's dimensions, i.e. an element in return
        list of |query_bots_list|.

    @return True if a bot is available to run task, otherwise False.
    """
    return not (bot['is_dead'] or bot['quarantined'])


def get_child_tasks(parent_task_id):
    """Get the child tasks based on a parent swarming task id.

    @param parent_task_id: The parent swarming task id.

    @return a list of dicts, each dict refers to the whole stats of a task,
        keys include 'name', 'bot_dimensions', 'tags', 'bot_id', 'state', etc.
    """
    swarming_cmd = get_basic_swarming_cmd('query')
    swarming_cmd += ['tasks/list?tags=parent_task_id:%s' % parent_task_id]
    timeout_util = autotest.chromite_load('timeout_util')
    cros_build_lib = autotest.chromite_load('cros_build_lib')
    with timeout_util.Timeout(60):
        child_tasks = cros_build_lib.RunCommand(
                swarming_cmd, capture_output=True)
        return json.loads(child_tasks.output)['items']
