blob: a1fe5806aed4cdee1c7865d450ca969858c06b5e [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test signer_payloads_client library."""
from __future__ import print_function
import mox
import os
import shutil
import socket
import tempfile
import fixup_path
fixup_path.FixupPath()
from chromite.lib import cros_test_lib
from chromite.lib.paygen import gslib
from chromite.lib.paygen import gslock
from chromite.lib.paygen import signer_payloads_client
from chromite.lib.paygen import utils
# pylint: disable=W0212
class SignerPayloadsClientGoogleStorageTest(mox.MoxTestBase):
"""Test suite for the class SignerPayloadsClientGoogleStorage."""
def setUp(self):
"""Setup for tests, and store off some standard expected values."""
mox.MoxTestBase.setUp(self)
self.hash_names = [
'1.payload.hash',
'2.payload.hash',
'3.payload.hash']
self.build_uri = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
'payloads/signing/foo-unique')
# To make certain we don't self update while running tests.
os.environ['CROSTOOLS_NO_SOURCE_UPDATE'] = '1'
def createStandardClient(self):
"""Test helper method to create a client with standard arguments."""
client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
'foo-channel',
'foo-board',
'foo-version',
bucket='foo-bucket',
unique='foo-unique')
return client
def testUris(self):
"""Test that the URIs on the client are correct."""
client = self.createStandardClient()
expected_build_uri = self.build_uri
self.assertEquals(
client.signing_base_dir,
expected_build_uri)
self.assertEquals(
client.archive_uri,
expected_build_uri + '/payload.hash.tar.bz2')
def testCleanSignerFilesByKeyset(self):
"""Test the keyset specific cleanup works as expected."""
hashes = ('hash-1', 'hash-2')
keyset = 'foo-keys'
lock_uri = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
'foo-version,payloads,signing,foo-unique,'
'foo-keys.payload.signer.instructions.lock')
signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
'payloads/signing/foo-unique')
expected_removals = (
# Signing Request
'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
'payloads,signing,foo-unique,'
'foo-keys.payload.signer.instructions',
# Signing Instructions
signing_dir + '/foo-keys.payload.signer.instructions',
# Signed Results`
signing_dir + '/1.payload.hash.foo-keys.signed.bin',
signing_dir + '/1.payload.hash.foo-keys.signed.bin.md5',
signing_dir + '/2.payload.hash.foo-keys.signed.bin',
signing_dir + '/2.payload.hash.foo-keys.signed.bin.md5',
)
client = self.createStandardClient()
self.mox.StubOutWithMock(gslock, 'Lock')
self.mox.StubOutWithMock(gslib, 'Remove')
# pylint: disable=E1101
# Fail to acquire the lock the first time.
gslock.Lock(lock_uri).AndRaise(gslock.LockNotAcquired())
lock = self.mox.CreateMockAnything()
gslock.Lock(lock_uri).AndReturn(lock)
lock.__enter__().AndReturn(lock)
lock.__exit__(
mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(None)
for uri in expected_removals:
gslib.Remove(uri, ignore_no_match=True).InAnyOrder()
self.mox.ReplayAll()
client._CleanSignerFilesByKeyset(hashes, keyset)
self.mox.VerifyAll()
def testCleanSignerFiles(self):
"""Test that GS cleanup works as expected."""
hashes = ('hash-1', 'hash-2')
keysets = ('foo-keys-1', 'foo-keys-2')
lock_uri1 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
'foo-version,payloads,signing,foo-unique,'
'foo-keys-1.payload.signer.instructions.lock')
lock_uri2 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
'foo-version,payloads,signing,foo-unique,'
'foo-keys-2.payload.signer.instructions.lock')
signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
'payloads/signing/foo-unique')
expected_removals = (
# Signing Request
'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
'payloads,signing,foo-unique,'
'foo-keys-1.payload.signer.instructions',
'gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,'
'payloads,signing,foo-unique,'
'foo-keys-2.payload.signer.instructions',
# Signing Instructions
signing_dir + '/foo-keys-1.payload.signer.instructions',
signing_dir + '/foo-keys-2.payload.signer.instructions',
# Signed Results
signing_dir + '/1.payload.hash.foo-keys-1.signed.bin',
signing_dir + '/1.payload.hash.foo-keys-1.signed.bin.md5',
signing_dir + '/2.payload.hash.foo-keys-1.signed.bin',
signing_dir + '/2.payload.hash.foo-keys-1.signed.bin.md5',
signing_dir + '/1.payload.hash.foo-keys-2.signed.bin',
signing_dir + '/1.payload.hash.foo-keys-2.signed.bin.md5',
signing_dir + '/2.payload.hash.foo-keys-2.signed.bin',
signing_dir + '/2.payload.hash.foo-keys-2.signed.bin.md5',
)
client = self.createStandardClient()
self.mox.StubOutWithMock(gslock, 'Lock')
self.mox.StubOutWithMock(gslib, 'Remove')
lock1 = self.mox.CreateMockAnything()
gslock.Lock(lock_uri1).AndReturn(lock1)
lock1.__enter__().AndReturn(lock1)
lock1.__exit__(
mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(None)
lock2 = self.mox.CreateMockAnything()
gslock.Lock(lock_uri2).AndReturn(lock2)
lock2.__enter__().AndReturn(lock2)
lock2.__exit__(
mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(None)
for uri in expected_removals:
gslib.Remove(uri, ignore_no_match=True).InAnyOrder()
gslib.Remove(signing_dir, ignore_no_match=True, recurse=True).InAnyOrder()
self.mox.ReplayAll()
client._CleanSignerFiles(hashes, keysets)
self.mox.VerifyAll()
def testCreateInstructionsUri(self):
"""Test that the expected instructions URI is correct."""
client = self.createStandardClient()
signature_uri = client._CreateInstructionsURI('keyset_foo')
expected_signature_uri = (
self.build_uri +
'/keyset_foo.payload.signer.instructions')
self.assertEqual(signature_uri, expected_signature_uri)
def testCreateHashNames(self):
"""Test that the expected hash names are generated."""
client = self.createStandardClient()
hash_names = client._CreateHashNames(3)
expected_hash_names = self.hash_names
self.assertEquals(hash_names, expected_hash_names)
def testCreateSignatureURIs(self):
"""Test that the expected signature URIs are generated."""
client = self.createStandardClient()
signature_uris = client._CreateSignatureURIs(self.hash_names,
'keyset_foo')
expected_signature_uris = [
self.build_uri + '/1.payload.hash.keyset_foo.signed.bin',
self.build_uri + '/2.payload.hash.keyset_foo.signed.bin',
self.build_uri + '/3.payload.hash.keyset_foo.signed.bin',
]
self.assertEquals(signature_uris, expected_signature_uris)
def testCreateArchive(self):
"""Test that we can correctly archive up hash values for the signer."""
client = self.createStandardClient()
tmp_dir = None
hashes = ['Hash 1', 'Hash 2', 'Hash 3']
try:
with tempfile.NamedTemporaryFile() as archive_file:
client._CreateArchive(archive_file.name, hashes, self.hash_names)
# Make sure the archive file created exists
self.assertTrue(os.path.exists(archive_file.name))
tmp_dir = tempfile.mkdtemp()
cmd = ['tar', '-xjf', archive_file.name]
utils.RunCommand(cmd,
redirect_stdout=True,
redirect_stderr=True,
cwd=tmp_dir)
# Check that the expected (and only the expected) contents are present
extracted_file_names = os.listdir(tmp_dir)
self.assertEquals(len(extracted_file_names), len(self.hash_names))
for name in self.hash_names:
self.assertTrue(name in extracted_file_names)
# Make sure each file has the expected contents
for h, hash_name in zip(hashes, self.hash_names):
with open(os.path.join(tmp_dir, hash_name), 'r') as f:
self.assertEqual([h], f.readlines())
finally:
# Clean up at the end of the test
if tmp_dir:
shutil.rmtree(tmp_dir)
def testCreateInstructions(self):
"""Test that we can correctly create signer instructions."""
client = self.createStandardClient()
instructions = client._CreateInstructions(self.hash_names, 'keyset_foo')
expected_instructions = """
# Auto-generated instruction file for signing payload hashes.
[insns]
generate_metadata = false
keyset = keyset_foo
channel = foo
input_files = %s
output_names = @BASENAME@.@KEYSET@.signed
[general]
archive = metadata-disable.instructions
type = update_payload
board = foo-board
archive = payload.hash.tar.bz2
# We reuse version for version rev because we may not know the
# correct versionrev "R24-1.2.3"
version = foo-version
versionrev = foo-version
""" % " ".join(['1.payload.hash',
'2.payload.hash',
'3.payload.hash'])
self.assertEquals(instructions, expected_instructions)
def testSignerRequestUri(self):
"""Test that we can create signer request URI."""
client = self.createStandardClient()
instructions_uri = client._CreateInstructionsURI('foo_keyset')
signer_request_uri = client._SignerRequestUri(instructions_uri)
expected = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
'foo-version,payloads,signing,foo-unique,'
'foo_keyset.payload.signer.instructions')
self.assertEquals(signer_request_uri, expected)
def testWaitForSignaturesInstant(self):
"""Test that we can correctly wait for a list of URIs to be created."""
self.mox.StubOutWithMock(gslib, 'Exists')
# Assert that each uri is tested exactly once.
gslib.Exists('foo').InAnyOrder().AndReturn(True)
gslib.Exists('bar').InAnyOrder().AndReturn(True)
gslib.Exists('is').InAnyOrder().AndReturn(True)
self.mox.ReplayAll()
uris = ['foo', 'bar', 'is']
client = self.createStandardClient()
self.assertTrue(client._WaitForSignatures(uris))
self.mox.VerifyAll()
def testWaitForSignaturesNever(self):
"""Test that we can correctly timeout waiting for a list of URIs."""
self.mox.StubOutWithMock(gslib, 'Exists')
gslib.Exists(mox.IsA(str)).MultipleTimes().AndReturn(False)
self.mox.ReplayAll()
uris = ['foo', 'bar', 'is']
client = self.createStandardClient()
self.assertFalse(client._WaitForSignatures(uris, timeout=5))
self.mox.VerifyAll()
@cros_test_lib.NetworkTest()
def testDownloadSignatures(self):
"""Test that we can correctly download a list of URIs."""
uris = ['gs://chromeos-releases-test/sigining-test/foo',
'gs://chromeos-releases-test/sigining-test/bar']
client = self.createStandardClient()
downloads = client._DownloadSignatures(uris)
self.assertEquals(downloads, ['FooSig\r\n\r', 'BarSig'])
class SignerPayloadsClientIntegrationTest(mox.MoxTestBase):
"""Test suite integration with live signer servers."""
@cros_test_lib.NetworkTest()
def testGetHashSignatures(self):
"""Integration test that talks to the real signer with test hashes."""
unique_id = "%s.%d" % (socket.gethostname(), os.getpid())
clean_uri = ('gs://chromeos-releases/test-channel/%s/'
'crostools-client/**') % unique_id
# Cleanup before we start
gslib.Remove(clean_uri, ignore_no_match=True)
try:
# This is in the real production chromeos-releases, but the listed
# build has never, and will never exist.
client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
'test-channel',
'crostools-client',
'Rxx-Ryy')
hashes = ['0' * 32,
'1' * 32,
('29834370e415b3124a926c903906f18b'
'3d52e955147f9e6accd67e9512185a63')]
keysets = ['update_signer']
expected_sigs_hex = (
('ba4c7a86b786c609bf6e4c5fb9c47525608678caa532bea8acc457aa6dd32b43'
'5f094b331182f2e167682916990c40ff7b6b0128de3fa45ad0fd98041ec36d6f'
'63b867bcf219804200616590a41a727c2685b48340efb4b480f1ef448fc7bc3f'
'b1c4b53209e950ecc721b07a52a41d9c025fd25602340c93d5295211308caa29'
'a03ed18516cf61411c508097d5b47620d643ed357b05213b2b9fa3a3f938d6c4'
'f52b85c3f9774edc376902458344d1c1cd72bc932f033c076c76fee2400716fe'
'652306871ba923021ce245e0c778ad9e0e50e87a169b2aea338c4dc8b5c0c716'
'aabfb6133482e8438b084a09503db27ca546e910f8938f7805a8a76a3b0d0241',),
('2d909ca5b33a7fb6f2323ca0bf9de2e4f2266c73da4b6948a517dffa96783e08'
'ca36411d380f6e8a20011f599d8d73576b2a141a57c0873d089726e24f62c7e0'
'346ba5fbde68414b0f874b627fb1557a6e9658c8fac96c54f458161ea770982b'
'fa9fe514120635e5ccb32e8219b9069cb0bf8063fba48d60d649c5af203cccef'
'ca5dbc2191f81f0215edbdee4ec8c1553e69b83036aca3e840227d317ff6cf8b'
'968c973f698db1ce59f6871303dcdbe839400c5df4d2e6e505d68890010a4459'
'6ca9fee77f4db6ea3448d98018437c319fc8c5f4603ef94b04e3a4eafa206b73'
'91a2640d43128310285bc0f1c7e5060d37c433d663b1c6f01110b9a43f2a74f4',),
('23791c99ab937f1ae5d4988afc9ceca39c290ac90e3da9f243f9a0b1c86c3c32'
'ab7241d43dfc233da412bab989cf02f15a01fe9ea4b2dc7dc9182117547836d6'
'9310af3aa005ee3a6deb9602bc676dcc103bf3f7831d64ab844b4785c5c8b4b1'
'4467e6b5ab6bf34c12f7534e0d5140151c8f28e8276e703dd6332c2bab9e7f4a'
'495215998ff56e476b81bd6b8d765e1f87da50c22cd52c9afa8c43a6528ab898'
'6d7a273d9136d5aff5c4d95985d16eeec7380539ef963e0784a0de42b42890df'
'c83702179f69f5c6eca4630807fbc4ab6241017e0942b15feada0b240e9729bf'
'33bf456bd419da63302477e147963550a45c6cf60925ff48ad7b309fa158dcb2',))
expected_sigs = [[sig[0].decode('hex')] for sig in expected_sigs_hex]
all_signatures = client.GetHashSignatures(hashes, keysets)
self.assertEquals(all_signatures, expected_sigs)
self.assertEquals(gslib.List(clean_uri), [])
finally:
# Cleanup when we are over
gslib.Remove(clean_uri, ignore_no_match=True)
if __name__ == '__main__':
cros_test_lib.main()