# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for toolchain_util."""

from __future__ import print_function

import base64
import collections
import datetime
import glob
import io
import json
import os
import re
import shutil
import sys
import time

import mock
from six.moves import builtins

from chromite.lib import chroot_lib
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import git
from chromite.lib import gob_util
from chromite.lib import gs
from chromite.lib import gs_unittest
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import portage_util
from chromite.lib import timeout_util
from chromite.lib import toolchain_util

# pylint: disable=protected-access

assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'

_input_artifact = collections.namedtuple('_input_artifact',
                                         ['name', 'gs_locations'])


class ProfilesNameHelperTest(cros_test_lib.MockTempDirTestCase):
  """Test the helper functions related to naming."""

  # pylint: disable=protected-access
  def testParseBenchmarkProfileName(self):
    """Test top-level function _ParseBenchmarkProfileName."""
    # Test parse failure
    profile_name_to_fail = 'this_is_an_invalid_name'
    with self.assertRaises(toolchain_util.ProfilesNameHelperError) as context:
      toolchain_util._ParseBenchmarkProfileName(profile_name_to_fail)
    self.assertIn('Unparseable benchmark profile name:', str(context.exception))

    # Test parse success
    profile_name = 'chromeos-chrome-amd64-77.0.3849.0_rc-r1.afdo'
    result = toolchain_util._ParseBenchmarkProfileName(profile_name)
    self.assertEqual(
        result,
        toolchain_util.BenchmarkProfileVersion(
            major=77, minor=0, build=3849, patch=0, revision=1,
            is_merged=False))

  def testParseCWPProfileName(self):
    """Test top-level function _ParseCWPProfileName."""
    # Test parse failure
    profile_name_to_fail = 'this_is_an_invalid_name'
    with self.assertRaises(toolchain_util.ProfilesNameHelperError) as context:
      toolchain_util._ParseCWPProfileName(profile_name_to_fail)
    self.assertIn('Unparseable CWP profile name:', str(context.exception))

    # Test parse success
    profile_name = 'R77-3809.38-1562580965.afdo.xz'
    result = toolchain_util._ParseCWPProfileName(profile_name)
    self.assertEqual(
        result,
        toolchain_util.CWPProfileVersion(
            major=77, build=3809, patch=38, clock=1562580965))

  def testParseMergedProfileName(self):
    """Test top-level function _ParseMergedProfileName."""
    # Test parse failure
    profile_name_to_fail = 'this_is_an_invalid_name'
    with self.assertRaises(toolchain_util.ProfilesNameHelperError) as context:
      toolchain_util._ParseMergedProfileName(profile_name_to_fail)
    self.assertIn('Unparseable merged AFDO name:', str(context.exception))

    # Test parse orderfile success
    orderfile_name = ('chromeos-chrome-orderfile-field-77-3809.38-1562580965'
                      '-benchmark-77.0.3849.0-r1.orderfile.xz')
    result = toolchain_util._ParseMergedProfileName(orderfile_name)
    self.assertEqual(
        result, (toolchain_util.BenchmarkProfileVersion(
            major=77, minor=0, build=3849, patch=0, revision=1,
            is_merged=False),
                 toolchain_util.CWPProfileVersion(
                     major=77, build=3809, patch=38, clock=1562580965)))

    # Test parse release AFDO success
    afdo_name = ('chromeos-chrome-amd64-airmont-77-3809.38-1562580965'
                 '-benchmark-77.0.3849.0-r1-redacted.afdo.xz')
    result = toolchain_util._ParseMergedProfileName(afdo_name)
    self.assertEqual(
        result, (toolchain_util.BenchmarkProfileVersion(
            major=77, minor=0, build=3849, patch=0, revision=1,
            is_merged=False),
                 toolchain_util.CWPProfileVersion(
                     major=77, build=3809, patch=38, clock=1562580965)))

  def testGetArtifactVersionInEbuild(self):
    """Test top-level function _GetArtifactVersionInEbuild."""
    package = 'package'
    ebuild_file = os.path.join(self.tempdir, 'package.ebuild')
    variables = ['variable_name', 'another_variable_name']
    values = ['old-afdo-artifact-1.0', 'another-old-afdo-artifact-1.0']
    ebuild_file_content = '\n'.join([
        'Some message before',
        '%s="%s"' % (variables[0], values[0]),
        '%s="%s"' % (variables[1], values[1]), 'Some message after'
    ])
    osutils.WriteFile(ebuild_file, ebuild_file_content)
    self.PatchObject(
        toolchain_util, '_FindEbuildPath', return_value=ebuild_file)
    for n, v in zip(variables, values):
      ret = toolchain_util._GetArtifactVersionInEbuild(package, n)
      self.assertEqual(ret, v)

  def testGetOrderfileName(self):
    """Test method _GetOrderfileName and related methods."""
    profile_name = ('chromeos-chrome-amd64-silvermont-77-3809.38-1562580965-'
                    'benchmark-77.0.3849.0-r1-redacted.afdo.xz')
    self.PatchObject(
        toolchain_util,
        '_GetArtifactVersionInChromium',
        return_value=profile_name)
    result = toolchain_util._GetOrderfileName('/path/to/chrome_root')
    cwp_name = 'field-77-3809.38-1562580965'
    benchmark_name = 'benchmark-77.0.3849.0-r1'
    self.assertEqual(
        result, 'chromeos-chrome-orderfile-%s-%s' % (cwp_name, benchmark_name))

  def testCompressAFDOFiles(self):
    """Test _CompressAFDOFiles()."""
    input_dir = '/path/to/inputs'
    output_dir = '/another/path/to/outputs'
    targets = ['input1', '/path/to/inputs/input2']
    suffix = '.xz'
    self.PatchObject(cros_build_lib, 'CompressFile')
    # Should raise exception because the input doesn't exist
    with self.assertRaises(RuntimeError) as context:
      toolchain_util._CompressAFDOFiles(targets, input_dir, output_dir, suffix)
    self.assertEqual(
        str(context.exception), 'file %s to compress does not exist' %
        os.path.join(input_dir, targets[0]))
    # Should pass
    self.PatchObject(os.path, 'exists', return_value=True)
    toolchain_util._CompressAFDOFiles(targets, input_dir, output_dir, suffix)
    compressed_names = [os.path.basename(x) for x in targets]
    inputs = [os.path.join(input_dir, n) for n in compressed_names]
    outputs = [os.path.join(output_dir, n + suffix) for n in compressed_names]
    calls = [mock.call(n, o) for n, o in zip(inputs, outputs)]
    cros_build_lib.CompressFile.assert_has_calls(calls)

  def testGetProfileAge(self):
    """Test top-level function _GetProfileAge()."""
    # Test unsupported artifact_type
    current_day_profile = 'R0-0.0-%d' % int(time.time())
    with self.assertRaises(ValueError) as context:
      toolchain_util._GetProfileAge(current_day_profile, 'unsupported_type')
    self.assertEqual('Only kernel afdo is supported to check profile age.',
                     str(context.exception))

    # Test using profile of the current day.
    ret = toolchain_util._GetProfileAge(current_day_profile, 'kernel_afdo')
    self.assertEqual(0, ret)

    # Test using profile from the last day.
    last_day_profile = 'R0-0.0-%d' % int(time.time() - 86400)
    ret = toolchain_util._GetProfileAge(last_day_profile, 'kernel_afdo')
    self.assertEqual(1, ret)


class PrepareBundleTest(cros_test_lib.RunCommandTempDirTestCase):
  """Setup code common to Prepare/Bundle class methods."""

  def setUp(self):
    self.board = 'lulu'
    self.chroot = chroot_lib.Chroot(path=self.tempdir, chrome_root=self.tempdir)
    self.sysroot = '/build/%s' % self.board
    self.chrome_package = 'chromeos-chrome'
    self.kernel_package = 'chromeos-kernel-3_14'
    self.chrome_PV = 'chromeos-base/chromeos-chrome-78.0.3893.0_rc-r1'
    self.chrome_ebuild = os.path.realpath(
        os.path.join(
            os.path.dirname(__file__), '..', '..',
            'src', 'third_party', 'chromiumos-overlay',
            os.path.dirname(self.chrome_PV), 'chromeos-chrome',
            '%s.ebuild' % os.path.basename(self.chrome_PV)))
    self.chrome_CPV = portage_util.SplitCPV(self.chrome_PV)
    self.glob = self.PatchObject(
        glob, 'glob', return_value=[self.chrome_ebuild])
    self.rc.AddCmdResult(partial_mock.In('rm'), returncode=0)
    self.obj = toolchain_util._CommonPrepareBundle('None', chroot=self.chroot)
    self.gs_context = self.PatchObject(self.obj, '_gs_context')
    self.gsc_list = self.PatchObject(self.gs_context, 'List', return_value=[])
    self.data = b'data'
    self.arch = 'silvermont'
    self.fetch = self.PatchObject(
        gob_util, 'FetchUrl', return_value=base64.encodebytes(self.data))


class CommonPrepareBundleTest(PrepareBundleTest):
  """Test common Prepare/Bundle class methods."""

  def testGetEbuildInfo(self):
    """Verify that EbuildInfo is correctly returned."""
    # chrome_branch calls GetEbuildInfo.
    self.assertEqual('78', self.obj.chrome_branch)
    self.glob.assert_called_once()

    self.glob.return_value = ['1', '2']
    self.assertRaises(toolchain_util.PrepareForBuildHandlerError,
                      self.obj._GetEbuildInfo, 'chromeos-kernel-3_14')

  def test_GetArtifactVersionInGob(self):
    """Test that we look in the right place in GoB."""
    self.assertRaises(ValueError, self.obj._GetArtifactVersionInGob, 'badarch')

    self.assertEqual(
        self.data.decode('utf-8'), self.obj._GetArtifactVersionInGob(self.arch))
    self.fetch.assert_called_once_with(
        constants.EXTERNAL_GOB_HOST,
        'chromium/src/+/refs/tags/%s/chromeos/profiles/%s.afdo.newest.txt'
        '?format=text' %
        (self.chrome_CPV.version_no_rev.split('_')[0], self.arch))

    self.fetch.reset_mock()
    self.fetch.return_value = ''
    self.assertRaises(RuntimeError, self.obj._GetArtifactVersionInGob,
                      self.arch)
    self.fetch.assert_called_once()

  def test_GetOrderfileName(self):
    """Test that GetOrderfileName finds the right answer."""
    vers = self.PatchObject(
        self.obj,
        '_GetArtifactVersionInGob',
        return_value=('chromeos-chrome-amd64-silvermont-78-1111.0-'
                      '157000000-benchmark-78.0.3893.0-r1-redacted.afdo.xz'))
    self.assertEqual(
        'chromeos-chrome-orderfile-field-78-1111.0-'
        '157000000-benchmark-78.0.3893.0-r1.orderfile',
        self.obj._GetOrderfileName())
    vers.assert_called_once()

  def test_UpdateEbuildWithArtifacts(self):
    """Test _UpdateEbuildWithArtifacts."""
    func = self.PatchObject(self.obj, '_PatchEbuild')
    self.obj._UpdateEbuildWithArtifacts('chromeos-chrome', {'var': 'val'})
    info = toolchain_util._EbuildInfo(
        path=self.chrome_ebuild, CPV=self.chrome_CPV)
    info_9999 = toolchain_util._EbuildInfo(
        path=os.path.realpath(
            os.path.join(
                os.path.dirname(__file__), '..', '..', 'src', 'third_party',
                'chromiumos-overlay', 'chromeos-base', 'chromeos-chrome',
                'chromeos-chrome-9999.ebuild')),
        CPV=portage_util.SplitCPV('chromeos-base/chromeos-chrome-9999'))
    self.assertEqual([
        mock.call(info, {'var': 'val'}, uprev=True),
        mock.call(info_9999, {'var': 'val'}, uprev=False)
    ], func.call_args_list)


class PrepBundLatestAFDOArtifactTest(PrepareBundleTest):
  """Test related function to compare freshness of AFDO artifacts."""

  def setUp(self):
    self.board = 'board'
    self.gs_url = 'gs://path/to/any_gs_url'
    self.current_branch = '78'
    self.current_arch = 'airmont'
    self.MockListResult = collections.namedtuple('MockListResult',
                                                 ('url', 'creation_time'))
    files_in_gs_bucket = [
        # Benchmark profiles
        ('chromeos-chrome-amd64-78.0.3893.0_rc-r1.afdo.bz2', 2.0),
        ('chromeos-chrome-amd64-78.0.3896.0_rc-r1.afdo.bz2', 1.0),  # Latest
        ('chromeos-chrome-amd64-78.0.3897.0_rc-r1-merged.afdo.bz2', 3.0),
        # CWP profiles
        ('R78-3869.38-1562580965.afdo.xz', 2.1),
        ('R78-3866.0-1570000000.afdo.xz', 1.1),  # Latest
        ('R77-3811.0-1580000000.afdo.xz', 3.1),
        # Kernel profiles
        ('R76-3869.38-1562580965.gcov.xz', 1.3),
        ('R76-3866.0-1570000000.gcov.xz', 2.3),  # Latest
        # Orderfiles
        ('chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
         'benchmark-78.0.3893.0-r1.orderfile.xz', 1.2),  # Latest
        ('chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
         'benchmark-78.0.3850.0-r1.orderfile.xz', 2.2),
    ]

    self.gs_list = [
        self.MockListResult(url=os.path.join(self.gs_url, x), creation_time=y)
        for x, y in files_in_gs_bucket
    ]
    self.gsc_list.return_value = self.gs_list

  def testFindLatestAFDOArtifactPassWithBenchmarkAFDO(self):
    """Test _FindLatestAFDOArtifact returns latest benchmark AFDO."""
    latest_afdo = self.obj._FindLatestAFDOArtifact(
        [self.gs_url], self.obj._RankValidBenchmarkProfiles)
    self.assertEqual(
        latest_afdo,
        os.path.join(self.gs_url,
                     'chromeos-chrome-amd64-78.0.3896.0_rc-r1.afdo.bz2'))

  def testFindLatestAFDOArtifactPassWithOrderfile(self):
    """Test _FindLatestAFDOArtifact return latest orderfile."""
    latest_orderfile = self.obj._FindLatestAFDOArtifact(
        [self.gs_url], self.obj._RankValidOrderfiles)
    self.assertEqual(
        latest_orderfile,
        os.path.join(
            self.gs_url, 'chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
            'benchmark-78.0.3893.0-r1.orderfile.xz'))

  def testFindLatestAfdoArtifactOnPriorBranch(self):
    """Test that we find a file from prior branch when we have none."""
    self.obj._ebuild_info['chromeos-chrome'] = toolchain_util._EbuildInfo(
        path='path',
        CPV=portage_util.SplitCPV(
            'chromeos-base/chromeos-chrome-79.0.3900.0_rc-r1'))
    latest_orderfile = self.obj._FindLatestAFDOArtifact(
        [self.gs_url], self.obj._RankValidOrderfiles)
    self.assertEqual(
        latest_orderfile,
        os.path.join(
            self.gs_url, 'chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
            'benchmark-78.0.3893.0-r1.orderfile.xz'))

  def testFindLatestAFDOArtifactFailToFindAnyFiles(self):
    """Test function fails when no files on current branch."""
    self.obj._ebuild_info['chromeos-chrome'] = toolchain_util._EbuildInfo(
        path='path',
        CPV=portage_util.SplitCPV(
            'chromeos-base/chromeos-chrome-80.0.3950.0_rc-r1'))
    self.gsc_list.side_effect = gs.GSNoSuchKey('No files')
    with self.assertRaises(RuntimeError) as context:
      self.obj._FindLatestAFDOArtifact([self.gs_url],
                                       self.obj._RankValidOrderfiles)
    self.assertEqual('No files for branch 80 found in %s' % self.gs_url,
                     str(context.exception))

  def testFindLatestAFDOArtifactsFindMaxFromInvalidFiles(self):
    """Test function fails when searching max from list of invalid files."""
    mock_gs_list = [
        self.MockListResult(
            url=os.path.join(self.gs_url, 'Invalid-name-but-end-in-78.afdo'),
            creation_time=1.0)
    ]
    self.gsc_list.return_value = mock_gs_list
    with self.assertRaises(RuntimeError) as context:
      self.obj._FindLatestAFDOArtifact([self.gs_url],
                                       self.obj._RankValidBenchmarkProfiles)
    self.assertIn('No valid latest artifact was found', str(context.exception))


class PrepareForBuildHandlerTest(PrepareBundleTest):
  """Test PrepareForBuildHandler specific methods."""

  def setUp(self):
    self.artifact_type = 'Unspecified'
    self.input_artifacts = {}
    self.profile_info = {}
    self.gsc_exists = None
    self.orderfile_name = (
        'chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
        'benchmark-78.0.3893.0-r1.orderfile')
    self.afdo_name = 'chromeos-chrome-amd64-78.0.3893.0-r1.afdo'
    self.PatchObject(
        toolchain_util._CommonPrepareBundle,
        '_GetOrderfileName',
        return_value=self.orderfile_name)
    self.PatchObject(
        toolchain_util._CommonPrepareBundle,
        '_FindLatestOrderfileArtifact',
        return_value=self.orderfile_name + toolchain_util.XZ_COMPRESSION_SUFFIX)
    self.patch_ebuild = self.PatchObject(toolchain_util._CommonPrepareBundle,
                                         '_PatchEbuild')

  def SetUpPrepare(self, artifact_type, input_artifacts):
    """Set up to test _Prepare${artifactType}."""
    self.artifact_type = artifact_type
    self.input_artifacts = input_artifacts
    self.obj = toolchain_util.PrepareForBuildHandler(self.artifact_type,
                                                     self.chroot, self.sysroot,
                                                     self.board,
                                                     self.input_artifacts,
                                                     self.profile_info)
    self.obj._gs_context = self.gs_context
    self.PatchObject(self.obj, '_GetOrderfileName', return_value='orderfile')
    self.gsc_exists = self.PatchObject(
        self.gs_context, 'Exists', return_value=True)

  def testPrepareUnverifiedChromeLlvmOrderfileExists(self):
    """Test that PrepareUnverfiedChromeLlvmOrderfile works when POINTLESS."""
    self.SetUpPrepare(
        'UnverifiedChromeLlvmOrderfile',
        {'UnverifiedChromeLlvmOrderfile': ['gs://publish/location']})
    self.assertEqual(toolchain_util.PrepareForBuildReturn.POINTLESS,
                     self.obj.Prepare())
    self.gs_context.Exists.assert_called_once_with(
        'gs://publish/location/orderfile.xz')

  def testPrepareUnverifiedChromeLlvmOrderfileMissing(self):
    """Test that PrepareUnverfiedChromeLlvmOrderfile works when NEEDED."""
    self.SetUpPrepare(
        'UnverifiedChromeLlvmOrderfile',
        {'UnverifiedChromeLlvmOrderfile': ['gs://publish/location']})
    self.gsc_exists.return_value = False
    self.assertEqual(toolchain_util.PrepareForBuildReturn.NEEDED,
                     self.obj.Prepare())
    self.gs_context.Exists.assert_called_once_with(
        'gs://publish/location/orderfile.xz')

  def testPrepareVerifiedChromeLlvmOrderfileExists(self):
    """Test that PrepareVerfiedChromeLlvmOrderfile works when POINTLESS."""
    self.SetUpPrepare(
        'VerifiedChromeLlvmOrderfile', {
            'UnverifiedChromeLlvmOrderfile':
                ['gs://path/to/unvetted', 'gs://other/path/to/unvetted']
        })
    self.assertEqual(toolchain_util.PrepareForBuildReturn.POINTLESS,
                     self.obj.Prepare())
    self.gs_context.Exists.assert_called_once_with('gs://path/to/vetted/%s.xz' %
                                                   self.orderfile_name)
    # The ebuild is still updated.
    self.patch_ebuild.assert_called_once()

  def testPrepareVerifiedChromeLlvmOrderfileMissing(self):
    """Test that PrepareVerfiedChromeLlvmOrderfile works when NEEDED."""
    self.SetUpPrepare(
        'VerifiedChromeLlvmOrderfile', {
            'UnverifiedChromeLlvmOrderfile':
                ['gs://path/to/unvetted', 'gs://other/path/to/unvetted']
        })
    self.gsc_exists.return_value = False
    self.assertEqual(toolchain_util.PrepareForBuildReturn.NEEDED,
                     self.obj.Prepare())
    self.gs_context.Exists.assert_called_once_with('gs://path/to/vetted/%s.xz' %
                                                   self.orderfile_name)
    self.patch_ebuild.assert_called_once()

  def testPrepareUnverifiedChromeBenchmarkAfdoFile(self):
    self.SetUpPrepare(
        'UnverifiedChromeBenchmarkAfdoFile', {
            'UnverifiedChromeBenchmarkPerfFile': ['gs://path/to/perfdata'],
            'UnverifiedChromeBenchmarkAfdoFile': ['gs://path/to/unvetted'],
            'ChromeDebugBinary': ['gs://image-archive/path'],
        })
    # Published artifact is missing, debug binary is present, perf.data is
    # missing.
    self.gsc_exists.side_effect = (False, True, False)
    self.assertEqual(toolchain_util.PrepareForBuildReturn.NEEDED,
                     self.obj.Prepare())
    expected = [
        mock.call('gs://path/to/unvetted/'
                  'chromeos-chrome-amd64-78.0.3893.0_rc-r1.afdo.bz2'),
        mock.call('gs://image-archive/path/chrome.debug.bz2'),
        mock.call('gs://path/to/perfdata/'
                  'chromeos-chrome-amd64-78.0.3893.0.perf.data.bz2'),
    ]
    self.assertEqual(expected, self.gs_context.Exists.call_args_list)
    # There is no need to patch the ebuild.
    self.patch_ebuild.assert_not_called()


class BundleArtifactHandlerTest(PrepareBundleTest):
  """Test BundleArtifactHandler specific methods."""

  def setUp(self):

    def _Bundle(_self):
      osutils.WriteFile(os.path.join(_self.output_dir, 'artifact'), 'data\n')

    self.artifact_type = 'Unspecified'
    self.outdir = None
    self.afdo_tmp_path = None
    self.profile_info = {}
    self.orderfile_name = (
        'chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
        'benchmark-78.0.3893.0-r1.orderfile')
    self.afdo_name = 'chromeos-chrome-amd64-78.0.3893.0_rc-r1.afdo'
    self.perf_name = 'chromeos-chrome-amd64-78.0.3893.0.perf.data'
    self.merged_afdo_name = (
        'chromeos-chrome-amd64-78.0.3893.0_rc-r1-merged.afdo')

    self.gen_order = self.PatchObject(
        toolchain_util.GenerateChromeOrderfile, 'Bundle', new=_Bundle)
    self.PatchObject(
        toolchain_util._CommonPrepareBundle,
        '_GetArtifactVersionInEbuild',
        return_value=self.orderfile_name)
    self.PatchObject(
        toolchain_util, '_GetOrderfileName', return_value=self.orderfile_name)
    self.copy2 = self.PatchObject(shutil, 'copy2')

  def SetUpBundle(self, artifact_type):
    """Set up to test _Bundle${artifactType}."""
    self.artifact_type = artifact_type
    self.outdir = os.path.join(self.tempdir, 'tmp', 'output_dir')
    osutils.SafeMakedirs(self.outdir)
    self.afdo_tmp_path = '/tmp/benchmark-afdo-generate'
    osutils.SafeMakedirs(self.chroot.full_path(self.afdo_tmp_path))
    self.obj = toolchain_util.BundleArtifactHandler(self.artifact_type,
                                                    self.chroot, self.sysroot,
                                                    self.board, self.outdir,
                                                    self.profile_info)
    self.obj._gs_context = self.gs_context

  def testBundleUnverifiedChromeLlvmOrderfile(self):
    """Test that BundleUnverfiedChromeLlvmOrderfile works."""
    self.SetUpBundle('UnverifiedChromeLlvmOrderfile')
    artifact = os.path.join(self.outdir, 'artifact')
    self.assertEqual([artifact], self.obj.Bundle())
    self.copy2.assert_called_once_with(mock.ANY, artifact)

  def testBundleVerifiedChromeLlvmOrderfileExists(self):
    """Test that BundleVerfiedChromeLlvmOrderfile works."""
    self.SetUpBundle('VerifiedChromeLlvmOrderfile')
    artifact = os.path.join(self.outdir, '%s.xz' % self.orderfile_name)
    self.assertEqual([artifact], self.obj.Bundle())
    self.copy2.assert_called_once_with(
        os.path.join(self.chroot.path, 'build', self.board, 'opt/google/chrome',
                     '%s.xz' % self.orderfile_name), artifact)

  def testBundleChromeClangWarningsFile(self):
    """Test that BundleChromeClangWarningsFile works."""

    class mock_datetime(object):
      """Class for mocking datetime.datetime."""

      @staticmethod
      def strftime(_when, _fmt):
        return 'DATE'

      @staticmethod
      def now():
        return -1

    self.PatchObject(datetime, 'datetime', new=mock_datetime)
    self.SetUpBundle('ChromeClangWarningsFile')
    artifact = os.path.join(self.outdir,
                            '%s.DATE.clang_tidy_warnings.tar.xz' % self.board)
    self.assertEqual([artifact], self.obj.Bundle())
    self.copy2.assert_called_once_with(mock.ANY, artifact)

  def testBundleUnverifiedLlvmPgoFile(self):
    self.SetUpBundle('UnverifiedLlvmPgoFile')
    llvm_version = '10.0_pre377782_p20200113-r14'
    llvm_clang_sha = 'a21beccea2020f950845cbb68db663d0737e174c'
    llvm_cpv = portage_util.SplitCPV('sys-devel/llvm-%s' % llvm_version)
    self.PatchObject(
        self.obj,
        '_GetProfileNames',
        return_value=[
            self.chroot.full_path(self.sysroot, 'build', 'coverage_data',
                                  'sys-libs', 'libcxxabi', 'raw_profiles',
                                  'libcxxabi-10.0_pre3_1673101222_0.profraw')
        ])
    self.PatchObject(
        portage_util, 'FindPackageNameMatches', return_value=[llvm_cpv])
    self.rc.AddCmdResult(
        partial_mock.In('clang'),
        returncode=0,
        stdout=('Chromium OS %s clang version 10.0.0 (/path/to/'
                'llvm-project %s)\n' % (llvm_version, llvm_clang_sha)))
    base = '%s-%s' % (llvm_cpv.pv, llvm_clang_sha)
    artifacts = [
        os.path.join(self.outdir, x)
        for x in ('%s.llvm_metadata.json' % base, 'llvm_metadata.json',
                  '%s.llvm.profdata.tar.xz' % base)
    ]
    self.assertEqual(artifacts, self.obj.Bundle())

  def testBundleUnverifiedChromeBenchmarkPerfFile(self):
    self.SetUpBundle('UnverifiedChromeBenchmarkPerfFile')
    self.assertEqual([], self.obj.Bundle())

  def testBundleChromeDebugBinary(self):
    self.SetUpBundle('ChromeDebugBinary')
    bin_path = toolchain_util._CHROME_DEBUG_BIN % {
        'root': self.chroot.path,
        'sysroot': self.sysroot
    }
    osutils.WriteFile(bin_path, '', makedirs=True)
    output = os.path.join(
        self.outdir,
        os.path.basename(bin_path) + toolchain_util.BZ2_COMPRESSION_SUFFIX)
    self.assertEqual([output], self.obj.Bundle())

  def testBundleToolchainWarningLogs(self):
    self.SetUpBundle('ToolchainWarningLogs')
    path = '/tmp/fatal_clang_warnings'
    in_chroot_dirs = [path, '/build/%s%s' % (self.board, path)]
    check_dirs = [self.chroot.full_path(x) for x in in_chroot_dirs]
    for d in check_dirs:
      osutils.SafeMakedirs(d)
      for l in ['log1.json', 'log2.json', 'log3.notjson']:
        osutils.Touch(os.path.join(d, l))
    tarball = self.obj.Bundle()
    self.assertEqual(tarball,
                     [os.path.join(self.outdir, 'fatal_clang_warnings.tar.xz')])

    # Test internal function _CollectFatalClangWarnings, to make sure
    # the duplicate logs are renamed and all logs are captured.
    ret = self.obj._CollectFatalClangWarnings(self.outdir)
    self.assertCountEqual(
        ['log1.json', 'log2.json', 'log10.json', 'log20.json'], ret)

  @mock.patch.object(builtins, 'open')
  def testBundleUnverifiedChromeBenchmarkAfdoFile(self, mock_open):
    self.SetUpBundle('UnverifiedChromeBenchmarkAfdoFile')
    self.PatchObject(
        self.obj,
        '_GetEbuildInfo',
        return_value=toolchain_util._EbuildInfo(
            path=self.chrome_ebuild, CPV=self.chrome_CPV))
    run_command = self.PatchObject(cros_build_lib, 'run')
    sym_link_command = self.PatchObject(osutils, 'SafeSymlink')
    mock_file_obj = io.StringIO()
    mock_open.return_value = mock_file_obj

    ret = self.obj.Bundle()
    afdo_path = os.path.join(
        self.outdir, self.afdo_name + toolchain_util.BZ2_COMPRESSION_SUFFIX)
    self.assertEqual([afdo_path], ret)
    # Make sure the sym link to debug Chrome is created
    sym_link_command.assert_called_with(
        os.path.basename(toolchain_util._CHROME_DEBUG_BIN),
        self.chroot.full_path(
            os.path.join(self.afdo_tmp_path, 'chrome.unstripped')))
    afdo_path_inside = os.path.join(self.afdo_tmp_path, self.afdo_name)
    # Make sure commands are executed correctly
    mock_calls = [
        mock.call([
            toolchain_util._AFDO_GENERATE_LLVM_PROF,
            '--binary=' + os.path.join(self.afdo_tmp_path, 'chrome.unstripped'),
            '--profile=' + os.path.join(self.afdo_tmp_path, self.perf_name),
            '--out=' + afdo_path_inside,
            '--sample_threshold_frac=0',
        ],
                  enter_chroot=True,
                  print_cmd=True),
        mock.call(['bzip2', '-c', afdo_path_inside],
                  stdout=mock_file_obj,
                  enter_chroot=True,
                  print_cmd=True)
    ]
    run_command.assert_has_calls(mock_calls)

  def testBundleChromeAFDOProfileForAndroidLinuxFailWhenNoBenchmark(self):
    self.SetUpBundle('ChromeAFDOProfileForAndroidLinux')
    merge_function = self.PatchObject(self.obj,
                                      '_CreateAndUploadMergedAFDOProfile')
    with self.assertRaises(AssertionError) as context:
      self.obj.Bundle()
    self.assertIn('No new AFDO profile created', str(context.exception))
    merge_function.assert_not_called()

  @mock.patch.object(builtins, 'open')
  def testBundleChromeAFDOProfileForAndroidLinuxPass(self, mock_open):
    self.SetUpBundle('ChromeAFDOProfileForAndroidLinux')
    self.PatchObject(os.path, 'exists', return_value=True)
    run_command = self.PatchObject(cros_build_lib, 'run')
    merge_function = self.PatchObject(
        self.obj,
        '_CreateAndUploadMergedAFDOProfile',
        return_value=self.merged_afdo_name)
    mock_file_obj = io.StringIO()
    mock_open.return_value = mock_file_obj

    ret = self.obj.Bundle()
    merged_path = os.path.join(
        self.outdir,
        self.merged_afdo_name + toolchain_util.BZ2_COMPRESSION_SUFFIX)
    self.assertEqual([merged_path], ret)
    # Make sure merged function is called
    afdo_path_inside = os.path.join(self.afdo_tmp_path, self.afdo_name)
    merged_path_inside = os.path.join(self.afdo_tmp_path, self.merged_afdo_name)
    merge_function.assert_called_with(
        self.chroot.full_path(afdo_path_inside),
        self.chroot.full_path(os.path.join(self.afdo_tmp_path)))
    run_command.assert_called_with(
        ['bzip2', '-c', merged_path_inside],
        stdout=mock_file_obj,
        enter_chroot=True,
        print_cmd=True,
    )


class ReleaseChromeAFDOProfileTest(PrepareBundleTest):
  """Test functions related to create a release CrOS profile.

  Since these functions are similar to _UploadReleaseChromeAFDO() and
  related functions. These tests are also similar to
  UploadReleaseChromeAFDOTest, except the setup are within recipe
  environment.
  """

  def setUp(self):
    self.cwp_name = 'R77-3809.38-1562580965.afdo'
    self.cwp_full = self.cwp_name + toolchain_util.XZ_COMPRESSION_SUFFIX
    self.arch = 'silvermont'
    self.benchmark_name = 'chromeos-chrome-amd64-77.0.3849.0_rc-r1.afdo'
    self.benchmark_full = \
        self.benchmark_name + toolchain_util.BZ2_COMPRESSION_SUFFIX
    cwp_string = '%s-77-3809.38-1562580965' % self.arch
    benchmark_string = 'benchmark-77.0.3849.0-r1'
    self.merged_name = 'chromeos-chrome-amd64-%s-%s' % (cwp_string,
                                                        benchmark_string)
    self.redacted_name = self.merged_name + '-redacted.afdo'
    self.cwp_url = os.path.join(toolchain_util.CWP_AFDO_GS_URL, self.arch,
                                self.cwp_full)
    self.benchmark_url = os.path.join(toolchain_util.BENCHMARK_AFDO_GS_URL,
                                      self.benchmark_full)
    self.merge_inputs = [
        (os.path.join(self.tempdir,
                      self.cwp_name), toolchain_util.RELEASE_CWP_MERGE_WEIGHT),
        (os.path.join(self.tempdir, self.benchmark_name),
         toolchain_util.RELEASE_BENCHMARK_MERGE_WEIGHT),
    ]
    self.merge_output = os.path.join(self.tempdir, self.merged_name)

    self.gs_copy = self.PatchObject(self.gs_context, 'Copy')
    self.decompress = self.PatchObject(cros_build_lib, 'UncompressFile')
    self.run_command = self.PatchObject(cros_build_lib, 'run')

  def testMergeAFDOProfiles(self):
    self.obj._MergeAFDOProfiles(self.merge_inputs, self.merge_output)
    merge_command = [
        'llvm-profdata',
        'merge',
        '-sample',
        '-output=' + self.chroot.chroot_path(self.merge_output),
    ] + [
        '-weighted-input=%d,%s' % (weight, self.chroot.chroot_path(name))
        for name, weight in self.merge_inputs
    ]
    self.run_command.assert_called_once_with(
        merge_command, enter_chroot=True, print_cmd=True)

  def runProcessAFDOProfileOnce(self,
                                expected_commands,
                                input_path=None,
                                output_path=None,
                                *args,
                                **kwargs):
    if not input_path:
      input_path = os.path.join(self.tempdir, 'input.afdo')
    if not output_path:
      output_path = os.path.join(self.tempdir, 'output.afdo')
    self.obj._ProcessAFDOProfile(input_path, output_path, *args, **kwargs)

    self.run_command.assert_has_calls(expected_commands)

  def testProcessAFDOProfileForAndroidLinuxProfile(self):
    """Test call on _processAFDOProfile() for Android/Linux profiles."""
    input_path = os.path.join(self.tempdir, 'android.prof.afdo')
    input_path_inchroot = self.chroot.chroot_path(input_path)
    input_to_text = input_path_inchroot + '.text.temp'
    removed_temp = input_path_inchroot + '.removed.temp'
    reduced_temp = input_path_inchroot + '.reduced.tmp'
    reduce_functions = 70000
    output_path = os.path.join(self.tempdir, 'android.prof.output.afdo')
    expected_commands = [
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                '-text',
                input_path_inchroot,
                '-output',
                input_to_text,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'remove_indirect_calls',
                '--input=' + input_to_text,
                '--output=' + removed_temp,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'remove_cold_functions',
                '--input=' + removed_temp,
                '--output=' + reduced_temp,
                '--number=' + str(reduce_functions),
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                reduced_temp,
                '-output',
                self.chroot.chroot_path(output_path),
            ],
            enter_chroot=True,
            print_cmd=True,
        )
    ]

    self.runProcessAFDOProfileOnce(
        expected_commands,
        input_path=input_path,
        output_path=output_path,
        remove=True,
        reduce_functions=reduce_functions)

  @mock.patch.object(builtins, 'open')
  def testProcessAFDOProfileForChromeOSReleaseProfile(self, mock_open):
    """Test call on _processAFDOProfile() for CrOS release profiles."""
    input_path = os.path.join(self.tempdir, self.merged_name)
    input_path_inchroot = self.chroot.chroot_path(input_path)
    input_to_text = input_path_inchroot + '.text.temp'
    redacted_temp = input_path_inchroot + '.redacted.temp'
    redacted_temp_full = input_path + '.redacted.temp'
    removed_temp = input_path_inchroot + '.removed.temp'
    reduced_temp = input_path_inchroot + '.reduced.tmp'
    reduce_functions = 20000
    output_path = os.path.join(self.tempdir, self.redacted_name)
    mock_file_obj = io.StringIO()
    mock_open.return_value = mock_file_obj

    expected_commands = [
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                '-text',
                input_path_inchroot,
                '-output',
                input_to_text,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            ['redact_textual_afdo_profile'],
            input=mock_file_obj,
            stdout=redacted_temp_full,
            print_cmd=True,
            enter_chroot=True,
        ),
        mock.call(
            [
                'remove_indirect_calls',
                '--input=' + redacted_temp,
                '--output=' + removed_temp,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'remove_cold_functions',
                '--input=' + removed_temp,
                '--output=' + reduced_temp,
                '--number=' + str(reduce_functions),
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                reduced_temp,
                '-output',
                self.chroot.chroot_path(output_path),
                '-compbinary',
            ],
            enter_chroot=True,
            print_cmd=True,
        )
    ]
    self.runProcessAFDOProfileOnce(
        expected_commands,
        input_path=input_path,
        output_path=output_path,
        redact=True,
        remove=True,
        reduce_functions=reduce_functions,
        compbinary=True)

  def testCreateReleaseChromeAFDO(self):
    merged_call = self.PatchObject(self.obj, '_MergeAFDOProfiles')
    process_call = self.PatchObject(self.obj, '_ProcessAFDOProfile')
    ret = self.obj._CreateReleaseChromeAFDO(self.cwp_url, self.benchmark_url,
                                            self.tempdir, self.merged_name)

    self.assertEqual(ret, os.path.join(self.tempdir, self.redacted_name))
    self.gs_copy.assert_has_calls([
        mock.call(self.cwp_url, os.path.join(self.tempdir, self.cwp_full)),
        mock.call(self.benchmark_url,
                  os.path.join(self.tempdir, self.benchmark_full))
    ])

    # Check decompress files.
    decompress_calls = [
        mock.call(
            os.path.join(self.tempdir, self.cwp_full),
            os.path.join(self.tempdir, self.cwp_name)),
        mock.call(
            os.path.join(self.tempdir, self.benchmark_full),
            os.path.join(self.tempdir, self.benchmark_name))
    ]
    self.decompress.assert_has_calls(decompress_calls)

    # Check call to merge.
    merged_call.assert_called_once_with(
        self.merge_inputs,
        os.path.join(self.tempdir, self.merged_name),
    )

    # Check calls to redact.
    process_call.assert_called_once_with(
        self.merge_output,
        os.path.join(self.tempdir, self.redacted_name),
        redact=True,
        remove=True,
        reduce_functions=20000,
        compbinary=True,
    )


class CreateAndUploadMergedAFDOProfileTest(PrepBundLatestAFDOArtifactTest):
  """Test CreateAndUploadMergedAFDOProfile and related functions.

  These tests are mostly coming from cbuildbot/afdo_unittest.py, and are
  written to adapt to recipe functions. When legacy builders are removed,
  those tests can be safely preserved by this one.
  """

  @staticmethod
  def _benchmark_afdo_profile_name(major=0,
                                   minor=0,
                                   build=0,
                                   patch=0,
                                   rev=1,
                                   merged_suffix=False,
                                   compression_suffix=True):
    suffix = '-merged' if merged_suffix else ''
    result = 'chromeos-chrome-amd64-%d.%d.%d.%d_rc-r%d%s' % (
        major, minor, build, patch, rev, suffix)
    result += toolchain_util.AFDO_SUFFIX
    if compression_suffix:
      result += toolchain_util.BZ2_COMPRESSION_SUFFIX
    return result

  def setUp(self):
    self.benchmark_url = 'gs://path/to/unvetted'
    self.obj.input_artifacts = {
        'UnverifiedChromeBenchmarkAfdoFile': [self.benchmark_url],
    }
    self.obj.chroot = self.chroot
    self.output_dir = os.path.join(self.chroot.path, 'tmp', 'output_dir')
    osutils.SafeMakedirs(self.output_dir)
    self.output_dir_inchroot = self.chroot.chroot_path(self.output_dir)
    self.now = datetime.datetime.now()

  def runCreateAndUploadMergedAFDOProfileOnce(self, **kwargs):
    if 'unmerged_name' not in kwargs:
      # Match everything.
      kwargs['unmerged_name'] = self._benchmark_afdo_profile_name(
          major=9999, compression_suffix=False)

    if 'output_dir' not in kwargs:
      kwargs['output_dir'] = self.output_dir
    Mocks = collections.namedtuple('Mocks', [
        'gs_context',
        'find_artifact',
        'run_command',
        'uncompress_file',
        'compress_file',
        'process_afdo_profile',
    ])

    def MockList(*_args, **_kwargs):
      files = [
          self._benchmark_afdo_profile_name(major=10, build=9),
          self._benchmark_afdo_profile_name(major=10, build=10),
          self._benchmark_afdo_profile_name(
              major=10, build=10, merged_suffix=True),
          self._benchmark_afdo_profile_name(major=10, build=11),
          self._benchmark_afdo_profile_name(major=10, build=12),
          self._benchmark_afdo_profile_name(major=10, build=13),
          self._benchmark_afdo_profile_name(
              major=10, build=13, merged_suffix=True),
          self._benchmark_afdo_profile_name(major=10, build=13, patch=1),
          self._benchmark_afdo_profile_name(major=10, build=13, patch=2),
          self._benchmark_afdo_profile_name(
              major=10, build=13, patch=2, merged_suffix=True),
          self._benchmark_afdo_profile_name(major=11, build=14),
          self._benchmark_afdo_profile_name(
              major=11, build=14, merged_suffix=True),
          self._benchmark_afdo_profile_name(major=11, build=15),
      ]

      results = []
      for i, name in enumerate(files):
        url = os.path.join(self.benchmark_url, name)
        now = self.now - datetime.timedelta(days=len(files) - i)
        results.append(self.MockListResult(url=url, creation_time=now))
      return results

    self.gs_context.List = MockList
    run_command = self.PatchObject(cros_build_lib, 'run')
    uncompress_file = self.PatchObject(cros_build_lib, 'UncompressFile')
    compress_file = self.PatchObject(cros_build_lib, 'CompressFile')
    process_afdo_profile = self.PatchObject(self.obj, '_ProcessAFDOProfile')
    unmerged_profile = os.path.join(self.output_dir,
                                    kwargs.pop('unmerged_name'))
    osutils.Touch(unmerged_profile)
    kwargs['unmerged_profile'] = unmerged_profile

    merged_name = self.obj._CreateAndUploadMergedAFDOProfile(**kwargs)
    return merged_name, Mocks(
        gs_context=self.gs_context,
        find_artifact=MockList,
        run_command=run_command,
        uncompress_file=uncompress_file,
        compress_file=compress_file,
        process_afdo_profile=process_afdo_profile,
    )

  def testCreateAndUploadMergedAFDOProfileErrorWhenProfileInBucket(self):
    unmerged_name = self._benchmark_afdo_profile_name(major=10, build=13)
    merged_name = None
    with self.assertRaises(AssertionError):
      merged_name, _ = self.runCreateAndUploadMergedAFDOProfileOnce(
          unmerged_name=unmerged_name)
    self.assertIsNone(merged_name)

  def testCreateAndUploadMergedAFDOProfileMergesBranchProfiles(self):
    unmerged_name = self._benchmark_afdo_profile_name(
        major=10, build=13, patch=99, compression_suffix=False)

    merged_name, mocks = self.runCreateAndUploadMergedAFDOProfileOnce(
        unmerged_name=unmerged_name)
    self.assertIsNotNone(merged_name)

    def _afdo_name(major, build, patch=0, merged_suffix=False):
      return self._benchmark_afdo_profile_name(
          major=major,
          build=build,
          patch=patch,
          merged_suffix=merged_suffix,
          compression_suffix=False)

    expected_unordered_args = [
        '-output=' + os.path.join(
            self.output_dir_inchroot, 'raw-' +
            _afdo_name(major=10, build=13, patch=99, merged_suffix=True))
    ] + [
        '-weighted-input=1,' + os.path.join(self.output_dir_inchroot, s)
        for s in [
            _afdo_name(major=10, build=12),
            _afdo_name(major=10, build=13),
            _afdo_name(major=10, build=13, patch=1),
            _afdo_name(major=10, build=13, patch=2),
            _afdo_name(major=10, build=13, patch=99),
        ]
    ]

    # Note that these should all be in-chroot names.
    expected_ordered_args = ['llvm-profdata', 'merge', '-sample']

    args = mocks.run_command.call_args[0][0]
    ordered_args = args[:len(expected_ordered_args)]
    self.assertEqual(ordered_args, expected_ordered_args)

    unordered_args = args[len(expected_ordered_args):]

    self.assertCountEqual(unordered_args, expected_unordered_args)
    self.assertEqual(mocks.gs_context.Copy.call_count, 4)

  def testCreateAndUploadMergedAFDOProfileRemovesIndirectCallTargets(self):
    unmerged_name = self._benchmark_afdo_profile_name(
        major=10, build=13, patch=99, compression_suffix=False)

    merged_name, mocks = \
        self.runCreateAndUploadMergedAFDOProfileOnce(
            recent_to_merge=2,
            unmerged_name=unmerged_name)
    self.assertIsNotNone(merged_name)

    def _afdo_name(major, build, patch=0, merged_suffix=False):
      return self._benchmark_afdo_profile_name(
          major=major,
          build=build,
          patch=patch,
          merged_suffix=merged_suffix,
          compression_suffix=False)

    merge_output_name = 'raw-' + _afdo_name(
        major=10, build=13, patch=99, merged_suffix=True)
    self.assertNotEqual(merged_name, merge_output_name)

    expected_unordered_args = [
        '-output=' + os.path.join(self.output_dir_inchroot, merge_output_name),
        '-weighted-input=1,' + os.path.join(
            self.output_dir_inchroot, _afdo_name(major=10, build=13, patch=2)),
        '-weighted-input=1,' + os.path.join(
            self.output_dir_inchroot, _afdo_name(major=10, build=13, patch=99)),
    ]

    # Note that these should all be in-chroot names.
    expected_ordered_args = ['llvm-profdata', 'merge', '-sample']
    args = mocks.run_command.call_args[0][0]
    ordered_args = args[:len(expected_ordered_args)]
    self.assertEqual(ordered_args, expected_ordered_args)

    unordered_args = args[len(expected_ordered_args):]
    self.assertCountEqual(unordered_args, expected_unordered_args)

    mocks.process_afdo_profile.assert_called_once_with(
        os.path.join(self.output_dir, merge_output_name),
        os.path.join(self.output_dir, merged_name),
        redact=False,
        remove=True,
        reduce_functions=70000,
        compbinary=False,
    )

  def testCreateAndUploadMergedAFDOProfileWorksInTheHappyCase(self):
    merged_name, mocks = \
        self.runCreateAndUploadMergedAFDOProfileOnce()
    self.assertIsNotNone(merged_name)

    # Note that we always return the *basename*
    self.assertEqual(
        merged_name,
        self._benchmark_afdo_profile_name(
            major=9999, merged_suffix=True, compression_suffix=False))

    mocks.run_command.assert_called_once()

    # Note that these should all be in-chroot names.
    expected_ordered_args = ['llvm-profdata', 'merge', '-sample']

    def _afdo_name(major, build=0, patch=0, merged_suffix=False):
      return self._benchmark_afdo_profile_name(
          major=major,
          build=build,
          patch=patch,
          merged_suffix=merged_suffix,
          compression_suffix=False)

    input_afdo_names = [
        _afdo_name(major=10, build=13, patch=1),
        _afdo_name(major=10, build=13, patch=2),
        _afdo_name(major=11, build=14),
        _afdo_name(major=11, build=15),
        _afdo_name(major=9999),
    ]

    output_afdo_name = _afdo_name(major=9999, merged_suffix=True)
    expected_unordered_args = [
        '-output=' +
        os.path.join(self.output_dir_inchroot, 'raw-' + output_afdo_name)
    ] + [
        '-weighted-input=1,' + os.path.join(self.output_dir_inchroot, n)
        for n in input_afdo_names
    ]

    args = mocks.run_command.call_args[0][0]
    ordered_args = args[:len(expected_ordered_args)]
    self.assertEqual(ordered_args, expected_ordered_args)

    unordered_args = args[len(expected_ordered_args):]
    self.assertCountEqual(unordered_args, expected_unordered_args)
    self.assertEqual(mocks.gs_context.Copy.call_count, 4)
    self.assertEqual(mocks.uncompress_file.call_count, 4)

    def call_for(name):
      basis = os.path.join(self.output_dir, name)
      return mock.call(basis + toolchain_util.BZ2_COMPRESSION_SUFFIX, basis)

    # The last profile is not compressed, so no need to uncompress it
    mocks.uncompress_file.assert_has_calls(
        any_order=True, calls=[call_for(n) for n in input_afdo_names[:-1]])

  def testMergeIsOKIfWeFindFewerProfilesThanWeWant(self):
    merged_name, mocks = \
        self.runCreateAndUploadMergedAFDOProfileOnce(recent_to_merge=1000,
                                                     max_age_days=1000)
    self.assertIsNotNone(merged_name)
    self.assertEqual(mocks.gs_context.Copy.call_count, 9)

  def testNoFilesAfterUnmergedNameAreIncluded(self):
    max_name = self._benchmark_afdo_profile_name(
        major=10, build=11, patch=2, compression_suffix=False)
    merged_name, mocks = \
        self.runCreateAndUploadMergedAFDOProfileOnce(unmerged_name=max_name)
    self.assertIsNotNone(merged_name)

    self.assertEqual(
        self._benchmark_afdo_profile_name(
            major=10,
            build=11,
            patch=2,
            merged_suffix=True,
            compression_suffix=False), merged_name)

    def _afdo_name(major, build, patch=0, merged_suffix=False):
      return self._benchmark_afdo_profile_name(
          major=major,
          build=build,
          patch=patch,
          merged_suffix=merged_suffix,
          compression_suffix=False)

    # Note that these should all be in-chroot names.
    expected_ordered_args = ['llvm-profdata', 'merge', '-sample']
    expected_unordered_args = [
        '-output=' + os.path.join(
            self.output_dir_inchroot, 'raw-' +
            _afdo_name(major=10, build=11, patch=2, merged_suffix=True)),
    ] + [
        '-weighted-input=1,' + os.path.join(self.output_dir_inchroot, s)
        for s in [
            _afdo_name(major=10, build=9),
            _afdo_name(major=10, build=10),
            _afdo_name(major=10, build=11),
            _afdo_name(major=10, build=11, patch=2),
        ]
    ]

    args = mocks.run_command.call_args[0][0]
    ordered_args = args[:len(expected_ordered_args)]
    self.assertEqual(ordered_args, expected_ordered_args)

    unordered_args = args[len(expected_ordered_args):]
    self.assertCountEqual(unordered_args, expected_unordered_args)

    self.assertEqual(mocks.gs_context.Copy.call_count, 3)
    self.assertEqual(mocks.uncompress_file.call_count, 3)

  def testMergeDoesntHappenIfNoProfilesAreMerged(self):
    runs = [
        self.runCreateAndUploadMergedAFDOProfileOnce(recent_to_merge=1),
        self.runCreateAndUploadMergedAFDOProfileOnce(max_age_days=0),
    ]

    for merged_name, mocks in runs:
      self.assertIsNone(merged_name)
      self.gs_context.Copy.assert_not_called()
      mocks.run_command.assert_not_called()
      mocks.uncompress_file.assert_not_called()
      mocks.compress_file.assert_not_called()


class FindEbuildPathTest(cros_test_lib.MockTempDirTestCase):
  """Test top-level function _FindEbuildPath()."""

  def setUp(self):
    self.board = 'lulu'
    self.chrome_package = 'chromeos-chrome'
    self.kernel_package = 'chromeos-kernel-3_14'
    self.chrome_ebuild = \
        '/mnt/host/source/src/path/to/chromeos-chrome-1.0.ebuild'
    mock_result = cros_build_lib.CommandResult(output=self.chrome_ebuild)
    self.mock_command = self.PatchObject(
        cros_build_lib, 'run', return_value=mock_result)

  # pylint: disable=protected-access
  def testInvalidPackage(self):
    """Test invalid package name."""
    with self.assertRaises(ValueError) as context:
      toolchain_util._FindEbuildPath('some-invalid-package')
    self.assertIn('Invalid package name', str(context.exception))
    self.mock_command.assert_not_called()

  def testChromePackagePass(self):
    """Test finding chrome ebuild work."""
    ebuild_file = toolchain_util._FindEbuildPath(self.chrome_package)
    cmd = ['equery', 'w', self.chrome_package]
    self.mock_command.assert_called_with(
        cmd, enter_chroot=True, stdout=True, encoding='utf-8')
    self.assertEqual(ebuild_file, self.chrome_ebuild)

  def testKernelPackagePass(self):
    """Test finding kernel ebuild work."""
    ebuild_path = \
        '/mnt/host/source/src/path/to/chromeos-kernel-3_14-3.14-r1.ebuild'
    mock_result = cros_build_lib.CommandResult(output=ebuild_path)
    mock_command = self.PatchObject(
        cros_build_lib, 'run', return_value=mock_result)
    ebuild_file = toolchain_util._FindEbuildPath(self.kernel_package)
    cmd = ['equery', 'w', self.kernel_package]
    mock_command.assert_called_with(
        cmd, enter_chroot=True, stdout=True, encoding='utf-8')
    self.assertEqual(ebuild_file, ebuild_path)

  def testPassWithBoardName(self):
    """Test working with a board name."""
    ebuild_file = toolchain_util._FindEbuildPath(
        self.chrome_package, board='board')
    cmd = ['equery-board', 'w', self.chrome_package]
    self.mock_command.assert_called_with(
        cmd, enter_chroot=True, stdout=True, encoding='utf-8')
    self.assertEqual(ebuild_file, self.chrome_ebuild)

  def testReturnPathOutsideChroot(self):
    """Test returning correct path outside chroot."""
    ebuild_file = toolchain_util._FindEbuildPath(
        self.chrome_package, buildroot='/path/to/buildroot')
    self.assertEqual(
        ebuild_file,
        '/path/to/buildroot/src/path/to/chromeos-chrome-1.0.ebuild')


class LatestAFDOArtifactTest(cros_test_lib.RunCommandTempDirTestCase):
  """Test related function to compare freshness of AFDO artifacts."""

  # pylint: disable=protected-access
  def setUp(self):
    self.board = 'board'
    self.gs_url = 'gs://path/to/any_gs_url'
    self.current_branch = '78'
    self.current_arch = 'airmont'
    self.MockListResult = collections.namedtuple('MockListResult',
                                                 ('url', 'creation_time'))
    files_in_gs_bucket = [
        # Benchmark profiles
        ('chromeos-chrome-amd64-78.0.3893.0_rc-r1.afdo.bz2', 2.0),
        ('chromeos-chrome-amd64-78.0.3896.0_rc-r1.afdo.bz2', 1.0),  # Latest
        ('chromeos-chrome-amd64-78.0.3897.0_rc-r1-merged.afdo.bz2', 3.0),
        # CWP profiles
        ('R78-3869.38-1562580965.afdo.xz', 2.1),
        ('R78-3866.0-1570000000.afdo.xz', 1.1),  # Latest
        ('R77-3811.0-1580000000.afdo.xz', 3.1),
        # Kernel profiles
        ('R76-3869.38-1562580965.gcov.xz', 1.3),
        ('R76-3866.0-1570000000.gcov.xz', 2.3),  # Latest
        # Orderfiles
        ('chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
         'benchmark-78.0.3893.0-r1.orderfile.xz', 1.2),  # Latest
        ('chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
         'benchmark-78.0.3850.0-r1.orderfile.xz', 2.2),
    ]

    self.gs_list = [
        self.MockListResult(url=os.path.join(self.gs_url, x), creation_time=y)
        for x, y in files_in_gs_bucket
    ]
    self.PatchObject(gs.GSContext, 'List', return_value=self.gs_list)
    self.PatchObject(
        toolchain_util,
        '_FindCurrentChromeBranch',
        return_value=self.current_branch)

  def testFindCurrentChromeBranch(self):
    """Test _FindCurrentChromeBranch() works correctly."""
    chrome_name = 'chromeos-chrome-78.0.3893.0_rc-r1.ebuild'
    self.PatchObject(
        toolchain_util,
        '_FindEbuildPath',
        return_value=os.path.join('/path/to', chrome_name))
    ret = toolchain_util._FindCurrentChromeBranch()
    self.assertEqual(ret, self.current_branch)

  def testFindLatestAFDOArtifactPassWithBenchmarkAFDO(self):
    """Test _FindLatestAFDOArtifact returns latest benchmark AFDO."""
    latest_afdo = toolchain_util._FindLatestAFDOArtifact(
        self.gs_url, toolchain_util._RankValidBenchmarkProfiles)
    self.assertEqual(latest_afdo,
                     'chromeos-chrome-amd64-78.0.3896.0_rc-r1.afdo.bz2')

  def testFindLatestAFDOArtifactPassWithCWPAFDO(self):
    """Test _FindLatestAFDOArtifact return latest cwp AFDO."""
    latest_afdo = toolchain_util._FindLatestAFDOArtifact(
        self.gs_url, toolchain_util._RankValidCWPProfiles)
    self.assertEqual(latest_afdo, 'R78-3866.0-1570000000.afdo.xz')

  def testFindLatestAFDOArtifactPassWithKernelAFDO(self):
    """Test _FindLatestAFDOArtifact return latest kernel AFDO."""
    self.PatchObject(
        toolchain_util, '_FindCurrentChromeBranch', return_value='76')
    latest_afdo = toolchain_util._FindLatestAFDOArtifact(
        self.gs_url, toolchain_util._RankValidCWPProfiles)
    self.assertEqual(latest_afdo, 'R76-3866.0-1570000000.gcov.xz')

  def testFindLatestAFDOArtifactPassWithOrderfile(self):
    """Test _FindLatestAFDOArtifact return latest orderfile."""
    latest_orderfile = toolchain_util._FindLatestAFDOArtifact(
        self.gs_url, toolchain_util._RankValidOrderfiles)
    self.assertEqual(
        latest_orderfile,
        'chromeos-chrome-orderfile-field-78-3877.0-1567418235-'
        'benchmark-78.0.3893.0-r1.orderfile.xz')

  def testFindLatestAFDOArtifactPassOnLastBranch(self):
    """Test returns latest file on last branch when current has none."""
    self.PatchObject(
        toolchain_util, '_FindCurrentChromeBranch', return_value='79')
    self.testFindLatestAFDOArtifactPassWithBenchmarkAFDO()

  def testFindLatestAFDOArtifactFailToFindAnyFiles(self):
    """Test function fails when no files on current branch."""
    self.PatchObject(
        toolchain_util, '_FindCurrentChromeBranch', return_value='80')
    with self.assertRaises(RuntimeError) as context:
      self.testFindLatestAFDOArtifactPassWithBenchmarkAFDO()

    self.assertEqual('No files found on %s for branch 80' % self.gs_url,
                     str(context.exception))

  def testFindLatestAFDOArtifactsFindMaxFromInvalidFiles(self):
    """Test function fails when searching max from list of invalid files."""
    mock_gs_list = [
        self.MockListResult(
            url=os.path.join(self.gs_url, 'Invalid-name-but-end-in-78.afdo'),
            creation_time=1.0)
    ]
    self.PatchObject(gs.GSContext, 'List', return_value=mock_gs_list)
    with self.assertRaises(RuntimeError) as context:
      toolchain_util._FindLatestAFDOArtifact(
          self.gs_url, toolchain_util._RankValidBenchmarkProfiles)
    self.assertIn('No valid latest artifact was found', str(context.exception))


class UploadAFDOArtifactToGSBucketTest(gs_unittest.AbstractGSContextTest):
  """Test top-level function _UploadAFDOArtifactToGSBucket."""

  # pylint: disable=protected-access
  def setUp(self):
    self.gs_url = 'gs://some/random/gs/url'
    self.local_path = '/path/to/file'
    self.rename = 'new_file_name'
    self.url_without_renaming = os.path.join(self.gs_url, 'file')
    self.url_with_renaming = os.path.join(self.gs_url, 'new_file_name')
    self.mock_copy = self.PatchObject(gs.GSContext, 'Copy')

  def testFileToUploadNotExistTriggerException(self):
    """Test the file to upload doesn't exist in the local path."""
    with self.assertRaises(RuntimeError) as context:
      toolchain_util._UploadAFDOArtifactToGSBucket(self.gs_url, self.local_path)
    self.assertIn('to upload does not exist', str(context.exception))

  def testFileToUploadAlreadyInBucketSkipsException(self):
    """Test uploading a file that already exists in the bucket."""
    self.PatchObject(os.path, 'exists', return_value=True)
    mock_exist = self.PatchObject(gs.GSContext, 'Exists', return_value=True)
    toolchain_util._UploadAFDOArtifactToGSBucket(self.gs_url, self.local_path)
    mock_exist.assert_called_once_with(self.url_without_renaming)
    self.mock_copy.assert_not_called()

  def testFileUploadSuccessWithoutRenaming(self):
    """Test successfully upload a file without renaming."""
    self.PatchObject(os.path, 'exists', return_value=True)
    self.PatchObject(gs.GSContext, 'Exists', return_value=False)
    toolchain_util._UploadAFDOArtifactToGSBucket(self.gs_url, self.local_path)
    self.mock_copy.assert_called_once_with(
        self.local_path, self.url_without_renaming, acl='public-read')

  def testFileUploadSuccessWithRenaming(self):
    """Test successfully upload a file with renaming."""
    self.PatchObject(os.path, 'exists', return_value=True)
    self.PatchObject(gs.GSContext, 'Exists', return_value=False)
    toolchain_util._UploadAFDOArtifactToGSBucket(self.gs_url, self.local_path,
                                                 self.rename)
    self.mock_copy.assert_called_once_with(
        self.local_path, self.url_with_renaming, acl='public-read')


class GenerateChromeOrderfileTest(cros_test_lib.MockTempDirTestCase):
  """Test GenerateChromeOrderfile class."""

  # pylint: disable=protected-access
  def setUp(self):
    self.board = 'board'
    self.out_dir = os.path.join(self.tempdir, 'outdir')
    osutils.SafeMakedirs(self.out_dir)
    self.chroot_dir = os.path.join(self.tempdir, 'chroot')
    self.working_dir = os.path.join(self.chroot_dir, 'tmp')
    osutils.SafeMakedirs(self.working_dir)
    self.working_dir_inchroot = '/tmp'
    self.chroot_args = []
    self.orderfile_name = 'chromeos-chrome-orderfile-1.0'
    self.PatchObject(
        toolchain_util, '_GetOrderfileName', return_value=self.orderfile_name)
    self.test_obj = toolchain_util.GenerateChromeOrderfile(
        self.board, self.out_dir, '/path/to/chrome_root', self.chroot_dir,
        self.chroot_args)

  def testCheckArgumentsFail(self):
    """Test arguments checking fails without files existing."""
    with self.assertRaises(
        toolchain_util.GenerateChromeOrderfileError) as context:
      self.test_obj._CheckArguments()

    self.assertIn('Chrome binary does not exist at', str(context.exception))

  def testGenerateChromeNM(self):
    """Test generating chrome NM is handled correctly."""
    chrome_binary = self.test_obj.CHROME_BINARY_PATH.replace(
        '${BOARD}', self.board)
    cmd = ['llvm-nm', '-n', chrome_binary]
    output = os.path.join(self.working_dir, self.orderfile_name + '.nm')
    self.test_obj.tempdir = self.tempdir
    self.PatchObject(cros_build_lib, 'run')
    self.test_obj._GenerateChromeNM()
    cros_build_lib.run.assert_called_with(
        cmd, stdout=output, enter_chroot=True, chroot_args=self.chroot_args)

  def testPostProcessOrderfile(self):
    """Test post-processing orderfile is handled correctly."""
    chrome_nm = os.path.join(self.working_dir_inchroot,
                             self.orderfile_name + '.nm')
    input_orderfile = self.test_obj.INPUT_ORDERFILE_PATH.replace(
        '${BOARD}', self.board)
    output = os.path.join(self.working_dir_inchroot,
                          self.orderfile_name + '.orderfile')
    self.PatchObject(cros_build_lib, 'run')
    self.test_obj._PostProcessOrderfile(chrome_nm)
    cmd = [
        self.test_obj.PROCESS_SCRIPT, '--chrome', chrome_nm, '--input',
        input_orderfile, '--output', output
    ]
    cros_build_lib.run.assert_called_with(
        cmd, enter_chroot=True, chroot_args=self.chroot_args)

  def testSuccessRun(self):
    """Test the main function is running successfully."""
    # Patch the two functions that generate artifacts from inputs that are
    # non-existent without actually building Chrome
    chrome_nm = os.path.join(self.working_dir, self.orderfile_name + '.nm')
    with open(chrome_nm, 'w') as f:
      print('Write something in the nm file', file=f)
    self.PatchObject(
        toolchain_util.GenerateChromeOrderfile,
        '_GenerateChromeNM',
        return_value=chrome_nm)
    chrome_orderfile = os.path.join(self.working_dir,
                                    self.orderfile_name + '.orderfile')
    with open(chrome_orderfile, 'w') as f:
      print('Write something in the orderfile', file=f)
    self.PatchObject(
        toolchain_util.GenerateChromeOrderfile,
        '_PostProcessOrderfile',
        return_value=chrome_orderfile)

    self.PatchObject(toolchain_util.GenerateChromeOrderfile, '_CheckArguments')
    mock_upload = self.PatchObject(toolchain_util,
                                   '_UploadAFDOArtifactToGSBucket')

    self.test_obj.Perform()

    # Make sure the tarballs are inside the output directory
    output_files = os.listdir(self.out_dir)
    self.assertIn(
        self.orderfile_name + '.nm' + toolchain_util.XZ_COMPRESSION_SUFFIX,
        output_files)
    self.assertIn(
        self.orderfile_name + '.orderfile' +
        toolchain_util.XZ_COMPRESSION_SUFFIX, output_files)
    self.assertEqual(mock_upload.call_count, 2)


class UpdateEbuildWithAFDOArtifactsTest(cros_test_lib.MockTempDirTestCase):
  """Test UpdateEbuildWithAFDOArtifacts class."""

  # pylint: disable=protected-access
  def setUp(self):
    self.board = 'board'
    self.package = 'valid-package'
    self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=True)
    self.variable_name = 'VARIABLE_NAME'
    self.variable_value = 'new-afdo-artifact-1.1'
    self.test_obj = toolchain_util.UpdateEbuildWithAFDOArtifacts(
        self.board, self.package, {self.variable_name: self.variable_value})

  def testPatchEbuildFailWithoutMarkers(self):
    """Test _PatchEbuild() fail if the ebuild has no valid markers."""
    ebuild_file = os.path.join(self.tempdir, self.package + '.ebuild')
    osutils.Touch(ebuild_file)
    with self.assertRaises(
        toolchain_util.UpdateEbuildWithAFDOArtifactsError) as context:
      self.test_obj._PatchEbuild(ebuild_file)

    self.assertEqual(
        'Ebuild file does not have appropriate marker for AFDO/orderfile.',
        str(context.exception))

  def testPatchEbuildWithOneRule(self):
    """Test _PatchEbuild() works with only one rule to replace."""
    ebuild_file = os.path.join(self.tempdir, self.package + '.ebuild')
    ebuild_file_content = '\n'.join([
        'Some message before',
        '%s="old-afdo-artifact-1.0"' % self.variable_name, 'Some message after'
    ])
    osutils.WriteFile(ebuild_file, ebuild_file_content)

    self.test_obj._PatchEbuild(ebuild_file)

    # Make sure temporary file is removed
    self.assertNotIn(self.package + '.ebuild.new', os.listdir(self.tempdir))

    # Make sure the artifact is updated
    pattern = re.compile(toolchain_util.AFDO_ARTIFACT_EBUILD_REGEX %
                         self.variable_name)
    found = False
    with open(ebuild_file) as f:
      for line in f:
        matched = pattern.match(line)
        if matched:
          found = True
          self.assertEqual(matched.group('name')[1:-1], self.variable_value)

    self.assertTrue(found)

  def testPatchEbuildWithMultipleRulesPass(self):
    """Test _PatchEbuild() works with multiple rules to replace."""
    ebuild_file = os.path.join(self.tempdir, self.package + '.ebuild')
    another_variable_name = 'VARIABLE_NAME2'
    another_variable_value = 'another-new-afdo-artifact-2.0'
    ebuild_file_content = '\n'.join([
        'Some message before',
        '%s="old-afdo-artifact-1.0"' % self.variable_name,
        '%s="another-old-afdo-artifact-1.0"' % another_variable_name,
        'Some message after'
    ])
    osutils.WriteFile(ebuild_file, ebuild_file_content)

    test_obj = toolchain_util.UpdateEbuildWithAFDOArtifacts(
        self.board, self.package, {
            self.variable_name: self.variable_value,
            another_variable_name: another_variable_value
        })

    test_obj._PatchEbuild(ebuild_file)

    # Make sure all patterns are updated.
    patterns = [
        re.compile(toolchain_util.AFDO_ARTIFACT_EBUILD_REGEX %
                   self.variable_name),
        re.compile(toolchain_util.AFDO_ARTIFACT_EBUILD_REGEX %
                   another_variable_name)
    ]
    values = [self.variable_value, another_variable_value]

    found = 0
    with open(ebuild_file) as f:
      for line in f:
        for p in patterns:
          matched = p.match(line)
          if matched:
            found += 1
            self.assertEqual(
                matched.group('name')[1:-1], values[patterns.index(p)])
            break

    self.assertEqual(found, len(patterns))

  def testPatchEbuildWithMultipleRulesFail(self):
    """Test _PatchEbuild() fails when one marker not found in rules."""
    ebuild_file = os.path.join(self.tempdir, self.package + '.ebuild')
    ebuild_file_content = '\n'.join([
        'Some message before',
        '%s="old-afdo-artifact-1.0"' % self.variable_name, 'Some message after'
    ])
    osutils.WriteFile(ebuild_file, ebuild_file_content)

    test_obj = toolchain_util.UpdateEbuildWithAFDOArtifacts(
        self.board, self.package, {
            self.variable_name: self.variable_value,
            'another_variable_name': 'another_variable_value'
        })

    with self.assertRaises(
        toolchain_util.UpdateEbuildWithAFDOArtifactsError) as context:
      test_obj._PatchEbuild(ebuild_file)
    self.assertEqual(
        'Ebuild file does not have appropriate marker for AFDO/orderfile.',
        str(context.exception))

  def testUpdateManifest(self):
    """Test _UpdateManifest() works properly."""
    ebuild_file = os.path.join(self.tempdir, self.package + '.ebuild')
    cmd = ['ebuild-%s' % self.board, ebuild_file, 'manifest', '--force']
    self.PatchObject(cros_build_lib, 'run')
    self.test_obj._UpdateManifest(ebuild_file)
    cros_build_lib.run.assert_called_with(cmd, enter_chroot=True)


class CheckAFDOArtifactExistsTest(cros_test_lib.RunCommandTempDirTestCase):
  """Test CheckAFDOArtifactExists command."""

  def setUp(self):
    self.orderfile_name = 'any_orderfile_name'
    self.afdo_name = 'any_name.afdo'
    self.PatchObject(
        toolchain_util, '_FindCurrentChromeBranch', return_value='78')

  def _CheckExistCall(self, target, url_to_check, board='board'):
    """Helper function to check the Exists() call on a url."""
    for exists in [False, True]:
      mock_exist = self.PatchObject(gs.GSContext, 'Exists', return_value=exists)
      ret = toolchain_util.CheckAFDOArtifactExists(
          buildroot='buildroot',
          chrome_root='chrome_root',
          target=target,
          board=board)
      self.assertEqual(exists, ret)
      mock_exist.assert_called_once_with(url_to_check)

  def testOrderfileGenerateAsTarget(self):
    """Test check orderfile for generation work properly."""
    self.PatchObject(
        toolchain_util, '_GetOrderfileName', return_value=self.orderfile_name)
    self._CheckExistCall(
        'orderfile_generate',
        os.path.join(
            toolchain_util.ORDERFILE_GS_URL_UNVETTED, self.orderfile_name +
            '.orderfile' + toolchain_util.XZ_COMPRESSION_SUFFIX))

  def testOrderfileVerifyAsTarget(self):
    """Test check orderfile for verification work properly."""
    self.PatchObject(
        toolchain_util,
        '_FindLatestAFDOArtifact',
        return_value=self.orderfile_name)
    self._CheckExistCall(
        'orderfile_verify',
        os.path.join(toolchain_util.ORDERFILE_GS_URL_VETTED,
                     self.orderfile_name))

  def testBenchmarkAFDOAsTarget(self):
    """Test check benchmark AFDO generation work properly."""
    self.PatchObject(
        toolchain_util, '_GetBenchmarkAFDOName', return_value=self.afdo_name)
    self._CheckExistCall(
        'benchmark_afdo',
        os.path.join(toolchain_util.BENCHMARK_AFDO_GS_URL,
                     self.afdo_name + toolchain_util.BZ2_COMPRESSION_SUFFIX))

  def testKernelAFDOAsTarget(self):
    """Test check kernel AFDO verification work properly."""
    self.PatchObject(
        toolchain_util, '_FindLatestAFDOArtifact', return_value=self.afdo_name)
    self._CheckExistCall(
        'kernel_afdo',
        os.path.join(toolchain_util.KERNEL_AFDO_GS_URL_VETTED, '3.14',
                     self.afdo_name), 'lulu')


class AFDOUpdateEbuildTests(cros_test_lib.RunCommandTempDirTestCase):
  """Test wrapper functions to update ebuilds for different types."""

  mock_benchmark_afdo = 'chromeos-chrome-amd64-78.0.3877.0.afdo.bz2'
  mock_cwp_afdo = {
      'silvermont': 'R78-3877.0-1566814872.afdo.xz',
      'airmont': 'R78-3877.0-1566812873.afdo.xz',
      'broadwell': 'R78-3865.35-1566812043.afdo.xz'
  }

  @staticmethod
  def mockFindChromeAFDO(url, _pattern):
    """Mock toolchain_util._FindLatestAFDOArtifact for Chrome AFDO."""
    if 'llvm' in url:
      return AFDOUpdateEbuildTests.mock_benchmark_afdo

    for arch in AFDOUpdateEbuildTests.mock_cwp_afdo:
      if arch in url:
        return AFDOUpdateEbuildTests.mock_cwp_afdo[arch]

  # pylint: disable=protected-access
  def setUp(self):
    self.board = 'eve'
    self.arch = 'broadwell'
    self.kver = '4_4'
    self.orderfile = 'chrome.orderfile.xz'
    self.orderfile_stripped = 'chrome.orderfile'
    self.kernel = 'R78-12345.0-1564997810.gcov.xz'
    self.kernel_stripped = 'R78-12345.0-1564997810'
    self.mock_obj = self.PatchObject(
        toolchain_util, 'UpdateEbuildWithAFDOArtifacts', autospec=True)
    self.chrome_branch = '78'
    self.mock_branch = self.PatchObject(
        toolchain_util,
        '_FindCurrentChromeBranch',
        return_value=self.chrome_branch)
    self.mock_warn = self.PatchObject(
        toolchain_util, '_WarnSheriffAboutKernelProfileExpiration')
    self.PatchObject(
        toolchain_util, '_FindCurrentChromeBranch', return_value='78')
    self.PatchObject(osutils.TempDir, '__enter__', return_value=self.tempdir)

  def testOrderfileUpdateChromePass(self):
    """Test OrderfileUpdateChromeEbuild() calls other functions correctly."""
    mock_find = self.PatchObject(
        toolchain_util, '_FindLatestAFDOArtifact', return_value=self.orderfile)

    toolchain_util.OrderfileUpdateChromeEbuild(self.board)
    mock_find.assert_called_once_with(toolchain_util.ORDERFILE_GS_URL_UNVETTED,
                                      toolchain_util._RankValidOrderfiles)
    self.mock_obj.assert_called_with(
        board=self.board,
        package='chromeos-chrome',
        update_rules={'UNVETTED_ORDERFILE': self.orderfile_stripped})

  # pylint: disable=protected-access
  def testAFDOUpdateChromeEbuildPass(self):
    """Test AFDOUpdateChromeEbuild() calls other functions correctly."""
    mock_find = self.PatchObject(
        toolchain_util,
        '_FindLatestAFDOArtifact',
        side_effect=self.mockFindChromeAFDO)

    afdo_name = 'any_name_for_merged.afdo'
    mock_create = self.PatchObject(
        toolchain_util, '_CreateReleaseChromeAFDO', return_value=afdo_name)
    self.PatchObject(os, 'rename')

    ret = toolchain_util.AFDOUpdateChromeEbuild(self.board)
    self.assertTrue(ret)

    calls = [
        mock.call(toolchain_util.BENCHMARK_AFDO_GS_URL,
                  toolchain_util._RankValidBenchmarkProfiles),
        mock.call(
            os.path.join(toolchain_util.CWP_AFDO_GS_URL, self.arch),
            toolchain_util._RankValidCWPProfiles),
    ]
    mock_find.assert_has_calls(calls)
    mock_create.assert_called_with(
        os.path.splitext(self.mock_cwp_afdo[self.arch])[0], self.arch,
        os.path.splitext(self.mock_benchmark_afdo)[0], self.tempdir)
    self.mock_obj.assert_called_with(
        board=self.board,
        package='chromeos-chrome',
        update_rules={'UNVETTED_AFDO_FILE': os.path.join('/tmp', afdo_name)})

  # pylint: disable=protected-access
  def testAFDOUpdateKernelEbuildPass(self):
    """Test AFDOUpdateKernelEbuild() calls other functions correctly."""
    mock_age = self.PatchObject(
        toolchain_util, '_GetProfileAge', return_value=0)
    mock_find = self.PatchObject(
        toolchain_util, '_FindLatestAFDOArtifact', return_value=self.kernel)

    ret = toolchain_util.AFDOUpdateKernelEbuild(self.board)

    self.assertTrue(ret)

    url = os.path.join(toolchain_util.KERNEL_PROFILE_URL,
                       self.kver.replace('_', '.'))
    mock_find.assert_called_once_with(url, toolchain_util._RankValidCWPProfiles)
    mock_age.assert_called_once_with(self.kernel_stripped, 'kernel_afdo')

    self.mock_warn.assert_not_called()

    self.mock_obj.assert_called_with(
        board=self.board,
        package='chromeos-kernel-' + self.kver,
        update_rules={'AFDO_PROFILE_VERSION': self.kernel_stripped})

  def testAFDOUpdateKernelEbuildFailDueToExpire(self):
    """Test AFDOUpdateKernelEbuild() fails when the profile expires."""
    self.PatchObject(
        toolchain_util,
        '_GetProfileAge',
        return_value=toolchain_util.KERNEL_ALLOWED_STALE_DAYS + 1)
    self.PatchObject(
        toolchain_util, '_FindLatestAFDOArtifact', return_value=self.kernel)

    ret = toolchain_util.AFDOUpdateKernelEbuild(self.board)

    self.assertFalse(ret)

  def testAFDOUpdateKernelEbuildWarnSheriff(self):
    """Test AFDOUpdateKernelEbuild() warns sheriff when profile near expire."""
    self.PatchObject(
        toolchain_util,
        '_GetProfileAge',
        return_value=toolchain_util.KERNEL_ALLOWED_STALE_DAYS - 1)
    self.PatchObject(
        toolchain_util, '_FindLatestAFDOArtifact', return_value=self.kernel)
    ret = toolchain_util.AFDOUpdateKernelEbuild(self.board)
    self.assertTrue(ret)
    self.mock_warn.assert_called_once_with(self.kver, self.kernel_stripped)


class GenerateBenchmarkAFDOProfile(cros_test_lib.MockTempDirTestCase):
  """Test GenerateBenchmarkAFDOProfile class."""

  # pylint: disable=protected-access
  def setUp(self):
    self.buildroot = self.tempdir
    self.chroot_dir = os.path.join(self.tempdir, 'chroot')
    osutils.SafeMakedirs(self.chroot_dir)
    self.chroot_args = []
    self.working_dir = os.path.join(self.chroot_dir, 'tmp')
    osutils.SafeMakedirs(self.working_dir)
    self.output_dir = os.path.join(self.tempdir, 'outdir')
    unused = {
        'pv': None,
        'rev': None,
        'category': None,
        'cpv': None,
        'cp': None,
        'cpf': None
    }
    self.package = 'chromeos-chrome'
    self.version = '77.0.3863.0_rc-r1'
    self.chrome_cpv = portage_util.CPV(
        version_no_rev=self.version.split('_')[0],
        package=self.package,
        version=self.version,
        **unused)
    self.board = 'board'
    self.arch = 'amd64'
    self.PatchObject(
        portage_util, 'PortageqBestVisible', return_value=self.chrome_cpv)
    self.PatchObject(portage_util, 'PortageqEnvvar')
    self.test_obj = toolchain_util.GenerateBenchmarkAFDOProfile(
        board=self.board,
        output_dir=self.output_dir,
        chroot_path=self.chroot_dir,
        chroot_args=self.chroot_args)
    self.test_obj.arch = self.arch

  def testDecompressAFDOFile(self):
    """Test _DecompressAFDOFile method."""
    perf_data = 'perf.data.bz2'
    to_decompress = os.path.join(self.working_dir, perf_data)
    mock_uncompress = self.PatchObject(cros_build_lib, 'UncompressFile')
    ret = self.test_obj._DecompressAFDOFile(to_decompress)
    dest = os.path.join(self.working_dir, 'perf.data')
    mock_uncompress.assert_called_once_with(to_decompress, dest)
    self.assertEqual(ret, dest)

  def testGetPerfAFDOName(self):
    """Test _GetPerfAFDOName method."""
    ret = self.test_obj._GetPerfAFDOName()
    perf_data_name = toolchain_util.CHROME_PERF_AFDO_FILE % {
        'package': self.package,
        'arch': self.arch,
        'versionnorev': self.version.split('_')[0]
    }
    self.assertEqual(ret, perf_data_name)

  def testCheckAFDOPerfDataStatus(self):
    """Test _CheckAFDOPerfDataStatus method."""
    afdo_name = 'chromeos.afdo'
    url = os.path.join(toolchain_util.BENCHMARK_AFDO_GS_URL,
                       afdo_name + toolchain_util.BZ2_COMPRESSION_SUFFIX)
    for exist in [True, False]:
      mock_exist = self.PatchObject(gs.GSContext, 'Exists', return_value=exist)
      self.PatchObject(
          toolchain_util.GenerateBenchmarkAFDOProfile,
          '_GetPerfAFDOName',
          return_value=afdo_name)
      ret_value = self.test_obj._CheckAFDOPerfDataStatus()
      self.assertEqual(exist, ret_value)
      mock_exist.assert_called_once_with(url)

  def testWaitForAFDOPerfDataTimeOut(self):
    """Test _WaitForAFDOPerfData method with timeout."""

    def mock_timeout(*_args, **_kwargs):
      raise timeout_util.TimeoutError

    self.PatchObject(timeout_util, 'WaitForReturnTrue', new=mock_timeout)
    ret = self.test_obj._WaitForAFDOPerfData()
    self.assertFalse(ret)

  def testWaitForAFDOPerfDataSuccess(self):
    """Test method _WaitForAFDOPerfData() passes."""
    mock_wait = self.PatchObject(timeout_util, 'WaitForReturnTrue')
    afdo_name = 'perf.data'
    mock_get = self.PatchObject(
        toolchain_util.GenerateBenchmarkAFDOProfile,
        '_GetPerfAFDOName',
        return_value=afdo_name)
    # TODO(crbug/1065172): Invalid assertion that had previously been mocked.
    # mock_check =
    self.PatchObject(toolchain_util.GenerateBenchmarkAFDOProfile,
                     '_CheckAFDOPerfDataStatus')
    mock_decompress = self.PatchObject(
        toolchain_util.GenerateBenchmarkAFDOProfile, '_DecompressAFDOFile')
    mock_copy = self.PatchObject(gs.GSContext, 'Copy')
    self.test_obj._WaitForAFDOPerfData()
    mock_wait.assert_called_once_with(
        self.test_obj._CheckAFDOPerfDataStatus,
        timeout=constants.AFDO_GENERATE_TIMEOUT,
        period=constants.SLEEP_TIMEOUT)

    # TODO(crbug/1065172): Invalid assertion that had previously been mocked.
    # mock_check.assert_called_once()

    # In actual program, this function should be called twice. But since
    # its called _CheckAFDOPerfDataStatus() is mocked, it's only called once
    # in this test.
    mock_get.assert_called_once()
    dest = os.path.join(self.working_dir, 'perf.data.bz2')
    mock_decompress.assert_called_once_with(dest)
    mock_copy.assert_called_once()

  def testCreateAFDOFromPerfData(self):
    """Test method _CreateAFDOFromPerfData()."""
    # Intercept the real path to chrome binary
    mock_chrome_debug = os.path.join(self.working_dir, 'chrome.debug')
    toolchain_util._CHROME_DEBUG_BIN = mock_chrome_debug
    osutils.Touch(mock_chrome_debug)
    perf_name = 'chromeos-chrome-amd64-77.0.3849.0.perf.data'
    self.PatchObject(
        toolchain_util.GenerateBenchmarkAFDOProfile,
        '_GetPerfAFDOName',
        return_value=perf_name)
    afdo_name = 'chromeos-chrome-amd64-77.0.3849.0_rc-r1.afdo'
    self.PatchObject(
        toolchain_util, '_GetBenchmarkAFDOName', return_value=afdo_name)
    mock_command = self.PatchObject(cros_build_lib, 'run')
    self.test_obj._CreateAFDOFromPerfData()
    afdo_cmd = [
        toolchain_util._AFDO_GENERATE_LLVM_PROF,
        '--binary=/tmp/chrome.unstripped', '--profile=/tmp/' + perf_name,
        '--out=/tmp/' + afdo_name
    ]
    mock_command.assert_called_once_with(
        afdo_cmd,
        enter_chroot=True,
        capture_output=True,
        print_cmd=True,
        chroot_args=self.chroot_args)

  def testUploadArtifacts(self):
    """Test member _UploadArtifacts()."""
    chrome_binary = 'chrome.unstripped'
    afdo_name = 'chrome-1.0.afdo'
    mock_upload = self.PatchObject(toolchain_util,
                                   '_UploadAFDOArtifactToGSBucket')
    self.test_obj._UploadArtifacts(chrome_binary, afdo_name)
    chrome_version = toolchain_util.CHROME_ARCH_VERSION % {
        'package': self.package,
        'arch': self.arch,
        'version': self.version
    }
    upload_name = chrome_version + '.debug' + \
        toolchain_util.BZ2_COMPRESSION_SUFFIX
    calls = [
        mock.call(
            toolchain_util.BENCHMARK_AFDO_GS_URL,
            os.path.join(self.output_dir,
                         chrome_binary + toolchain_util.BZ2_COMPRESSION_SUFFIX),
            rename=upload_name),
        mock.call(
            toolchain_util.BENCHMARK_AFDO_GS_URL,
            os.path.join(self.output_dir,
                         afdo_name + toolchain_util.BZ2_COMPRESSION_SUFFIX))
    ]
    mock_upload.assert_has_calls(calls)

  def testGenerateAFDOData(self):
    """Test main function of _GenerateAFDOData()."""
    chrome_binary = toolchain_util._CHROME_DEBUG_BIN % {
        'root': self.chroot_dir,
        'sysroot': os.path.join('build', self.board)
    }
    afdo_name = 'chrome.afdo'
    mock_create = self.PatchObject(
        self.test_obj, '_CreateAFDOFromPerfData', return_value=afdo_name)
    mock_compress = self.PatchObject(toolchain_util, '_CompressAFDOFiles')
    mock_upload = self.PatchObject(self.test_obj, '_UploadArtifacts')
    ret = self.test_obj._GenerateAFDOData()
    self.assertEqual(ret, afdo_name)
    mock_create.assert_called_once_with()
    calls = [
        mock.call(
            targets=[chrome_binary],
            input_dir=None,
            output_dir=self.output_dir,
            suffix=toolchain_util.BZ2_COMPRESSION_SUFFIX),
        mock.call(
            targets=[afdo_name],
            input_dir=self.working_dir,
            output_dir=self.output_dir,
            suffix=toolchain_util.BZ2_COMPRESSION_SUFFIX)
    ]
    mock_compress.assert_has_calls(calls)
    mock_upload.assert_called_once_with(chrome_binary, afdo_name)


class UploadVettedAFDOArtifactTest(cros_test_lib.MockTempDirTestCase):
  """Test _UploadVettedAFDOArtifacts()."""

  # pylint: disable=protected-access
  def setUp(self):
    self.artifact = 'some-artifact-1.0'
    self.kver = '3.18'
    self.cwp_arch = 'broadwell'
    self.mock_get = self.PatchObject(
        toolchain_util,
        '_GetArtifactVersionInEbuild',
        return_value=self.artifact)
    self.mock_exist = self.PatchObject(
        gs.GSContext, 'Exists', return_value=False)
    self.mock_upload = self.PatchObject(gs.GSContext, 'Copy')

  def testWrongArtifactType(self):
    """Test wrong artifact_type raises exception."""
    with self.assertRaises(ValueError) as context:
      toolchain_util._UploadVettedAFDOArtifacts('wrong-type')
    self.assertEqual('Only orderfile and kernel_afdo are supported.',
                     str(context.exception))
    self.mock_exist.assert_not_called()
    self.mock_upload.assert_not_called()

  def testArtifactExistInGSBucket(self):
    """Test the artifact is already in the GS bucket."""
    mock_exist = self.PatchObject(gs.GSContext, 'Exists', return_value=True)
    ret = toolchain_util._UploadVettedAFDOArtifacts('orderfile')
    mock_exist.assert_called_once()
    self.assertIsNone(ret)

  def testUploadVettedOrderfile(self):
    """Test _UploadVettedAFDOArtifacts() works with orderfile."""
    full_name = self.artifact + toolchain_util.XZ_COMPRESSION_SUFFIX
    source_url = os.path.join(toolchain_util.ORDERFILE_GS_URL_UNVETTED,
                              full_name)
    dest_url = os.path.join(toolchain_util.ORDERFILE_GS_URL_VETTED, full_name)
    ret = toolchain_util._UploadVettedAFDOArtifacts('orderfile')
    self.mock_get.assert_called_once_with('chromeos-chrome',
                                          'UNVETTED_ORDERFILE')
    self.mock_exist.assert_called_once_with(dest_url)
    self.mock_upload.assert_called_once_with(
        source_url, dest_url, acl='public-read')
    self.assertEqual(ret, self.artifact)

  def testUploadVettedKernelAFDO(self):
    """Test _UploadVettedAFDOArtifacts() works with kernel afdo."""
    full_name = self.artifact + toolchain_util.KERNEL_AFDO_COMPRESSION_SUFFIX
    source_url = os.path.join(toolchain_util.KERNEL_PROFILE_URL, self.kver,
                              full_name)
    dest_url = os.path.join(toolchain_util.KERNEL_AFDO_GS_URL_VETTED, self.kver,
                            full_name)
    ret = toolchain_util._UploadVettedAFDOArtifacts('kernel_afdo', self.kver)
    self.mock_get.assert_called_once_with(
        'chromeos-kernel-' + self.kver.replace('.', '_'),
        'AFDO_PROFILE_VERSION')
    self.mock_exist.assert_called_once_with(dest_url)
    self.mock_upload.assert_called_once_with(
        source_url, dest_url, acl='public-read')
    self.assertEqual(ret, self.artifact)


class PublishVettedAFDOArtifactTest(cros_test_lib.MockTempDirTestCase):
  """Test _PublishVettedAFDOArtifacts()."""

  # pylint: disable=protected-access
  def setUp(self):
    self.package = 'silvermont'
    self.package2 = 'benchmark'
    self.afdo_sorted_by_freshness = [
        'R78-3865.0-1560000000.afdo', 'R78-3869.38-1562580965.afdo',
        'R78-3866.0-1570000000.afdo'
    ]
    self.uploaded_invalid = {
        self.package: self.afdo_sorted_by_freshness[0],
        self.package2: None
    }
    self.uploaded = {
        self.package: self.afdo_sorted_by_freshness[2],
        self.package2: None
    }
    # Prepare a JSON file containing metadata
    toolchain_util.TOOLCHAIN_UTILS_PATH = self.tempdir
    osutils.SafeMakedirs(os.path.join(self.tempdir, 'afdo_metadata'))
    self.json_file = os.path.join(self.tempdir,
                                  'afdo_metadata/kernel_afdo.json')
    self.afdo_versions = {
        self.package: {
            'name': self.afdo_sorted_by_freshness[1],
        },
        self.package2: {
            'name': 'R1234',
        },
        'some-package-should-not-change': {
            'name': 'R5678-1234',
        },
    }

    with open(self.json_file, 'w') as f:
      json.dump(self.afdo_versions, f)
    GitStatus = collections.namedtuple('GitStatus', ['output'])
    self.mock_git = self.PatchObject(
        git, 'RunGit', return_value=GitStatus(output='non-empty'))

  def testPublishOlderArtifactThanInMetadataFailure(self):
    """Test failure when publishing an older metadata as than JSON file."""
    with self.assertRaises(
        toolchain_util.PublishVettedAFDOArtifactsError) as context:
      toolchain_util._PublishVettedAFDOArtifacts(self.json_file,
                                                 self.uploaded_invalid)
    self.assertIn('to update is not newer than the JSON file',
                  str(context.exception))

  def testPublishUploadedProfilesPass(self):
    """Test successfully publish metadata for uploaded profiles."""
    toolchain_util._PublishVettedAFDOArtifacts(self.json_file, self.uploaded)

    # Check changes in JSON file
    new_afdo_versions = json.loads(osutils.ReadFile(self.json_file))
    self.assertEqual(len(self.afdo_versions), len(new_afdo_versions))
    self.assertEqual(new_afdo_versions[self.package]['name'],
                     self.uploaded[self.package])
    for k in self.afdo_versions:
      # Make sure other fields are not changed
      if k != self.package:
        self.assertEqual(self.afdo_versions[k], new_afdo_versions[k])

    # Check the git calls correct
    message = 'afdo_metadata: Publish new profiles.\n\n'
    message += 'Update %s from %s to %s\n' % (self.package,
                                              self.afdo_sorted_by_freshness[1],
                                              self.afdo_sorted_by_freshness[2])
    calls = [
        mock.call(
            self.tempdir, [
                'pull',
                toolchain_util.TOOLCHAIN_UTILS_REPO,
                'refs/heads/master',
            ],
            print_cmd=True),
        mock.call(
            self.tempdir, ['status', '--porcelain', '-uno'],
            capture_output=True,
            print_cmd=True),
        mock.call(self.tempdir, ['diff'], capture_output=True, print_cmd=True),
        mock.call(
            self.tempdir, ['commit', '-a', '-m', message], print_cmd=True),
        mock.call(
            self.tempdir, [
                'push', toolchain_util.TOOLCHAIN_UTILS_REPO,
                'HEAD:refs/for/master%submit'
            ],
            capture_output=True,
            print_cmd=True)
    ]
    self.mock_git.assert_has_calls(calls)


class UploadReleaseChromeAFDOTest(cros_test_lib.MockTempDirTestCase):
  """Test _UploadReleaseChromeAFDO() and related functions."""

  # pylint: disable=protected-access
  def setUp(self):
    self.cwp_name = 'R77-3809.38-1562580965.afdo'
    self.cwp_full = self.cwp_name + toolchain_util.XZ_COMPRESSION_SUFFIX
    self.arch = 'silvermont'
    self.benchmark_name = 'chromeos-chrome-amd64-77.0.3849.0_rc-r1.afdo'
    self.benchmark_full = \
        self.benchmark_name + toolchain_util.BZ2_COMPRESSION_SUFFIX
    cwp_string = '%s-77-3809.38-1562580965' % self.arch
    benchmark_string = 'benchmark-77.0.3849.0-r1'
    self.merged_name = 'chromeos-chrome-amd64-%s-%s' % (cwp_string,
                                                        benchmark_string)
    self.redacted_name = self.merged_name + '-redacted.afdo'
    self.output = os.path.join(
        self.tempdir, self.redacted_name + toolchain_util.XZ_COMPRESSION_SUFFIX)
    self.decompress = self.PatchObject(cros_build_lib, 'UncompressFile')
    self.compress = self.PatchObject(
        toolchain_util, '_CompressAFDOFiles', return_value=[self.output])
    self.upload = self.PatchObject(toolchain_util,
                                   '_UploadAFDOArtifactToGSBucket')
    self.run_command = self.PatchObject(cros_build_lib, 'run')
    self.gs_copy = self.PatchObject(gs.GSContext, 'Copy')
    self.PatchObject(osutils.TempDir, '__enter__', return_value=self.tempdir)

  @mock.patch.object(builtins, 'open')
  def testRedactAFDOProfile(self, mock_open):
    """Test _RedactAFDOProfile() handles calls correctly."""
    input_name = os.path.join(self.tempdir, self.merged_name)
    input_to_text = input_name + '.text.temp'
    redacted_temp = input_name + '.redacted.temp'
    removed_temp = input_name + '.removed.temp'
    reduced_temp = input_name + '.reduced.temp'
    output_name = os.path.join(self.tempdir, self.redacted_name)

    mock_file_obj = io.StringIO()
    mock_open.return_value = mock_file_obj

    toolchain_util._RedactAFDOProfile(input_name, output_name)

    redact_calls = [
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                '-text',
                input_name,
                '-output',
                input_to_text,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            ['redact_textual_afdo_profile'],
            input=mock_file_obj,
            stdout=redacted_temp,
            print_cmd=True,
            enter_chroot=True,
        ),
        mock.call(
            [
                'remove_indirect_calls',
                '--input=' + redacted_temp,
                '--output=' + removed_temp,
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'remove_cold_functions',
                '--input=' + removed_temp,
                '--output=' + reduced_temp,
                '--number=20000',
            ],
            enter_chroot=True,
            print_cmd=True,
        ),
        mock.call(
            [
                'llvm-profdata',
                'merge',
                '-sample',
                '-compbinary',
                reduced_temp,
                '-output',
                output_name,
            ],
            enter_chroot=True,
            print_cmd=True,
        )
    ]
    self.run_command.assert_has_calls(redact_calls)

  def testCreateReleaseChromeAFDOPass(self):
    """Test _CreateReleaseChromeAFDO() handles naming and calls correctly."""
    redact_call = self.PatchObject(toolchain_util, '_RedactAFDOProfile')

    toolchain_util._CreateReleaseChromeAFDO(self.cwp_name, self.arch,
                                            self.benchmark_name, self.tempdir)

    # Check downloading files.
    gs_copy_calls = [
        mock.call(
            os.path.join(toolchain_util.CWP_AFDO_GS_URL, self.arch,
                         self.cwp_full),
            os.path.join(self.tempdir, self.cwp_full)),
        mock.call(
            os.path.join(toolchain_util.BENCHMARK_AFDO_GS_URL,
                         self.benchmark_full),
            os.path.join(self.tempdir, self.benchmark_full))
    ]
    self.gs_copy.assert_has_calls(gs_copy_calls)

    # Check decompress files.
    decompress_calls = [
        mock.call(
            os.path.join(self.tempdir, self.cwp_full),
            os.path.join(self.tempdir, self.cwp_name)),
        mock.call(
            os.path.join(self.tempdir, self.benchmark_full),
            os.path.join(self.tempdir, self.benchmark_name))
    ]
    self.decompress.assert_has_calls(decompress_calls)

    # Check call to merge.
    merge_command = [
        'llvm-profdata',
        'merge',
        '-sample',
        '-output=' + os.path.join(self.tempdir, self.merged_name),
        '-weighted-input=%d,%s' % (toolchain_util.RELEASE_CWP_MERGE_WEIGHT,
                                   os.path.join(self.tempdir, self.cwp_name)),
        '-weighted-input=%d,%s' %
        (toolchain_util.RELEASE_BENCHMARK_MERGE_WEIGHT,
         os.path.join(self.tempdir, self.benchmark_name)),
    ]
    self.run_command.assert_called_once_with(
        merge_command, enter_chroot=True, print_cmd=True)

    # Check calls to redact.
    redact_call.assert_called_once_with(
        os.path.join(self.tempdir, self.merged_name),
        os.path.join(self.tempdir, self.redacted_name))

  def testUploadReleaseChromeAFDOPass(self):
    """Test _UploadReleaseChromeAFDO() handles naming and calls correctly."""
    verified_afdo = os.path.join(self.tempdir, self.redacted_name)
    self.PatchObject(
        toolchain_util,
        '_GetArtifactVersionInEbuild',
        return_value=verified_afdo)

    ret = toolchain_util._UploadReleaseChromeAFDO()
    self.assertEqual(verified_afdo, ret)
    # Check compress and upload.
    self.compress.assert_called_once_with([os.path.join(verified_afdo)], None,
                                          self.tempdir,
                                          toolchain_util.XZ_COMPRESSION_SUFFIX)
    self.upload.assert_called_once_with(
        toolchain_util.RELEASE_AFDO_GS_URL_VETTED,
        os.path.join(self.tempdir,
                     self.redacted_name + toolchain_util.XZ_COMPRESSION_SUFFIX))


class UploadAndPublishVettedAFDOArtifactsTest(cros_test_lib.MockTempDirTestCase
                                             ):
  """Test UploadAndPublishVettedAFDOArtifacts()."""

  orderfile_name = 'chrome.orderfile'
  kernel_afdo = 'kernel.afdo'

  @staticmethod
  def mockUploadVettedAFDOArtifacts(artifact_type, _subcategory=None):
    if artifact_type == 'orderfile':
      return UploadAndPublishVettedAFDOArtifactsTest.orderfile_name
    if artifact_type == 'kernel_afdo':
      return UploadAndPublishVettedAFDOArtifactsTest.kernel_afdo
    return None

  def setUp(self):
    self.mock_upload = self.PatchObject(
        toolchain_util,
        '_UploadVettedAFDOArtifacts',
        side_effect=self.mockUploadVettedAFDOArtifacts)
    self.mock_publish = self.PatchObject(toolchain_util,
                                         '_PublishVettedAFDOArtifacts')
    self.mock_merge = self.PatchObject(toolchain_util,
                                       '_UploadReleaseChromeAFDO')
    self.board = 'chell'  # Chose chell to test kernel
    self.kver = '3.18'
    self.kernel_json = os.path.join(toolchain_util.TOOLCHAIN_UTILS_PATH,
                                    'afdo_metadata/kernel_afdo.json')
    self.chrome_json = os.path.join(toolchain_util.TOOLCHAIN_UTILS_PATH,
                                    'afdo_metadata/chrome_afdo.json')

  def testReturnFalseWhenNoArtifactUploaded(self):
    """Test it returns False when no new artifacts are uploaded."""
    mock_upload_nothing = self.PatchObject(
        toolchain_util, '_UploadVettedAFDOArtifacts', return_value=None)
    ret = toolchain_util.UploadAndPublishVettedAFDOArtifacts(
        'orderfile', self.board)
    self.assertFalse(ret)
    mock_upload_nothing.assert_called_once_with('orderfile')
    self.mock_publish.assert_not_called()

  def testChromeAFDOPass(self):
    """Make sure for chrome_afdo, it calls other functions correctly."""
    mock_upload = self.PatchObject(toolchain_util, '_UploadReleaseChromeAFDO')
    ret = toolchain_util.UploadAndPublishVettedAFDOArtifacts(
        'chrome_afdo', self.board)
    self.assertTrue(ret)
    mock_upload.assert_called_once_with()
    self.mock_publish.assert_not_called()

  def testKernelAFDOPass(self):
    """Make sure for kernel_afdo, it calls other functions correctly."""
    ret = toolchain_util.UploadAndPublishVettedAFDOArtifacts(
        'kernel_afdo', self.board)
    self.assertTrue(ret)
    uploaded = {
        'chromeos-kernel-' + self.kver.replace('.', '_'): self.kernel_afdo
    }
    self.mock_upload.assert_called_once_with('kernel_afdo', self.kver)
    self.mock_publish.assert_called_once_with(
        self.kernel_json, uploaded,
        'afdo_metadata: Publish new profiles for kernel %s.' % self.kver)

  def testOrderfilePass(self):
    """Make sure for orderfile, it calls other functions correctly."""
    ret = toolchain_util.UploadAndPublishVettedAFDOArtifacts(
        'orderfile', self.board)
    self.assertTrue(ret)
    self.mock_upload.assert_called_once_with('orderfile')
    self.mock_publish.assert_not_called()
