# Lint as: python2, python3
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import logging
import os
import re
import shutil
import time
from six.moves import zip
from six.moves import zip_longest
import six.moves.urllib.parse

from datetime import datetime, timedelta
from xml.etree import ElementTree

from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.client.cros.update_engine import dlc_util
from autotest_lib.client.cros.update_engine import update_engine_event as uee
from autotest_lib.client.cros.update_engine import update_engine_util
from autotest_lib.server import autotest
from autotest_lib.server import test
from autotest_lib.server.cros import gsutil_wrapper
from autotest_lib.server.cros.dynamic_suite import tools
from autotest_lib.server.hosts.tls_client import connection
from autotest_lib.server.hosts.tls_client import fake_omaha
from autotest_lib.utils.frozen_chromite.lib import auto_updater
from autotest_lib.utils.frozen_chromite.lib import auto_updater_transfer
from autotest_lib.utils.frozen_chromite.lib import remote_access
from autotest_lib.utils.frozen_chromite.lib import retry_util


class UpdateEngineTest(test.test, update_engine_util.UpdateEngineUtil):
    """Base class for all autoupdate_ server tests.

    Contains useful functions shared between tests like staging payloads
    on devservers, verifying hostlogs, and launching client tests.

    """
    version = 1

    # Timeout periods, given in seconds.
    _INITIAL_CHECK_TIMEOUT = 12 * 60
    _DOWNLOAD_STARTED_TIMEOUT = 4 * 60
    _DOWNLOAD_FINISHED_TIMEOUT = 20 * 60
    _UPDATE_COMPLETED_TIMEOUT = 4 * 60
    _POST_REBOOT_TIMEOUT = 15 * 60

    # Name of the logfile generated by nebraska.py.
    _NEBRASKA_LOG = 'nebraska.log'

    # Version we tell the DUT it is on before update.
    _CUSTOM_LSB_VERSION = '0.0.0.0'

    _CELLULAR_BUCKET = 'gs://chromeos-throw-away-bucket/CrOSPayloads/Cellular/'

    _TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S'

    # Paygen.json file provides information about all builds on all channels.
    _PAYGEN_JSON_URI = 'gs://chromeos-build-release-console/paygen.json'

    def initialize(self, host=None, hosts=None):
        """
        Sets default variables for the test.

        @param host: The DUT we will be running on.
        @param hosts: If we are running a test with multiple DUTs (eg P2P)
                      we will use hosts instead of host.

        """
        self._current_timestamp = None
        self._host = host
        # Some AU tests use multiple DUTs
        self._hosts = hosts

        # Define functions used in update_engine_util.
        self._run = self._host.run if self._host else None
        self._get_file = self._host.get_file if self._host else None

        # Utilities for DLC management
        self._dlc_util = dlc_util.DLCUtil(self._run)


    def cleanup(self):
        """Clean up update_engine autotests."""
        if self._host:
            self._host.get_file(self._UPDATE_ENGINE_LOG, self.resultsdir)


    def _get_expected_events_for_rootfs_update(self, source_release):
        """
        Creates a list of expected events fired during a rootfs update.

        There are 4 events fired during a rootfs update. We will create these
        in the correct order.

        @param source_release: The source build version.

        """
        return [
            uee.UpdateEngineEvent(
                version=source_release,
                timeout=self._INITIAL_CHECK_TIMEOUT),
            uee.UpdateEngineEvent(
                event_type=uee.EVENT_TYPE_DOWNLOAD_STARTED,
                event_result=uee.EVENT_RESULT_SUCCESS,
                version=source_release,
                timeout=self._DOWNLOAD_STARTED_TIMEOUT),
            uee.UpdateEngineEvent(
                event_type=uee.EVENT_TYPE_DOWNLOAD_FINISHED,
                event_result=uee.EVENT_RESULT_SUCCESS,
                version=source_release,
                timeout=self._DOWNLOAD_FINISHED_TIMEOUT),
            uee.UpdateEngineEvent(
                event_type=uee.EVENT_TYPE_UPDATE_COMPLETE,
                event_result=uee.EVENT_RESULT_SUCCESS,
                version=source_release,
                timeout=self._UPDATE_COMPLETED_TIMEOUT)
        ]


    def _get_expected_event_for_post_reboot_check(self, source_release,
                                                  target_release):
        """
        Creates the expected event fired during post-reboot update check.

        @param source_release: The source build version.
        @param target_release: The target build version.

        """
        return [
            uee.UpdateEngineEvent(
                event_type=uee.EVENT_TYPE_REBOOTED_AFTER_UPDATE,
                event_result=uee.EVENT_RESULT_SUCCESS,
                version=target_release,
                previous_version=source_release,
                timeout = self._POST_REBOOT_TIMEOUT)
        ]


    def _verify_event_with_timeout(self, expected_event, actual_event):
        """
        Verify an expected event occurred before its timeout.

        @param expected_event: an expected event.
        @param actual_event: an actual event from the hostlog.

        @return None if event complies, an error string otherwise.

        """
        logging.info('Expecting %s within %s seconds', expected_event,
                     expected_event._timeout)
        if not actual_event:
            return ('No entry found for %s event.' % uee.get_event_type
                (expected_event._expected_attrs['event_type']))
        logging.info('Consumed new event: %s', actual_event)
        # If this is the first event, set it as the current time
        if self._current_timestamp is None:
            self._current_timestamp = datetime.strptime(
                actual_event['timestamp'], self._TIMESTAMP_FORMAT)

        # Get the time stamp for the current event and convert to datetime
        timestamp = actual_event['timestamp']
        event_timestamp = datetime.strptime(timestamp,
                                            self._TIMESTAMP_FORMAT)

        # If the event happened before the timeout
        difference = event_timestamp - self._current_timestamp

        # We check if the difference > 7 hs. If so we assume that this is due to
        # timezone jump from local to utc caused by log format change
        # crrev.com/c/2652108 and adjust accordingly. Another assumption here is
        # that the DUT are in PST timezone. The jump will be 7 hs with DST and 8
        # hs without. This hack should be removed once reasonable time has
        # passed and we do not need to consider the old log format anymore.
        # TODO(crbug.com/c/1178930): Remove the hack below.
        if difference > timedelta(hours=7):
            logging.info(
                    'Detected a timezone jump with difference %s with event %s',
                    difference,
                    uee.get_event_type(
                            expected_event._expected_attrs['event_type']))
            if difference > timedelta(hours=8):
                difference -= timedelta(hours=8)
            else:
                difference -= timedelta(hours=7)

        if difference < timedelta(seconds=expected_event._timeout):
            logging.info('Event took %s seconds to fire during the '
                         'update', difference.seconds)
            self._current_timestamp = event_timestamp
            mismatched_attrs = expected_event.equals(actual_event)
            if mismatched_attrs is None:
                return None
            else:
                return self._error_incorrect_event(
                    expected_event, actual_event, mismatched_attrs)
        else:
            return self._timeout_error_message(expected_event,
                                               difference.seconds)


    def _error_incorrect_event(self, expected, actual, mismatched_attrs):
        """
        Error message for when an event is not what we expect.

        @param expected: The expected event that did not match the hostlog.
        @param actual: The actual event with the mismatched arg(s).
        @param mismatched_attrs: A list of mismatched attributes.

        """
        et = uee.get_event_type(expected._expected_attrs['event_type'])
        return ('Event %s had mismatched attributes: %s. We expected %s, but '
                'got %s.' % (et, mismatched_attrs, expected, actual))


    def _timeout_error_message(self, expected, time_taken):
        """
        Error message for when an event takes too long to fire.

        @param expected: The expected event that timed out.
        @param time_taken: How long it actually took.

        """
        et = uee.get_event_type(expected._expected_attrs['event_type'])
        return ('Event %s should take less than %ds. It took %ds.'
                % (et, expected._timeout, time_taken))


    def _stage_payload_by_uri(self, payload_uri, properties_file=True):
        """Stage a payload based on its GS URI.

        This infers the build's label, filename and GS archive from the
        provided GS URI.

        @param payload_uri: The full GS URI of the payload.
        @param properties_file: If true, it will stage the update payload
                                properties file too.

        @return URL of the staged payload (and properties file) on the server.

        @raise error.TestError if there's a problem with staging.

        """
        archive_url, _, filename = payload_uri.rpartition('/')
        build_name = six.moves.urllib.parse.urlsplit(archive_url).path.strip(
                '/')
        filenames = [filename]
        if properties_file:
            filenames.append(filename + '.json')
        try:
            self._autotest_devserver.stage_artifacts(image=build_name,
                                                     files=filenames,
                                                     archive_url=archive_url)
            return (self._autotest_devserver.get_staged_file_url(f, build_name)
                    for f in filenames)
        except dev_server.DevServerException as e:
            raise error.TestError('Failed to stage payload: %s' % e)


    def _get_devserver_for_test(self, test_conf):
        """Find a devserver to use.

        We use the payload URI as the hash for ImageServer.resolve. The chosen
        devserver needs to respect the location of the host if
        'prefer_local_devserver' is set to True or 'restricted_subnets' is set.

        @param test_conf: a dictionary of test settings.

        """
        autotest_devserver = dev_server.ImageServer.resolve(
            test_conf['target_payload_uri'], self._host.hostname)
        devserver_hostname = six.moves.urllib.parse.urlparse(
                autotest_devserver.url()).hostname
        logging.info('Devserver chosen for this run: %s', devserver_hostname)
        return autotest_devserver


    def _get_payload_url(self, build=None, full_payload=True, is_dlc=False):
        """
        Gets the GStorage URL of the full or delta payload for this build, for
        either platform or DLC payloads.

        @param build: build string e.g eve-release/R85-13265.0.0.
        @param full_payload: True for full payload. False for delta.
        @param is_dlc: True to get the payload URL for sample-dlc.

        @returns the payload URL.

        """
        if build is None:
            if self._job_repo_url is None:
                self._job_repo_url = self._get_job_repo_url()
            ds_url, build = tools.get_devserver_build_from_package_url(
                self._job_repo_url)
            self._autotest_devserver = dev_server.ImageServer(ds_url)

        gs = dev_server._get_image_storage_server()

        # Example payload names (AU):
        # chromeos_R85-13265.0.0_eve_full_dev.bin
        # chromeos_R85-13265.0.0_R85-13265.0.0_eve_delta_dev.bin
        # Example payload names (DLC):
        # dlc_sample-dlc_package_R85-13265.0.0_eve_full_dev.bin
        # dlc_sample-dlc_package_R85-13265.0.0_R85-13265.0.0_eve_delta_dev.bin
        if is_dlc:
            payload_prefix = 'dlc_*_%s_*.bin'
        else:
            payload_prefix = 'chromeos_*_%s_*.bin'

        regex = payload_prefix % ('full' if full_payload else 'delta')

        payload_url_regex = gs + build + '/' + regex
        logging.debug('Trying to find payloads at %s', payload_url_regex)
        payloads = utils.gs_ls(payload_url_regex)
        if not payloads:
            raise error.TestFail('Could not find payload for %s', build)
        logging.debug('Payloads found: %s', payloads)
        return payloads[0]


    @staticmethod
    def _get_stateful_uri(build_uri):
        """Returns a complete GS URI of a stateful update given a build path."""
        return '/'.join([build_uri.rstrip('/'), 'stateful.tgz'])


    def _get_job_repo_url(self, job_repo_url=None):
        """Gets the job_repo_url argument supplied to the test by the lab."""
        if job_repo_url is not None:
            return job_repo_url
        if self._hosts is not None:
            self._host = self._hosts[0]
        if self._host is None:
            raise error.TestFail('No host specified by AU test.')
        info = self._host.host_info_store.get()
        return info.attributes.get(self._host.job_repo_url_attribute, '')


    def _stage_payloads(self, payload_uri, archive_uri):
        """
        Stages payloads on the devserver.

        @param payload_uri: URI for a GS payload to stage.
        @param archive_uri: URI for GS folder containing payloads. This is used
                            to find the related stateful payload.

        @returns URI of staged payload, URI of staged stateful.

        """
        if not payload_uri:
            return None, None
        staged_uri, _ = self._stage_payload_by_uri(payload_uri)
        logging.info('Staged %s at %s.', payload_uri, staged_uri)

        # Figure out where to get the matching stateful payload.
        if archive_uri:
            stateful_uri = self._get_stateful_uri(archive_uri)
        else:
            stateful_uri = self._payload_to_stateful_uri(payload_uri)
        staged_stateful = self._stage_payload_by_uri(stateful_uri,
                                                     properties_file=False)
        logging.info('Staged stateful from %s at %s.', stateful_uri,
                     staged_stateful)
        return staged_uri, staged_stateful



    def _payload_to_stateful_uri(self, payload_uri):
        """Given a payload GS URI, returns the corresponding stateful URI."""
        build_uri = payload_uri.rpartition('/payloads/')[0]
        return self._get_stateful_uri(build_uri)


    def _copy_payload_to_public_bucket(self, payload_url):
        """
        Copy payload and make link public.

        @param payload_url: Payload URL on Google Storage.

        @returns The payload URL that is now publicly accessible.

        """
        payload_filename = payload_url.rpartition('/')[2]
        utils.run(['gsutil', 'cp', '%s*' % payload_url, self._CELLULAR_BUCKET])
        new_gs_url = self._CELLULAR_BUCKET + payload_filename
        utils.run(['gsutil', 'acl', 'ch', '-u', 'AllUsers:R',
                   '%s*' % new_gs_url])
        return new_gs_url.replace('gs://', 'https://storage.googleapis.com/')


    def _suspend_then_resume(self):
        """Suspends and resumes the host DUT."""
        try:
            self._host.suspend(suspend_time=30)
        except error.AutoservSuspendError:
            logging.exception('Suspend did not last the entire time.')


    def _run_client_test_and_check_result(self, test_name, **kwargs):
        """
        Kicks of a client autotest and checks that it didn't fail.

        @param test_name: client test name
        @param **kwargs: key-value arguments to pass to the test.

        """
        client_at = autotest.Autotest(self._host)
        client_at.run_test(test_name, **kwargs)
        client_at._check_client_test_result(self._host, test_name)


    def _extract_request_logs(self, update_engine_log, is_dlc=False):
        """
        Extracts request logs from an update_engine log.

        @param update_engine_log: The update_engine log as a string.
        @param is_dlc: True to return the request logs for the DLC updates
                       instead of the platform update.
        @returns a list object representing the platform (OS) request logs, or
                 a dictionary of lists representing DLC request logs,
                 keyed by DLC ID, if is_dlc is True.

        """
        # Looking for all request XML strings in the log.
        pattern = re.compile(r'<request.*?</request>', re.DOTALL)
        requests = pattern.findall(update_engine_log)

        # We are looking for patterns like this:
        # "2021-01-28T10:14:33.998217Z INFO update_engine: \
        # [omaha_request_action.cc(794)] Request: <?xml"
        timestamp_pattern = re.compile(
                r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).* Request:.*xml')
        timestamps = [
                datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S')
                for ts in timestamp_pattern.findall(update_engine_log)
        ]
        if len(timestamps) == 0:
            # We might be reading log in old format so try parsing with another regexp.
            # [0324/151230.562305:INFO:omaha_request_action.cc(501)] Request:
            timestamp_pattern_old = re.compile(
                    r'\[([0-9]+)/([0-9]+).*?\] Request:')

            # Since the old format uses local time, we want to convert it to utc time.
            is_dst = time.daylight and time.localtime().tm_isdst > 0
            utc_offset = timedelta(
                    seconds=(time.altzone if is_dst else time.timezone))
            if utc_offset > timedelta(seconds=0):
                logging.info('Parsing old log format. Adding utc offset of %s',
                             utc_offset)
            else:
                logging.warning(
                        'Local time to UTC conversion might fail. utc_offset=%s. time.altzone=%s, time.timezone=%s',
                        utc_offset, time.altzone, time.timezone)
            timestamps = [
                    # Just use the current year since the logs don't have the year
                    # value. Let's all hope tests don't start to fail on new year's
                    # eve LOL.
                    datetime(
                            datetime.now().year,
                            int(ts[0][0:2]),  # Month
                            int(ts[0][2:4]),  # Day
                            int(ts[1][0:2]),  # Hours
                            int(ts[1][2:4]),  # Minutes
                            int(ts[1][4:6])  # Seconds
                    ) + utc_offset
                    for ts in timestamp_pattern_old.findall(update_engine_log)
            ]

        if len(requests) != len(timestamps):
            raise error.TestFail('Failed to properly parse the update_engine '
                                 'log file.')

        result = []
        dlc_results = {}
        for timestamp, request in zip(timestamps, requests):

            root = ElementTree.fromstring(request)

            # There may be events for multiple apps if DLCs are installed.
            # See below (trimmed) example request including DLC:
            #
            # <request requestid=...>
            #   <os version="Indy" platform=...></os>
            #   <app appid="{DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}"
            #       version="13265.0.0" track=...>
            #     <event eventtype="13" eventresult="1"></event>
            #   </app>
            #   <app appid="{DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}_sample-dlc"
            #       version="0.0.0.0" track=...>
            #     <event eventtype="13" eventresult="1"></event>
            #   </app>
            # </request>
            #
            # The first <app> section is for the platform update. The second
            # is for the DLC update.
            #
            # Example without DLC:
            # <request requestid=...>
            #   <os version="Indy" platform=...></os>
            #   <app appid="{DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}"
            #       version="13265.0.0" track=...>
            #     <event eventtype="13" eventresult="1"></event>
            #   </app>
            # </request>

            apps = root.findall('app')
            for app in apps:
                event = app.find('event')

                event_info = {
                    'version': app.attrib.get('version'),
                    'event_type': (int(event.attrib.get('eventtype'))
                                  if event is not None else None),
                    'event_result': (int(event.attrib.get('eventresult'))
                                    if event is not None else None),
                    'timestamp': timestamp.strftime(self._TIMESTAMP_FORMAT),
                }

                previous_version = (event.attrib.get('previousversion')
                                    if event is not None else None)
                if previous_version:
                    event_info['previous_version'] = previous_version

                # Check if the event is for the platform update or for a DLC
                # by checking the appid. For platform, the appid looks like:
                #     {DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}
                # For DLCs, it is the platform app ID + _ + the DLC ID:
                #     {DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}_sample-dlc
                id_segments = app.attrib.get('appid').split('_')
                if len(id_segments) > 1:
                    dlc_id = id_segments[1]
                    if dlc_id in dlc_results:
                        dlc_results[dlc_id].append(event_info)
                    else:
                        dlc_results[dlc_id] = [event_info]
                else:
                    result.append(event_info)

        if is_dlc:
            logging.info('Extracted DLC request logs: %s', dlc_results)
            return dlc_results
        else:
            logging.info('Extracted platform (OS) request log: %s', result)
            return result


    def _create_hostlog_files(self):
        """Create the two hostlog files for the update.

        To ensure the update was successful we need to compare the update
        events against expected update events. There is a hostlog for the
        rootfs update and for the post reboot update check.

        """
        # Check that update logs exist for the update that just happened.
        if len(self._get_update_engine_logs()) < 2:
            err_msg = 'update_engine logs are missing. Cannot verify update.'
            raise error.TestFail(err_msg)

        # Each time we reboot in the middle of an update we ping omaha again
        # for each update event. So parse the list backwards to get the final
        # events.
        rootfs_hostlog = os.path.join(self.resultsdir, 'hostlog_rootfs')
        with open(rootfs_hostlog, 'w') as fp:
            # There are four expected hostlog events during update.
            json.dump(self._extract_request_logs(
                self._get_update_engine_log(1))[-4:], fp)

        reboot_hostlog = os.path.join(self.resultsdir, 'hostlog_reboot')
        with open(reboot_hostlog, 'w') as fp:
            # There is one expected hostlog events after reboot.
            json.dump(self._extract_request_logs(
                self._get_update_engine_log(0))[:1], fp)

        return rootfs_hostlog, reboot_hostlog


    def _create_dlc_hostlog_files(self):
        """Create the rootfs and reboot hostlog files for DLC updates.

        Each DLC has its own set of update requests in the logs together with
        the platform update requests. To ensure the DLC update was successful
        we will compare the update events against the expected events, which
        are the same expected events as for the platform update. There is a
        hostlog for the rootfs update and the post-reboot update check for
        each DLC.

        @returns two dictionaries, one for the rootfs DLC update and one for
                 the post-reboot check. The keys are DLC IDs and the values
                 are the hostlog filenames.

        """
        dlc_rootfs_hostlogs = {}
        dlc_reboot_hostlogs = {}

        dlc_rootfs_request_logs = self._extract_request_logs(
            self._get_update_engine_log(1), is_dlc=True)

        for dlc_id in dlc_rootfs_request_logs:
            dlc_rootfs_hostlog = os.path.join(self.resultsdir,
                                              'hostlog_' + dlc_id)
            dlc_rootfs_hostlogs[dlc_id] = dlc_rootfs_hostlog
            with open(dlc_rootfs_hostlog, 'w') as fp:
                # Same number of events for DLC updates as for platform
                json.dump(dlc_rootfs_request_logs[dlc_id][-4:], fp)

        dlc_reboot_request_logs = self._extract_request_logs(
            self._get_update_engine_log(0), is_dlc=True)

        for dlc_id in dlc_reboot_request_logs:
            dlc_reboot_hostlog = os.path.join(self.resultsdir,
                                              'hostlog_' + dlc_id + '_reboot')
            dlc_reboot_hostlogs[dlc_id] = dlc_reboot_hostlog
            with open(dlc_reboot_hostlog, 'w') as fp:
                # Same number of events for DLC updates as for platform
                json.dump(dlc_reboot_request_logs[dlc_id][:1], fp)

        return dlc_rootfs_hostlogs, dlc_reboot_hostlogs


    def _set_active_p2p_host(self, host):
        """
        Choose which p2p host device to run commands on.

        For P2P tests with multiple DUTs we need to be able to choose which
        host within self._hosts we want to issue commands on.

        @param host: The host to run commands on.

        """
        self._set_util_functions(host.run, host.get_file)


    def _set_update_over_cellular_setting(self, update_over_cellular=True):
        """
        Toggles the update_over_cellular setting in update_engine.

        @param update_over_cellular: True to enable, False to disable.

        """
        answer = 'yes' if update_over_cellular else 'no'
        cmd = [self._UPDATE_ENGINE_CLIENT_CMD,
               '--update_over_cellular=%s' % answer]
        retry_util.RetryException(error.AutoservRunError, 2, self._run, cmd)


    def _copy_generated_nebraska_logs(self, logs_dir, identifier):
        """Copies nebraska logs from logs_dir into job results directory.

        The nebraska process on the device generates logs and stores those logs
        in a /tmp directory. The update engine generates update_engine.log
        during the auto-update which is also stored in the same /tmp directory.
        This method copies these logfiles from the /tmp directory into the job

        @param logs_dir: Directory containing paths to the log files generated
                         by the nebraska process.
        @param identifier: A string that is appended to the logfile when it is
                           saved so that multiple files with the same name can
                           be differentiated.
        """
        partial_filename = '%s_%s_%s' % ('%s', self._host.hostname, identifier)
        src_files = [
            self._NEBRASKA_LOG,
            os.path.basename(self._UPDATE_ENGINE_LOG),
        ]

        for src_fname in src_files:
            source = os.path.join(logs_dir, src_fname)
            dest = os.path.join(self.resultsdir, partial_filename % src_fname)
            logging.debug('Copying logs from %s to %s', source, dest)
            try:
                shutil.copyfile(source, dest)
            except Exception as e:
                logging.error('Could not copy logs from %s into %s due to '
                              'exception: %s', source, dest, e)

    @staticmethod
    def _get_update_parameters_from_uri(payload_uri):
        """Extract vars needed to update with a Google Storage payload URI.

        The two values we need are:
        (1) A build_name string e.g dev-channel/samus/9583.0.0
        (2) A filename of the exact payload file to use for the update. This
        payload needs to have already been staged on the devserver.

        @param payload_uri: Google Storage URI to extract values from

        """

        # gs://chromeos-releases/dev-channel/samus/9334.0.0/payloads/blah.bin
        # build_name = dev-channel/samus/9334.0.0
        # payload_file = payloads/blah.bin
        build_name = payload_uri[:payload_uri.index('payloads/')]
        build_name = six.moves.urllib.parse.urlsplit(build_name).path.strip(
                '/')
        payload_file = payload_uri[payload_uri.index('payloads/'):]

        logging.debug('Extracted build_name: %s, payload_file: %s from %s.',
                      build_name, payload_file, payload_uri)
        return build_name, payload_file


    def _restore_stateful(self):
        """Restore the stateful partition after a destructive test."""
        # Stage stateful payload.
        ds_url, build = tools.get_devserver_build_from_package_url(
                self._job_repo_url)
        self._autotest_devserver = dev_server.ImageServer(ds_url)
        self._autotest_devserver.stage_artifacts(build, ['stateful'])

        logging.info('Restoring stateful partition...')
        # Setup local dir.
        self._run(['mkdir', '-p', '-m', '1777', '/usr/local/tmp'])

        # Download and extract the stateful payload.
        update_url = self._autotest_devserver.get_update_url(build)
        statefuldev_url = update_url.replace('update', 'static')
        statefuldev_url += '/stateful.tgz'
        cmd = [
                'curl', '--silent', '--show-error', '--max-time', '600',
                statefuldev_url, '|', 'tar', '--ignore-command-error',
                '--overwrite', '--directory', '/mnt/stateful_partition', '-xz'
        ]
        try:
            self._run(cmd)
        except error.AutoservRunError as e:
            err_str = 'Failed to restore the stateful partition'
            raise error.TestFail('%s: %s' % (err_str, str(e)))

        # Touch a file so changes are picked up after reboot.
        update_file = '/mnt/stateful_partition/.update_available'
        self._run(['echo', '-n', 'clobber', '>', update_file])
        self._host.reboot()

        # Make sure python is available again.
        try:
            self._run(['python', '--version'])
        except error.AutoservRunError as e:
            err_str = 'Python not available after restoring stateful.'
            raise error.TestFail(err_str)

        logging.info('Stateful restored successfully.')


    def verify_update_events(self, source_release, hostlog_filename,
                             target_release=None):
        """Compares a hostlog file against a set of expected events.

        In this class we build a list of expected events (list of
        UpdateEngineEvent objects), and compare that against a "hostlog"
        returned from update_engine from the update. This hostlog is a json
        list of events fired during the update.

        @param source_release: The source build version.
        @param hostlog_filename: The path to a hotlog returned from nebraska.
        @param target_release: The target build version.

        """
        if target_release is not None:
            expected_events = self._get_expected_event_for_post_reboot_check(
                source_release, target_release)
        else:
            expected_events = self._get_expected_events_for_rootfs_update(
                source_release)
        logging.info('Checking update against hostlog file: %s',
                     hostlog_filename)
        try:
            with open(hostlog_filename, 'r') as fp:
                hostlog_events = json.load(fp)
        except Exception as e:
            raise error.TestFail('Error reading the hostlog file: %s' % e)

        for expected, actual in zip_longest(expected_events, hostlog_events):
            err_msg = self._verify_event_with_timeout(expected, actual)
            if err_msg is not None:
                raise error.TestFail(('Hostlog verification failed: %s ' %
                                     err_msg))


    def get_update_url_for_test(self,
                                job_repo_url=None,
                                full_payload=True,
                                stateful=False,
                                moblab=False,
                                return_noupdate_starting=2):
        """
        Returns a devserver update URL for tests that cannot use a Nebraska
        instance on the DUT for updating.

        This expects the test to set self._host or self._hosts.

        @param job_repo_url: string url containing the current build.
        @param full_payload: bool whether we want a full payload.
        @param stateful: bool whether we want to stage stateful payload too.
        @param moblab: bool whether we are running on moblab.
        @param return_noupdate_starting: (Number of times - 1) we want
                                         FakeOmaha to return an update before
                                         returning noupdate. 1 means never
                                         return noupdate. 2 means return
                                         noupdate after 1 update response.

        @returns a valid devserver update URL.

        """
        if not moblab:
            return self.get_update_url_from_fake_omaha(
                    job_repo_url,
                    full_payload=full_payload,
                    return_noupdate_starting=return_noupdate_starting)

        self._job_repo_url = self._get_job_repo_url(job_repo_url)
        if not self._job_repo_url:
            raise error.TestFail('There was no job_repo_url so we cannot get '
                                 'a payload to use.')
        ds_url, build = tools.get_devserver_build_from_package_url(
            self._job_repo_url)

        # The lab devserver assigned to this test.
        lab_devserver = dev_server.ImageServer(ds_url)

        # Stage payloads on the lab devserver.
        self._autotest_devserver = lab_devserver
        artifacts = ['full_payload' if full_payload else 'delta_payload']
        if stateful:
            artifacts.append('stateful')
        self._autotest_devserver.stage_artifacts(build, artifacts)

        # Use the same lab devserver to also handle the update.
        url = self._autotest_devserver.get_update_url(build)

        logging.info('Update URL: %s', url)
        return url


    def get_update_url_from_fake_omaha(self,
                                       job_repo_url=None,
                                       full_payload=True,
                                       payload_id='ROOTFS',
                                       critical_update=True,
                                       exposed_via_proxy=True,
                                       return_noupdate_starting=0):
        """
        Starts a FakeOmaha instance with TLS and returns is update url.

        Some tests that require an omaha instance to survive a reboot will use
        the FakeOmaha TLS.

        @param job_repo_url: string url containing the current build.
        @param full_payload: bool whether we want a full payload.
        @param payload_id: ROOTFS or DLC
        @param critical_update: True if we want a critical update response.
        @param exposed_via_proxy: True to tell TLS we want the fakeomaha to be
                                  accessible after a reboot.
        @param return_noupdate_starting: int of how many valid update responses
                                         to return before returning noupdate
                                        forever.

        @returns an update url on the FakeOmaha instance for tests to use.

        """
        self._job_repo_url = self._get_job_repo_url(job_repo_url)
        if not self._job_repo_url:
            raise error.TestFail('There was no job_repo_url so we cannot get '
                                 'a payload to use.')
        gs = dev_server._get_image_storage_server()
        _, build = tools.get_devserver_build_from_package_url(
                self._job_repo_url)
        target_build = gs + build
        payload_type = 'FULL' if full_payload else 'DELTA'
        tlsconn = connection.TLSConnection()
        self.fake_omaha = fake_omaha.TLSFakeOmaha(tlsconn)
        fake_omaha_url = self.fake_omaha.start_omaha(
                self._host.hostname,
                target_build=target_build,
                payloads=[{
                        'payload_id': payload_id,
                        'payload_type': payload_type,
                }],
                exposed_via_proxy=True,
                critical_update=critical_update,
                return_noupdate_starting=return_noupdate_starting)
        logging.info('Fake Omaha update URL: %s', fake_omaha_url)
        return fake_omaha_url

    def get_payload_url_on_public_bucket(self, job_repo_url=None,
                                         full_payload=True, is_dlc=False):
        """
        Get the google storage url of the payload in a public bucket.

        We will be copying the payload to a public google storage bucket
        (similar location to updates via autest command).

        @param job_repo_url: string url containing the current build.
        @param full_payload: True for full, False for delta.
        @param is_dlc: True to get the payload URL for sample-dlc.

        """
        self._job_repo_url = self._get_job_repo_url(job_repo_url)
        payload_url = self._get_payload_url(full_payload=full_payload,
                                            is_dlc=is_dlc)
        url = self._copy_payload_to_public_bucket(payload_url)
        logging.info('Public update URL: %s', url)
        return url


    def get_payload_for_nebraska(self, job_repo_url=None, full_payload=True,
                                 public_bucket=False, is_dlc=False):
        """
        Gets a platform or DLC payload URL to be used with a nebraska instance
        on the DUT.

        @param job_repo_url: string url containing the current build.
        @param full_payload: bool whether we want a full payload.
        @param public_bucket: True to return a payload on a public bucket.
        @param is_dlc: True to get the payload URL for sample-dlc.

        @returns string URL of a payload staged on a lab devserver.

        """
        if public_bucket:
            return self.get_payload_url_on_public_bucket(
                job_repo_url, full_payload=full_payload, is_dlc=is_dlc)

        self._job_repo_url = self._get_job_repo_url(job_repo_url)
        payload = self._get_payload_url(full_payload=full_payload,
                                        is_dlc=is_dlc)
        payload_url, _ = self._stage_payload_by_uri(payload)
        logging.info('Payload URL for Nebraska: %s', payload_url)
        return payload_url


    def update_device(self,
                      payload_uri,
                      clobber_stateful=False,
                      tag='source',
                      ignore_appid=False):
        """
        Updates the device.

        Used by autoupdate_EndToEndTest and autoupdate_StatefulCompatibility,
        which use auto_updater to perform updates.

        @param payload_uri: The payload with which the device should be updated.
        @param clobber_stateful: Boolean that determines whether the stateful
                                 of the device should be force updated and the
                                 TPM ownership should be cleared. By default,
                                 set to False.
        @param tag: An identifier string added to each log filename.
        @param ignore_appid: True to tell Nebraska to ignore the App ID field
                             when parsing the update request. This allows
                             the target update to use a different board's
                             image, which is needed for kernelnext updates.

        @raise error.TestFail if anything goes wrong with the update.

        """
        cros_preserved_path = ('/mnt/stateful_partition/unencrypted/'
                               'preserve/cros-update')
        build_name, payload_filename = self._get_update_parameters_from_uri(
            payload_uri)
        logging.info('Installing %s on the DUT', payload_uri)
        with remote_access.ChromiumOSDeviceHandler(
            self._host.hostname, base_dir=cros_preserved_path) as device:
            updater = auto_updater.ChromiumOSUpdater(
                    device,
                    build_name,
                    build_name,
                    yes=True,
                    payload_filename=payload_filename,
                    clobber_stateful=clobber_stateful,
                    clear_tpm_owner=clobber_stateful,
                    do_stateful_update=True,
                    staging_server=self._autotest_devserver.url(),
                    transfer_class=auto_updater_transfer.
                    LabEndToEndPayloadTransfer,
                    ignore_appid=ignore_appid)

            try:
                updater.RunUpdate()
            except Exception as e:
                logging.exception('ERROR: Failed to update device.')
                raise error.TestFail(str(e))
            finally:
                self._copy_generated_nebraska_logs(
                    updater.request_logs_dir, identifier=tag)

    def _get_paygen_json(self):
        """Return the paygen.json file as a json dictionary."""
        bucket, paygen_file = self._PAYGEN_JSON_URI.rsplit('/', 1)
        tmpdir = '/tmp/m2n/'
        self._host.run('mkdir -p %s' % tmpdir)
        gsutil_wrapper.copy_private_bucket(host=self._host,
                                           bucket=bucket,
                                           filename=paygen_file,
                                           destination=tmpdir)
        return json.loads(
                self._host.run('cat %s' %
                               os.path.join(tmpdir, paygen_file)).stdout)

    def _paygen_json_lookup(self, board, channel, delta_type):
        """
        Filters the paygen.json file by board, channel, and payload type.

        @param board: The board name.
        @param channel: The ChromeOS channel.
        @param delta_type: OMAHA, FSI, MILESTONE. STEPPING_STONE.

        @returns json results filtered by the input params.

        """
        paygen_data = self._get_paygen_json()
        result = []
        if channel.endswith('-channel'):
            channel = channel[:-len('-channel')]
        for delta in paygen_data['delta']:
            if ((delta['board']['public_codename'] == board)
                        and (delta.get('channel', None) == channel)
                        and (delta.get('delta_type', None) == delta_type)):
                result.append(delta)
        return result

    def _get_latest_serving_stable_build(self):
        """
      Returns the latest serving stable build on Omaha for the current board.

      It will lookup the paygen.json file and return the build label that can
      be passed to quick_provision. This is useful for M2N tests to easily find
      the first build to provision.

      @returns latest stable serving omaha build.

      """
        board = self._host.get_board().split(':')[1]
        # Boards like auron_paine are auron-paine in paygen.json and builders.
        if '_' in board:
            board = board.replace('_', '-')
        channel = 'stable-channel'
        delta_type = 'OMAHA'
        stable_paygen_data = self._paygen_json_lookup(board, channel,
                                                      delta_type)
        if not stable_paygen_data:
            raise error.TestFail(
                    'No stable build found in paygen.json for %s' % board)
        latest_stable_paygen_data = max(
                stable_paygen_data, key=(lambda key: key['chrome_os_version']))
        return os.path.join(channel, board,
                            latest_stable_paygen_data["chrome_os_version"])
