# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittests for the gs.py module."""

import contextlib
import datetime
import functools
import numbers
import os
from pathlib import Path
import string
import sys
from unittest import mock

from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import retry_stats


GS_PACKAGES_PATH = "gs://test/Packages"
GS_PACKAGES_WRONG_PATH = "gs://test/Pack"

STAT_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"
NOW_RAW = datetime.datetime.now()
NOW_STRING = datetime.datetime.strftime(
    NOW_RAW.replace(tzinfo=datetime.timezone.utc), STAT_DATE_FORMAT
)

# Need to re-convert the date string into a datetime object since timezone
# isn't included in NOW_RAW from datetime.now().
NOW_DATETIME = datetime.datetime.strptime(NOW_STRING, STAT_DATE_FORMAT)
STAT_OUTPUT_NOW = f"""{GS_PACKAGES_PATH}:
    Creation time:    {NOW_STRING}
    Content-Language: en
    Content-Length:   74
    Content-Type:   application/octet-stream
    Hash (crc32c):    BBPMPA==
    Hash (md5):   ms+qSYvgI9SjXn8tW/5UpQ==
    ETag:     CNCgocbmqMACEAE=
    Generation:   1408776800850000
    Metageneration:   1
    """
STAT_OUTPUT_ERR = f"No URLs matched: {GS_PACKAGES_WRONG_PATH}"


def PatchGS(*args, **kwargs):
    """Convenience method for patching GSContext."""
    return mock.patch.object(gs.GSContext, *args, **kwargs)


class GSContextMock(partial_mock.PartialCmdMock):
    """Used to mock out the GSContext class."""

    TARGET = "chromite.lib.gs.GSContext"
    ATTRS = (
        "InitializeCache",
        "DoCommand",
        "_CRCMOD_METHOD",
        "DEFAULT_SLEEP_TIME",
        "DEFAULT_RETRIES",
        "DEFAULT_BOTO_FILE",
        "_DEFAULT_GSUTIL_BIN",
        "_DEFAULT_GSUTIL_BUILDER_BIN",
        "GSUTIL_URL",
    )
    DEFAULT_ATTR = "DoCommand"

    GSResponsePreconditionFailed = """\
Copying file:///dev/null [Content-Type=application/octet-stream]...
Uploading   gs://chromeos-throw-away-bucket/vapier/null:         0 B    \r\
Uploading   gs://chromeos-throw-away-bucket/vapier/null:         0 B    \r\
PreconditionException: 412 Precondition Failed"""

    DEFAULT_SLEEP_TIME = 0
    DEFAULT_RETRIES = 2
    TMP_ROOT = "/tmp/cros_unittest"
    DEFAULT_BOTO_FILE = "%s/boto_file" % TMP_ROOT
    _DEFAULT_GSUTIL_BIN = "%s/gsutil_bin" % TMP_ROOT
    _DEFAULT_GSUTIL_BUILDER_BIN = _DEFAULT_GSUTIL_BIN
    _CRCMOD_METHOD = "missing"
    GSUTIL_URL = None

    def __init__(self):
        partial_mock.PartialCmdMock.__init__(self, create_tempdir=True)
        self.raw_gs_cmds = []

    def _SetGSUtilUrl(self):
        tempfile = os.path.join(self.tempdir, "tempfile")
        osutils.WriteFile(tempfile, "some content")
        gsutil_path = os.path.join(self.tempdir, gs.GSContext.GSUTIL_TAR)
        cros_build_lib.CreateTarball(
            gsutil_path,
            self.tempdir,
            inputs=[os.path.basename(tempfile)],
            compression=cros_build_lib.CompressionType.NONE,
        )
        self.GSUTIL_URL = "file://%s" % gsutil_path

    def PreStart(self):
        os.environ.pop("BOTO_CONFIG", None)
        # Set it here for now, instead of mocking out Cached() directly because
        # python-mock has a bug with mocking out class methods with
        # autospec=True.
        # TODO(rcui): Change this when this is fixed in PartialMock.
        self._SetGSUtilUrl()

    def InitializeCache(self, *_args, **_kwargs):
        self._DEFAULT_GSUTIL_BIN = "gsutil"

    def DoCommand(self, inst, gsutil_cmd, **kwargs):
        result = self._results["DoCommand"].LookupResult(
            (gsutil_cmd,), hook_args=(inst, gsutil_cmd), hook_kwargs=kwargs
        )

        rc_mock = cros_test_lib.RunCommandMock()
        rc_mock.AddCmdResult(
            partial_mock.ListRegex("gsutil"),
            result.returncode,
            stdout=result.stdout,
            stderr=result.stderr,
        )

        with rc_mock:
            try:
                return self.backup["DoCommand"](inst, gsutil_cmd, **kwargs)
            finally:
                self.raw_gs_cmds.extend(
                    args[0] for args, _ in rc_mock.call_args_list
                )


class AbstractGSContextTest(cros_test_lib.MockTempDirTestCase):
    """Base class for GSContext tests."""

    def setUp(self):
        self.gs_mock = self.StartPatcher(GSContextMock())
        self.gs_mock.SetDefaultCmdResult()
        self.ctx = gs.GSContext()


class CanonicalizeURLTest(cros_test_lib.TestCase):
    """Tests for the CanonicalizeURL function."""

    def _checkit(self, in_url, exp_url):
        self.assertEqual(gs.CanonicalizeURL(in_url), exp_url)

    def testPublicUrl(self):
        """Test public https URLs."""
        self._checkit(
            "https://commondatastorage.googleapis.com/releases/some/file/t.gz",
            "gs://releases/some/file/t.gz",
        )

    def testPrivateUrl(self):
        """Test private https URLs."""
        self._checkit(
            "https://storage.cloud.google.com/releases/some/file/t.gz",
            "gs://releases/some/file/t.gz",
        )
        self._checkit(
            "https://pantheon.corp.google.com/storage/browser/releases/some/"
            "file/t.gz",
            "gs://releases/some/file/t.gz",
        )
        self._checkit(
            "https://stainless.corp.google.com/browse/releases/some/file/t.gz",
            "gs://releases/some/file/t.gz",
        )

    def testDuplicateBase(self):
        """Test multiple prefixes in a single URL."""
        self._checkit(
            (
                "https://storage.cloud.google.com/releases/some/"
                "https://storage.cloud.google.com/some/file/t.gz"
            ),
            (
                "gs://releases/some/"
                "https://storage.cloud.google.com/some/file/t.gz"
            ),
        )


class PathIsGsTests(cros_test_lib.TestCase):
    """Tests for the PathIsGs function."""

    def testString(self):
        """Test strings!"""
        self.assertTrue(gs.PathIsGs("gs://foo"))
        self.assertFalse(gs.PathIsGs("/tmp/f"))

    def testPath(self):
        """Test Path objects!"""
        self.assertFalse(gs.PathIsGs(Path.cwd()))
        self.assertFalse(gs.PathIsGs(Path("gs://foo")))


class GsUrlToHttpTest(cros_test_lib.TestCase):
    """Tests for the GsUrlToHttp function."""

    def setUp(self):
        self.testUrls = [
            "gs://releases",
            "gs://releases/",
            "gs://releases/path",
            "gs://releases/path/",
            "gs://releases/path/file",
        ]

    def testPublicUrls(self):
        """Test public https URLs."""
        expected = [
            "https://storage.googleapis.com/releases",
            "https://storage.googleapis.com/releases/",
            "https://storage.googleapis.com/releases/path",
            "https://storage.googleapis.com/releases/path/",
            "https://storage.googleapis.com/releases/path/file",
        ]

        for gs_url, http_url in zip(self.testUrls, expected):
            self.assertEqual(gs.GsUrlToHttp(gs_url), http_url)
            self.assertEqual(gs.GsUrlToHttp(gs_url, directory=True), http_url)

    def testPrivateUrls(self):
        """Test private https URLs."""
        expected = [
            "https://storage.cloud.google.com/releases",
            "https://stainless.corp.google.com/browse/releases/",
            "https://storage.cloud.google.com/releases/path",
            "https://stainless.corp.google.com/browse/releases/path/",
            "https://storage.cloud.google.com/releases/path/file",
        ]

        for gs_url, http_url in zip(self.testUrls, expected):
            self.assertEqual(gs.GsUrlToHttp(gs_url, public=False), http_url)

    def testPrivateDirectoryUrls(self):
        """Test private https directory URLs."""
        expected = [
            "https://stainless.corp.google.com/browse/releases",
            "https://stainless.corp.google.com/browse/releases/",
            "https://stainless.corp.google.com/browse/releases/path",
            "https://stainless.corp.google.com/browse/releases/path/",
            "https://stainless.corp.google.com/browse/releases/path/file",
        ]

        for gs_url, http_url in zip(self.testUrls, expected):
            self.assertEqual(
                gs.GsUrlToHttp(gs_url, public=False, directory=True), http_url
            )


class VersionTest(AbstractGSContextTest):
    """Tests GSContext.gsutil_version functionality."""

    LOCAL_PATH = "/tmp/file"
    GIVEN_REMOTE = EXPECTED_REMOTE = "gs://test/path/file"

    def testGetVersionStdout(self):
        """Simple gsutil_version fetch test from stdout."""
        self.gs_mock.AddCmdResult(
            partial_mock.In("version"),
            returncode=0,
            stdout="gsutil version 3.35\n",
        )
        self.assertEqual("3.35", self.ctx.gsutil_version)

    def testGetVersionStderr(self):
        """Simple gsutil_version fetch test from stderr."""
        self.gs_mock.AddCmdResult(
            partial_mock.In("version"),
            returncode=0,
            stderr="gsutil version 3.36\n",
        )
        self.assertEqual("3.36", self.ctx.gsutil_version)

    def testGetVersionCached(self):
        """Simple gsutil_version fetch test from cache."""
        # pylint: disable=protected-access
        self.ctx._gsutil_version = "3.37"
        self.assertEqual("3.37", self.ctx.gsutil_version)

    def testGetVersionNewFormat(self):
        """Simple gsutil_version fetch test for new gsutil output format."""
        self.gs_mock.AddCmdResult(
            partial_mock.In("version"),
            returncode=0,
            stdout="gsutil version: 4.5\n",
        )
        self.assertEqual("4.5", self.ctx.gsutil_version)

    def testGetVersionBadOutput(self):
        """Simple gsutil_version fetch test from cache."""
        self.gs_mock.AddCmdResult(
            partial_mock.In("version"), returncode=0, stdout="gobblety gook\n"
        )
        self.assertRaises(
            gs.GSContextException, getattr, self.ctx, "gsutil_version"
        )


class GetSizeTest(AbstractGSContextTest):
    """Tests GetSize functionality."""

    GETSIZE_PATH = "gs://abc/1"

    def _GetSize(self, ctx, path, **kwargs):
        return ctx.GetSize(path, **kwargs)

    def GetSize(self, ctx=None, **kwargs):
        if ctx is None:
            ctx = self.ctx
        return self._GetSize(ctx, self.GETSIZE_PATH, **kwargs)

    def testBasic(self):
        """Simple test."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", self.GETSIZE_PATH], stdout=StatTest.STAT_OUTPUT
        )
        self.assertEqual(self.GetSize(), 74)


class UnmockedGetSizeTest(cros_test_lib.TempDirTestCase):
    """Tests GetSize functionality w/out mocks."""

    @cros_test_lib.pytestmark_network_test
    def testBasic(self):
        """Simple test."""
        ctx = gs.GSContext()

        local_file = os.path.join(self.tempdir, "foo")
        osutils.WriteFile(local_file, "!" * 5)

        with gs.TemporaryURL("chromite.getsize") as tempuri:
            ctx.Copy(local_file, tempuri)
            self.assertEqual(ctx.GetSize(tempuri), 5)

    def testLocal(self):
        """Test local files."""
        ctx = gs.GSContext()
        f = os.path.join(self.tempdir, "f")

        osutils.Touch(f)
        self.assertEqual(ctx.GetSize(f), 0)

        osutils.WriteFile(f, "f" * 10)
        self.assertEqual(ctx.GetSize(f), 10)


class GetCreationTimeTest(AbstractGSContextTest):
    """Test GetCreationTime functionality."""

    def testBasic(self):
        """Simple test."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", GS_PACKAGES_PATH], stdout=STAT_OUTPUT_NOW
        )
        ctx = gs.GSContext()
        result = ctx.GetCreationTime(GS_PACKAGES_PATH)
        self.gs_mock.assertCommandContains(["stat", "--", GS_PACKAGES_PATH])
        self.assertEqual(result, NOW_DATETIME)

    def testURlNoExist(self):
        self.gs_mock.AddCmdResult(
            ["stat", "--", GS_PACKAGES_WRONG_PATH],
            stderr=STAT_OUTPUT_ERR,
            returncode=1,
        )
        ctx = gs.GSContext()
        self.assertRaises(
            gs.GSNoSuchKey, ctx.GetCreationTime, GS_PACKAGES_WRONG_PATH
        )
        self.gs_mock.assertCommandContains(
            ["stat", "--", GS_PACKAGES_WRONG_PATH]
        )


class UnMockedGetCreationTimeTest(cros_test_lib.TempDirTestCase):
    """Test GetCreationTime functionality without mocks."""

    @cros_test_lib.pytestmark_network_test
    def testGetCreationTime(self):
        """Test getting the creation time of a file."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("testGetCreationTime") as url:
            self.assertRaises(gs.GSNoSuchKey, ctx.GetCreationTime, url)

            ctx.CreateWithContents(url, "test file contents")

            result = ctx.GetCreationTime(url)
        self.assertIsInstance(result, datetime.datetime)


class GetCreationTimeSinceTest(AbstractGSContextTest):
    """Test GetCreationTimeSince functionality."""

    OLDER_RAW = NOW_RAW - datetime.timedelta(days=10)
    OLDER_STRING = datetime.datetime.strftime(
        OLDER_RAW.replace(tzinfo=datetime.timezone.utc), STAT_DATE_FORMAT
    )

    STAT_OUTPUT_OLDER = f"""{GS_PACKAGES_PATH}:
        Creation time:    {OLDER_STRING}
        Content-Language: en
        Content-Length:   74
        Content-Type:   application/octet-stream
        Hash (crc32c):    BBPMPA==
        Hash (md5):   ms+qSYvgI9SjXn8tW/5UpQ==
        ETag:     CNCgocbmqMACEAE=
        Generation:   1408776800850000
        Metageneration:   1
      """

    def testBasic(self):
        """Simple test."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", GS_PACKAGES_PATH], stdout=self.STAT_OUTPUT_OLDER
        )
        ctx = gs.GSContext()
        result = ctx.GetCreationTimeSince(GS_PACKAGES_PATH, NOW_RAW)
        self.gs_mock.assertCommandContains(["stat", "--", GS_PACKAGES_PATH])
        self.assertEqual(result.days, 10)

    def testURlNoExist(self):
        self.gs_mock.AddCmdResult(
            ["stat", "--", GS_PACKAGES_WRONG_PATH],
            stderr=STAT_OUTPUT_ERR,
            returncode=1,
        )
        ctx = gs.GSContext()
        with self.assertRaises(gs.GSNoSuchKey):
            ctx.GetCreationTimeSince(GS_PACKAGES_WRONG_PATH, self.OLDER_RAW)


class UnMockedGetCreationTimeSinceTest(cros_test_lib.TempDirTestCase):
    """Test GetCreationTimeSince functionality without mocks."""

    @cros_test_lib.pytestmark_network_test
    def testGetCreationTimeSince(self):
        """Test getting the creation time of a file."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("testGetCreationTime") as url:
            with self.assertRaises(gs.GSNoSuchKey):
                self.assertRaises(
                    gs.GSNoSuchKey, ctx.GetCreationTimeSince, url, NOW_RAW
                )
                ctx.GetCreationTimeSince(url, NOW_RAW)

            ctx.CreateWithContents(url, "test file contents")

            result = ctx.GetCreationTimeSince(url, NOW_RAW)
        self.assertIsInstance(result, datetime.timedelta)


class LSTest(AbstractGSContextTest):
    """Tests LS/List functionality."""

    LS_PATH = "gs://test/path/to/list"
    LS_OUTPUT_LINES = [
        "%s/foo" % LS_PATH,
        "%s/bar bell" % LS_PATH,
        "%s/nada/" % LS_PATH,
    ]
    LS_OUTPUT = "\n".join(LS_OUTPUT_LINES)

    SIZE1 = 12345
    SIZE2 = 654321
    DT1 = datetime.datetime(2000, 1, 2, 10, 10, 10)
    DT2 = datetime.datetime(2010, 3, 14)
    DT_STR1 = DT1.strftime(gs.DATETIME_FORMAT)
    DT_STR2 = DT2.strftime(gs.DATETIME_FORMAT)
    DETAILED_LS_OUTPUT_LINES = [
        "%10d  %s  %s/foo" % (SIZE1, DT_STR1, LS_PATH),
        "%10d  %s  %s/bar bell" % (SIZE2, DT_STR2, LS_PATH),
        "          %s/nada/" % LS_PATH,
        "TOTAL: 3 objects, XXXXX bytes (X.XX GB)",
    ]
    DETAILED_LS_OUTPUT = "\n".join(DETAILED_LS_OUTPUT_LINES)

    LIST_RESULT = [
        gs.GSListResult(
            content_length=SIZE1,
            creation_time=DT1,
            url="%s/foo" % LS_PATH,
            generation=None,
            metageneration=None,
        ),
        gs.GSListResult(
            content_length=SIZE2,
            creation_time=DT2,
            url="%s/bar bell" % LS_PATH,
            generation=None,
            metageneration=None,
        ),
        gs.GSListResult(
            content_length=None,
            creation_time=None,
            url="%s/nada/" % LS_PATH,
            generation=None,
            metageneration=None,
        ),
    ]

    def _LS(self, ctx, path, **kwargs):
        return ctx.LS(path, **kwargs)

    def LS(self, ctx=None, **kwargs):
        if ctx is None:
            ctx = self.ctx
        return self._LS(ctx, self.LS_PATH, **kwargs)

    def _List(self, ctx, path, **kwargs):
        return ctx.List(path, **kwargs)

    def List(self, ctx=None, **kwargs):
        if ctx is None:
            ctx = self.ctx
        return self._List(ctx, self.LS_PATH, **kwargs)

    def testBasicLS(self):
        """Simple LS test."""
        self.gs_mock.SetDefaultCmdResult(stdout=self.LS_OUTPUT)
        result = self.LS()
        self.gs_mock.assertCommandContains(["ls", "--", self.LS_PATH])

        self.assertEqual(self.LS_OUTPUT_LINES, result)

    def testBasicList(self):
        """Simple List test."""
        self.gs_mock.SetDefaultCmdResult(stdout=self.DETAILED_LS_OUTPUT)
        result = self.List(details=True)
        self.gs_mock.assertCommandContains(["ls", "-l", "--", self.LS_PATH])

        self.assertEqual(self.LIST_RESULT, result)


class UnmockedLSTest(cros_test_lib.TempDirTestCase):
    """Tests LS/List functionality w/out mocks."""

    def testLocalPaths(self):
        """Tests listing local paths."""
        ctx = gs.GSContext()

        # The tempdir should exist, but be empty, by default.
        self.assertEqual([], ctx.LS(self.tempdir))

        # Create a few random files.
        files = ["a", "b", "c!@", "d e f", "k\tj"]
        for f in files:
            osutils.Touch(os.path.join(self.tempdir, f))

        # See what the code finds -- order is not guaranteed.
        found = ctx.LS(self.tempdir)
        files.sort()
        found.sort()
        self.assertEqual(files, found)

    @cros_test_lib.pytestmark_network_test
    def testRemotePath(self):
        """Tests listing remote paths."""
        ctx = gs.GSContext()

        with gs.TemporaryURL("chromite.ls") as tempuri:
            # The path shouldn't exist by default.
            with self.assertRaises(gs.GSNoSuchKey):
                ctx.LS(tempuri)

            # Create some files with known sizes.
            files = ["a", "b", "c!@", "d e f", "k\tj"]
            uris = []
            for f in files:
                filename = os.path.join(self.tempdir, f)
                osutils.WriteFile(filename, f * 10)
                uri = os.path.join(tempuri, f)
                uris.append(uri)
                ctx.Copy(filename, uri)

            # Check the plain listing -- order is not guaranteed.
            found = ctx.LS(tempuri)
            uris.sort()
            found.sort()
            self.assertEqual(uris, found)

            # Check the detailed listing.
            found = ctx.List(tempuri, details=True)
            self.assertEqual(
                files, sorted([os.path.basename(x.url) for x in found])
            )

            # Check the detailed listing with multiple paths.
            found = ctx.List(
                ["%s/%s" % (tempuri, x) for x in files], details=True
            )
            self.assertEqual(
                files, sorted([os.path.basename(x.url) for x in found])
            )

            # Make sure sizes line up.
            for f in found:
                l = len(os.path.basename(f.url)) * 10
                self.assertEqual(f.content_length, l)
                self.assertIsNone(f.generation)
                self.assertIsNone(f.metageneration)

            # Check the generation listing with multiple paths.
            found = ctx.List([f"{tempuri}/{x}" for x in files], generation=True)
            self.assertGreater(len(found), 0)

            for f in found:
                self.assertGreater(f.generation, 0)
                self.assertGreater(f.metageneration, 0)


class CopyTest(AbstractGSContextTest, cros_test_lib.TempDirTestCase):
    """Tests GSContext.Copy() functionality."""

    GIVEN_REMOTE = EXPECTED_REMOTE = "gs://test/path/file"
    ACL = "public-read"

    def setUp(self):
        self.local_path = os.path.join(self.tempdir, "file")
        osutils.WriteFile(self.local_path, "")

    def _Copy(self, ctx, src, dst, **kwargs):
        return ctx.Copy(src, dst, **kwargs)

    def Copy(self, ctx=None, **kwargs):
        if ctx is None:
            ctx = self.ctx
        return self._Copy(ctx, self.local_path, self.GIVEN_REMOTE, **kwargs)

    def testBasic(self):
        """Simple copy test."""
        self.Copy()
        self.gs_mock.assertCommandContains(
            ["cp", "--", self.local_path, self.EXPECTED_REMOTE]
        )

    def testWithACL(self):
        """ACL specified during init."""
        ctx = gs.GSContext(acl=self.ACL)
        self.Copy(ctx=ctx)
        self.gs_mock.assertCommandContains(["cp", "-a", self.ACL])

    def testWithACL2(self):
        """ACL specified during invocation."""
        self.Copy(acl=self.ACL)
        self.gs_mock.assertCommandContains(["cp", "-a", self.ACL])

    def testWithACL3(self):
        """ACL specified during invocation that overrides init."""
        ctx = gs.GSContext(acl=self.ACL)
        self.Copy(ctx=ctx, acl=self.ACL)
        self.gs_mock.assertCommandContains(["cp", "-a", self.ACL])

    def testRunCommandError(self):
        """Test RunCommandError is propagated."""
        self.gs_mock.AddCmdResult(partial_mock.In("cp"), returncode=1)
        self.assertRaises(cros_build_lib.RunCommandError, self.Copy)

    def testGSContextPreconditionFailed(self):
        """GSContextPreconditionFailed is raised properly."""
        self.gs_mock.AddCmdResult(
            partial_mock.In("cp"),
            returncode=1,
            stderr=self.gs_mock.GSResponsePreconditionFailed,
        )
        self.assertRaises(gs.GSContextPreconditionFailed, self.Copy)

    def testNonRecursive(self):
        """Test non-recursive copy."""
        self.Copy(recursive=False)
        self.gs_mock.assertCommandContains(["-r"], expected=False)

    def testRecursive(self):
        """Test recursive copy."""
        self.Copy(recursive=True)
        self.gs_mock.assertCommandContains(["-r"], expected=False)
        self._Copy(self.ctx, self.tempdir, self.GIVEN_REMOTE, recursive=True)
        self.gs_mock.assertCommandContains(["cp", "-r"])

    def testCompress(self):
        """Test auto_compress behavior."""
        path = os.path.join(self.tempdir, "ok.txt")
        self._Copy(self.ctx, path, self.GIVEN_REMOTE, auto_compress=True)
        self.gs_mock.assertCommandContains(["-Z"], expected=True)

    def testGeneration(self):
        """Test generation return value."""
        exp_gen = 1413571271901000
        stderr = (
            "Copying file:///dev/null [Content-Type=application/octet-stream]"
            "...\n"
            "Uploading   %(uri)s:               0 B    \r"
            "Uploading   %(uri)s:               0 B    \r"
            "Created: %(uri)s#%(gen)s\n"
        ) % {"uri": self.GIVEN_REMOTE, "gen": exp_gen}
        self.gs_mock.AddCmdResult(
            partial_mock.In("cp"), returncode=0, stderr=stderr
        )
        gen = self.Copy()
        self.assertEqual(gen, exp_gen)

    def testGeneration404(self):
        """Test behavior when we get weird output."""
        stderr = (
            # This is a bit verbose, but it's from real output, so should be
            # fine.
            "Copying file:///tmp/tmpyUUPg1 [Content-Type=application/"
            "octet-stream]...\n"
            "Uploading   ...recovery-R38-6158.66.0-mccloud.instructions.lock:"
            " 0 B/38 B    \r"
            "Uploading   ...recovery-R38-6158.66.0-mccloud.instructions.lock:"
            " 38 B/38 B    \r"
            'NotFoundException: 404 Attempt to get key for "'
            "gs://chromeos-releases/tobesigned/50,beta-\n"
            "channel,mccloud,6158.66.0,ChromeOS-\n"
            'recovery-R38-6158.66.0-mccloud.instructions.lock" failed. '
            "This can happen if the\n"
            "URI refers to a non-existent object or if you meant to operate on "
            "a directory\n"
            "(e.g., leaving off -R option on gsutil cp, mv, or ls of a "
            "bucket)\n"
        )
        self.gs_mock.AddCmdResult(
            partial_mock.In("cp"), returncode=1, stderr=stderr
        )
        self.assertEqual(self.Copy(), None)


class UnmockedCopyTest(cros_test_lib.TempDirTestCase):
    """Tests Copy functionality w/out mocks."""

    @cros_test_lib.pytestmark_network_test
    def testNormal(self):
        """Test normal upload/download behavior."""
        ctx = gs.GSContext()

        content = "foooooooooooooooo!@!"

        local_src_file = os.path.join(self.tempdir, "src.txt")
        local_dst_file = os.path.join(self.tempdir, "dst.txt")

        osutils.WriteFile(local_src_file, content)

        with gs.TemporaryURL("chromite.cp") as tempuri:
            # Upload the file.
            gen = ctx.Copy(local_src_file, tempuri)

            # Verify the generation is valid.  All we can assume is that it's a
            # valid whole number greater than 0.
            self.assertNotEqual(gen, None)
            self.assertIsInstance(gen, numbers.Integral)
            self.assertGreater(gen, 0)

            # Verify the size is what we expect.
            self.assertEqual(
                ctx.GetSize(tempuri), os.path.getsize(local_src_file)
            )

            # Copy it back down and verify the content is unchanged.
            ctx.Copy(tempuri, local_dst_file)
            new_content = osutils.ReadFile(local_dst_file)
            self.assertEqual(content, new_content)

    @cros_test_lib.pytestmark_network_test
    def testCompress(self):
        """Test auto_compress behavior."""
        ctx = gs.GSContext()

        # Need a string that compresses well.
        content = (
            "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
            "zzzzzlkasjdf89j2;3o4kqmnioruasddfv89uxdp;foiasjdf0892qn5kln"
        )

        local_src_file = os.path.join(self.tempdir, "src.txt")
        local_dst_file = os.path.join(self.tempdir, "dst.txt")

        osutils.WriteFile(local_src_file, content)

        with gs.TemporaryURL("chromite.cp") as tempuri:
            # Upload & compress the file.
            gen = ctx.Copy(local_src_file, tempuri, auto_compress=True)

            # Verify the generation is valid.  All we can assume is that it's a
            # valid whole number greater than 0.
            self.assertNotEqual(gen, None)
            self.assertGreater(gen, 0)

            # Verify the size is smaller (because it's compressed).
            self.assertLess(
                ctx.GetSize(tempuri), os.path.getsize(local_src_file)
            )

            # Copy it back down and verify the content is decompressed &
            # unchanged.
            ctx.Copy(tempuri, local_dst_file)
            new_content = osutils.ReadFile(local_dst_file)
            self.assertEqual(content, new_content)

    @cros_test_lib.pytestmark_network_test
    def testVersion(self):
        """Test version (generation) behavior."""
        ctx = gs.GSContext()

        local_src_file = os.path.join(self.tempdir, "src.txt")

        with gs.TemporaryURL("chromite.cp") as tempuri:
            # Upload the file.
            osutils.WriteFile(local_src_file, "gen0")
            gen = ctx.Copy(local_src_file, tempuri, version=0)

            # Verify the generation is valid.  All we can assume is that it's a
            # valid whole number greater than 0.
            self.assertNotEqual(gen, None)
            self.assertGreater(gen, 0)

            # The file should exist, so this will die due to wrong generation.
            osutils.WriteFile(local_src_file, "gen-bad")
            self.assertRaises(
                gs.GSContextPreconditionFailed,
                ctx.Copy,
                local_src_file,
                tempuri,
                version=0,
            )

            # Sanity check the content is unchanged.
            self.assertEqual(ctx.Cat(tempuri, encoding="utf-8"), "gen0")

            # Upload the file, but with the right generation.
            osutils.WriteFile(local_src_file, b"gen-new", mode="wb")
            gen = ctx.Copy(local_src_file, tempuri, version=gen)
            self.assertEqual(ctx.Cat(tempuri), b"gen-new")


class CopyIntoTest(CopyTest):
    """Test CopyInto functionality."""

    FILE = "ooga"
    GIVEN_REMOTE = "gs://test/path/file"
    EXPECTED_REMOTE = "%s/%s" % (GIVEN_REMOTE, FILE)

    def _Copy(self, ctx, src, dst, **kwargs):
        return ctx.CopyInto(src, dst, filename=self.FILE, **kwargs)


class RemoveTest(AbstractGSContextTest):
    """Tests GSContext.Remove() functionality."""

    def testNormal(self):
        """Test normal remove behavior."""
        self.assertEqual(self.ctx.Remove("gs://foo/bar"), None)

    def testMissing(self):
        """Test behavior w/missing files."""
        self.gs_mock.AddCmdResult(
            ["rm", "--", "gs://foo/bar"],
            stderr="CommandException: No URLs matched: " "gs://foo/bar",
            returncode=1,
        )
        self.assertRaises(gs.GSNoSuchKey, self.ctx.Remove, "gs://foo/bar")
        # This one should not throw an exception.
        self.ctx.Remove("gs://foo/bar", ignore_missing=True)

    def testRecursive(self):
        """Verify we pass down -R in recursive mode."""
        self.ctx.Remove("gs://foo/bar", recursive=True)
        self.gs_mock.assertCommandContains(["rm", "-R"])

    def testMultiple(self):
        """Test handling of multiple paths."""
        self.ctx.Remove(["gs://foo/bar", "gs://fat/cow"], recursive=True)
        self.gs_mock.assertCommandContains(
            ["rm", "-R", "--", "gs://foo/bar", "gs://fat/cow"]
        )


class UnmockedRemoveTest(cros_test_lib.TestCase):
    """Tests Remove functionality w/out mocks."""

    @cros_test_lib.pytestmark_network_test
    def testNormal(self):
        """Test normal remove behavior."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.rm") as tempuri:
            ctx.Copy("/dev/null", tempuri)
            self.assertEqual(ctx.Remove(tempuri), None)

    @cros_test_lib.pytestmark_network_test
    def testMissing(self):
        """Test behavior w/missing files."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.rm") as tempuri:
            self.assertRaises(gs.GSNoSuchKey, ctx.Remove, tempuri)
            # This one should not throw an exception.
            ctx.Remove(tempuri, ignore_missing=True)

    @cros_test_lib.pytestmark_network_test
    def testRecursive(self):
        """Verify recursive mode works."""
        files = ("a", "b/c", "d/e/ffff")
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.rm") as tempuri:
            for p in files:
                ctx.Copy("/dev/null", os.path.join(tempuri, p))
            ctx.Remove(tempuri, recursive=True)
            for p in files:
                self.assertFalse(ctx.Exists(os.path.join(tempuri, p)))

    @cros_test_lib.pytestmark_network_test
    def testMultiple(self):
        """Test handling of multiple paths."""
        files = ("a", "b/c", "d/e/ffff")
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.rm") as tempuri:
            for p in files:
                ctx.Copy("/dev/null", os.path.join(tempuri, p))
            ctx.Remove(["%s/%s" % (tempuri, x) for x in files])
            for p in files:
                self.assertFalse(ctx.Exists(os.path.join(tempuri, p)))

    @cros_test_lib.pytestmark_network_test
    def testGeneration(self):
        """Test conditional remove behavior."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.rm") as tempuri:
            ctx.Copy("/dev/null", tempuri)
            gen, _ = ctx.GetGeneration(tempuri)
            self.assertRaises(
                gs.GSContextPreconditionFailed,
                ctx.Remove,
                tempuri,
                version=gen + 1,
            )
            self.assertTrue(ctx.Exists(tempuri))
            ctx.Remove(tempuri, version=gen)
            self.assertFalse(ctx.Exists(tempuri))


class MoveTest(AbstractGSContextTest, cros_test_lib.TempDirTestCase):
    """Tests GSContext.Move() functionality."""

    GIVEN_REMOTE = EXPECTED_REMOTE = "gs://test/path/file"

    def setUp(self):
        self.local_path = os.path.join(self.tempdir, "file")
        osutils.WriteFile(self.local_path, "")

    def _Move(self, ctx, src, dst, **kwargs):
        return ctx.Move(src, dst, **kwargs)

    def Move(self, ctx=None, **kwargs):
        if ctx is None:
            ctx = self.ctx
        return self._Move(ctx, self.local_path, self.GIVEN_REMOTE, **kwargs)

    def testBasic(self):
        """Simple move test."""
        self.Move()
        self.gs_mock.assertCommandContains(
            ["mv", "--", self.local_path, self.EXPECTED_REMOTE]
        )


class GSContextInitTest(cros_test_lib.MockTempDirTestCase):
    """Tests GSContext.__init__() functionality."""

    def setUp(self):
        os.environ.pop("BOTO_CONFIG", None)
        self.bad_path = os.path.join(self.tempdir, "nonexistent")

        file_list = ["gsutil_bin", "boto_file", "acl_file"]
        cros_test_lib.CreateOnDiskHierarchy(self.tempdir, file_list)
        for f in file_list:
            setattr(self, f, os.path.join(self.tempdir, f))
        self.StartPatcher(PatchGS("DEFAULT_BOTO_FILE", new=self.boto_file))
        self.StartPatcher(PatchGS("_DEFAULT_GSUTIL_BIN", new=self.gsutil_bin))

    def testInitGsutilBin(self):
        """Test we use the given gsutil binary, erroring where appropriate."""
        # pylint: disable=protected-access
        gs.GSContext._CRCMOD_METHOD = "missing"
        self.assertEqual(
            gs.GSContext()._gsutil_bin, [sys.executable, self.gsutil_bin]
        )

        gs.GSContext._CRCMOD_METHOD = "vpython"
        self.assertEqual(
            gs.GSContext()._gsutil_bin, ["vpython3", self.gsutil_bin]
        )

        self.assertRaises(
            gs.GSContextException, gs.GSContext, gsutil_bin=self.bad_path
        )

        gs.GSContext._CRCMOD_METHOD = None

    def testBadGSUtilBin(self):
        """Test exception thrown for bad gsutil paths."""
        self.assertRaises(
            gs.GSContextException, gs.GSContext, gsutil_bin=self.bad_path
        )

    def testInitBotoFileEnv(self):
        """Test boto file environment is set correctly."""
        # We use gsutil_bin as a file that already exists and is not the
        # default.
        os.environ["BOTO_CONFIG"] = self.gsutil_bin
        self.assertTrue(gs.GSContext().boto_file, self.gsutil_bin)
        self.assertEqual(
            gs.GSContext(boto_file=self.acl_file).boto_file, self.acl_file
        )
        self.assertEqual(
            gs.GSContext(boto_file=self.bad_path).boto_file, self.bad_path
        )

    def testInitBotoFileEnvError(self):
        """Boto file through env var error."""
        self.assertEqual(gs.GSContext().boto_file, self.boto_file)
        # Check env usage next; no need to cleanup, teardown handles it,
        # and we want the env var to persist for the next part of this test.
        os.environ["BOTO_CONFIG"] = self.bad_path
        self.assertEqual(gs.GSContext().boto_file, self.bad_path)

    def testInitBotoFileError(self):
        """Test bad boto file."""
        self.assertEqual(
            gs.GSContext(boto_file=self.bad_path).boto_file, self.bad_path
        )

    def testDoNotUseDefaultBotoFileIfItDoesNotExist(self):
        """Do not set boto file if the default path does not exist."""
        if "BOTO_CONFIG" in os.environ:
            del os.environ["BOTO_CONFIG"]
        gs.GSContext.DEFAULT_BOTO_FILE = "foo/bar/doesnotexist"
        self.assertEqual(gs.GSContext().boto_file, None)

    def testInitAclFile(self):
        """Test ACL selection logic in __init__."""
        self.assertEqual(gs.GSContext().acl, None)
        self.assertEqual(gs.GSContext(acl=self.acl_file).acl, self.acl_file)

    def _testHTTPProxySettings(self, d):
        flags = gs.GSContext().gsutil_flags
        for key in d:
            flag = "Boto:%s=%s" % (key, d[key])
            error_msg = "%s not in %s" % (flag, " ".join(flags))
            self.assertTrue(flag in flags, error_msg)

    def testHTTPProxy(self):
        """Test we set http proxy correctly."""
        d = {
            "proxy": "fooserver",
            "proxy_user": "foouser",
            "proxy_pass": "foopasswd",
            "proxy_port": "8080",
        }
        os.environ["http_proxy"] = "http://%s:%s@%s:%s/" % (
            d["proxy_user"],
            d["proxy_pass"],
            d["proxy"],
            d["proxy_port"],
        )
        self._testHTTPProxySettings(d)

    def testHTTPProxyNoPort(self):
        """Test we accept http proxy without port number."""
        d = {
            "proxy": "fooserver",
            "proxy_user": "foouser",
            "proxy_pass": "foopasswd",
        }
        os.environ["http_proxy"] = "http://%s:%s@%s/" % (
            d["proxy_user"],
            d["proxy_pass"],
            d["proxy"],
        )
        self._testHTTPProxySettings(d)

    def testHTTPProxyNoUserPasswd(self):
        """Test we accept http proxy without user and password."""
        d = {"proxy": "fooserver", "proxy_port": "8080"}
        os.environ["http_proxy"] = "http://%s:%s/" % (
            d["proxy"],
            d["proxy_port"],
        )
        self._testHTTPProxySettings(d)

    def testHTTPProxyNoPasswd(self):
        """Test we accept http proxy without password."""
        d = {
            "proxy": "fooserver",
            "proxy_user": "foouser",
            "proxy_port": "8080",
        }
        os.environ["http_proxy"] = "http://%s@%s:%s/" % (
            d["proxy_user"],
            d["proxy"],
            d["proxy_port"],
        )
        self._testHTTPProxySettings(d)


class GSDoCommandTest(cros_test_lib.TestCase):
    """Tests of gs.DoCommand behavior.

    This test class inherits from cros_test_lib.TestCase instead of from
    AbstractGSContextTest, because the latter unnecessarily mocks out
    cros_build_lib.run, in a way that breaks _testDoCommand (changing
    cros_build_lib.run to refer to a mock instance after the
    GenericRetry mock has already been set up to expect a reference to the
    original run).
    """

    def setUp(self):
        self.ctx = gs.GSContext()

    def _testDoCommand(
        self,
        ctx,
        headers=(),
        retries=None,
        sleep=None,
        version=None,
        recursive=False,
    ):
        if retries is None:
            retries = ctx.DEFAULT_RETRIES
        if sleep is None:
            sleep = ctx.DEFAULT_SLEEP_TIME

        result = cros_build_lib.CompletedProcess(stderr="")
        with mock.patch.object(
            retry_stats, "RetryWithStats", autospec=True, return_value=result
        ):
            ctx.Copy("/blah", "gs://foon", version=version, recursive=recursive)
            # pylint: disable=protected-access
            cmd = self.ctx._gsutil_bin + self.ctx.gsutil_flags + list(headers)
            cmd += ["cp", "-v"]
            if recursive:
                cmd += ["-r", "-e"]
            cmd += ["--", "/blah", "gs://foon"]

            # pylint: disable=protected-access
            retry_stats.RetryWithStats.assert_called_once_with(
                retry_stats.GSUTIL,
                ctx._RetryFilter,
                retries,
                cros_build_lib.run,
                cmd,
                sleep=sleep,
                stderr=True,
                stdout=True,
                encoding="utf-8",
                extra_env=mock.ANY,
            )

    def testDoCommandDefault(self):
        """Verify the internal DoCommand function works correctly."""
        self._testDoCommand(self.ctx)

    def testDoCommandCustom(self):
        """Test that retries and sleep parameters are honored."""
        ctx = gs.GSContext(retries=4, sleep=1)
        self._testDoCommand(ctx, retries=4, sleep=1)

    def testVersion(self):
        """Test that the version field expands into the header."""
        self._testDoCommand(
            self.ctx, version=3, headers=["-h", "x-goog-if-generation-match:3"]
        )

    def testDoCommandRecursiveCopy(self):
        """Test that recursive copy command is honored."""
        self._testDoCommand(self.ctx, recursive=True)


class GSRetryFilterTest(cros_test_lib.TestCase):
    """Verifies that we filter and process gsutil errors correctly."""

    # pylint: disable=protected-access

    LOCAL_PATH = "/tmp/file"
    REMOTE_PATH = (
        "gs://chromeos-prebuilt/board/beltino/paladin-R33-4926.0.0"
        "-rc2/packages/chromeos-base/autotest-tests-0.0.1-r4679.tbz2"
    )
    GSUTIL_TRACKER_DIR = "/foo"
    UPLOAD_TRACKER_FILE = (
        "upload_TRACKER_9263880a80e4a582aec54eaa697bfcdd9c5621ea.9.tbz2__JSON."
        "url"
    )
    DOWNLOAD_TRACKER_FILE = (
        "download_TRACKER_5a695131f3ef6e4c903f594783412bb996a7f375._file__JSON."
        "etag"
    )
    RETURN_CODE = 3

    def setUp(self):
        self.ctx = gs.GSContext()
        self.ctx.DEFAULT_GSUTIL_TRACKER_DIR = self.GSUTIL_TRACKER_DIR

    def _getException(self, cmd, error, returncode=RETURN_CODE):
        result = cros_build_lib.CompletedProcess(
            args=cmd, stderr=error, returncode=returncode
        )
        return cros_build_lib.RunCommandError("blah", result)

    def assertNoSuchKey(self, error_msg):
        cmd = ["gsutil", "ls", self.REMOTE_PATH]
        e = self._getException(cmd, error_msg)
        self.assertRaises(gs.GSNoSuchKey, self.ctx._RetryFilter, e)

    def assertPreconditionFailed(self, error_msg):
        cmd = ["gsutil", "ls", self.REMOTE_PATH]
        e = self._getException(cmd, error_msg)
        self.assertRaises(
            gs.GSContextPreconditionFailed, self.ctx._RetryFilter, e
        )

    def testRetryOnlyFlakyErrors(self):
        """Test that we retry only flaky errors."""
        cmd = ["gsutil", "ls", self.REMOTE_PATH]
        e = self._getException(cmd, "ServiceException: 503")
        self.assertTrue(self.ctx._RetryFilter(e))

        e = self._getException(cmd, "UnknownException: 603")
        self.assertFalse(self.ctx._RetryFilter(e))

    def testRaiseGSErrors(self):
        """Test that we raise appropriate exceptions."""
        self.assertNoSuchKey("CommandException: No URLs matched.")
        self.assertNoSuchKey("NotFoundException: 404")
        self.assertPreconditionFailed(
            "PreconditionException: 412 Precondition Failed"
        )

    @mock.patch("chromite.lib.osutils.SafeUnlink")
    @mock.patch("chromite.lib.osutils.ReadFile")
    @mock.patch("os.path.exists")
    def testRemoveUploadTrackerFile(
        self, exists_mock, readfile_mock, unlink_mock
    ):
        """Test removal of tracker files for resumable upload failures."""
        cmd = ["gsutil", "cp", self.LOCAL_PATH, self.REMOTE_PATH]
        e = self._getException(cmd, self.ctx.RESUMABLE_UPLOAD_ERROR)
        exists_mock.return_value = True
        readfile_mock.return_value = "foohash"
        self.ctx._RetryFilter(e)
        tracker_file_path = os.path.join(
            self.GSUTIL_TRACKER_DIR, self.UPLOAD_TRACKER_FILE
        )
        unlink_mock.assert_called_once_with(tracker_file_path)

    @mock.patch("chromite.lib.osutils.SafeUnlink")
    @mock.patch("chromite.lib.osutils.ReadFile")
    @mock.patch("os.path.exists")
    def testRemoveDownloadTrackerFile(
        self, exists_mock, readfile_mock, unlink_mock
    ):
        """Test removal of tracker files for resumable download failures."""
        cmd = ["gsutil", "cp", self.REMOTE_PATH, self.LOCAL_PATH]
        e = self._getException(cmd, self.ctx.RESUMABLE_DOWNLOAD_ERROR)
        exists_mock.return_value = True
        readfile_mock.return_value = "foohash"
        self.ctx._RetryFilter(e)
        tracker_file_path = os.path.join(
            self.GSUTIL_TRACKER_DIR, self.DOWNLOAD_TRACKER_FILE
        )
        unlink_mock.assert_called_once_with(tracker_file_path)

    def testRemoveTrackerFileOnlyForCP(self):
        """Test that we remove tracker files only for 'gsutil cp'."""
        cmd = ["gsutil", "ls", self.REMOTE_PATH]
        e = self._getException(cmd, self.ctx.RESUMABLE_DOWNLOAD_ERROR)

        with mock.patch.object(gs.GSContext, "GetTrackerFilenames"):
            self.ctx._RetryFilter(e)
            self.assertFalse(self.ctx.GetTrackerFilenames.called)

    def testNoRemoveTrackerFileOnOtherErrors(self):
        """Verify we do not attempt to delete tracker files for other errors."""
        cmd = ["gsutil", "cp", self.REMOTE_PATH, self.LOCAL_PATH]
        e = self._getException(cmd, "One or more URLs matched no objects")

        with mock.patch.object(gs.GSContext, "GetTrackerFilenames"):
            self.assertRaises(gs.GSNoSuchKey, self.ctx._RetryFilter, e)
            self.assertFalse(self.ctx.GetTrackerFilenames.called)

    def testRetryTransient(self):
        """Verify retry behavior when hitting b/11762375"""
        error = (
            "Removing gs://foo/bar/monkey...\n"
            "GSResponseError: status=403, code=InvalidAccessKeyId, "
            'reason="Forbidden", message="The User Id you provided '
            'does not exist in our records.", detail="GOOGBWPADTH7OV25KJXZ"'
        )
        e = self._getException(["gsutil", "rm", "gs://foo/bar/monkey"], error)
        self.assertEqual(self.ctx._RetryFilter(e), True)

    def testRetrySSLEOF(self):
        """Verify retry behavior on EOF in violation of SSL protocol."""
        error = (
            "ssl.SSLError: [Errno 8] _ssl.c:510: EOF occurred in violation of"
            " protocol"
        )
        e = self._getException(
            ["gsutil", "cat", "gs://totally/legit/uri"], error
        )
        self.assertEqual(self.ctx._RetryFilter(e), True)

    def testRetrySSLTimeout(self):
        """Verify retry behavior when read operation timed out."""
        error = "ssl.SSLError: ('The read operation timed out',)"
        e = self._getException(
            ["gsutil", "cp", self.REMOTE_PATH, self.LOCAL_PATH], error
        )
        self.assertEqual(self.ctx._RetryFilter(e), True)

    def testRetrySSLHandshakeTimeout(self):
        """Verify retry behavior when handshake operation timed out."""
        error = "ssl.SSLError: _ssl.c:495: The handshake operation timed out"
        e = self._getException(
            ["gsutil", "cp", self.REMOTE_PATH, self.LOCAL_PATH], error
        )
        self.assertEqual(self.ctx._RetryFilter(e), True)

    def testRetryAccessDeniedException(self):
        """Verify retry behavior on transient AccessDeniedException."""
        error = (
            "AccessDeniedException: 403 XXX@gmail.com does not have "
            "storage.objects.delete access to XXX"
            "CommandException: 1 file/object could not be transferred."
        )
        e = self._getException(
            ["gsutil", "cp", self.REMOTE_PATH, self.LOCAL_PATH], error
        )
        self.assertEqual(self.ctx._RetryFilter(e), True)


class GSContextTest(AbstractGSContextTest):
    """Tests for GSContext()"""

    URL = "gs://chromeos-image-archive/x86-mario-release/R17-1413.0.0-a1-b1346"
    FILE_NAME = "chromeos_R17-1413.0.0-a1_x86-mario_full_dev.bin"
    GS_PATH = "gs://test/path/to/list"
    DT = datetime.datetime(2000, 1, 2, 10, 10, 10)
    DT_STR = DT.strftime(gs.DATETIME_FORMAT)
    DETAILED_LS_OUTPUT_LINES = [
        "%10d  %s  %s/mock_data" % (100, DT_STR, GS_PATH),
        "%10d  %s  %s/mock_data" % (100, DT_STR, GS_PATH),
        "%10d  %s  %s/%s" % (100, DT_STR, GS_PATH, FILE_NAME),
        "TOTAL: 3 objects, XXXXX bytes (X.XX GB)",
    ]

    LIST_RESULT = [
        gs.GSListResult(
            content_length=100,
            creation_time=DT,
            url="%s/mock_data" % GS_PATH,
            generation=None,
            metageneration=None,
        ),
        gs.GSListResult(
            content_length=100,
            creation_time=DT,
            url="%s/mock_data" % GS_PATH,
            generation=None,
            metageneration=None,
        ),
        gs.GSListResult(
            content_length=100,
            creation_time=DT,
            url="%s/%s" % (GS_PATH, FILE_NAME),
            generation=None,
            metageneration=None,
        ),
    ]

    def testTemporaryUrl(self):
        """Just verify the url helper generates valid URLs."""
        with gs.TemporaryURL("mock") as url:
            base = url[0 : len(constants.TRASH_BUCKET)]
            self.assertEqual(base, constants.TRASH_BUCKET)

            valid_chars = set(string.ascii_letters + string.digits + "/-")
            used_chars = set(url[len(base) + 1 :])
            self.assertEqual(used_chars - valid_chars, set())

    def testSetAclError(self):
        """Ensure SetACL blows up if the acl isn't specified."""
        self.assertRaises(gs.GSContextException, self.ctx.SetACL, "gs://abc/3")

    def testSetDefaultAcl(self):
        """Test default ACL behavior."""
        self.ctx.SetACL("gs://abc/1", "monkeys")
        self.gs_mock.assertCommandContains(
            ["acl", "set", "--", "monkeys", "gs://abc/1"]
        )

    def testSetAcl(self):
        """Base ACL setting functionality."""
        ctx = gs.GSContext(acl="/my/file/acl")
        ctx.SetACL("gs://abc/1")
        self.gs_mock.assertCommandContains(
            ["acl", "set", "/my/file/acl", "gs://abc/1"]
        )

    def testSetAclMultiple(self):
        """Test multiple paths at once."""
        ctx = gs.GSContext(acl="/my/file/acl")
        ctx.SetACL(["gs://abc/1", "gs://abc/2"])
        self.gs_mock.assertCommandContains(
            ["acl", "set", "--", "/my/file/acl", "gs://abc/1", "gs://abc/2"]
        )

    def testChangeAcl(self):
        """Test changing an ACL."""
        basic_file = """
-g foo:READ

-u bar:FULL_CONTROL"""
        comment_file = """
# Give foo READ permission
-g foo:READ # Now foo can read this
  # This whole line should be removed
-u bar:FULL_CONTROL
# A comment at the end"""
        tempfile = os.path.join(self.tempdir, "tempfile")
        ctx = gs.GSContext()

        osutils.WriteFile(tempfile, basic_file)
        ctx.ChangeACL("gs://abc/1", acl_args_file=tempfile)
        self.gs_mock.assertCommandContains(
            [
                "acl",
                "ch",
                "-g",
                "foo:READ",
                "-u",
                "bar:FULL_CONTROL",
                "gs://abc/1",
            ]
        )

        osutils.WriteFile(tempfile, comment_file)
        ctx.ChangeACL("gs://abc/1", acl_args_file=tempfile)
        self.gs_mock.assertCommandContains(
            [
                "acl",
                "ch",
                "-g",
                "foo:READ",
                "-u",
                "bar:FULL_CONTROL",
                "gs://abc/1",
            ]
        )

        ctx.ChangeACL(
            "gs://abc/1", acl_args=["-g", "foo:READ", "-u", "bar:FULL_CONTROL"]
        )
        self.gs_mock.assertCommandContains(
            [
                "acl",
                "ch",
                "-g",
                "foo:READ",
                "-u",
                "bar:FULL_CONTROL",
                "gs://abc/1",
            ]
        )

        with self.assertRaises(gs.GSContextException):
            ctx.ChangeACL(
                "gs://abc/1", acl_args_file=tempfile, acl_args=["foo"]
            )

        with self.assertRaises(gs.GSContextException):
            ctx.ChangeACL("gs://abc/1")

    def testIncrement(self):
        """Test ability to atomically increment a counter."""
        ctx = gs.GSContext()

        with mock.patch.object(ctx, "GetGeneration", return_value=(0, 0)):
            ctx.Counter("gs://abc/1").Increment()

        self.gs_mock.assertCommandContains(["cp", "gs://abc/1"])

    def testGetGeneration(self):
        """Test ability to get the generation of a file."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"], stdout=StatTest.STAT_OUTPUT
        )
        ctx = gs.GSContext()
        ctx.GetGeneration("gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])

    def testCreateCached(self):
        """Test that the function runs through."""
        gs.GSContext(cache_dir=self.tempdir)

    def testReuseCached(self):
        """Test that second fetch is a cache hit."""
        gs.GSContext(cache_dir=self.tempdir)
        gs.GSUTIL_URL = None
        gs.GSContext(cache_dir=self.tempdir)

    def testUnknownError(self):
        """Verify when gsutil fails in an unknown way, we do the right thing."""
        self.gs_mock.AddCmdResult(["cat", "/asdf"], returncode=1)

        ctx = gs.GSContext()
        self.assertRaises(gs.GSCommandError, ctx.DoCommand, ["cat", "/asdf"])

    def testWaitForGsPathsAllPresent(self):
        """Test for waiting when all paths exist already."""
        ctx = gs.GSContext()

        with mock.patch.object(ctx, "Exists", return_value=True):
            ctx.WaitForGsPaths(["/path1", "/path2"], 20)

    def testWaitForGsPathsDelayedSuccess(self):
        """Test for waiting, but not all paths exist so we timeout."""
        ctx = gs.GSContext()

        # First they both don't exist, then one does, then remaining does.
        exists = [False, False, True, False, True]
        with mock.patch.object(ctx, "Exists", side_effect=exists):
            ctx.WaitForGsPaths(["/path1", "/path2"], 20, period=0.02)

    def testWaitForGsPathsTimeout(self):
        """Test for waiting, but not all paths exist so we timeout."""
        ctx = gs.GSContext()

        exists = {"/path1": True, "/path2": False}
        with mock.patch.object(ctx, "Exists", side_effect=lambda p: exists[p]):
            self.assertRaises(
                gs.timeout_util.TimeoutError,
                ctx.WaitForGsPaths,
                ["/path1", "/path2"],
                timeout=1,
                period=0.02,
            )

    def testParallelFalse(self):
        """Tests that "-m" is not used by default."""
        ctx = gs.GSContext()
        ctx.Copy("-", "gs://abc/1")
        self.assertFalse(any("-m" in cmd for cmd in self.gs_mock.raw_gs_cmds))

    def testParallelTrue(self):
        """Tests that "-m" is used when you pass parallel=True."""
        ctx = gs.GSContext()
        ctx.Copy("gs://abc/1", "gs://abc/2", parallel=True)
        self.assertTrue(all("-m" in cmd for cmd in self.gs_mock.raw_gs_cmds))

    def testNoParallelOpWithStdin(self):
        """Tests that "-m" is not used when we pipe the input."""
        ctx = gs.GSContext()
        ctx.Copy("gs://abc/1", "gs://abc/2", input="foo", parallel=True)
        self.assertFalse(any("-m" in cmd for cmd in self.gs_mock.raw_gs_cmds))

    def testGetGsNamesWithWait(self):
        """Test that we get the target artifact that is available."""
        pattern = "*_full_*"

        ctx = gs.GSContext()

        # GSUtil ls gs://archive_url_prefix/.
        self.gs_mock.SetDefaultCmdResult(
            stdout="\n".join(self.DETAILED_LS_OUTPUT_LINES)
        )

        # Timeout explicitly set to 0 to test that we always run at least once.
        result = ctx.GetGsNamesWithWait(pattern, self.URL, period=1, timeout=0)
        self.assertEqual([self.FILE_NAME], result)

    def testGetGsNamesWithWaitWithDirectStat(self):
        """Verify direct stat an artifact whose name is fully spelled out."""
        pattern = self.FILE_NAME

        ctx = gs.GSContext()

        exists = {"%s/%s" % (self.URL, self.FILE_NAME): True}
        with mock.patch.object(ctx, "Exists", side_effect=lambda p: exists[p]):
            # Timeout explicitly set to 0 to test that we always run at least
            # once.
            result = ctx.GetGsNamesWithWait(
                pattern, self.URL, period=1, timeout=0
            )
            self.assertEqual([self.FILE_NAME], result)

    def testGetGsNamesWithWaitWithRetry(self):
        """Test that we can poll until all target artifacts are available."""
        pattern = "*_full_*"

        ctx = gs.GSContext()

        # GSUtil ls gs://archive_url_prefix/.
        exists = [[], self.LIST_RESULT]
        with mock.patch.object(ctx, "List", side_effect=exists):
            # Timeout explicitly set to 0 to test that we always run at least
            # once.
            result = ctx.GetGsNamesWithWait(
                pattern, self.URL, period=1, timeout=4
            )
            self.assertEqual([self.FILE_NAME], result)

    def testGetGsNamesWithWaitTimeout(self):
        """Test that we can poll until all target artifacts are available."""
        pattern = "*_full_*"

        ctx = gs.GSContext()

        # GSUtil ls gs://archive_url_prefix/.
        self.gs_mock.SetDefaultCmdResult(stdout=[])

        # Timeout explicitly set to 0 to test that we always run at least once.
        result = ctx.GetGsNamesWithWait(pattern, self.URL, period=2, timeout=1)
        self.assertEqual(None, result)


class UnmockedGSContextTest(cros_test_lib.TempDirTestCase):
    """Tests for GSContext that go over the network."""

    @cros_test_lib.pytestmark_network_test
    def testIncrement(self):
        ctx = gs.GSContext()
        with gs.TemporaryURL("testIncrement") as url:
            counter = ctx.Counter(url)
            self.assertEqual(0, counter.Get())
            for i in range(1, 4):
                self.assertEqual(i, counter.Increment())
                self.assertEqual(i, counter.Get())

    @cros_test_lib.pytestmark_network_test
    def testGetGsNamesWithWait(self):
        """Tests getting files from remote paths."""
        file_name = "chromeos_R17-1413.0.0-a1_x86-mario_full_dev.bin"
        pattern = "*_full_*"

        ctx = gs.GSContext()

        with gs.TemporaryURL("testGetGsNamesWithWait") as url:
            # The path shouldn't exist by default.
            with self.assertRaises(gs.GSNoSuchKey):
                ctx.GetGsNamesWithWait(pattern, url, period=2, timeout=1)

            # Create files in bucket.
            files = ["mock_data", file_name]
            for f in files:
                filename = os.path.join(self.tempdir, f)
                osutils.WriteFile(filename, f * 10)
                uri = os.path.join(url, f)
                ctx.Copy(filename, uri)

            # Use pattern to get google storage names.
            self.assertEqual(
                ctx.GetGsNamesWithWait(pattern, url, period=1, timeout=0),
                [file_name],
            )
            # Use direct file name to get google storage names.
            self.assertEqual(
                ctx.GetGsNamesWithWait(file_name, url, period=1, timeout=0),
                [file_name],
            )

            # Remove the matched file, verify that GetGsNamesWithWait returns
            # None.
            ctx.Remove(os.path.join(url, file_name), ignore_missing=True)

            self.assertEqual(
                ctx.GetGsNamesWithWait(pattern, url, period=1, timeout=3), None
            )
            self.assertEqual(
                ctx.GetGsNamesWithWait(file_name, url, period=1, timeout=3),
                None,
            )


class StatTest(AbstractGSContextTest):
    """Tests Stat functionality."""

    # Convenient constant for mocking Stat results.
    STAT_OUTPUT = b"""gs://abc/1:
        Creation time:    Sat, 23 Aug 2014 06:53:20 GMT
        Content-Language: en
        Content-Length:   74
        Content-Type:   application/octet-stream
        Hash (crc32c):    BBPMPA==
        Hash (md5):   ms+qSYvgI9SjXn8tW/5UpQ==
        ETag:     CNCgocbmqMACEAE=
        Generation:   1408776800850000
        Metageneration:   1
      """

    # Stat output can vary based on how/when the file was created.
    STAT_OUTPUT_OLDER = b"""gs://abc/1:
        Creation time:    Sat, 23 Aug 2014 06:53:20 GMT
        Content-Length:   74
        Content-Type:   application/octet-stream
        Hash (crc32c):    BBPMPA==
        Hash (md5):   ms+qSYvgI9SjXn8tW/5UpQ==
        ETag:     CNCgocbmqMACEAE=
        Generation:   1408776800850000
        Metageneration:   1
      """

    # Stat output with no MD5 (this is not guaranteed by GS, and
    # can be omitted if files are uploaded as composite objects).
    STAT_OUTPUT_NO_MD5 = b"""gs://abc/1:
        Creation time:    Sat, 23 Aug 2014 06:53:20 GMT
        Content-Language: en
        Content-Length:   74
        Content-Type:   application/octet-stream
        Hash (crc32c):    BBPMPA==
        ETag:     CNCgocbmqMACEAE=
        Generation:   1408776800850000
        Metageneration:   1
      """

    # When stat throws an error.  It's a special snow flake.
    STAT_ERROR_OUTPUT = b"No URLs matched gs://abc/1"
    RETRY_STAT_ERROR_OUTPUT = (
        b"Retrying request, attempt #1...\nNo URLs matched gs://abc/1"
    )

    def testStat(self):
        """Test ability to get the generation of a file."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"], stdout=self.STAT_OUTPUT
        )
        ctx = gs.GSContext()
        result = ctx.Stat("gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])

        self.assertEqual(
            result.creation_time, datetime.datetime(2014, 8, 23, 6, 53, 20)
        )
        self.assertEqual(result.content_length, 74)
        self.assertEqual(result.content_type, "application/octet-stream")
        self.assertEqual(result.hash_crc32c, "BBPMPA==")
        self.assertEqual(result.hash_md5, "ms+qSYvgI9SjXn8tW/5UpQ==")
        self.assertEqual(result.etag, "CNCgocbmqMACEAE=")
        self.assertEqual(result.generation, 1408776800850000)
        self.assertEqual(result.metageneration, 1)

    def testStatOlderOutput(self):
        """Test ability to get the generation of a file."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"], stdout=self.STAT_OUTPUT_OLDER
        )
        ctx = gs.GSContext()
        result = ctx.Stat("gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])

        self.assertEqual(
            result.creation_time, datetime.datetime(2014, 8, 23, 6, 53, 20)
        )
        self.assertEqual(result.content_length, 74)
        self.assertEqual(result.content_type, "application/octet-stream")
        self.assertEqual(result.hash_crc32c, "BBPMPA==")
        self.assertEqual(result.hash_md5, "ms+qSYvgI9SjXn8tW/5UpQ==")
        self.assertEqual(result.etag, "CNCgocbmqMACEAE=")
        self.assertEqual(result.generation, 1408776800850000)
        self.assertEqual(result.metageneration, 1)

    def testStatNoMD5(self):
        """Make sure GSContext works without an MD5."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"], stdout=self.STAT_OUTPUT_NO_MD5
        )
        ctx = gs.GSContext()
        result = ctx.Stat("gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])

        self.assertEqual(
            result.creation_time, datetime.datetime(2014, 8, 23, 6, 53, 20)
        )
        self.assertEqual(result.content_length, 74)
        self.assertEqual(result.content_type, "application/octet-stream")
        self.assertEqual(result.hash_crc32c, "BBPMPA==")
        self.assertEqual(result.hash_md5, None)
        self.assertEqual(result.etag, "CNCgocbmqMACEAE=")
        self.assertEqual(result.generation, 1408776800850000)
        self.assertEqual(result.metageneration, 1)

    def testStatNoExist(self):
        """Test ability to get the generation of a file."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"],
            stderr=self.STAT_ERROR_OUTPUT,
            returncode=1,
        )
        ctx = gs.GSContext()
        self.assertRaises(gs.GSNoSuchKey, ctx.Stat, "gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])

    def testStatRetryNoExist(self):
        """Test ability to get the generation of a file."""
        self.gs_mock.AddCmdResult(
            ["stat", "--", "gs://abc/1"],
            stderr=self.RETRY_STAT_ERROR_OUTPUT,
            returncode=1,
        )
        ctx = gs.GSContext()
        self.assertRaises(gs.GSNoSuchKey, ctx.Stat, "gs://abc/1")
        self.gs_mock.assertCommandContains(["stat", "--", "gs://abc/1"])


class UnmockedStatTest(cros_test_lib.TempDirTestCase):
    """Tests Stat functionality w/out mocks."""

    @cros_test_lib.pytestmark_network_test
    def testStat(self):
        """Test ability to get the generation of a file."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("testStat") as url:
            # The URL doesn't exist. Test Stat for this case.
            self.assertRaises(gs.GSNoSuchKey, ctx.Stat, url)

            # Populate the URL.
            ctx.CreateWithContents(url, "test file contents")

            # Stat a URL that exists.
            result = ctx.Stat(url)

        # Verify the Stat results.
        self.assertIsInstance(result.creation_time, datetime.datetime)
        self.assertEqual(result.content_length, 18)
        self.assertEqual(result.content_type, "application/octet-stream")
        self.assertEqual(result.hash_crc32c, "wUc4sQ==")
        self.assertEqual(result.hash_md5, "iRvNNwBhmvUVG/lbg2/5sQ==")
        self.assertIsInstance(result.etag, str)
        self.assertIsInstance(result.generation, int)
        self.assertEqual(result.metageneration, 1)

    @cros_test_lib.pytestmark_network_test
    def testMissing(self):
        """Test exceptions when the file doesn't exist."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("testStat") as url:
            self.assertRaises(gs.GSNoSuchKey, ctx.Stat, url)
            self.assertFalse(ctx.Exists(url))

    def testExists(self):
        """Test Exists behavior with local files."""
        ctx = gs.GSContext()
        f = os.path.join(self.tempdir, "f")

        self.assertFalse(ctx.Exists(f))

        osutils.Touch(f)
        self.assertTrue(ctx.Exists(f))


class CatTest(cros_test_lib.TempDirTestCase):
    """Tests GSContext.Copy() functionality."""

    def testLocalFile(self):
        """Tests catting a local file."""
        ctx = gs.GSContext()
        filename = os.path.join(self.tempdir, "myfile")
        content = "foo"
        osutils.WriteFile(filename, content)
        self.assertEqual(content, ctx.Cat(filename, encoding="utf-8"))
        self.assertEqual(content.encode("utf-8"), ctx.Cat(filename))

    def testLocalMissingFile(self):
        """Tests catting a missing local file."""
        ctx = gs.GSContext()
        with self.assertRaises(gs.GSNoSuchKey):
            ctx.Cat(os.path.join(self.tempdir, "does/not/exist"))

    def testLocalForbiddenFile(self):
        """Tests catting a local file that we don't have access to."""
        ctx = gs.GSContext()
        filename = os.path.join(self.tempdir, "myfile")
        content = "foo"
        osutils.WriteFile(filename, content)
        os.chmod(filename, 0o000)
        with self.assertRaises(gs.GSContextException):
            ctx.Cat(filename)

    @cros_test_lib.pytestmark_network_test
    def testNetworkFile(self):
        """Tests catting a GS file."""
        ctx = gs.GSContext()
        filename = os.path.join(self.tempdir, "myfile")
        content = "fOoOoOoo1\n\thi@!*!(\r\r\nend"
        osutils.WriteFile(filename, content)

        with gs.TemporaryURL("chromite.cat") as tempuri:
            ctx.Copy(filename, tempuri)
            self.assertEqual(content, ctx.Cat(tempuri, encoding="utf-8"))

    @cros_test_lib.pytestmark_network_test
    def testNetworkMissingFile(self):
        """Tests catting a missing GS file."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.cat") as tempuri:
            with self.assertRaises(gs.GSNoSuchKey):
                ctx.Cat(tempuri)

    @cros_test_lib.pytestmark_network_test
    def testStreamingRemoteFile(self):
        """Test streaming a remote file."""
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.cat") as url:
            # The default chunksize is 0x100000 (1MB).
            first_chunk = b"a" * 0x100000
            second_chunk = b"aaaaaaaaabbbbbbccc"
            ctx.CreateWithContents(url, first_chunk + second_chunk)

            result = ctx.StreamingCat(url)
            # Get the 1st chunk.
            self.assertEqual(next(result), first_chunk)
            # Then the second chunk.
            self.assertEqual(next(result), second_chunk)
            # At last, no more.
            with self.assertRaises(StopIteration):
                next(result)


class DryRunTest(cros_test_lib.RunCommandTestCase):
    """Verify dry_run works for all of GSContext."""

    def setUp(self):
        self.ctx = gs.GSContext(dry_run=True)

    def tearDown(self):
        # Verify we don't try to call gsutil at all.
        for call_args in self.rc.call_args_list:
            self.assertNotIn("gsutil", call_args[0][0])

    def testCat(self):
        """Test Cat in dry_run mode."""
        self.assertEqual(self.ctx.Cat("gs://foo/bar"), b"")
        self.assertEqual(self.ctx.Cat("gs://foo/bar", encoding="utf-8"), "")

    def testChangeACL(self):
        """Test ChangeACL in dry_run mode."""
        self.assertEqual(
            self.ctx.ChangeACL("gs://foo/bar", acl_args_file="/dev/null"), None
        )

    def testCopy(self):
        """Test Copy in dry_run mode."""
        self.ctx.Copy("/dev/null", "gs://foo/bar")
        self.ctx.Copy("gs://foo/bar", "/dev/null")

    def testCreateWithContents(self):
        """Test Copy in dry_run mode."""
        self.ctx.CreateWithContents("gs://foo/bar", "My Little Content(tm)")

    def testCopyInto(self):
        """Test CopyInto in dry_run mode."""
        self.ctx.CopyInto("/dev/null", "gs://foo/bar")

    def testDoCommand(self):
        """Test DoCommand in dry_run mode."""
        self.ctx.DoCommand(["a-bad-command"])

    def testExists(self):
        """Test Exists in dry_run mode."""
        self.assertEqual(self.ctx.Exists("gs://foo/bar"), True)

    def testGetGeneration(self):
        """Test GetGeneration in dry_run mode."""
        self.assertEqual(self.ctx.GetGeneration("gs://foo/bar"), (0, 0))

    def testGetSize(self):
        """Test GetSize in dry_run mode."""
        self.assertEqual(self.ctx.GetSize("gs://foo/bar"), 0)

    def testGetTrackerFilenames(self):
        """Test GetTrackerFilenames in dry_run mode."""
        self.ctx.GetTrackerFilenames("foo")

    def testLS(self):
        """Test LS in dry_run mode."""
        self.assertEqual(self.ctx.LS("gs://foo/bar"), [])

    def testList(self):
        """Test List in dry_run mode."""
        self.assertEqual(self.ctx.List("gs://foo/bar"), [])

    def testMove(self):
        """Test Move in dry_run mode."""
        self.ctx.Move("gs://foo/bar", "gs://foo/bar2")

    def testRemove(self):
        """Test Remove in dry_run mode."""
        self.ctx.Remove("gs://foo/bar")

    def testSetACL(self):
        """Test SetACL in dry_run mode."""
        self.assertEqual(self.ctx.SetACL("gs://foo/bar", "bad-acl"), None)

    def testStat(self):
        """Test Stat in dry_run mode."""
        result = self.ctx.Stat("gs://foo/bar")
        self.assertEqual(result.content_length, 0)
        self.assertNotEqual(result.creation_time, None)

    def testStreamingCat(self):
        """Test StreamingCat in dry_run mode."""
        result = self.ctx.StreamingCat("gs://foo/bar")
        self.assertEqual(next(result), "")
        with self.assertRaises(StopIteration):
            next(result)

    def testVersion(self):
        """Test gsutil_version in dry_run mode."""
        self.assertEqual(self.ctx.gsutil_version, gs.GSContext.GSUTIL_VERSION)


class InitBotoTest(AbstractGSContextTest):
    """Test boto file interactive initialization."""

    # pylint: disable=protected-access

    GS_LS_ERROR = """\
You are attempting to access protected data with no configured credentials.
Please see http://code.google.com/apis/storage/docs/signup.html for
details about activating the Google Cloud Storage service and then run the
"gsutil config" command to configure gsutil to use these credentials."""

    GS_LS_ERROR2 = """\
GSResponseError: status=400, code=MissingSecurityHeader, reason=Bad Request, \
detail=Authorization."""

    GS_LS_BENIGN = """\
"GSResponseError: status=400, code=MissingSecurityHeader, reason=Bad Request,
detail=A nonempty x-goog-project-id header is required for this request."""

    def setUp(self):
        self.boto_file = os.path.join(self.tempdir, "boto_file")
        self.ctx = gs.GSContext(boto_file=self.boto_file)
        self.auth_cmd = ["ls", gs.AUTHENTICATION_BUCKET]

    def testGSLsSkippableError(self):
        """Benign GS error."""
        self.gs_mock.AddCmdResult(
            self.auth_cmd, returncode=1, stderr=self.GS_LS_BENIGN
        )
        self.assertTrue(self.ctx._TestGSLs())

    def testGSLsAuthorizationError1(self):
        """GS authorization error 1."""
        self.gs_mock.AddCmdResult(
            self.auth_cmd, returncode=1, stderr=self.GS_LS_ERROR
        )
        self.assertFalse(self.ctx._TestGSLs())

    def testGSLsAuthorizationErrorNoStderrCapture(self):
        """GS authorization error when not capturing stderr"""
        self.gs_mock.AddCmdResult(self.auth_cmd, returncode=1, stderr="")
        self.assertFalse(self.ctx._TestGSLs(stderr=False))

    def testGSLsError2(self):
        """GS authorization error 2."""
        self.gs_mock.AddCmdResult(
            self.auth_cmd, returncode=1, stderr=self.GS_LS_ERROR2
        )
        self.assertFalse(self.ctx._TestGSLs())

    def _WriteBotoFile(self, contents, *_args, **_kwargs):
        osutils.WriteFile(self.ctx.boto_file, contents)

    def testInitGSLsFailButSuccess(self):
        """Invalid GS Config, but we config properly."""
        self.gs_mock.AddCmdResult(
            self.auth_cmd, returncode=1, stderr=self.GS_LS_ERROR
        )
        self.ctx._InitBoto()

    def _AddLsConfigResult(self, side_effect=None):
        self.gs_mock.AddCmdResult(
            self.auth_cmd, returncode=1, stderr=self.GS_LS_ERROR
        )
        self.gs_mock.AddCmdResult(
            ["config"], returncode=1, side_effect=side_effect
        )

    def testGSLsFailAndConfigError(self):
        """Invalid GS Config, and we fail to config."""
        self._AddLsConfigResult(
            side_effect=functools.partial(self._WriteBotoFile, "monkeys")
        )
        self.assertRaises(cros_build_lib.RunCommandError, self.ctx._InitBoto)

    def testGSLsFailAndEmptyConfigFile(self):
        """Invalid GS Config, and we raise error on empty config file."""
        self._AddLsConfigResult(
            side_effect=functools.partial(self._WriteBotoFile, "")
        )
        self.assertRaises(gs.GSContextException, self.ctx._InitBoto)


class GSCounterTest(AbstractGSContextTest):
    """Tests GSCounter functionality."""

    COUNTER_URI = "gs://foo/mock/counter"
    INITIAL_VALUE = 100

    def setUp(self):
        self.counter = gs.GSCounter(self.ctx, self.COUNTER_URI)
        self.cat_mock = self.PatchObject(self.ctx, "Cat")
        self.gen_mock = self.PatchObject(
            self.ctx, "GetGeneration", return_value=(1, 1)
        )
        self._SetCounter(self.INITIAL_VALUE)

    def _SetCounter(self, value):
        """Set the test counter to |value|."""
        self.cat_mock.return_value = str(value)

    def testGetInitial(self):
        """Test Get when the counter doesn't exist."""
        self.cat_mock.side_effect = gs.GSNoSuchKey
        self.assertEqual(self.counter.Get(), 0)

    def testGet(self):
        """Basic Get() test."""
        self.assertEqual(self.counter.Get(), self.INITIAL_VALUE)

    def testIncrement(self):
        """Basic Increment() test."""
        self.assertEqual(self.counter.Increment(), self.INITIAL_VALUE + 1)

    def testDecrement(self):
        """Basic Decrement() test."""
        self.assertEqual(self.counter.Decrement(), self.INITIAL_VALUE - 1)

    def testReset(self):
        """Basic Reset() test."""
        self.assertEqual(self.counter.Reset(), 0)

    def testStreakIncrement(self):
        """Basic StreakIncrement() test."""
        self._SetCounter(10)
        self.assertEqual(self.counter.StreakIncrement(), 11)

    def testStreakIncrementReset(self):
        """Test StreakIncrement() when the counter is negative."""
        self._SetCounter(-10)
        self.assertEqual(self.counter.StreakIncrement(), 1)

    def testStreakDecrement(self):
        """Basic StreakDecrement() test."""
        self._SetCounter(-10)
        self.assertEqual(self.counter.StreakDecrement(), -11)

    def testStreakDecrementReset(self):
        """Test StreakDecrement() when the counter is positive."""
        self._SetCounter(10)
        self.assertEqual(self.counter.StreakDecrement(), -1)


class UnmockedGSCounterTest(cros_test_lib.TestCase):
    """Tests GSCounter functionality w/out mocks."""

    @staticmethod
    @contextlib.contextmanager
    def _Counter():
        ctx = gs.GSContext()
        with gs.TemporaryURL("chromite.counter") as tempuri:
            yield gs.GSCounter(ctx, tempuri)

    @staticmethod
    def _SetCounter(counter, value):
        """Set the test counter to |value|."""
        counter.AtomicCounterOperation(value, lambda x: value)

    @cros_test_lib.pytestmark_network_test
    def testGetInitial(self):
        """Test Get when the counter doesn't exist."""
        with self._Counter() as counter:
            self.assertEqual(counter.Get(), 0)

    @cros_test_lib.pytestmark_network_test
    def testGet(self):
        """Basic Get() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.Get(), 100)

    @cros_test_lib.pytestmark_network_test
    def testIncrement(self):
        """Basic Increment() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.Increment(), 101)

    @cros_test_lib.pytestmark_network_test
    def testDecrement(self):
        """Basic Decrement() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.Decrement(), 99)

    @cros_test_lib.pytestmark_network_test
    def testReset(self):
        """Basic Reset() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.Reset(), 0)
            self.assertEqual(counter.Get(), 0)

    @cros_test_lib.pytestmark_network_test
    def testStreakIncrement(self):
        """Basic StreakIncrement() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.StreakIncrement(), 101)

    @cros_test_lib.pytestmark_network_test
    def testStreakIncrementReset(self):
        """Test StreakIncrement() when the counter is negative."""
        with self._Counter() as counter:
            self._SetCounter(counter, -100)
            self.assertEqual(counter.StreakIncrement(), 1)

    @cros_test_lib.pytestmark_network_test
    def testStreakDecrement(self):
        """Basic StreakDecrement() test."""
        with self._Counter() as counter:
            self._SetCounter(counter, -100)
            self.assertEqual(counter.StreakDecrement(), -101)

    @cros_test_lib.pytestmark_network_test
    def testStreakDecrementReset(self):
        """Test StreakDecrement() when the counter is positive."""
        with self._Counter() as counter:
            self._SetCounter(counter, 100)
            self.assertEqual(counter.StreakDecrement(), -1)
