blob: 47321e910e358b16e512f1ec6681db860ee40f13 [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test signer_payloads_client library."""
import base64
import os
import shutil
import tempfile
from unittest import mock
from chromite.api.gen.chromiumos import build_report_pb2
from chromite.api.gen.chromiumos import common_pb2
from chromite.api.gen.chromiumos import signing_pb2
from chromite.lib import chroot_lib
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib import gs_unittest
from chromite.lib import osutils
from chromite.lib import remote_access
from chromite.lib.paygen import gslock
from chromite.lib.paygen import gspaths
from chromite.lib.paygen import signer_payloads_client
from chromite.service import image
pytestmark = cros_test_lib.pytestmark_inside_only
# pylint: disable=protected-access
SIGNED_BUILD_METADATA = build_report_pb2.BuildReport.SignedBuildMetadata
class SignerPayloadsClientGoogleStorageTest(
gs_unittest.AbstractGSContextTest, cros_test_lib.TempDirTestCase
):
"""Test suite for the class SignerPayloadsClientGoogleStorage."""
orig_timeout = (
signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS
)
def setUp(self) -> None:
"""Setup for tests, and store off some standard expected values."""
self.hash_names = ["1.payload.hash", "2.payload.hash", "3.payload.hash"]
self.build_uri = (
"gs://foo-bucket/foo-channel/foo-board/foo-version/"
"payloads/signing/foo-unique"
)
# Some tests depend on this timeout. Make it smaller, then restore.
signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS = 0.01
def tearDown(self) -> None:
"""Teardown after tests, and restore values test might adjust."""
# Some tests modify this timeout. Restore the original value.
signer_payloads_client.DELAY_CHECKING_FOR_SIGNER_RESULTS_SECONDS = (
self.orig_timeout
)
def createStandardClient(self):
"""Test helper method to create a client with standard arguments."""
client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
chroot=chroot_lib.Chroot(),
build=gspaths.Build(
channel="foo-channel",
board="foo-board",
version="foo-version",
bucket="foo-bucket",
),
work_dir=self.tempdir,
unique="foo-unique",
ctx=self.ctx,
)
return client
def testUris(self) -> None:
"""Test that the URIs on the client are correct."""
client = self.createStandardClient()
expected_build_uri = self.build_uri
self.assertEqual(client.signing_base_dir, expected_build_uri)
self.assertEqual(
client.archive_uri, expected_build_uri + "/payload.hash.tar.bz2"
)
def testWorkDir(self) -> None:
"""Test that the work_dir is generated/passed correctly."""
client = self.createStandardClient()
self.assertIsNotNone(client._work_dir)
client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
chroot=chroot_lib.Chroot(),
build=gspaths.Build(),
work_dir="/foo-dir",
)
self.assertEqual(client._work_dir, "/foo-dir")
def testCleanSignerFilesByKeyset(self) -> None:
"""Test the keyset specific cleanup works as expected."""
hashes = ("hash-1", "hash-2")
keyset = "foo-keys"
lock_uri = (
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,"
"foo-version,payloads,signing,foo-unique,"
"foo-keys.payload.signer.instructions.lock"
)
signing_dir = (
"gs://foo-bucket/foo-channel/foo-board/foo-version/"
"payloads/signing/foo-unique"
)
expected_removals = (
# Signing Request
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,"
"payloads,signing,foo-unique,"
"foo-keys.payload.signer.instructions",
# Signing Instructions
signing_dir + "/foo-keys.payload.signer.instructions",
# Signed Results`
signing_dir + "/1.payload.hash.foo-keys.signed.bin",
signing_dir + "/1.payload.hash.foo-keys.signed.bin.md5",
signing_dir + "/2.payload.hash.foo-keys.signed.bin",
signing_dir + "/2.payload.hash.foo-keys.signed.bin.md5",
)
client = self.createStandardClient()
# Fake lock failed then acquired.
lock = self.PatchObject(gslock, "Lock", autospec=True)
lock.Acquire.side_effect = [gslock.LockNotAcquired(), mock.MagicMock()]
# Do the work.
client._CleanSignerFilesByKeyset(hashes, keyset)
# Assert locks created with expected lock_uri.
lock.assert_called_with(lock_uri)
# Verify all expected files were removed.
for uri in expected_removals:
self.gs_mock.assertCommandContains(["rm", uri])
def testCleanSignerFiles(self) -> None:
"""Test that GS cleanup works as expected."""
hashes = ("hash-1", "hash-2")
keysets = ("foo-keys-1", "foo-keys-2")
lock_uri1 = (
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,"
"foo-version,payloads,signing,foo-unique,"
"foo-keys-1.payload.signer.instructions.lock"
)
lock_uri2 = (
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,"
"foo-version,payloads,signing,foo-unique,"
"foo-keys-2.payload.signer.instructions.lock"
)
signing_dir = (
"gs://foo-bucket/foo-channel/foo-board/foo-version/"
"payloads/signing/foo-unique"
)
expected_removals = (
# Signing Request
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,"
"payloads,signing,foo-unique,"
"foo-keys-1.payload.signer.instructions",
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,foo-version,"
"payloads,signing,foo-unique,"
"foo-keys-2.payload.signer.instructions",
# Signing Instructions
signing_dir + "/foo-keys-1.payload.signer.instructions",
signing_dir + "/foo-keys-2.payload.signer.instructions",
# Signed Results
signing_dir + "/1.payload.hash.foo-keys-1.signed.bin",
signing_dir + "/1.payload.hash.foo-keys-1.signed.bin.md5",
signing_dir + "/2.payload.hash.foo-keys-1.signed.bin",
signing_dir + "/2.payload.hash.foo-keys-1.signed.bin.md5",
signing_dir + "/1.payload.hash.foo-keys-2.signed.bin",
signing_dir + "/1.payload.hash.foo-keys-2.signed.bin.md5",
signing_dir + "/2.payload.hash.foo-keys-2.signed.bin",
signing_dir + "/2.payload.hash.foo-keys-2.signed.bin.md5",
)
client = self.createStandardClient()
# Fake lock failed then acquired.
lock = self.PatchObject(gslock, "Lock", autospec=True)
# Do the work.
client._CleanSignerFiles(hashes, keysets)
# Check created with lock_uri1, lock_uri2.
self.assertEqual(
lock.call_args_list, [mock.call(lock_uri1), mock.call(lock_uri2)]
)
# Verify expected removals.
for uri in expected_removals:
self.gs_mock.assertCommandContains(["rm", uri])
self.gs_mock.assertCommandContains(["rm", signing_dir])
def testCreateInstructionsUri(self) -> None:
"""Test that the expected instructions URI is correct."""
client = self.createStandardClient()
signature_uri = client._CreateInstructionsURI("keyset_foo")
expected_signature_uri = (
self.build_uri + "/keyset_foo.payload.signer.instructions"
)
self.assertEqual(signature_uri, expected_signature_uri)
def testCreateHashNames(self) -> None:
"""Test that the expected hash names are generated."""
client = self.createStandardClient()
hash_names = client._CreateHashNames(3)
expected_hash_names = self.hash_names
self.assertEqual(hash_names, expected_hash_names)
def testCreateSignatureURIs(self) -> None:
"""Test that the expected signature URIs are generated."""
client = self.createStandardClient()
signature_uris = client._CreateSignatureURIs(
self.hash_names, "keyset_foo"
)
expected_signature_uris = [
self.build_uri + "/1.payload.hash.keyset_foo.signed.bin",
self.build_uri + "/2.payload.hash.keyset_foo.signed.bin",
self.build_uri + "/3.payload.hash.keyset_foo.signed.bin",
]
self.assertEqual(signature_uris, expected_signature_uris)
def testCreateArchive(self) -> None:
"""Test that we can correctly archive up hash values for the signer."""
client = self.createStandardClient()
tmp_dir = None
hashes = [b"Hash 1", b"Hash 2", b"Hash 3"]
try:
with tempfile.NamedTemporaryFile() as archive_file:
client._CreateArchive(
archive_file.name, hashes, self.hash_names
)
# Make sure the archive file created exists
self.assertExists(archive_file.name)
tmp_dir = tempfile.mkdtemp()
cmd = ["tar", "-xjf", archive_file.name]
cros_build_lib.run(cmd, stdout=True, stderr=True, cwd=tmp_dir)
# Check that the expected (and only the expected) contents are
# present.
extracted_file_names = os.listdir(tmp_dir)
self.assertEqual(
len(extracted_file_names), len(self.hash_names)
)
for name in self.hash_names:
self.assertTrue(name in extracted_file_names)
# Make sure each file has the expected contents
for h, hash_name in zip(hashes, self.hash_names):
with open(os.path.join(tmp_dir, hash_name), "rb") as f:
self.assertEqual([h], f.readlines())
finally:
# Clean up at the end of the test
if tmp_dir:
shutil.rmtree(tmp_dir)
def testCreateInstructions(self) -> None:
"""Test that we can correctly create signer instructions."""
client = self.createStandardClient()
instructions = client._CreateInstructions(self.hash_names, "keyset_foo")
expected_instructions = """
# Auto-generated instruction file for signing payload hashes.
[insns]
generate_metadata = false
keyset = keyset_foo
channel = foo
input_files = %s
output_names = @BASENAME@.@KEYSET@.signed
[general]
type = update_payload
board = foo-board
archive = payload.hash.tar.bz2
# We reuse version for version rev because we may not know the
# correct versionrev "R24-1.2.3"
version = foo-version
versionrev = foo-version
""" % " ".join(
["1.payload.hash", "2.payload.hash", "3.payload.hash"]
)
self.assertEqual(instructions, expected_instructions)
def testSignerRequestUri(self) -> None:
"""Test that we can create signer request URI."""
client = self.createStandardClient()
instructions_uri = client._CreateInstructionsURI("foo_keyset")
signer_request_uri = client._SignerRequestUri(instructions_uri)
expected = (
"gs://foo-bucket/tobesigned/45,foo-channel,foo-board,"
"foo-version,payloads,signing,foo-unique,"
"foo_keyset.payload.signer.instructions"
)
self.assertEqual(signer_request_uri, expected)
def testWaitForSignaturesInstant(self) -> None:
"""Test that we can correctly wait for a list of URIs to be created."""
uris = ["foo", "bar", "is"]
# All Urls exist.
exists = self.PatchObject(self.ctx, "Exists", return_value=True)
client = self.createStandardClient()
self.assertTrue(client._WaitForSignatures(uris, timeout=0.02))
# Make sure it really looked for every URL listed.
self.assertEqual(exists.call_args_list, [mock.call(u) for u in uris])
def testWaitForSignaturesNever(self) -> None:
"""Test that we can correctly timeout waiting for a list of URIs."""
uris = ["foo", "bar", "is"]
# Default mock GSContext behavior is nothing Exists.
client = self.createStandardClient()
self.assertFalse(client._WaitForSignatures(uris, timeout=0.02))
# We don't care which URLs it checked, since it doesn't have to check
# them all in this case.
class SignerPayloadsClientIntegrationTest(cros_test_lib.MockTempDirTestCase):
"""Test suite integration with live signer servers."""
def setUp(self) -> None:
# This is in the real production chromeos-releases, but the listed
# build has never, and will never exist.
self.client = signer_payloads_client.SignerPayloadsClientGoogleStorage(
chroot=chroot_lib.Chroot(),
build=gspaths.Build(
channel="test-channel",
board="crostools-client",
version="Rxx-Ryy",
bucket="chromeos-releases",
),
work_dir=self.tempdir,
)
def testDownloadSignatures(self) -> None:
"""Test that we can correctly download a list of URIs."""
def fake_copy(uri, sig) -> None:
"""Just write the uri address to the content of the file."""
osutils.WriteFile(sig, uri, mode="wb")
self.PatchObject(self.client._ctx, "Copy", side_effect=fake_copy)
uris = [
b"gs://chromeos-releases-test/sigining-test/foo",
b"gs://chromeos-releases-test/sigining-test/bar",
]
downloads = self.client._DownloadSignatures(uris)
self.assertEqual(downloads, uris)
@cros_test_lib.pytestmark_network_test
def testGetHashSignatures(self) -> None:
"""Integration test that talks to the real signer with test hashes."""
ctx = gs.GSContext()
clean_uri = (
"gs://chromeos-releases/test-channel/%s/crostools-client/**"
% (cros_build_lib.GetRandomString(),)
)
# Cleanup before we start.
ctx.Remove(clean_uri, ignore_missing=True)
try:
hashes = [
b"0" * 32,
b"1" * 32,
(
b"29834370e415b3124a926c903906f18b"
b"3d52e955147f9e6accd67e9512185a63"
),
]
keysets = ["update_signer"]
expected_sigs_hex = (
(
"ba4c7a86b786c609bf6e4c5fb9c47525608678caa532bea8acc457aa6"
"dd32b435f094b331182f2e167682916990c40ff7b6b0128de3fa45ad0"
"fd98041ec36d6f63b867bcf219804200616590a41a727c2685b48340e"
"fb4b480f1ef448fc7bc3fb1c4b53209e950ecc721b07a52a41d9c025f"
"d25602340c93d5295211308caa29a03ed18516cf61411c508097d5b47"
"620d643ed357b05213b2b9fa3a3f938d6c4f52b85c3f9774edc376902"
"458344d1c1cd72bc932f033c076c76fee2400716fe652306871ba9230"
"21ce245e0c778ad9e0e50e87a169b2aea338c4dc8b5c0c716aabfb613"
"3482e8438b084a09503db27ca546e910f8938f7805a8a76a3b0d0241",
),
(
"2d909ca5b33a7fb6f2323ca0bf9de2e4f2266c73da4b6948a517dffa9"
"6783e08ca36411d380f6e8a20011f599d8d73576b2a141a57c0873d08"
"9726e24f62c7e0346ba5fbde68414b0f874b627fb1557a6e9658c8fac"
"96c54f458161ea770982bfa9fe514120635e5ccb32e8219b9069cb0bf"
"8063fba48d60d649c5af203cccefca5dbc2191f81f0215edbdee4ec8c"
"1553e69b83036aca3e840227d317ff6cf8b968c973f698db1ce59f687"
"1303dcdbe839400c5df4d2e6e505d68890010a44596ca9fee77f4db6e"
"a3448d98018437c319fc8c5f4603ef94b04e3a4eafa206b7391a2640d"
"43128310285bc0f1c7e5060d37c433d663b1c6f01110b9a43f2a74f4",
),
(
"23791c99ab937f1ae5d4988afc9ceca39c290ac90e3da9f243f9a0b1c"
"86c3c32ab7241d43dfc233da412bab989cf02f15a01fe9ea4b2dc7dc9"
"182117547836d69310af3aa005ee3a6deb9602bc676dcc103bf3f7831"
"d64ab844b4785c5c8b4b14467e6b5ab6bf34c12f7534e0d5140151c8f"
"28e8276e703dd6332c2bab9e7f4a495215998ff56e476b81bd6b8d765"
"e1f87da50c22cd52c9afa8c43a6528ab8986d7a273d9136d5aff5c4d9"
"5985d16eeec7380539ef963e0784a0de42b42890dfc83702179f69f5c"
"6eca4630807fbc4ab6241017e0942b15feada0b240e9729bf33bf456b"
"d419da63302477e147963550a45c6cf60925ff48ad7b309fa158dcb2",
),
)
expected_sigs = [
[base64.b16decode(x[0], True)] for x in expected_sigs_hex
]
all_signatures = self.client.GetHashSignatures(hashes, keysets)
self.assertEqual(all_signatures, expected_sigs)
self.assertRaises(gs.GSNoSuchKey, ctx.List, clean_uri)
finally:
# Cleanup when we are over.
ctx.Remove(clean_uri, ignore_missing=True)
@cros_test_lib.pytestmark_network_test
def testGetHashSignaturesDuplicates(self) -> None:
"""Integration test with real signer with duplicate test hashes."""
ctx = gs.GSContext()
clean_uri = (
"gs://chromeos-releases/test-channel/%s/crostools-client/**"
% (cros_build_lib.GetRandomString(),)
)
# Cleanup before we start.
ctx.Remove(clean_uri, ignore_missing=True)
try:
hashes = [b"0" * 32, b"0" * 32]
keysets = ["update_signer"]
expected_sigs_hex = (
(
"ba4c7a86b786c609bf6e4c5fb9c47525608678caa532bea8acc457aa6"
"dd32b435f094b331182f2e167682916990c40ff7b6b0128de3fa45ad0"
"fd98041ec36d6f63b867bcf219804200616590a41a727c2685b48340e"
"fb4b480f1ef448fc7bc3fb1c4b53209e950ecc721b07a52a41d9c025f"
"d25602340c93d5295211308caa29a03ed18516cf61411c508097d5b47"
"620d643ed357b05213b2b9fa3a3f938d6c4f52b85c3f9774edc376902"
"458344d1c1cd72bc932f033c076c76fee2400716fe652306871ba9230"
"21ce245e0c778ad9e0e50e87a169b2aea338c4dc8b5c0c716aabfb613"
"3482e8438b084a09503db27ca546e910f8938f7805a8a76a3b0d0241",
),
(
"ba4c7a86b786c609bf6e4c5fb9c47525608678caa532bea8acc457aa6"
"dd32b435f094b331182f2e167682916990c40ff7b6b0128de3fa45ad0"
"fd98041ec36d6f63b867bcf219804200616590a41a727c2685b48340e"
"fb4b480f1ef448fc7bc3fb1c4b53209e950ecc721b07a52a41d9c025f"
"d25602340c93d5295211308caa29a03ed18516cf61411c508097d5b47"
"620d643ed357b05213b2b9fa3a3f938d6c4f52b85c3f9774edc376902"
"458344d1c1cd72bc932f033c076c76fee2400716fe652306871ba9230"
"21ce245e0c778ad9e0e50e87a169b2aea338c4dc8b5c0c716aabfb613"
"3482e8438b084a09503db27ca546e910f8938f7805a8a76a3b0d0241",
),
)
expected_sigs = [
[base64.b16decode(x[0], True)] for x in expected_sigs_hex
]
all_signatures = self.client.GetHashSignatures(hashes, keysets)
self.assertEqual(all_signatures, expected_sigs)
self.assertRaises(gs.GSNoSuchKey, ctx.List, clean_uri)
finally:
# Cleanup when we are over.
ctx.Remove(clean_uri, ignore_missing=True)
class UnofficialPayloadSignerTest(cros_test_lib.TempDirTestCase):
"""Test suit for testing unofficial local payload signer."""
def setUp(self) -> None:
# UnofficialSignerPayloadsClient need a temporary directory inside
# chroot so cros_test_lib.TempDirTestCase will not work if we run this
# unittest outside the chroot.
chroot = chroot_lib.Chroot()
with chroot.tempdir(delete=False) as dir_in_chroot:
self._temp_dir = dir_in_chroot
self._client = signer_payloads_client.UnofficialSignerPayloadsClient(
chroot=chroot,
private_key=remote_access.TEST_PRIVATE_KEY,
work_dir=self._temp_dir,
)
def cleanUp(self) -> None:
shutil.rmtree(self._temp_dir)
def testExtractPublicKey(self) -> None:
"""Tests the correct command is run to extract the public key."""
with tempfile.NamedTemporaryFile() as public_key:
self._client.ExtractPublicKey(public_key.name)
self.assertIn(b"BEGIN PUBLIC KEY", public_key.read())
def testGetHashSignatures(self) -> None:
"""Tests we correctly sign given hashes."""
hashes = (b"0" * 32, b"1" * 32)
keyset = "foo-keys"
expected_sigs_hex = (
"4732bf3c12b5795d5f4dd015cf8a65d8294186710f71e1530aa3b10d364ed15dc"
"71cef3bed312fd3f805d1b4ee79c1b868f7b8e175199d1c145838044fa3d037d6"
"7b142140a7187cb18cd8fb6897cc88481cb258e9ba87e6508a3eb2670b1354210"
"7dfea51417abb3ee30c8ea81242d1b69c92b8c531e7b3799a28285c26ce5e9834"
"648cf9601bdaf04257ffe97111b7497e14e4530ef9d4e9b6cdbf473304fee948a"
"f68fb6c992340e20bcf78f0c6c28d7ea7d8f35322bc61d5d1456b3b868c16bfca"
"9747887750e2734544b2dc0d22f68866e6456243ad53fc847d957b7fc1e87d0b4"
"eafbb98b61810a86bf5b587b7241d0f92ba4323da3fc57ccd883fdc4d",
"63cefceefb45688c5af557949fd0ec408245f5867e1453d2037c2125511a0ef5d"
"7ead88cfebe04f6cda176a91707a6002a3a618fbb0ae8c956c0e7b56e1f29fb50"
"c3ec3d47e786131c07b14d15cf768bbaceefd8d900526d79f08a985df910f31d3"
"3c97ec9f159902e8478f4d0a1766bb21ea81677c67d11b2b17b0f4cf41599dcad"
"b76549ee0b69badca94ba5fae3b54cea75468a7a670991ba595f622d21ccb1f47"
"edf0366503200e4ee7fec686908f9099e4fc53f4a963769b42b856d2a8c94a153"
"18d0620b7ed0b425989c599ffd363390a82c175f4ebab80f46d2f07585f5924fe"
"1014233ba76ca6a2b047baee023141ab38a027f56c963dd70550204ad",
)
expected_sigs = [[base64.b16decode(x, True)] for x in expected_sigs_hex]
signatures = self._client.GetHashSignatures(hashes, keyset)
self.assertEqual(signatures, expected_sigs)
class LocalSignerPayloadsClientTest(cros_test_lib.TempDirTestCase):
"""Test suite for the class LocalSignerPayloadsClient."""
def setUp(self) -> None:
"""Setup for tests, and store off some standard expected values."""
self._docker_image = (
"us-docker.pkg.dev/chromeos-bot/signing/signing:16963491"
)
def createStandardClient(self):
"""Test helper method to create a client with standard arguments."""
client = signer_payloads_client.LocalSignerPayloadsClient(
docker_image=self._docker_image,
build=gspaths.Build(
channel="dev-channel",
board="foo-board",
version="foo-version",
bucket="foo-bucket",
),
work_dir=self.tempdir,
)
return client
def testWorkDir(self) -> None:
"""Test that the work_dir is generated/passed correctly."""
client = self.createStandardClient()
self.assertIsNotNone(client._work_dir)
def testCreateArchive(self) -> None:
"""Test that we can correctly archive up hash values for the signer."""
client = self.createStandardClient()
tmp_dir = None
hashes = [b"Hash 1", b"Hash 2", b"Hash 3"]
try:
with tempfile.NamedTemporaryFile() as archive_file:
hash_filenames = client._CreateArchive(
archive_file.name, hashes
)
# Make sure the archive file created exists
self.assertExists(archive_file.name)
tmp_dir = tempfile.mkdtemp()
cmd = ["tar", "-xjf", archive_file.name]
cros_build_lib.run(cmd, stdout=True, stderr=True, cwd=tmp_dir)
# Check that the expected (and only the expected) contents are
# present.
extracted_file_names = os.listdir(tmp_dir)
self.assertEqual(len(extracted_file_names), len(hash_filenames))
for name in hash_filenames:
self.assertTrue(name in extracted_file_names)
# Make sure each file has the expected contents
for h, hash_name in zip(hashes, hash_filenames):
with open(os.path.join(tmp_dir, hash_name), "rb") as f:
self.assertEqual([h], f.readlines())
finally:
# Clean up at the end of the test
if tmp_dir:
shutil.rmtree(tmp_dir)
def testReadSignatures(self) -> None:
client = self.createStandardClient()
keysets = ["keyseta", "keysetb"]
for keyset in keysets:
for i in range(3):
signature_file = f"{i}.payload.hash.{keyset}.signed.bin"
with open(
os.path.join(self.tempdir, signature_file), mode="wb+"
) as f:
f.write(bytes(f"{i}-{keyset}", "utf-8"))
artifact_name = lambda n: f"{n}.payload.hash.{keyset}.signed.bin"
archive_artifacts = []
for keyset in keysets:
archive_artifacts.append(
signing_pb2.ArchiveArtifacts(
keyset=keyset,
signed_artifacts=[
signing_pb2.SignedArtifact(
status=signing_pb2.STATUS_SUCCESS,
signed_artifact_name=artifact_name(i),
)
for i in range(3)
],
)
)
signing_response = signing_pb2.BuildTargetSignedArtifacts(
archive_artifacts=archive_artifacts
)
signatures = client._ReadSignatures(
self.tempdir, keysets, signing_response
)
b = lambda s: bytes(s, "utf-8")
self.assertEqual(
signatures,
[
[b("0-keyseta"), b("0-keysetb")],
[b("1-keyseta"), b("1-keysetb")],
[b("2-keyseta"), b("2-keysetb")],
],
)
@mock.patch.object(image, "SignImage")
def testGetHashSignaturesMockSignImage(
self, mock_sign_image: mock.MagicMock
) -> None:
client = self.createStandardClient()
keyset = "DevPreMPKeys"
expected_signature_files = [
f"{i}.payload.hash.{keyset}.signed.bin" for i in range(3)
]
os.mkdir(os.path.join(self.tempdir, "result_dir"))
for i, signature_file in enumerate(expected_signature_files):
with open(
os.path.join(self.tempdir, "result_dir", signature_file),
mode="wb+",
) as f:
f.write(bytes("abcd" * (i + 1), "utf-8"))
artifact_name = lambda n: f"{n}.payload.hash.{keyset}.signed.bin"
mock_sign_image.return_value = signing_pb2.BuildTargetSignedArtifacts(
archive_artifacts=[
signing_pb2.ArchiveArtifacts(
keyset=keyset,
signed_artifacts=[
signing_pb2.SignedArtifact(
signed_artifact_name=artifact_name(i),
)
for i in range(3)
],
signing_status=SIGNED_BUILD_METADATA.SIGNING_STATUS_PASSED,
)
]
)
# Normally the public key would be placed by the signer.
with open(
os.path.join(
client._work_dir, "result_dir", "update-payload-key-pub.pem"
),
mode="wb",
) as f:
f.write(bytes("abcd", "utf-8"))
hashes = [b"Hash 1", b"Hash 2", b"Hash 3"]
signatures = client.GetHashSignatures(hashes, (keyset,))
self.assertEqual(
signatures,
[
[bytes("abcd", "utf-8")],
[bytes("abcdabcd", "utf-8")],
[bytes("abcdabcdabcd", "utf-8")],
],
)
self.assertEqual(
client.public_key,
os.path.join(
client._work_dir, "result_dir", "update-payload-key-pub.pem"
),
)
expected_signing_config = signing_pb2.BuildTargetSigningConfigs(
build_target_signing_configs=[
signing_pb2.BuildTargetSigningConfig(
build_target="foo-board",
signing_configs=[
signing_pb2.SigningConfig(
image_type=common_pb2.IMAGE_TYPE_UPDATE_PAYLOAD,
keyset=keyset,
channel=common_pb2.CHANNEL_DEV,
input_files=[
"0.payload.hash",
"1.payload.hash",
"2.payload.hash",
],
output_names=["@BASENAME@.@KEYSET_VER@.signed"],
archive_path="hashes.tar.bz2",
)
],
)
]
)
mock_sign_image.assert_called_with(
expected_signing_config,
client._work_dir,
os.path.join(client._work_dir, "result_dir"),
self._docker_image,
)
@mock.patch.object(image, "SignImage")
def testGetHashSignaturesMockSignImageFailure(
self, mock_sign_image: mock.MagicMock
):
"""Test that GetHashSignatures raises an exception if signing fails."""
client = self.createStandardClient()
keyset = "DevPreMPKeys"
expected_signature_files = [
f"{i}.payload.hash.{keyset}.signed.bin" for i in range(3)
]
os.mkdir(os.path.join(self.tempdir, "result_dir"))
for i, signature_file in enumerate(expected_signature_files):
with open(
os.path.join(self.tempdir, "result_dir", signature_file),
mode="wb+",
) as f:
f.write(bytes("abcd" * (i + 1), "utf-8"))
artifact_name = lambda n: f"{n}.payload.hash.{keyset}.signed.bin"
mock_sign_image.return_value = signing_pb2.BuildTargetSignedArtifacts(
archive_artifacts=[
signing_pb2.ArchiveArtifacts(
keyset=keyset,
signed_artifacts=[
signing_pb2.SignedArtifact(
signed_artifact_name=artifact_name(i),
)
for i in range(3)
],
signing_status=SIGNED_BUILD_METADATA.SIGNING_STATUS_FAILED,
)
]
)
# Normally the public key would be placed by the signer.
with open(
os.path.join(
client._work_dir, "result_dir", "update-payload-key-pub.pem"
),
mode="wb",
) as f:
f.write(bytes("abcd", "utf-8"))
with self.assertRaises(signer_payloads_client.PaygenSigningError):
client.GetHashSignatures(
[b"Hash 1", b"Hash 2", b"Hash 3"], (keyset,)
)
@mock.patch.object(image, "SignImage")
def testGetHashSignaturesMockSignImageFailureMissingPublicKey(
self, mock_sign_image: mock.MagicMock
) -> None:
client = self.createStandardClient()
keyset = "DevPreMPKeys"
expected_signature_files = [
f"{i}.payload.hash.{keyset}.signed.bin" for i in range(3)
]
os.mkdir(os.path.join(self.tempdir, "result_dir"))
for i, signature_file in enumerate(expected_signature_files):
with open(
os.path.join(self.tempdir, "result_dir", signature_file),
mode="wb+",
) as f:
f.write(bytes("abcd" * (i + 1), "utf-8"))
artifact_name = lambda n: f"{n}.payload.hash.{keyset}.signed.bin"
mock_sign_image.return_value = signing_pb2.BuildTargetSignedArtifacts(
archive_artifacts=[
signing_pb2.ArchiveArtifacts(
keyset=keyset,
signed_artifacts=[
signing_pb2.SignedArtifact(
signed_artifact_name=artifact_name(i),
)
for i in range(3)
],
signing_status=SIGNED_BUILD_METADATA.SIGNING_STATUS_PASSED,
)
]
)
with self.assertRaises(signer_payloads_client.PaygenSigningError):
client.GetHashSignatures(
[b"Hash 1", b"Hash 2", b"Hash 3"], (keyset,)
)