blob: 1c79bb8eeccb50f5f26a70cd3ec2dc4d2808d286 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test gsutil.py."""
import base64
import hashlib
import io
import json
import os
import shutil
import subprocess
import sys
import tempfile
import unittest
import zipfile
import urllib.request
from unittest import mock
# Add depot_tools to path
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
sys.path.append(DEPOT_TOOLS_DIR)
import gsutil
class TestError(Exception):
pass
class FakeCall(object):
def __init__(self):
self.expectations = []
def add_expectation(self, *args, **kwargs):
returns = kwargs.pop('_returns', None)
self.expectations.append((args, kwargs, returns))
def __call__(self, *args, **kwargs):
if not self.expectations:
raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
if args != exp_args or kwargs != exp_kwargs:
message = 'Expected:\n args: %s\n kwargs: %s\n' % (exp_args,
exp_kwargs)
message += 'Got:\n args: %s\n kwargs: %s\n' % (args, kwargs)
raise TestError(message)
return exp_returns
class GsutilUnitTests(unittest.TestCase):
def setUp(self):
self.fake = FakeCall()
self.tempdir = tempfile.mkdtemp()
self.old_urlopen = getattr(urllib.request, 'urlopen')
self.old_call = getattr(subprocess, 'call')
setattr(urllib.request, 'urlopen', self.fake)
setattr(subprocess, 'call', self.fake)
def tearDown(self):
self.assertEqual(self.fake.expectations, [])
shutil.rmtree(self.tempdir)
setattr(urllib.request, 'urlopen', self.old_urlopen)
setattr(subprocess, 'call', self.old_call)
def test_download_gsutil(self):
version = gsutil.VERSION
filename = 'gsutil_%s.zip' % version
full_filename = os.path.join(self.tempdir, filename)
fake_file = b'This is gsutil.zip'
fake_file2 = b'This is other gsutil.zip'
url = '%s%s' % (gsutil.GSUTIL_URL, filename)
self.fake.add_expectation(url, _returns=io.BytesIO(fake_file))
self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
full_filename)
with open(full_filename, 'rb') as f:
self.assertEqual(fake_file, f.read())
metadata_url = gsutil.API_URL + filename
md5_calc = hashlib.md5()
md5_calc.update(fake_file)
b64_md5 = base64.b64encode(md5_calc.hexdigest().encode('utf-8'))
self.fake.add_expectation(metadata_url,
_returns=io.BytesIO(
json.dumps({
'md5Hash':
b64_md5.decode('utf-8')
}).encode('utf-8')))
self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
full_filename)
with open(full_filename, 'rb') as f:
self.assertEqual(fake_file, f.read())
self.assertEqual(self.fake.expectations, [])
self.fake.add_expectation(
metadata_url,
_returns=io.BytesIO(
json.dumps({
'md5Hash':
base64.b64encode(b'aaaaaaa').decode('utf-8') # Bad MD5
}).encode('utf-8')))
self.fake.add_expectation(url, _returns=io.BytesIO(fake_file2))
self.assertEqual(gsutil.download_gsutil(version, self.tempdir),
full_filename)
with open(full_filename, 'rb') as f:
self.assertEqual(fake_file2, f.read())
self.assertEqual(self.fake.expectations, [])
def test_ensure_gsutil_full(self):
version = gsutil.VERSION
gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
os.makedirs(gsutil_dir)
zip_filename = 'gsutil_%s.zip' % version
url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
_, tempzip = tempfile.mkstemp()
fake_gsutil = 'Fake gsutil'
with zipfile.ZipFile(tempzip, 'w') as zf:
zf.writestr('gsutil/gsutil', fake_gsutil)
with open(tempzip, 'rb') as f:
self.fake.add_expectation(url, _returns=io.BytesIO(f.read()))
# This should write the gsutil_bin with 'Fake gsutil'
gsutil.ensure_gsutil(version, self.tempdir, False)
self.assertTrue(os.path.exists(gsutil_bin))
with open(gsutil_bin, 'r') as f:
self.assertEqual(f.read(), fake_gsutil)
self.assertTrue(os.path.exists(gsutil_flag))
self.assertEqual(self.fake.expectations, [])
def test_ensure_gsutil_short(self):
version = gsutil.VERSION
gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
gsutil_flag = os.path.join(gsutil_dir, 'install.flag')
os.makedirs(gsutil_dir)
with open(gsutil_bin, 'w') as f:
f.write('Foobar')
with open(gsutil_flag, 'w') as f:
f.write('Barbaz')
self.assertEqual(gsutil.ensure_gsutil(version, self.tempdir, False),
gsutil_bin)
@mock.patch('sys.platform', 'linux')
def test__is_supported_platform_returns_true_for_supported_platform(self):
self.assertTrue(gsutil._is_luci_auth_supported_platform())
@mock.patch('sys.platform', 'aix')
def test__is_supported_platform_returns_false_for_unsupported_platform(
self):
self.assertFalse(gsutil._is_luci_auth_supported_platform())
if __name__ == '__main__':
unittest.main()