blob: 56d52015fe92ad2abeee67606aca57052c4fa933 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test signer_payloads_client library."""
from __future__ import print_function
import mock
import os
import shutil
import socket
import tempfile
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib import gs_unittest
from chromite.lib.paygen import gslock
from chromite.lib.paygen import signer_payloads_client
# pylint: disable=protected-access
class SignerPayloadsClientGoogleStorageTest(gs_unittest.AbstractGSContextTest):
"""Test suite for the class SignerPayloadsClientGoogleStorage."""
orig_timeout = (
def setUp(self):
"""Setup for tests, and store off some standard expected values."""
self.hash_names = [
self.build_uri = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
# Some tests depend on this timeout. Make it smaller, then restore.
def tearDown(self):
"""Teardown after tests, and restore values test might adjust."""
# Some tests modify this timeout. Restore the original value.
def createStandardClient(self):
"""Test helper method to create a client with standard arguments."""
client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
return client
def testUris(self):
"""Test that the URIs on the client are correct."""
client = self.createStandardClient()
expected_build_uri = self.build_uri
expected_build_uri + '/payload.hash.tar.bz2')
def testCleanSignerFilesByKeyset(self):
"""Test the keyset specific cleanup works as expected."""
hashes = ('hash-1', 'hash-2')
keyset = 'foo-keys'
lock_uri = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
expected_removals = (
# Signing Request
# Signing Instructions
signing_dir + '/foo-keys.payload.signer.instructions',
# Signed Results`
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
client = self.createStandardClient()
# Fake lock failed then acquired.
lock = self.PatchObject(gslock, 'Lock', autospec=True,
# Do the work.
client._CleanSignerFilesByKeyset(hashes, keyset)
# Assert locks created with expected lock_uri.
# Verify all expected files were removed.
for uri in expected_removals:
self.gs_mock.assertCommandContains(['rm', uri])
def testCleanSignerFiles(self):
"""Test that GS cleanup works as expected."""
hashes = ('hash-1', 'hash-2')
keysets = ('foo-keys-1', 'foo-keys-2')
lock_uri1 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
lock_uri2 = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
signing_dir = ('gs://foo-bucket/foo-channel/foo-board/foo-version/'
expected_removals = (
# Signing Request
# Signing Instructions
signing_dir + '/foo-keys-1.payload.signer.instructions',
signing_dir + '/foo-keys-2.payload.signer.instructions',
# Signed Results
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
signing_dir + '/',
client = self.createStandardClient()
# Fake lock failed then acquired.
lock = self.PatchObject(gslock, 'Lock', autospec=True)
# Do the work.
client._CleanSignerFiles(hashes, keysets)
# Check created with lock_uri1, lock_uri2.
# Verify expected removals.
for uri in expected_removals:
self.gs_mock.assertCommandContains(['rm', uri])
self.gs_mock.assertCommandContains(['rm', signing_dir])
def testCreateInstructionsUri(self):
"""Test that the expected instructions URI is correct."""
client = self.createStandardClient()
signature_uri = client._CreateInstructionsURI('keyset_foo')
expected_signature_uri = (
self.build_uri +
self.assertEqual(signature_uri, expected_signature_uri)
def testCreateHashNames(self):
"""Test that the expected hash names are generated."""
client = self.createStandardClient()
hash_names = client._CreateHashNames(3)
expected_hash_names = self.hash_names
self.assertEquals(hash_names, expected_hash_names)
def testCreateSignatureURIs(self):
"""Test that the expected signature URIs are generated."""
client = self.createStandardClient()
signature_uris = client._CreateSignatureURIs(self.hash_names,
expected_signature_uris = [
self.build_uri + '/1.payload.hash.keyset_foo.signed.bin',
self.build_uri + '/2.payload.hash.keyset_foo.signed.bin',
self.build_uri + '/3.payload.hash.keyset_foo.signed.bin',
self.assertEquals(signature_uris, expected_signature_uris)
def testCreateArchive(self):
"""Test that we can correctly archive up hash values for the signer."""
client = self.createStandardClient()
tmp_dir = None
hashes = ['Hash 1', 'Hash 2', 'Hash 3']
with tempfile.NamedTemporaryFile() as archive_file:
client._CreateArchive(, hashes, self.hash_names)
# Make sure the archive file created exists
tmp_dir = tempfile.mkdtemp()
cmd = ['tar', '-xjf',]
cmd, redirect_stdout=True, redirect_stderr=True, cwd=tmp_dir)
# Check that the expected (and only the expected) contents are present
extracted_file_names = os.listdir(tmp_dir)
self.assertEquals(len(extracted_file_names), len(self.hash_names))
for name in self.hash_names:
self.assertTrue(name in extracted_file_names)
# Make sure each file has the expected contents
for h, hash_name in zip(hashes, self.hash_names):
with open(os.path.join(tmp_dir, hash_name), 'r') as f:
self.assertEqual([h], f.readlines())
# Clean up at the end of the test
if tmp_dir:
def testCreateInstructions(self):
"""Test that we can correctly create signer instructions."""
client = self.createStandardClient()
instructions = client._CreateInstructions(self.hash_names, 'keyset_foo')
expected_instructions = """
# Auto-generated instruction file for signing payload hashes.
generate_metadata = false
keyset = keyset_foo
channel = foo
input_files = %s
output_names = @BASENAME@.@KEYSET@.signed
archive = metadata-disable.instructions
type = update_payload
board = foo-board
archive = payload.hash.tar.bz2
# We reuse version for version rev because we may not know the
# correct versionrev "R24-1.2.3"
version = foo-version
versionrev = foo-version
""" % ' '.join(['1.payload.hash',
self.assertEquals(instructions, expected_instructions)
def testSignerRequestUri(self):
"""Test that we can create signer request URI."""
client = self.createStandardClient()
instructions_uri = client._CreateInstructionsURI('foo_keyset')
signer_request_uri = client._SignerRequestUri(instructions_uri)
expected = ('gs://foo-bucket/tobesigned/45,foo-channel,foo-board,'
self.assertEquals(signer_request_uri, expected)
def testWaitForSignaturesInstant(self):
"""Test that we can correctly wait for a list of URIs to be created."""
uris = ['foo', 'bar', 'is']
# All Urls exist.
exists = self.PatchObject(self.ctx, 'Exists', returns=True)
client = self.createStandardClient()
self.assertTrue(client._WaitForSignatures(uris, timeout=0.02))
# Make sure it really looked for every URL listed.
[ for u in uris])
def testWaitForSignaturesNever(self):
"""Test that we can correctly timeout waiting for a list of URIs."""
uris = ['foo', 'bar', 'is']
# Default mock GSContext behavior is nothing Exists.
client = self.createStandardClient()
self.assertFalse(client._WaitForSignatures(uris, timeout=0.02))
# We don't care which URLs it checked, since it doesn't have to check
# them all in this case.
class SignerPayloadsClientIntegrationTest(cros_test_lib.TestCase):
"""Test suite integration with live signer servers."""
def setUp(self):
# This is in the real production chromeos-releases, but the listed
# build has never, and will never exist.
self.client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
def testDownloadSignatures(self):
"""Test that we can correctly download a list of URIs."""
uris = ['gs://chromeos-releases-test/sigining-test/foo',
downloads = self.client._DownloadSignatures(uris)
self.assertEquals(downloads, ['FooSig\r\n\r', 'BarSig'])
def testGetHashSignatures(self):
"""Integration test that talks to the real signer with test hashes."""
ctx = gs.GSContext()
unique_id = '%s.%d' % (socket.gethostname(), os.getpid())
clean_uri = ('gs://chromeos-releases/test-channel/%s/'
'crostools-client/**') % unique_id
# Cleanup before we start
ctx.Remove(clean_uri, ignore_missing=True)
hashes = ['0' * 32,
'1' * 32,
keysets = ['update_signer']
expected_sigs_hex = (
expected_sigs = [[sig[0].decode('hex')] for sig in expected_sigs_hex]
all_signatures = self.client.GetHashSignatures(hashes, keysets)
self.assertEquals(all_signatures, expected_sigs)
self.assertRaises(gs.GSNoSuchKey, ctx.List, clean_uri)
# Cleanup when we are over
ctx.Remove(clean_uri, ignore_missing=True)