#!/usr/bin/env python2

# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Load generator for devserver.

Example usage:

# Find DUTs in suites pool to test with:
atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready

# Lock DUTs:
atest host mod -l -r 'quick provision testing' DUT1 DUT2

# Create config file with DUTs to test and builds to use.
cat >config.json <<EOD
{
  "BOARD": {
    "duts": [
      "chromeosX-rowY-rackZ-hostA",
      "chromeosX-rowY-rackZ-hostB",
    ],
    "versions": [
      "auron_paine-paladin/R65-10208.0.0-rc2",
      "auron_paine-paladin/R65-10208.0.0-rc3",
      "auron_paine-paladin/R65-10209.0.0-rc1"
    ]
  },
}
EOD

# Do 100 total provisions, aiming to have 10 active simultaneously.
loadtest.py $DS config.json --simultaneous 10 --total 100

# Unlock DUTs:
atest host mod -u DUT1 DUT2
"""

import argparse
import collections
import datetime
import json
import logging
import random
import re
import signal
import subprocess
import sys
import time

import common
from autotest_lib.client.common_lib import time_utils
from autotest_lib.client.common_lib.cros import dev_server
from chromite.lib import locking
from chromite.lib import parallel

# Paylods to stage.
PAYLOADS = ['quick_provision', 'stateful']

# Number of seconds between full status checks.
STATUS_POLL_SECONDS = 2

# Number of successes/failures to blacklist a DUT.
BLACKLIST_CONSECUTIVE_FAILURE = 2
BLACKLIST_TOTAL_SUCCESS = 0
BLACKLIST_TOTAL_FAILURE = 5

def get_parser():
    """Creates the argparse parser."""
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('server', type=str, action='store',
                        help='Devserver to load test.')
    parser.add_argument('config', type=str, action='store',
                        help='Path to JSON config file.'
                             'Config file is indexed by board with keys of '
                             '"duts" and "versions", each a list.')
    parser.add_argument('--blacklist-consecutive', '-C', type=int,
                        action='store',
                        help=('Consecutive number of failures before '
                              'blacklisting DUT (default %d).') %
                             BLACKLIST_CONSECUTIVE_FAILURE,
                        default=BLACKLIST_CONSECUTIVE_FAILURE)
    parser.add_argument('--blacklist-success', '-S', type=int, action='store',
                        help=('Total number of successes before blacklisting '
                              'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
                        default=BLACKLIST_TOTAL_SUCCESS)
    parser.add_argument('--blacklist-total', '-T', type=int, action='store',
                        help=('Total number of failures before blacklisting '
                              'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
                        default=BLACKLIST_TOTAL_FAILURE)
    parser.add_argument('--boards', '-b', type=str, action='store',
                        help='Comma-separated list of boards to provision.')
    parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
                        help='Do not attempt to provision.')
    parser.add_argument('--duts', '-d', type=str, action='store',
                        help='Comma-separated list of duts to provision.')
    parser.add_argument('--outputlog', '-l', type=str, action='store',
                        help='Path to append JSON entries to.')
    parser.add_argument('--output', '-o', type=str, action='store',
                        help='Path to write JSON file to.')
    parser.add_argument('--ping', '-p', action='store_true',
                        help='Ping DUTs and blacklist unresponsive ones.')
    parser.add_argument('--simultaneous', '-s', type=int, action='store',
                        help='Number of simultaneous provisions to run.',
                        default=1)
    parser.add_argument('--no-stage', action='store_false',
                        dest='stage', default=True,
                        help='Do not attempt to stage builds.')
    parser.add_argument('--total', '-t', type=int, action='store',
                        help='Number of total provisions to run.',
                        default=0)
    return parser

def make_entry(entry_id, name, status, start_time,
               finish_time=None, parent=None, **kwargs):
    """Generate an event log entry to be stored in Cloud Datastore.

    @param entry_id: A (Kind, id) tuple representing the key.
    @param name: A string identifying the event
    @param status: A string identifying the status of the event.
    @param start_time: A datetime of the start of the event.
    @param finish_time: A datetime of the finish of the event.
    @param parent: A (Kind, id) tuple representing the parent key.

    @return A dictionary representing the entry suitable for dumping via JSON.
    """
    entry = {
        'id': entry_id,
        'name': name,
        'status': status,
        'start_time': time_utils.to_epoch_time(start_time),
    }
    if finish_time is not None:
        entry['finish_time'] = time_utils.to_epoch_time(finish_time)
    if parent is not None:
        entry['parent'] = parent
    return entry

class Job(object):
    """Tracks a single provision job."""
    def __init__(self, ds, host_name, build_name,
                 entry_id=0, parent=None, board=None,
                 start_active=0,
                 force_update=False, full_update=False,
                 clobber_stateful=True, quick_provision=True,
                 ping=False, dryrun=False):

        self.ds = ds
        self.host_name = host_name
        self.build_name = build_name

        self.entry_id = ('Job', entry_id)
        self.parent = parent
        self.board = board
        self.start_active = start_active
        self.end_active = None
        self.check_active_sum = 0
        self.check_active_count = 0

        self.start_time = datetime.datetime.now()
        self.finish_time = None
        self.trigger_response = None

        self.ping = ping
        self.pre_ping = None
        self.post_ping = None

        self.kwargs = {
            'host_name': host_name,
            'build_name': build_name,
            'force_update': force_update,
            'full_update': full_update,
            'clobber_stateful': clobber_stateful,
            'quick_provision': quick_provision,
        }

        if dryrun:
            self.finish_time = datetime.datetime.now()
            self.raised_error = None
            self.success = True
            self.pid = 0
        else:
            if self.ping:
                self.pre_ping = ping_dut(self.host_name)
            self.trigger_response = ds._trigger_auto_update(**self.kwargs)

    def as_entry(self):
        """Generate an entry for exporting to datastore."""
        entry = make_entry(self.entry_id, self.host_name,
                           'pass' if self.success else 'fail',
                           self.start_time, self.finish_time, self.parent)
        entry.update({
            'build_name': self.build_name,
            'board': self.board,
            'devserver': self.ds.hostname,
            'start_active': self.start_active,
            'end_active': self.end_active,
            'force_update': self.kwargs['force_update'],
            'full_update': self.kwargs['full_update'],
            'clobber_stateful': self.kwargs['clobber_stateful'],
            'quick_provision': self.kwargs['quick_provision'],
            'elapsed': int(self.elapsed().total_seconds()),
            'trigger_response': self.trigger_response,
            'pre_ping': self.pre_ping,
            'post_ping': self.post_ping,
        })
        if self.check_active_count:
            entry['avg_active'] = (self.check_active_sum /
                                   self.check_active_count)
        return entry

    def check(self, active_count):
        """Checks if a job has completed.

        @param active_count: Number of active provisions at time of the check.
        @return: True if the job has completed, False otherwise.
        """
        if self.finish_time is not None:
            return True

        self.check_active_sum += active_count
        self.check_active_count += 1

        finished, raised_error, pid = self.ds.check_for_auto_update_finished(
            self.trigger_response, wait=False, **self.kwargs)
        if finished:
            self.finish_time = datetime.datetime.now()
            self.raised_error = raised_error
            self.success = self.raised_error is None
            self.pid = pid
            self.end_active = active_count
            if self.ping:
                self.post_ping = ping_dut(self.host_name)

        return finished

    def elapsed(self):
        """Determine the elapsed time of the task."""
        finish_time = self.finish_time or datetime.datetime.now()
        return finish_time - self.start_time

class Runner(object):
    """Parallel provision load test runner."""
    def __init__(self, ds, duts, config, simultaneous=1, total=0,
                 outputlog=None, ping=False, blacklist_consecutive=None,
                 blacklist_success=None, blacklist_total=None, dryrun=False):
        self.ds = ds
        self.duts = duts
        self.config = config
        self.start_time = datetime.datetime.now()
        self.finish_time = None
        self.simultaneous = simultaneous
        self.total = total
        self.outputlog = outputlog
        self.ping = ping
        self.blacklist_consecutive = blacklist_consecutive
        self.blacklist_success = blacklist_success
        self.blacklist_total = blacklist_total
        self.dryrun = dryrun

        self.active = []
        self.started = 0
        self.completed = []
        # Track DUTs which have failed multiple times.
        self.dut_blacklist = set()
        # Track versions of each DUT to provision in order.
        self.last_versions = {}

        # id for the parent entry.
        # TODO: This isn't the most unique.
        self.entry_id = ('Runner',
                         int(time_utils.to_epoch_time(datetime.datetime.now())))

        # ids for the job entries.
        self.next_id = 0

        if self.outputlog:
            dump_entries_as_json([self.as_entry()], self.outputlog)

    def signal_handler(self, signum, frame):
        """Signal handle to dump current status."""
        logging.info('Received signal %s', signum)
        if signum == signal.SIGUSR1:
            now = datetime.datetime.now()
            logging.info('%d active provisions, %d completed provisions, '
                         '%s elapsed:',
                         len(self.active), len(self.completed),
                         now - self.start_time)
            for job in self.active:
                logging.info('  %s -> %s, %s elapsed',
                             job.host_name, job.build_name,
                             now - job.start_time)

    def as_entry(self):
        """Generate an entry for exporting to datastore."""
        entry = make_entry(self.entry_id, 'Runner', 'pass',
                           self.start_time, self.finish_time)
        entry.update({
            'devserver': self.ds.hostname,
        })
        return entry

    def get_completed_entries(self):
        """Retrieves all completed jobs as entries for datastore."""
        entries = [self.as_entry()]
        entries.extend([job.as_entry() for job in self.completed])
        return entries

    def get_next_id(self):
        """Get the next Job id."""
        entry_id = self.next_id
        self.next_id += 1
        return entry_id

    def spawn(self, host_name, build_name):
        """Spawn a single provision job."""
        job = Job(self.ds, host_name, build_name,
                  entry_id=self.get_next_id(), parent=self.entry_id,
                  board=self.get_dut_board_type(host_name),
                  start_active=len(self.active), ping=self.ping,
                  dryrun=self.dryrun)
        self.active.append(job)
        logging.info('Provision (%d) of %s to %s started',
                     job.entry_id[1], job.host_name, job.build_name)
        self.last_versions[host_name] = build_name
        self.started += 1

    def replenish(self):
        """Replenish the number of active provisions to match goals."""
        while ((self.simultaneous == 0 or
                len(self.active) < self.simultaneous) and
               (self.total == 0 or self.started < self.total)):
            host_name = self.find_idle_dut()
            if host_name:
                build_name = self.find_build_for_dut(host_name)
                self.spawn(host_name, build_name)
            elif self.simultaneous:
                logging.warn('Insufficient DUTs to satisfy goal')
                return False
            else:
                return len(self.active) > 0
        return True

    def check_all(self):
        """Check the status of outstanding provisions."""
        still_active = []
        for job in self.active:
            if job.check(len(self.active)):
                logging.info('Provision (%d) of %s to %s %s in %s: %s',
                             job.entry_id[1], job.host_name, job.build_name,
                             'completed' if job.success else 'failed',
                             job.elapsed(), job.raised_error)
                entry = job.as_entry()
                logging.debug(json.dumps(entry))
                if self.outputlog:
                    dump_entries_as_json([entry], self.outputlog)
                self.completed.append(job)
                if self.should_blacklist(job.host_name):
                    logging.error('Blacklisting DUT %s', job.host_name)
                    self.dut_blacklist.add(job.host_name)
            else:
                still_active.append(job)
        self.active = still_active

    def should_blacklist(self, host_name):
        """Determines if a given DUT should be blacklisted."""
        jobs = [job for job in self.completed if job.host_name == host_name]
        total = 0
        consecutive = 0
        successes = 0
        for job in jobs:
            if not job.success:
                total += 1
                consecutive += 1
                if ((self.blacklist_total is not None and
                     total >= self.blacklist_total) or
                    (self.blacklist_consecutive is not None and
                     consecutive >= self.blacklist_consecutive)):
                    return True
            else:
                successes += 1
                if (self.blacklist_success is not None and
                    successes >= self.blacklist_success):
                    return True
                consecutive = 0
        return False

    def find_idle_dut(self):
        """Find an idle DUT to provision.."""
        active_duts = {job.host_name for job in self.active}
        idle_duts = [d for d in self.duts
                     if d not in active_duts | self.dut_blacklist]
        return random.choice(idle_duts) if len(idle_duts) else None

    def get_dut_board_type(self, host_name):
        """Determine the board type of a DUT."""
        return self.duts[host_name]

    def get_board_versions(self, board):
        """Determine the versions to provision for a board."""
        return self.config[board]['versions']

    def find_build_for_dut(self, host_name):
        """Determine a build to provision on a DUT."""
        board = self.get_dut_board_type(host_name)
        versions = self.get_board_versions(board)
        last_version = self.last_versions.get(host_name)
        try:
            last_index = versions.index(last_version)
        except ValueError:
            return versions[0]
        return versions[(last_index + 1) % len(versions)]

    def stage(self, build):
        """Stage artifacts for a given build."""
        logging.debug('Staging %s', build)
        self.ds.stage_artifacts(build, PAYLOADS)

    def stage_all(self):
        """Stage all necessary artifacts."""
        boards = set(self.duts.values())
        logging.info('Staging for %d boards', len(boards))
        funcs = []
        for board in boards:
            for build in self.get_board_versions(board):
                funcs.append(lambda build_=build: self.stage(build_))
        parallel.RunParallelSteps(funcs)

    def loop(self):
        """Run the main provision loop."""
        # Install a signal handler for status updates.
        old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
        signal.siginterrupt(signal.SIGUSR1, False)

        try:
            while True:
                self.check_all()
                if self.total != 0 and len(self.completed) >= self.total:
                    break
                if not self.replenish() and len(self.active) == 0:
                    logging.error('Unable to replenish with no active '
                                  'provisions')
                    return False
                logging.debug('%d provisions active', len(self.active))
                time.sleep(STATUS_POLL_SECONDS)
            return True
        except KeyboardInterrupt:
            return False
        finally:
            self.finish_time = datetime.datetime.now()
            # Clean up signal handler.
            signal.signal(signal.SIGUSR1, old_handler)

    def elapsed(self):
        """Determine the elapsed time of the task."""
        finish_time = self.finish_time or datetime.datetime.now()
        return finish_time - self.start_time

def dump_entries_as_json(entries, output_file):
    """Dump event log entries as json to a file.

    @param entries: A list of event log entries to dump.
    @param output_file: The file to write to.
    """
    # Write the entries out as JSON.
    logging.debug('Dumping %d entries' % len(entries))
    for e in entries:
        json.dump(e, output_file, sort_keys=True)
        output_file.write('\n')
        output_file.flush()

def ping_dut(hostname):
    """Checks if a host is responsive to pings."""
    if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
        hostname += '.cros'

    response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
                               stdout=subprocess.PIPE)
    return response == 0

def main(argv):
    """Load generator for a devserver."""
    parser = get_parser()
    options = parser.parse_args(argv)

    # Parse devserver.
    if options.server:
        if re.match(r'^https?://', options.server):
            server = options.server
        else:
            server = 'http://%s/' % options.server
        ds = dev_server.ImageServer(server)
    else:
        parser.print_usage()
        logging.error('Must specify devserver')
        sys.exit(1)

    # Parse config file and determine master list of duts and their board type,
    # filtering by board type if specified.
    duts = {}
    if options.config:
        with open(options.config, 'r') as f:
            config = json.load(f)
            boards = (options.boards.split(',')
                      if options.boards else config.keys())
            duts_specified = (set(options.duts.split(','))
                              if options.duts else None)
            for board in boards:
                duts.update({dut: board for dut in config[board]['duts']
                             if duts_specified is None or
                                dut in duts_specified})
        logging.info('Config file %s: %d boards, %d duts',
                     options.config, len(boards), len(duts))
    else:
        parser.print_usage()
        logging.error('Must specify config file')
        sys.exit(1)

    if options.ping:
        logging.info('Performing ping tests')
        duts_alive = {}
        for dut, board in duts.items():
            if ping_dut(dut):
                duts_alive[dut] = board
            else:
                logging.error('Ignoring DUT %s (%s) for failing initial '
                              'ping check', dut, board)
        duts = duts_alive
        logging.info('After ping tests: %d boards, %d duts', len(boards),
                     len(duts))

    # Set up the test runner and stage all the builds.
    outputlog = open(options.outputlog, 'a') if options.outputlog else None
    runner = Runner(ds, duts, config,
                    simultaneous=options.simultaneous, total=options.total,
                    outputlog=outputlog, ping=options.ping,
                    blacklist_consecutive=options.blacklist_consecutive,
                    blacklist_success=options.blacklist_success,
                    blacklist_total=options.blacklist_total,
                    dryrun=options.dryrun)
    if options.stage:
        runner.stage_all()

    # Run all the provisions.
    with locking.FileLock(options.config, blocking=True).lock():
        completed = runner.loop()
    logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
                 runner.elapsed())
    # Write all entries as JSON.
    entries = runner.get_completed_entries()
    if options.output:
        with open(options.output, 'w') as f:
            dump_entries_as_json(entries, f)
    else:
        dump_entries_as_json(entries, sys.stdout)
    logging.info('Summary: %s',
                 dict(collections.Counter([e['status'] for e in entries
                                           if e['name'] != 'Runner'])))

    # List blacklisted DUTs.
    if runner.dut_blacklist:
        logging.warn('Blacklisted DUTs:')
        for host_name in runner.dut_blacklist:
            logging.warn('  %s', host_name)

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))
