# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module containing the various stages that a builder runs."""

import json
import logging
import os

from chromite.cbuildbot import cbuildbot_alerts
from chromite.cbuildbot import commands
from chromite.cbuildbot.stages import artifact_stages
from chromite.cbuildbot.stages import generic_stages
from chromite.lib import config_lib
from chromite.lib import constants
from chromite.lib import failures_lib
from chromite.lib import gs
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import timeout_util
from chromite.lib.paygen import gspaths
from chromite.lib.paygen import paygen_build_lib
from chromite.utils import pformat


class InvalidTestConditionException(Exception):
    """Raised when pre-conditions for a test aren't met."""


class SignerTestStage(artifact_stages.ArchivingStage):
    """Run signer related tests."""

    option_name = "tests"
    config_name = "signer_tests"
    category = constants.CI_INFRA_STAGE

    # If the signer tests take longer than 30 minutes, abort. They usually take
    # five minutes to run.
    SIGNER_TEST_TIMEOUT = 30 * 60

    def PerformStage(self):
        if not self.archive_stage.WaitForRecoveryImage():
            raise InvalidTestConditionException("Missing recovery image.")
        with timeout_util.Timeout(self.SIGNER_TEST_TIMEOUT):
            commands.RunSignerTests(self._build_root, self._current_board)


class SignerResultsTimeout(failures_lib.StepFailure):
    """The signer did not produce any results inside the expected time."""


class SignerFailure(failures_lib.StepFailure):
    """The signer returned an error result."""


class MissingInstructionException(failures_lib.StepFailure):
    """We didn't receive the list of signing instructions PushImage uploaded."""


class MalformedResultsException(failures_lib.StepFailure):
    """The Signer results aren't formatted as we expect."""


class PaygenSigningRequirementsError(failures_lib.StepFailure):
    """Paygen stage can't run if signing failed."""


class PaygenCrostoolsNotAvailableError(failures_lib.StepFailure):
    """Paygen stage can't run if signing failed."""


class PaygenNoPaygenConfigForBoard(failures_lib.StepFailure):
    """Paygen can't run with a release.conf config for the board."""


class SigningStage(generic_stages.BoardSpecificBuilderStage):
    """Stage that waits for image signing.

    This stage waits for values from ArchiveStage (push_image), then waits until
    the signing servers sign the uploaded images.
    """

    option_name = "paygen"
    config_name = "paygen"
    category = constants.CI_INFRA_STAGE

    # Poll for new results every 30 seconds.
    SIGNING_PERIOD = 30

    # Timeout for the signing process. 4 hours in seconds.
    # TODO(crbug.com/1137460): Increased from 2 -> 4 hours.
    SIGNING_TIMEOUT = 4 * 60 * 60

    def __init__(self, builder_run, buildstore, board, **kwargs):
        """Init that accepts the channels argument, if present.

        Args:
          builder_run: See builder_run on ArchivingStage.
          buildstore: BuildStore instance to make DB calls with.
          board: See board on ArchivingStage.
        """
        super().__init__(builder_run, buildstore, board, **kwargs)

        # Used to remember partial results between retries.
        self.signing_results = {}

        # Filled in via WaitUntilReady, Of the form:
        #   {'channel': ['gs://instruction_uri1', 'gs://signer_instruction_uri2']}
        self.instruction_urls_per_channel = None

    def _HandleStageException(self, exc_info):
        """Override and don't set status to FAIL but FORGIVEN instead."""
        exc_type, _exc_value, _exc_tb = exc_info

        # Notify stages blocked on us if we error out.
        self.board_runattrs.SetParallel("signed_images_ready", None)

        # Warn so people look at ArchiveStage for the real error.
        if issubclass(exc_type, MissingInstructionException):
            return self._HandleExceptionAsWarning(exc_info)

        return super()._HandleStageException(exc_info)

    def _JsonFromUrl(self, gs_ctx, url):
        """Fetch a GS Url, and parse it as Json.

        Args:
          gs_ctx: GS Context.
          url: Url to fetch and parse.

        Returns:
          None if the Url doesn't exist.
          Parsed Json structure if it did.

        Raises:
          MalformedResultsException if it failed to parse.
        """
        try:
            signer_txt = gs_ctx.Cat(url)
        except gs.GSNoSuchKey:
            return None

        try:
            return json.loads(signer_txt)
        except ValueError:
            # We should never see malformed Json, even for intermediate statuses.
            raise MalformedResultsException(signer_txt)

    def _SigningStatusFromJson(self, signer_json):
        """Extract a signing status from a signer result Json DOM.

        Args:
          signer_json: The parsed json status from a signer operation.

        Returns:
          string with a simple status: SIGNER_STATUS_PASSED, SIGNER_STATUS_FAILED,
          etc, or '' if the json doesn't contain a status.
        """
        return (signer_json or {}).get("status", {}).get("status", "")

    def _CheckForResults(
        self, gs_ctx, instruction_urls_per_channel, channel_notifier=None
    ):
        """timeout_util.WaitForSuccess func to check a list of signer results.

        Args:
          gs_ctx: Google Storage Context.
          instruction_urls_per_channel: Urls of the signer result files
                                        we're expecting.
          channel_notifier: Method to call when a channel is ready or None.

        Returns:
          Number of results not yet collected.
        """
        COMPLETED_STATUS = (
            constants.SIGNER_STATUS_PASSED,
            constants.SIGNER_STATUS_FAILED,
        )

        # Assume we are done, then try to prove otherwise.
        results_completed = True

        for channel in instruction_urls_per_channel.keys():
            self.signing_results.setdefault(channel, {})

            if len(self.signing_results[channel]) == len(
                instruction_urls_per_channel[channel]
            ):
                continue

            for url in instruction_urls_per_channel[channel]:
                # Convert from instructions URL to instructions result URL.
                url += ".json"

                # We already have a result for this URL.
                if url in self.signing_results[channel]:
                    continue

                try:
                    signer_json = self._JsonFromUrl(gs_ctx, url)
                except MalformedResultsException as e:
                    logging.warning("Received malformed json: %s", e)
                    continue

                if self._SigningStatusFromJson(signer_json) in COMPLETED_STATUS:
                    # If we find a completed result, remember it.
                    self.signing_results[channel][url] = signer_json

            # If we don't have full results for this channel, we aren't done
            # waiting.
            if len(self.signing_results[channel]) != len(
                instruction_urls_per_channel[channel]
            ):
                results_completed = False
                continue

            # If we reach here, the channel has just been completed for the first
            # time.

            # If all results passed the channel was successfully signed.
            channel_success = True
            for signer_result in self.signing_results[channel].values():
                if (
                    self._SigningStatusFromJson(signer_result)
                    != constants.SIGNER_STATUS_PASSED
                ):
                    channel_success = False

            # If we successfully completed the channel, inform someone.
            if channel_success and channel_notifier:
                channel_notifier(channel)

        return results_completed

    def _WaitForSigningResults(
        self, instruction_urls_per_channel, channel_notifier=None
    ):
        """Do the work of waiting for signer results and logging them.

        Args:
          instruction_urls_per_channel: push_image data (see _WaitForPushImage).
          channel_notifier: Method to call with channel name when ready or None.

        Raises:
          ValueError: If the signer result isn't valid json.
          RunCommandError: If we are unable to download signer results.
        """
        gs_ctx = gs.GSContext(dry_run=self._run.options.debug)

        try:
            logging.info("Waiting for signer results.")
            timeout_util.WaitForReturnTrue(
                self._CheckForResults,
                func_args=(
                    gs_ctx,
                    instruction_urls_per_channel,
                    channel_notifier,
                ),
                timeout=self.SIGNING_TIMEOUT,
                period=self.SIGNING_PERIOD,
            )
        except timeout_util.TimeoutError:
            msg = "Image signing timed out."
            logging.error(msg)
            cbuildbot_alerts.PrintBuildbotStepText(msg)
            raise SignerResultsTimeout(msg)

        # Log all signer results, then handle any signing failures.
        failures = []
        for url_results in self.signing_results.values():
            for url, signer_result in url_results.items():
                result_description = os.path.basename(url)
                cbuildbot_alerts.PrintBuildbotStepText(result_description)
                logging.info("Received results for: %s", result_description)
                logging.info(pformat.json(signer_result))

                status = self._SigningStatusFromJson(signer_result)
                if status != constants.SIGNER_STATUS_PASSED:
                    failures.append(result_description)
                    logging.error("Signing failed for: %s", result_description)
                    details = signer_result.get("status", {}).get("details")
                    if details:
                        logging.info("Details:\n%s", details)

        if failures:
            logging.error("Failure summary:")
            for failure in failures:
                logging.error("  %s", failure)
            raise SignerFailure(", ".join([str(f) for f in failures]))

    def WaitUntilReady(self):
        """Block until push_image data is ready.

        Sets self.instruction_urls_per_channel as described in __init__.

        Returns:
          Boolean that tells if we can run this stage.
        """
        # This call will NEVER time out.
        self.instruction_urls_per_channel = self.board_runattrs.GetParallel(
            "instruction_urls_per_channel", timeout=None
        )

        # A value of None signals an error in PushImage.
        if self.instruction_urls_per_channel is None:
            # ArchiveStage PushImage failed. Signing won't run at all.
            self.board_runattrs.SetParallel("signed_images_ready", None)
            return False

        return True

    def PerformStage(self):
        """Do the work of generating our release payloads."""
        # Convert to release tools naming for boards.
        board = self._current_board.replace("_", "-")
        version = self._run.attrs.release_tag

        logging.info("Waiting for image signing for: %s, %s", board, version)
        logging.info("GS errors are a normal part of the polling for results.")
        self._WaitForSigningResults(self.instruction_urls_per_channel)

        # Notify stages blocked on us that images are for the given channel list.
        channels = list(self.instruction_urls_per_channel)
        self.board_runattrs.SetParallel("signed_images_ready", channels)


class PaygenStage(generic_stages.BoardSpecificBuilderStage):
    """Stage that generates release payloads.

    If this stage is created with a 'channels' argument, it can run
    independently. Otherwise, it's dependent on values queued up by
    the SigningStage.
    """

    option_name = "paygen"
    config_name = "paygen"
    category = constants.CI_INFRA_STAGE

    def __init__(self, builder_run, buildstore, board, channels=None, **kwargs):
        """Init that accepts the channels argument, if present.

        Args:
          builder_run: See builder_run on ArchivingStage.
          buildstore: BuildStore instance to make DB calls with.
          board: See board on ArchivingStage.
          channels: Explicit list of channels to generate payloads for.
                    If empty, will instead wait on values from push_image.
                    Channels is normally None in release builds, and normally set
                    for trybot 'payloads' builds.
        """
        super().__init__(builder_run, buildstore, board, **kwargs)
        self.channels = channels

    def _HandleStageException(self, exc_info):
        """Override and don't set status to FAIL but FORGIVEN instead."""
        exc_type, _exc_value, _exc_tb = exc_info

        # If Paygen fails to find anything needed in release.conf, treat it
        # as a warning. This is common during new board bring up.
        if issubclass(exc_type, PaygenNoPaygenConfigForBoard):
            return self._HandleExceptionAsWarning(exc_info)

        # If the SigningStage failed, we warn that we didn't run, but don't fail
        # outright. Let SigningStage decide if this should kill the build.
        if issubclass(exc_type, SignerFailure):
            return self._HandleExceptionAsWarning(exc_info)
        return super()._HandleStageException(exc_info)

    def WaitUntilReady(self):
        """Block until signed images are ready.

        Returns:
          Boolean that tells if we can run this stage.
        """
        # If we did got an explicit channel list, there is no need to wait.
        if self.channels is None:
            # Wait for channels from signing stage.
            self.channels = self.board_runattrs.GetParallel(
                "signed_images_ready", timeout=None
            )

            # If the signing stage errored out for any reason.
            if self.channels is None:
                # SigningStage failed. Payloads can't be generated.
                return False

        return True

    def PerformStage(self):
        """Do the work of generating our release payloads."""
        # Convert to release tools naming for boards.
        board = self._current_board.replace("_", "-")
        version = self._run.attrs.release_tag

        assert version, "We can't generate payloads without a release_tag."
        logging.info("Generating payloads for: %s, %s", board, version)

        # Test to see if the current board has a Paygen configuration. We do
        # this here, not in the sub-process so we don't have to pass back a
        # failure reason.
        try:
            paygen_build_lib.ValidateBoardConfig(board)
        except paygen_build_lib.BoardNotConfigured:
            raise PaygenNoPaygenConfigForBoard(
                "Golden Eye (%s) has no entry for board %s. Get a TPM to fix."
                % (paygen_build_lib.PAYGEN_URI, board)
            )

        with parallel.BackgroundTaskRunner(
            self._RunPaygenInProcess
        ) as per_channel:
            logging.info("Using channels: %s", self.channels)

            # Set an metadata with the channels we've had configured.
            self._run.attrs.metadata.UpdateWithDict(
                {"channels": ",".join(self.channels)}
            )

            # If we have an explicit list of channels, use it.
            for channel in self.channels:
                per_channel.put(
                    (
                        channel,
                        board,
                        version,
                        self._run.options.debug,
                        self._run.config.paygen_skip_testing,
                        self._run.config.paygen_skip_delta_payloads,
                    )
                )

    def _RunPaygenInProcess(
        self, channel, board, version, debug, disable_tests, skip_delta_payloads
    ):
        """Runs the PaygenBuild and PaygenTest stage (if applicable)"""
        PaygenBuildStage(
            self._run,
            self.buildstore,
            board,
            channel,
            version,
            debug,
            disable_tests,
            skip_delta_payloads,
        ).Run()


class PaygenBuildStage(generic_stages.BoardSpecificBuilderStage):
    """Stage that generates payloads and uploads to Google Storage."""

    category = constants.CI_INFRA_STAGE

    def __init__(
        self,
        builder_run,
        buildstore,
        board,
        channel,
        version,
        debug,
        skip_testing,
        skip_delta_payloads,
        **kwargs,
    ):
        """Init that accepts the channels argument, if present.

        Args:
          builder_run: See builder_run on ArchiveStage
          buildstore: BuildStore instance to make DB calls with.
          board: Board of payloads to generate ('x86-mario', 'x86-alex-he', etc)
          channel: Channel of payloads to generate ('stable', 'beta', etc)
          version: Version of payloads to generate.
          debug: Flag telling if this is a real run, or a test run.
          skip_testing: Do not generate test artifacts or run payload tests.
          skip_delta_payloads: Skip generating delta payloads.
        """
        super().__init__(
            builder_run,
            buildstore,
            board,
            suffix=channel.capitalize(),
            **kwargs,
        )
        self._run = builder_run
        self.board = board
        self.channel = channel
        self.version = version
        self.debug = debug
        self.skip_testing = skip_testing
        self.skip_delta_payloads = skip_delta_payloads

    def PerformStage(self):
        """Invoke payload generation. If testing is enabled, schedule tests.

        This method is intended to be safe to invoke inside a process.
        """
        # Convert to release tools naming for channels.
        if not self.channel.endswith("-channel"):
            self.channel += "-channel"

        with osutils.TempDir(sudo_rm=True) as tempdir:
            # Create the definition of the build to generate payloads for.
            build = gspaths.Build(
                channel=self.channel,
                board=self.board,
                version=self.version,
                bucket=gspaths.ChromeosReleases.BUCKET,
            )
            payload_build = gspaths.Build(build)
            if self.debug:
                payload_build.bucket = gspaths.ChromeosReleases.TEST_BUCKET

            try:
                # Generate the payloads.
                self._PrintLoudly(
                    "Starting %s, %s, %s"
                    % (self.channel, self.version, self.board)
                )
                paygen = paygen_build_lib.PaygenBuild(
                    build,
                    payload_build,
                    work_dir=tempdir,
                    site_config=self._run.site_config,
                    dry_run=self.debug,
                    skip_delta_payloads=self.skip_delta_payloads,
                )

                testdata = paygen.CreatePayloads()

                # Now, schedule the payload tests if desired.
                if not self.skip_testing:
                    (
                        suite_name,
                        archive_board,
                        archive_build,
                        payload_test_configs,
                    ) = testdata
                    # For unified builds, only test against the specified models.
                    if self._run.config.models:
                        au_models = []
                        for model in self._run.config.models:
                            # 'au' is a test suite generated in ge_build_config.json
                            if model.test_suites and "au" in model.test_suites:
                                au_models.append(model)

                        if len(au_models) > 1:
                            fsi_configs = set(
                                p
                                for p in payload_test_configs
                                if p.payload_type
                                == paygen_build_lib.PAYLOAD_TYPE_FSI
                            )
                            non_fsi_configs = set(
                                p
                                for p in payload_test_configs
                                if p not in fsi_configs
                            )
                            # Schedule FSI's on every model even those not in the 'au' suite.
                            # This ensures no FSI tests are missed from models being disabled
                            # in the lab.
                            stages = self._ScheduleForApplicableModels(
                                archive_board,
                                archive_build,
                                self._run.config.models,
                                fsi_configs,
                                suite_name,
                            )
                            # Schedule the rest only on models in the 'au' suite.
                            stages += self._ScheduleForApplicableModels(
                                archive_board,
                                archive_build,
                                au_models,
                                non_fsi_configs,
                                suite_name,
                            )
                            steps = [stage.Run for stage in stages]
                            parallel.RunParallelSteps(steps)
                        elif len(au_models) == 1:
                            model = au_models[0]
                            PaygenTestStage(
                                self._run,
                                self.buildstore,
                                suite_name,
                                archive_board,
                                model.name,
                                model.lab_board_name,
                                self.channel,
                                archive_build,
                                self.debug,
                                payload_test_configs,
                                config_lib.GetHWTestEnv(
                                    self._run.config, model_config=model
                                ),
                            ).Run()
                    else:
                        lab_board_name = config_lib.GetNonUniBuildLabBoardName(
                            archive_board
                        )
                        PaygenTestStage(
                            self._run,
                            self.buildstore,
                            suite_name,
                            archive_board,
                            None,
                            lab_board_name,
                            self.channel,
                            archive_build,
                            self.debug,
                            payload_test_configs,
                            config_lib.GetHWTestEnv(self._run.config),
                        ).Run()

            except (paygen_build_lib.BuildLocked) as e:
                # These errors are normal if it's possible that another builder is
                # processing the same build. (perhaps by a trybot generating payloads on
                # request).
                logging.info(
                    "PaygenBuild for %s skipped because: %s", self.channel, e
                )

    def _ScheduleForModels(
        self, archive_board, archive_build, models, payload_configs, suite_name
    ):
        """Schedule AU tests on models.

        Args:
          archive_board: The board we schedule against.
          archive_build: The build of the payload config.
          models: The models with 'au' enabled.
          payload_configs: The list of payload configs.
          suite_name: The name of the suite we are scheduling.
        """
        return [
            PaygenTestStage(
                self._run,
                self.buildstore,
                suite_name,
                archive_board,
                model.name,
                model.lab_board_name,
                self.channel,
                archive_build,
                self.debug,
                payload_configs,
                config_lib.GetHWTestEnv(self._run.config, model_config=model),
            )
            for model in models
        ]

    def _ScheduleForApplicableModels(
        self, archive_board, archive_build, models, payload_configs, suite_name
    ):
        """Schedule AU tests on applicable_models, if any.

        Note that if a config has no applicable models then no tests will be
        scheduled for it. N2N and stepping stone configs never have
        applicable_models so they will run on everything.

        Args:
          archive_board: The board we schedule against.
          archive_build: The build of the payload config.
          models: The list of models to iterate over.
          payload_configs: The list of payload configs.
          suite_name: The name of the suite we are scheduling.
        """
        stages = []
        for payload_config in payload_configs:
            # N2N and stepping stone payloads do not have applicable_models so
            # just schedule on ALL models that should be running tests in the
            # lab.
            if payload_config.payload_type in (
                paygen_build_lib.PAYLOAD_TYPE_N2N,
                paygen_build_lib.PAYLOAD_TYPE_STEPPING_STONE,
            ):
                stages += self._ScheduleForModels(
                    archive_board,
                    archive_build,
                    models,
                    [payload_config],
                    suite_name,
                )
                continue
            applicable_models = []
            for m in models:
                if m.name in (payload_config.applicable_models or []):
                    applicable_models.append(m)
            stages += self._ScheduleForModels(
                archive_board,
                archive_build,
                applicable_models,
                [payload_config],
                suite_name,
            )
        return stages


class PaygenTestStage(generic_stages.BoardSpecificBuilderStage):
    """Stage that schedules the payload tests."""

    category = constants.CI_INFRA_STAGE

    def __init__(
        self,
        builder_run,
        buildstore,
        suite_name,
        board,
        model,
        lab_board_name,
        channel,
        build,
        debug,
        payload_test_configs,
        test_env,
        **kwargs,
    ):
        """Init that accepts the channels argument, if present.

        Args:
          builder_run: See builder_run on ArchiveStage
          buildstore: BuildStore instance to make DB calls with.
          suite_name: See builder_run on ArchiveStage
          board: Board overlay name.
          model: Model that will be tested. ('reef', 'pyro', etc)
          lab_board_name: The actual board label tested against in Autotest
          channel: Channel of payloads to generate ('stable', 'beta', etc)
          build: Version of payloads to generate.
          debug: Boolean indicating if this is a test run or a real run.
          payload_test_configs: A list of test_params.TestConfig objects. Only used
                                for scheduling HWTest with skylab tool.
          test_env: A string to indicate the env that the test should run in. The
                    value could be constants.ENV_SKYLAB or constants.ENV_AUTOTEST.
        """
        self.suite_name = suite_name
        self.board = board
        self.model = model
        self.lab_board_name = lab_board_name

        self.build = build
        self.debug = debug
        self.payload_test_configs = payload_test_configs
        assert test_env in [constants.ENV_SKYLAB, constants.ENV_AUTOTEST]
        self.test_env = test_env
        # We don't need the '-channel'suffix.
        if channel.endswith("-channel"):
            channel = channel[0 : -len("-channel")]
        suffix = channel.capitalize()
        if model:
            suffix += " [%s]" % model

        super().__init__(
            builder_run, buildstore, board, suffix=suffix, **kwargs
        )

    def PerformStage(self):
        """Schedule the tests to run."""
        # Schedule the tests to run and wait for the results.
        paygen_build_lib.ScheduleAutotestTests(
            self.suite_name,
            self.lab_board_name,
            self.model,
            self.build,
            self.payload_test_configs,
        )

    def _HandleStageException(self, exc_info):
        """Override and don't set status to FAIL but FORGIVEN instead."""
        exc_type, exc_value, _exc_tb = exc_info

        # If the exception is a TestLabFailure that means we couldn't schedule the
        # test. We don't fail the build for that. We do the CompoundFailure dance,
        # because that's how we'll get failures from background processes returned
        # to us.
        if issubclass(exc_type, failures_lib.TestLabFailure) or (
            issubclass(exc_type, failures_lib.CompoundFailure)
            and exc_value.MatchesFailureType(failures_lib.TestLabFailure)
        ):
            return self._HandleExceptionAsWarning(exc_info)

        return super()._HandleStageException(exc_info)
