# -*- coding: utf-8 -*-
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module containing the various stages that a builder runs."""

from __future__ import print_function

import json
import os

from chromite.cbuildbot import commands
from chromite.lib import failures_lib
from chromite.lib import config_lib
from chromite.cbuildbot.stages import artifact_stages
from chromite.cbuildbot.stages import generic_stages
from chromite.lib import constants
from chromite.lib import cros_logging as logging
from chromite.lib import gs
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import timeout_util
from chromite.lib.paygen import gspaths
from chromite.lib.paygen import paygen_build_lib


class InvalidTestConditionException(Exception):
  """Raised when pre-conditions for a test aren't met."""


class SignerTestStage(artifact_stages.ArchivingStage):
  """Run signer related tests."""

  option_name = 'tests'
  config_name = 'signer_tests'
  category = constants.CI_INFRA_STAGE

  # If the signer tests take longer than 30 minutes, abort. They usually take
  # five minutes to run.
  SIGNER_TEST_TIMEOUT = 30 * 60

  def PerformStage(self):
    if not self.archive_stage.WaitForRecoveryImage():
      raise InvalidTestConditionException('Missing recovery image.')
    with timeout_util.Timeout(self.SIGNER_TEST_TIMEOUT):
      commands.RunSignerTests(self._build_root, self._current_board)


class SignerResultsTimeout(failures_lib.StepFailure):
  """The signer did not produce any results inside the expected time."""


class SignerFailure(failures_lib.StepFailure):
  """The signer returned an error result."""


class MissingInstructionException(failures_lib.StepFailure):
  """We didn't receive the list of signing instructions PushImage uploaded."""


class MalformedResultsException(failures_lib.StepFailure):
  """The Signer results aren't formatted as we expect."""


class PaygenSigningRequirementsError(failures_lib.StepFailure):
  """Paygen stage can't run if signing failed."""


class PaygenCrostoolsNotAvailableError(failures_lib.StepFailure):
  """Paygen stage can't run if signing failed."""


class PaygenNoPaygenConfigForBoard(failures_lib.StepFailure):
  """Paygen can't run with a release.conf config for the board."""


class SigningStage(generic_stages.BoardSpecificBuilderStage):
  """Stage that waits for image signing.

  This stage waits for values from ArchiveStage (push_image), then waits until
  the signing servers sign the uploaded images.
  """
  option_name = 'paygen'
  config_name = 'paygen'
  category = constants.CI_INFRA_STAGE

  # Poll for new results every 30 seconds.
  SIGNING_PERIOD = 30

  # Timeout for the signing process. 2 hours in seconds.
  SIGNING_TIMEOUT = 2 * 60 * 60

  def __init__(self, builder_run, buildstore, board, **kwargs):
    """Init that accepts the channels argument, if present.

    Args:
      builder_run: See builder_run on ArchivingStage.
      buildstore: BuildStore instance to make DB calls with.
      board: See board on ArchivingStage.
    """
    super(SigningStage, self).__init__(builder_run, buildstore, board, **kwargs)

    # Used to remember partial results between retries.
    self.signing_results = {}

    # Filled in via WaitUntilReady, Of the form:
    #   {'channel': ['gs://instruction_uri1', 'gs://signer_instruction_uri2']}
    self.instruction_urls_per_channel = None

  def _HandleStageException(self, exc_info):
    """Override and don't set status to FAIL but FORGIVEN instead."""
    exc_type, _exc_value, _exc_tb = exc_info

    # Notify stages blocked on us if we error out.
    self.board_runattrs.SetParallel('signed_images_ready', None)

    # Warn so people look at ArchiveStage for the real error.
    if issubclass(exc_type, MissingInstructionException):
      return self._HandleExceptionAsWarning(exc_info)

    return super(SigningStage, self)._HandleStageException(exc_info)

  def _JsonFromUrl(self, gs_ctx, url):
    """Fetch a GS Url, and parse it as Json.

    Args:
      gs_ctx: GS Context.
      url: Url to fetch and parse.

    Returns:
      None if the Url doesn't exist.
      Parsed Json structure if it did.

    Raises:
      MalformedResultsException if it failed to parse.
    """
    try:
      signer_txt = gs_ctx.Cat(url)
    except gs.GSNoSuchKey:
      return None

    try:
      return json.loads(signer_txt)
    except ValueError:
      # We should never see malformed Json, even for intermediate statuses.
      raise MalformedResultsException(signer_txt)

  def _SigningStatusFromJson(self, signer_json):
    """Extract a signing status from a signer result Json DOM.

    Args:
      signer_json: The parsed json status from a signer operation.

    Returns:
      string with a simple status: SIGNER_STATUS_PASSED, SIGNER_STATUS_FAILED,
      etc, or '' if the json doesn't contain a status.
    """
    return (signer_json or {}).get('status', {}).get('status', '')

  def _CheckForResults(self, gs_ctx, instruction_urls_per_channel,
                       channel_notifier=None):
    """timeout_util.WaitForSuccess func to check a list of signer results.

    Args:
      gs_ctx: Google Storage Context.
      instruction_urls_per_channel: Urls of the signer result files
                                    we're expecting.
      channel_notifier: Method to call when a channel is ready or None.

    Returns:
      Number of results not yet collected.
    """
    COMPLETED_STATUS = (constants.SIGNER_STATUS_PASSED,
                        constants.SIGNER_STATUS_FAILED)

    # Assume we are done, then try to prove otherwise.
    results_completed = True

    for channel in instruction_urls_per_channel.keys():
      self.signing_results.setdefault(channel, {})

      if (len(self.signing_results[channel]) ==
          len(instruction_urls_per_channel[channel])):
        continue

      for url in instruction_urls_per_channel[channel]:
        # Convert from instructions URL to instructions result URL.
        url += '.json'

        # We already have a result for this URL.
        if url in self.signing_results[channel]:
          continue

        try:
          signer_json = self._JsonFromUrl(gs_ctx, url)
        except MalformedResultsException as e:
          logging.warning('Received malformed json: %s', e)
          continue

        if self._SigningStatusFromJson(signer_json) in COMPLETED_STATUS:
          # If we find a completed result, remember it.
          self.signing_results[channel][url] = signer_json

      # If we don't have full results for this channel, we aren't done
      # waiting.
      if (len(self.signing_results[channel]) !=
          len(instruction_urls_per_channel[channel])):
        results_completed = False
        continue

      # If we reach here, the channel has just been completed for the first
      # time.

      # If all results passed the channel was successfully signed.
      channel_success = True
      for signer_result in self.signing_results[channel].values():
        if (self._SigningStatusFromJson(signer_result) !=
            constants.SIGNER_STATUS_PASSED):
          channel_success = False

      # If we successfully completed the channel, inform someone.
      if channel_success and channel_notifier:
        channel_notifier(channel)

    return results_completed

  def _WaitForSigningResults(self,
                             instruction_urls_per_channel,
                             channel_notifier=None):
    """Do the work of waiting for signer results and logging them.

    Args:
      instruction_urls_per_channel: push_image data (see _WaitForPushImage).
      channel_notifier: Method to call with channel name when ready or None.

    Raises:
      ValueError: If the signer result isn't valid json.
      RunCommandError: If we are unable to download signer results.
    """
    gs_ctx = gs.GSContext(dry_run=self._run.options.debug)

    try:
      logging.info('Waiting for signer results.')
      timeout_util.WaitForReturnTrue(
          self._CheckForResults,
          func_args=(gs_ctx, instruction_urls_per_channel, channel_notifier),
          timeout=self.SIGNING_TIMEOUT, period=self.SIGNING_PERIOD)
    except timeout_util.TimeoutError:
      msg = 'Image signing timed out.'
      logging.error(msg)
      logging.PrintBuildbotStepText(msg)
      raise SignerResultsTimeout(msg)

    # Log all signer results, then handle any signing failures.
    failures = []
    for url_results in self.signing_results.values():
      for url, signer_result in url_results.items():
        result_description = os.path.basename(url)
        logging.PrintBuildbotStepText(result_description)
        logging.info('Received results for: %s', result_description)
        logging.info(json.dumps(signer_result, indent=4))

        status = self._SigningStatusFromJson(signer_result)
        if status != constants.SIGNER_STATUS_PASSED:
          failures.append(result_description)
          logging.error('Signing failed for: %s', result_description)
          details = signer_result.get('status', {}).get('details')
          if details:
            logging.info('Details:\n%s', details)

    if failures:
      logging.error('Failure summary:')
      for failure in failures:
        logging.error('  %s', failure)
      raise SignerFailure(', '.join([str(f) for f in failures]))

  def WaitUntilReady(self):
    """Block until push_image data is ready.

    Sets self.instruction_urls_per_channel as described in __init__.

    Returns:
      Boolean that tells if we can run this stage.
    """
    # This call will NEVER time out.
    self.instruction_urls_per_channel = self.board_runattrs.GetParallel(
        'instruction_urls_per_channel', timeout=None)

    # A value of None signals an error in PushImage.
    if self.instruction_urls_per_channel is None:
      # ArchiveStage PushImage failed. Signing won't run at all.
      self.board_runattrs.SetParallel('signed_images_ready', None)
      return False

    return True

  def PerformStage(self):
    """Do the work of generating our release payloads."""
    # Convert to release tools naming for boards.
    board = self._current_board.replace('_', '-')
    version = self._run.attrs.release_tag

    logging.info('Waiting for image signing for: %s, %s', board, version)
    logging.info('GS errors are a normal part of the polling for results.')
    self._WaitForSigningResults(self.instruction_urls_per_channel)

    # Notify stages blocked on us that images are for the given channel list.
    channels = list(self.instruction_urls_per_channel)
    self.board_runattrs.SetParallel('signed_images_ready', channels)


class PaygenStage(generic_stages.BoardSpecificBuilderStage):
  """Stage that generates release payloads.

  If this stage is created with a 'channels' argument, it can run
  independently. Otherwise, it's dependent on values queued up by
  the SigningStage.
  """
  option_name = 'paygen'
  config_name = 'paygen'
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, board, channels=None, **kwargs):
    """Init that accepts the channels argument, if present.

    Args:
      builder_run: See builder_run on ArchivingStage.
      buildstore: BuildStore instance to make DB calls with.
      board: See board on ArchivingStage.
      channels: Explicit list of channels to generate payloads for.
                If empty, will instead wait on values from push_image.
                Channels is normally None in release builds, and normally set
                for trybot 'payloads' builds.
    """
    super(PaygenStage, self).__init__(builder_run, buildstore, board, **kwargs)
    self.channels = channels

  def _HandleStageException(self, exc_info):
    """Override and don't set status to FAIL but FORGIVEN instead."""
    exc_type, _exc_value, _exc_tb = exc_info

    # If Paygen fails to find anything needed in release.conf, treat it
    # as a warning. This is common during new board bring up.
    if issubclass(exc_type, PaygenNoPaygenConfigForBoard):
      return self._HandleExceptionAsWarning(exc_info)

    # If the SigningStage failed, we warn that we didn't run, but don't fail
    # outright. Let SigningStage decide if this should kill the build.
    if issubclass(exc_type, SignerFailure):
      return self._HandleExceptionAsWarning(exc_info)
    return super(PaygenStage, self)._HandleStageException(exc_info)

  def WaitUntilReady(self):
    """Block until signed images are ready.

    Returns:
      Boolean that tells if we can run this stage.
    """
    # If we did got an explicit channel list, there is no need to wait.
    if self.channels is None:
      # Wait for channels from signing stage.
      self.channels = self.board_runattrs.GetParallel(
          'signed_images_ready', timeout=None)

      # If the signing stage errored out for any reason.
      if self.channels is None:
        # SigningStage failed. Payloads can't be generated.
        return False

    return True

  def PerformStage(self):
    """Do the work of generating our release payloads."""
    # Convert to release tools naming for boards.
    board = self._current_board.replace('_', '-')
    version = self._run.attrs.release_tag

    assert version, "We can't generate payloads without a release_tag."
    logging.info('Generating payloads for: %s, %s', board, version)

    # Test to see if the current board has a Paygen configuration. We do
    # this here, not in the sub-process so we don't have to pass back a
    # failure reason.
    try:
      paygen_build_lib.ValidateBoardConfig(board)
    except paygen_build_lib.BoardNotConfigured:
      raise PaygenNoPaygenConfigForBoard(
          'Golden Eye (%s) has no entry for board %s. Get a TPM to fix.' %
          (paygen_build_lib.PAYGEN_URI, board))

    # Default to False, set to True if it's a canary type build
    skip_duts_check = False
    if config_lib.IsCanaryType(self._run.config.build_type):
      skip_duts_check = True

    with parallel.BackgroundTaskRunner(self._RunPaygenInProcess) as per_channel:
      logging.info('Using channels: %s', self.channels)

      # Set an metadata with the channels we've had configured.
      self._run.attrs.metadata.UpdateWithDict({'channels':
                                               ','.join(self.channels)})

      # If we have an explicit list of channels, use it.
      for channel in self.channels:
        per_channel.put((channel, board, version, self._run.options.debug,
                         self._run.config.paygen_skip_testing,
                         self._run.config.paygen_skip_delta_payloads,
                         skip_duts_check))

  def _RunPaygenInProcess(self, channel, board, version, debug,
                          disable_tests, skip_delta_payloads,
                          skip_duts_check):
    """Runs the PaygenBuild and PaygenTest stage (if applicable)"""
    PaygenBuildStage(self._run, self.buildstore, board, channel, version, debug,
                     disable_tests, skip_delta_payloads, skip_duts_check).Run()


class PaygenBuildStage(generic_stages.BoardSpecificBuilderStage):
  """Stage that generates payloads and uploads to Google Storage."""

  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, board, channel, version, debug,
               skip_testing, skip_delta_payloads, skip_duts_check, **kwargs):
    """Init that accepts the channels argument, if present.

    Args:
      builder_run: See builder_run on ArchiveStage
      buildstore: BuildStore instance to make DB calls with.
      board: Board of payloads to generate ('x86-mario', 'x86-alex-he', etc)
      channel: Channel of payloads to generate ('stable', 'beta', etc)
      version: Version of payloads to generate.
      debug: Flag telling if this is a real run, or a test run.
      skip_testing: Do not generate test artifacts or run payload tests.
      skip_delta_payloads: Skip generating delta payloads.
      skip_duts_check: Do not check minimum available DUTs before tests.
    """
    super(PaygenBuildStage, self).__init__(
        builder_run, buildstore, board, suffix=channel.capitalize(), **kwargs)
    self._run = builder_run
    self.board = board
    self.channel = channel
    self.version = version
    self.debug = debug
    self.skip_testing = skip_testing
    self.skip_delta_payloads = skip_delta_payloads
    self.skip_duts_check = skip_duts_check

  def PerformStage(self):
    """Invoke payload generation. If testing is enabled, schedule tests.

    This method is intended to be safe to invoke inside a process.
    """
    # Convert to release tools naming for channels.
    if not self.channel.endswith('-channel'):
      self.channel += '-channel'

    with osutils.TempDir(sudo_rm=True) as tempdir:
      # Create the definition of the build to generate payloads for.
      build = gspaths.Build(channel=self.channel,
                            board=self.board,
                            version=self.version,
                            bucket=gspaths.ChromeosReleases.BUCKET)
      payload_build = gspaths.Build(build)
      if self.debug:
        payload_build.bucket = gspaths.ChromeosReleases.TEST_BUCKET

      try:
        # Generate the payloads.
        self._PrintLoudly('Starting %s, %s, %s' % (self.channel, self.version,
                                                   self.board))
        paygen = paygen_build_lib.PaygenBuild(
            build,
            payload_build,
            work_dir=tempdir,
            site_config=self._run.site_config,
            dry_run=self.debug,
            skip_delta_payloads=self.skip_delta_payloads,
            skip_duts_check=self.skip_duts_check)

        testdata = paygen.CreatePayloads()

        # Now, schedule the payload tests if desired.
        if not self.skip_testing:
          (suite_name, archive_board, archive_build,
           payload_test_configs) = testdata
          # For unified builds, only test against the specified models.
          if self._run.config.models:
            models = []
            for model in self._run.config.models:
              # 'au' is a test suite generated in ge_build_config.json
              if model.test_suites and 'au' in model.test_suites:
                models.append(model)

            if len(models) > 1:
              fsi_configs = set(p for p in payload_test_configs
                                if p.payload_type ==
                                paygen_build_lib.PAYLOAD_TYPE_FSI)
              non_fsi_configs = set(p for p in payload_test_configs
                                    if p not in fsi_configs)
              stages = self._ScheduleForApplicableModels(
                  archive_board, archive_build, fsi_configs, suite_name)
              stages += self._ScheduleForModels(
                  archive_board, archive_build, models, non_fsi_configs,
                  suite_name)
              steps = [stage.Run for stage in stages]
              parallel.RunParallelSteps(steps)
            elif len(models) == 1:
              model = models[0]
              PaygenTestStage(
                  self._run, self.buildstore, suite_name, archive_board,
                  model.name, model.lab_board_name, self.channel,
                  archive_build, self.skip_duts_check, self.debug,
                  payload_test_configs,
                  config_lib.GetHWTestEnv(self._run.config,
                                          model_config=model)).Run()
          else:
            lab_board_name = config_lib.GetNonUniBuildLabBoardName(
                archive_board)
            PaygenTestStage(self._run, self.buildstore, suite_name,
                            archive_board, None, lab_board_name,
                            self.channel, archive_build, self.skip_duts_check,
                            self.debug,
                            payload_test_configs,
                            config_lib.GetHWTestEnv(self._run.config)).Run()



      except (paygen_build_lib.BuildLocked) as e:
        # These errors are normal if it's possible that another builder is
        # processing the same build. (perhaps by a trybot generating payloads on
        # request).
        logging.info('PaygenBuild for %s skipped because: %s', self.channel, e)

  def _ScheduleForModels(self, archive_board, archive_build, models,
                         non_fsi_configs, suite_name):
    """Schedule AU tests on models in the 'au' suite.

    Args:
      archive_board: The board we schedule against.
      archive_build: The build of the payload config.
      models: The models with 'au' enabled.
      non_fsi_configs: The list of payload configs.
      suite_name: The name of the suite we are scheduling.
    """
    return [
        PaygenTestStage(
            self._run, self.buildstore, suite_name, archive_board,
            model.name, model.lab_board_name, self.channel,
            archive_build, self.skip_duts_check, self.debug,
            non_fsi_configs,
            config_lib.GetHWTestEnv(self._run.config, model_config=model))
        for model in models
    ]

  def _ScheduleForApplicableModels(self, archive_board, archive_build,
                                   fsi_configs, suite_name):
    """Schedule FSI AU tests on every applicable_model.

    We schedule on every model even if it is not in the 'au' suite.
    This ensures no FSI tests are missed from models being disabled in the lab.

    Args:
      archive_board: The board we schedule against.
      archive_build: The build of the payload config.
      fsi_configs: The list of payload configs of type FSI.
      suite_name: The name of the suite we are scheduling.
    """
    stages = []
    for payload_config in fsi_configs:
      applicable_models = [m for m in self._run.config.models
                           if m.name in payload_config.applicable_models]
      stages += self._ScheduleForModels(archive_board, archive_build,
                                        applicable_models, [payload_config],
                                        suite_name)
    return stages


class PaygenTestStage(generic_stages.BoardSpecificBuilderStage):
  """Stage that schedules the payload tests."""

  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, suite_name, board, model,
               lab_board_name, channel, build, skip_duts_check, debug,
               payload_test_configs, test_env, **kwargs):
    """Init that accepts the channels argument, if present.

    Args:
      builder_run: See builder_run on ArchiveStage
      buildstore: BuildStore instance to make DB calls with.
      suite_name: See builder_run on ArchiveStage
      board: Board overlay name.
      model: Model that will be tested. ('reef', 'pyro', etc)
      lab_board_name: The actual board label tested against in Autotest
      channel: Channel of payloads to generate ('stable', 'beta', etc)
      build: Version of payloads to generate.
      skip_duts_check: Do not check minimum available DUTs before tests.
      debug: Boolean indicating if this is a test run or a real run.
      payload_test_configs: A list of test_params.TestConfig objects. Only used
                            for scheduling HWTest with skylab tool.
      test_env: A string to indicate the env that the test should run in. The
                value could be constants.ENV_SKYLAB or constants.ENV_AUTOTEST.
    """
    self.suite_name = suite_name
    self.board = board
    self.model = model
    self.lab_board_name = lab_board_name

    self.build = build
    self.skip_duts_check = skip_duts_check
    self.debug = debug
    self.payload_test_configs = payload_test_configs
    assert test_env in [constants.ENV_SKYLAB, constants.ENV_AUTOTEST]
    self.test_env = test_env
    # We don't need the '-channel'suffix.
    if channel.endswith('-channel'):
      channel = channel[0:-len('-channel')]
    suffix = channel.capitalize()
    if model:
      suffix += ' [%s]' % model

    super(PaygenTestStage, self).__init__(
        builder_run, buildstore, board, suffix=suffix, **kwargs)

  def PerformStage(self):
    """Schedule the tests to run."""
    # Schedule the tests to run and wait for the results.
    paygen_build_lib.ScheduleAutotestTests(self.suite_name,
                                           self.lab_board_name,
                                           self.model,
                                           self.build,
                                           self.skip_duts_check,
                                           self.debug,
                                           self.payload_test_configs,
                                           self.test_env,
                                           job_keyvals=self.GetJobKeyvals())

  def _HandleStageException(self, exc_info):
    """Override and don't set status to FAIL but FORGIVEN instead."""
    exc_type, exc_value, _exc_tb = exc_info

    # If the exception is a TestLabFailure that means we couldn't schedule the
    # test. We don't fail the build for that. We do the CompoundFailure dance,
    # because that's how we'll get failures from background processes returned
    # to us.
    if (issubclass(exc_type, failures_lib.TestLabFailure) or
        (issubclass(exc_type, failures_lib.CompoundFailure) and
         exc_value.MatchesFailureType(failures_lib.TestLabFailure))):
      return self._HandleExceptionAsWarning(exc_info)

    return super(PaygenTestStage, self)._HandleStageException(exc_info)
