blob: b85efb7e976ef4e003bfed5ff13f3efc6bb8de83 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test paygen_payload_lib library."""
from __future__ import print_function
import json
import os
import shutil
import subprocess
import tempfile
import mock
from chromite.lib import dlc_lib
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib.paygen import download_cache
from chromite.lib.paygen import gspaths
from chromite.lib.paygen import partition_lib
from chromite.lib.paygen import paygen_payload_lib
from chromite.lib.paygen import signer_payloads_client
from chromite.lib.paygen import urilib
from chromite.lib.paygen import utils
pytestmark = cros_test_lib.pytestmark_inside_only
# We access a lot of protected members during testing.
# pylint: disable=protected-access
class PaygenPayloadLibTest(cros_test_lib.RunCommandTempDirTestCase):
"""PaygenPayloadLib tests base class."""
def setUp(self):
self.old_build = gspaths.Build(
channel='dev-channel',
board='x86-alex',
version='1620.0.0',
bucket='chromeos-releases')
self.old_image = gspaths.Image(
build=self.old_build,
key='mp-v3',
uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/'
'chromeos_1620.0.0_x86-alex_recovery_dev-channel_mp-v3.bin'))
self.old_base_image = gspaths.Image(
build=self.old_build,
key='mp-v3',
image_type='base',
uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/'
'chromeos_1620.0.0_x86-alex_base_dev-channel_mp-v3.bin'))
self.new_build = gspaths.Build(
channel='dev-channel',
board='x86-alex',
version='4171.0.0',
bucket='chromeos-releases')
self.new_image = gspaths.Image(
build=self.new_build,
key='mp-v3',
uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/'
'chromeos_4171.0.0_x86-alex_recovery_dev-channel_mp-v3.bin'))
self.new_dlc_image = gspaths.DLCImage(
build=self.new_build,
key='dlc',
dlc_id='dummy-dlc',
dlc_package='dummy-package',
uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/dlc/'
'dummy-dlc/dummy-package/dlc.img'))
self.old_dlc_image = gspaths.DLCImage(
build=self.old_build,
key='dlc',
dlc_id='dummy-dlc',
dlc_package='dummy-package',
uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/dlc/'
'dummy-dlc/dummy-package/dlc.img'))
self.old_test_image = gspaths.UnsignedImageArchive(
build=self.old_build,
uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/'
'chromeos_1620.0.0_x86-alex_recovery_dev-channel_test.bin'))
self.new_test_image = gspaths.Image(
build=self.new_build,
uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/'
'chromeos_4171.0.0_x86-alex_recovery_dev-channel_test.bin'))
self.full_payload = gspaths.Payload(tgt_image=self.old_base_image,
src_image=None,
uri='gs://full_old_foo/boo')
self.full_dlc_payload = gspaths.Payload(tgt_image=self.new_dlc_image,
src_image=None,
uri='gs://full_old_foo/boo')
self.delta_payload = gspaths.Payload(tgt_image=self.new_image,
src_image=self.old_image,
uri='gs://delta_new_old/boo')
self.delta_dlc_payload = gspaths.Payload(tgt_image=self.new_dlc_image,
src_image=self.old_dlc_image,
uri='gs://full_old_foo/boo')
self.full_test_payload = gspaths.Payload(tgt_image=self.old_test_image,
src_image=None,
uri='gs://full_old_foo/boo-test')
self.delta_test_payload = gspaths.Payload(tgt_image=self.new_test_image,
src_image=self.old_test_image,
uri='gs://delta_new_old/boo-test')
@classmethod
def setUpClass(cls):
cls.cache_dir = tempfile.mkdtemp(prefix='crostools-unittest-cache')
cls.cache = download_cache.DownloadCache(cls.cache_dir)
@classmethod
def tearDownClass(cls):
cls.cache = None
shutil.rmtree(cls.cache_dir)
class PaygenPayloadLibBasicTest(PaygenPayloadLibTest):
"""PaygenPayloadLib basic (and quick) testing."""
def _GetStdGenerator(self, work_dir=None, payload=None, sign=True):
"""Helper function to create a standardized PayloadGenerator."""
if payload is None:
payload = self.full_payload
if work_dir is None:
work_dir = self.tempdir
gen = paygen_payload_lib.PaygenPayload(
payload=payload,
work_dir=work_dir,
sign=sign,
verify=False)
gen.partition_names = ('foo-root', 'foo-kernel')
gen.tgt_partitions = ('/work/tgt_root.bin', '/work/tgt_kernel.bin')
gen.src_partitions = ('/work/src_root.bin', '/work/src_kernel.bin')
return gen
def testWorkingDirNames(self):
"""Make sure that some of the files we create have the expected names."""
gen = self._GetStdGenerator(work_dir='/foo')
self.assertEqual(gen.src_image_file, '/foo/src_image.bin')
self.assertEqual(gen.tgt_image_file, '/foo/tgt_image.bin')
self.assertEqual(gen.payload_file, '/foo/delta.bin')
self.assertEqual(gen.log_file, '/foo/delta.log')
# Siged image specific values.
self.assertEqual(gen.signed_payload_file, '/foo/delta.bin.signed')
self.assertEqual(gen.metadata_signature_file,
'/foo/delta.bin.signed.metadata-signature')
def testUriManipulators(self):
"""Validate _MetadataUri."""
gen = self._GetStdGenerator(work_dir='/foo')
self.assertEqual(gen._MetadataUri('/foo/bar'),
'/foo/bar.metadata-signature')
self.assertEqual(gen._MetadataUri('gs://foo/bar'),
'gs://foo/bar.metadata-signature')
self.assertEqual(gen._LogsUri('/foo/bar'),
'/foo/bar.log')
self.assertEqual(gen._LogsUri('gs://foo/bar'),
'gs://foo/bar.log')
self.assertEqual(gen._JsonUri('/foo/bar'),
'/foo/bar.json')
self.assertEqual(gen._JsonUri('gs://foo/bar'),
'gs://foo/bar.json')
def testSetupOfficialSigner(self):
"""Tests that official signer is being setup properly."""
gen = self._GetStdGenerator(work_dir='/foo', sign=True)
gen._private_key = 'foo-private-key'
gen._SetupSigner(self.new_build)
self.assertIsInstance(
gen.signer, signer_payloads_client.SignerPayloadsClientGoogleStorage)
self.assertIsNone(gen._private_key)
self.assertIsNone(gen._public_key)
def testSetupUnofficialSigner(self):
"""Tests that unofficial signer is being setup properly.
And private key is set to the default correctly.
"""
gen = self._GetStdGenerator(work_dir='/foo', sign=True)
pubkey_extract_mock = self.PatchObject(
signer_payloads_client.UnofficialSignerPayloadsClient,
'ExtractPublicKey')
build = self.new_build
build.bucket = 'foo-bucket'
gen._SetupSigner(build)
self.assertIsInstance(
gen.signer, signer_payloads_client.UnofficialSignerPayloadsClient)
self.assertEqual(gen._private_key, os.path.join(constants.CHROMITE_DIR,
'ssh_keys', 'testing_rsa'))
pubkey_extract_mock.assert_called_once_with('/foo/public_key.pem')
def testSetupUnofficialSignerPassedPrivateKey(self):
"""Tests that setting signers use correct passed private key."""
gen = self._GetStdGenerator(work_dir='/foo', sign=True)
gen._private_key = 'some-foo-private-key'
build = self.new_build
build.bucket = 'foo-bucket'
gen._SetupSigner(build)
self.assertIsInstance(
gen.signer, signer_payloads_client.UnofficialSignerPayloadsClient)
self.assertEqual(gen._private_key, 'some-foo-private-key')
def testRunGeneratorCmd(self):
"""Test the specialized command to run programs in chroot."""
mock_result = cros_build_lib.CommandResult(stdout=b'foo output')
run_mock = self.PatchObject(cros_build_lib, 'run',
return_value=mock_result)
expected_cmd = ['cmd', 'bar', 'jo nes']
gen = self._GetStdGenerator(work_dir=self.tempdir)
gen._RunGeneratorCmd(expected_cmd)
run_mock.assert_called_once_with(
expected_cmd,
stdout=True,
enter_chroot=True,
stderr=subprocess.STDOUT)
self.assertIn(mock_result.output,
osutils.ReadFile(os.path.join(self.tempdir, 'delta.log'),
mode='rb'))
# Now run with squawk_wrap=true.
run_mock = self.PatchObject(cros_build_lib, 'run',
return_value=mock_result)
gen._RunGeneratorCmd(expected_cmd, squawk_wrap=True)
run_mock.assert_called_once_with(
expected_cmd,
stdout=True,
enter_chroot=True,
stderr=subprocess.STDOUT)
self.assertIn(mock_result.output,
osutils.ReadFile(os.path.join(self.tempdir, 'delta.log'),
mode='rb'))
def testBuildArg(self):
"""Make sure the function semantics is satisfied."""
gen = self._GetStdGenerator(work_dir='/work')
test_dict = {'foo': 'bar'}
# Value present.
self.assertEqual(gen._BuildArg('--foo', test_dict, 'foo'), '--foo=bar')
# Value present, default has no impact.
self.assertEqual(gen._BuildArg('--foo', test_dict, 'foo', default='baz'),
'--foo=bar')
# Value missing, default kicking in.
self.assertEqual(gen._BuildArg('--foo2', test_dict, 'foo2', default='baz'),
'--foo2=baz')
def _DoPrepareImageTest(self, image_type):
"""Test _PrepareImage."""
download_uri = 'gs://bucket/foo/image.bin'
image_file = '/work/image.bin'
gen = self._GetStdGenerator(work_dir=self.tempdir)
if image_type == 'Image':
image_obj = gspaths.Image(build=self.new_build, key='mp-v3',
uri=download_uri)
test_extract_file = None
elif image_type == 'UnsignedImageArchive':
image_obj = gspaths.UnsignedImageArchive(
build=self.new_build, image_type='test', uri=download_uri)
test_extract_file = paygen_payload_lib.PaygenPayload.TEST_IMAGE_NAME
else:
raise ValueError('invalid image type descriptor (%s)' % image_type)
# Stub out and Check the expected function calls.
copy_mock = self.PatchObject(download_cache.DownloadCache, 'GetFileCopy')
if test_extract_file:
download_file = mock.ANY
else:
download_file = image_file
if test_extract_file:
run_mock = self.PatchObject(cros_build_lib, 'run')
move_mock = self.PatchObject(shutil, 'move')
# Run the test.
gen._PrepareImage(image_obj, image_file)
copy_mock.assert_called_once_with(download_uri, download_file)
if test_extract_file:
run_mock.assert_called_once_with(
['tar', '-xJf', download_file, test_extract_file], cwd=self.tempdir)
move_mock.assert_called_once_with(
os.path.join(self.tempdir, test_extract_file), image_file)
def testPrepareImageNormal(self):
"""Test preparing a normal image."""
self._DoPrepareImageTest('Image')
def testPrepareImageTest(self):
"""Test preparing a test image."""
self._DoPrepareImageTest('UnsignedImageArchive')
def testGeneratePostinstConfigTrue(self):
"""Tests creating the postinstall config file."""
gen = self._GetStdGenerator(payload=self.full_payload,
work_dir=self.tempdir)
gen._GeneratePostinstConfig(True)
self.assertEqual(osutils.ReadFile(gen._postinst_config_file),
'RUN_POSTINSTALL_root=true\n')
def testGeneratePostinstConfigFalse(self):
"""Tests creating the postinstall config file."""
gen = self._GetStdGenerator(payload=self.full_payload,
work_dir=self.tempdir)
gen._GeneratePostinstConfig(False)
self.assertEqual(osutils.ReadFile(gen._postinst_config_file),
'RUN_POSTINSTALL_root=false\n')
def testPreparePartitionsImageType(self):
"""Tests discrepency in image types for _PreparePartitions function."""
gen = self._GetStdGenerator(payload=self.delta_payload,
work_dir=self.tempdir)
def side_effect(image):
if image == gen.tgt_image_file:
return partition_lib.CROS_IMAGE
else:
return partition_lib.DLC_IMAGE
self.PatchObject(partition_lib, 'LookupImageType', side_effect=side_effect)
with self.assertRaisesRegex(paygen_payload_lib.Error, 'different types'):
gen._PreparePartitions()
def testPreparePartitionsGptFull(self):
"""Tests _PreparePartitions function for GPT (platform) images.
This test is or full payloads only.
"""
gen = self._GetStdGenerator(payload=self.full_payload,
work_dir=self.tempdir)
# Mock out needed functions.
self.PatchObject(partition_lib, 'LookupImageType',
return_value=partition_lib.CROS_IMAGE)
platform_params_mock = self.PatchObject(gen, '_GetPlatformImageParams',
return_value='foo-appid')
root_ext_mock = self.PatchObject(partition_lib, 'ExtractRoot')
kern_ext_mock = self.PatchObject(partition_lib, 'ExtractKernel')
postinst_mock = self.PatchObject(gen, '_GeneratePostinstConfig')
tgt_image_file = gen.tgt_image_file
gen._PreparePartitions()
# Check the appid was correctly set.
self.assertEqual(gen._appid, 'foo-appid')
# Check extract partition functions are called correctly.
root_ext_mock(gen.tgt_image_file, gen.tgt_partitions[0])
kern_ext_mock(gen.tgt_image_file, gen.tgt_partitions[1])
# Checks that postinstall config file was generated.
postinst_mock.assert_called_once_with(True)
# Checks we mounted the image and read the lsb-release file correctly.
platform_params_mock.assert_called_once_with(tgt_image_file)
# Make sure the tgt_image_file variable is set to None so no one can use it
# again.
self.assertIsNone(gen.tgt_image_file)
def testPreparePartitionsGptDelta(self):
"""Tests _PreparePartitions function for GPT (platform) images.
This test is or delta payloads only.
"""
gen = self._GetStdGenerator(payload=self.delta_payload,
work_dir=self.tempdir)
# Mock out needed functions.
self.PatchObject(partition_lib, 'LookupImageType',
return_value=partition_lib.CROS_IMAGE)
self.PatchObject(gen, '_GetPlatformImageParams', return_value='foo-appid')
root_ext_mock = self.PatchObject(partition_lib, 'ExtractRoot')
kern_ext_mock = self.PatchObject(partition_lib, 'ExtractKernel')
gen._PreparePartitions()
# Check extract partition functions are called correctly. Two times each
# partition (kernel, rootfs), once for source partition and once for target
# partition.
self.assertEqual(root_ext_mock.call_count, 2)
self.assertEqual(kern_ext_mock.call_count, 2)
def testPreparePartitionsDlc(self):
"""Tests _PreparePartitions function for DLC images."""
gen = self._GetStdGenerator(payload=self.full_payload,
work_dir=self.tempdir)
self.PatchObject(partition_lib, 'LookupImageType',
return_value=partition_lib.DLC_IMAGE)
get_params_mock = self.PatchObject(
gen, '_GetDlcImageParams',
return_value=('foo-id', 'foo-package', 'foo-appid'))
postinst_mock = self.PatchObject(gen, '_GeneratePostinstConfig')
gen._PreparePartitions()
self.assertEqual(gen._appid, 'foo-appid')
self.assertEqual(gen.partition_names, ('dlc/foo-id/foo-package',))
self.assertFalse(postinst_mock.called)
get_params_mock.assert_called_once()
def _TestGetDlcImageParams(self, tgt_id, tgt_package):
"""Utility function for Testing _GetDlcImageParams."""
payload = self.full_dlc_payload
tgt_image = self.new_dlc_image
tgt_image.dlc_id = tgt_id
tgt_image.dlc_package = tgt_package
gen = self._GetStdGenerator(payload=payload, work_dir=self.tempdir)
self.PatchObject(osutils, 'MountDir')
self.PatchObject(osutils, 'UmountDir')
lsb_read_mock = self.PatchObject(
utils, 'ReadLsbRelease',
return_value={dlc_lib.DLC_APPID_KEY: 'foo-appid',
dlc_lib.DLC_ID_KEY: 'dummy-dlc',
dlc_lib.DLC_PACKAGE_KEY: 'dummy-package'})
dlc_id, dlc_package, dlc_appid = gen._GetDlcImageParams(tgt_image)
self.assertEqual(dlc_id, 'dummy-dlc')
self.assertEqual(dlc_package, 'dummy-package')
self.assertEqual(dlc_appid, 'foo-appid')
lsb_read_mock.assert_called_once()
def testGetDlcImageParamsCorrect(self):
"""Tests _GetDlcImageParams function."""
self._TestGetDlcImageParams('dummy-dlc', 'dummy-package')
def testGetDlcImageParamsMismatchId(self):
"""Tests _GetDlcImageParams function with mismatched DLC ID."""
with self.assertRaises(paygen_payload_lib.Error):
self._TestGetDlcImageParams('dummy-dlc2', 'dummy-package')
def testGetDlcImageParamsMismatchPackage(self):
"""Tests _GetDlcImageParams function with mismatched DLC package."""
with self.assertRaises(paygen_payload_lib.Error):
self._TestGetDlcImageParams('dummy-dlc', 'dummy-package2')
def testGenerateUnsignedPayloadFull(self):
"""Test _GenerateUnsignedPayload with full payload."""
gen = self._GetStdGenerator(payload=self.full_payload,
work_dir=self.tempdir)
osutils.WriteFile(gen._postinst_config_file, 'Fake postinst content')
# Stub out the required functions.
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
# Run the test.
gen._GenerateUnsignedPayload()
# Check the expected function calls.
cmd = ['delta_generator',
'--major_version=2',
'--out_file=' + gen.payload_file,
'--partition_names=' + ':'.join(gen.partition_names),
'--new_partitions=' + ':'.join(gen.tgt_partitions),
'--new_postinstall_config_file=' + gen._postinst_config_file,
'--new_channel=dev-channel',
'--new_board=x86-alex',
'--new_version=1620.0.0',
'--new_build_channel=dev-channel',
'--new_build_version=1620.0.0',
'--new_key=mp-v3']
run_mock.assert_called_once_with(cmd, squawk_wrap=True)
def testGenerateUnsignedPayloadDelta(self):
"""Test _GenerateUnsignedPayload with delta payload."""
gen = self._GetStdGenerator(payload=self.delta_payload,
work_dir=self.tempdir)
# Stub out the required functions.
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
# Run the test.
gen._GenerateUnsignedPayload()
# Check the expected function calls.
cmd = ['delta_generator',
'--major_version=2',
'--out_file=' + gen.payload_file,
'--partition_names=' + ':'.join(gen.partition_names),
'--new_partitions=' + ':'.join(gen.tgt_partitions),
'--new_channel=dev-channel',
'--new_board=x86-alex',
'--new_version=4171.0.0',
'--new_build_channel=dev-channel',
'--new_build_version=4171.0.0',
'--new_key=mp-v3',
'--old_partitions=' + ':'.join(gen.src_partitions),
'--old_channel=dev-channel',
'--old_board=x86-alex',
'--old_version=1620.0.0',
'--old_build_channel=dev-channel',
'--old_build_version=1620.0.0',
'--old_key=mp-v3']
run_mock.assert_called_once_with(cmd, squawk_wrap=True)
def testGenerateHashes(self):
"""Test _GenerateHashes."""
gen = self._GetStdGenerator()
# Stub out the required functions.
run_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_RunGeneratorCmd')
osutils.WriteFile(gen.payload_hash_file, 'payload')
osutils.WriteFile(gen.metadata_hash_file, 'hash')
# Run the test.
self.assertEqual(gen._GenerateHashes(), (b'payload', b'hash'))
# Check the expected function calls.
cmd = ['delta_generator',
'--in_file=' + gen.payload_file,
'--signature_size=256',
partial_mock.HasString('--out_hash_file='),
partial_mock.HasString('--out_metadata_hash_file=')]
run_mock.assert_called_once_with(cmd)
def testSignHashes(self):
"""Test _SignHashes."""
hashes = (b'foo', b'bar')
signatures = ((b'0' * 256,), (b'1' * 256,))
gen = self._GetStdGenerator(work_dir='/work')
# Stub out the required functions.
hash_mock = self.PatchObject(
signer_payloads_client.SignerPayloadsClientGoogleStorage,
'GetHashSignatures',
return_value=signatures)
# Run the test.
self.assertEqual(gen._SignHashes(hashes), signatures)
# Check the expected function calls.
hash_mock.assert_called_once_with(
hashes,
keysets=gen.PAYLOAD_SIGNATURE_KEYSETS)
def testWriteSignaturesToFile(self):
"""Test writing signatures into files."""
gen = self._GetStdGenerator(payload=self.delta_payload)
signatures = (b'0' * 256, b'1' * 256)
file_paths = gen._WriteSignaturesToFile(signatures)
self.assertEqual(len(file_paths), len(signatures))
self.assertExists(file_paths[0])
self.assertExists(file_paths[1])
self.assertEqual(osutils.ReadFile(file_paths[0], mode='rb'), signatures[0])
self.assertEqual(osutils.ReadFile(file_paths[1], mode='rb'), signatures[1])
def testInsertSignaturesIntoPayload(self):
"""Test inserting payload and metadata signatures."""
gen = self._GetStdGenerator(payload=self.delta_payload)
payload_signatures = (b'0' * 256,)
metadata_signatures = (b'0' * 256,)
# Stub out the required functions.
run_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_RunGeneratorCmd')
# Run the test.
gen._InsertSignaturesIntoPayload(payload_signatures, metadata_signatures)
# Check the expected function calls.
cmd = ['delta_generator',
'--in_file=' + gen.payload_file,
partial_mock.HasString('payload_signature_file'),
partial_mock.HasString('metadata_signature_file'),
'--out_file=' + gen.signed_payload_file]
run_mock.assert_called_once_with(cmd)
def testStoreMetadataSignatures(self):
"""Test how we store metadata signatures."""
gen = self._GetStdGenerator(payload=self.delta_payload)
metadata_signatures = (b'1' * 256,)
encoded_metadata_signature = (
'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT'
'ExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTEx'
'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT'
'ExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTEx'
'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT'
'ExMTExMTExMQ==')
gen._StoreMetadataSignatures(metadata_signatures)
self.assertEqual(osutils.ReadFile(gen.metadata_signature_file),
encoded_metadata_signature)
def testVerifyPayloadDelta(self):
"""Test _VerifyPayload with delta payload."""
gen = self._GetStdGenerator(payload=self.delta_test_payload,
work_dir='/work')
# Stub out the required functions.
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
gen.metadata_size = 10
# Run the test.
gen._VerifyPayload()
# Check the expected function calls.
cmd = ['check_update_payload',
gen.signed_payload_file,
'--check',
'--type', 'delta',
'--disabled_tests', 'move-same-src-dst-block',
'--part_names', 'foo-root', 'foo-kernel',
'--dst_part_paths', '/work/tgt_root.bin', '/work/tgt_kernel.bin',
'--meta-sig', gen.metadata_signature_file,
'--metadata-size', '10',
'--src_part_paths', '/work/src_root.bin', '/work/src_kernel.bin']
run_mock.assert_called_once_with(cmd)
def testVerifyPayloadFull(self):
"""Test _VerifyPayload with Full payload."""
gen = self._GetStdGenerator(payload=self.full_test_payload,
work_dir='/work')
# Stub out the required functions.
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
gen.metadata_size = 10
# Run the test.
gen._VerifyPayload()
# Check the expected function calls.
cmd = ['check_update_payload',
gen.signed_payload_file,
'--check',
'--type', 'full',
'--disabled_tests', 'move-same-src-dst-block',
'--part_names', 'foo-root', 'foo-kernel',
'--dst_part_paths', '/work/tgt_root.bin', '/work/tgt_kernel.bin',
'--meta-sig', gen.metadata_signature_file,
'--metadata-size', '10']
run_mock.assert_called_once_with(cmd)
def testVerifyPayloadPublicKey(self):
"""Test _VerifyPayload with delta payload."""
payload = self.full_test_payload
payload.build.bucket = 'gs://chromeos-release-test'
gen = self._GetStdGenerator(payload=payload, work_dir='/work')
# Stub out the required functions.
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
gen.metadata_size = 10
gen._private_key = 'foo-private-key'
# Run the test.
gen._VerifyPayload()
# Check the expected function calls.
cmd = [mock.ANY] * 17
public_key = os.path.join('/work/public_key.pem')
cmd.extend(['--key', public_key])
run_mock.assert_called_once_with(cmd)
def testSignPayload(self):
"""Test the overall payload signature process."""
payload_hash = b'payload_hash'
metadata_hash = b'metadata_hash'
payload_sigs = [b'payload_sig']
metadata_sigs = [b'metadata_sig']
gen = self._GetStdGenerator(payload=self.delta_payload, work_dir='/work')
# Set up stubs.
gen_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_GenerateHashes',
return_value=(payload_hash, metadata_hash))
sign_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_SignHashes',
return_value=[payload_sigs, metadata_sigs])
ins_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_InsertSignaturesIntoPayload')
store_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_StoreMetadataSignatures')
# Run the test.
result_payload_sigs, result_metadata_sigs = gen._SignPayload()
self.assertEqual(payload_sigs, result_payload_sigs)
self.assertEqual(metadata_sigs, result_metadata_sigs)
# Check expected calls.
gen_mock.assert_called_once_with()
sign_mock.assert_called_once_with([payload_hash, metadata_hash])
ins_mock.assert_called_once_with(payload_sigs, metadata_sigs)
store_mock.assert_called_once_with(metadata_sigs)
def testCreateSignedDelta(self):
"""Test the overall payload generation process."""
payload = self.delta_payload
gen = self._GetStdGenerator(payload=payload, work_dir='/work')
# Set up stubs.
prep_image_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_PrepareImage')
prep_part_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_PreparePartitions')
gen_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_GenerateUnsignedPayload')
sign_mock = self.PatchObject(
paygen_payload_lib.PaygenPayload, '_SignPayload',
return_value=(['payload_sigs'], ['metadata_sigs']))
store_mock = self.PatchObject(paygen_payload_lib.PaygenPayload,
'_StorePayloadJson')
# Run the test.
gen._Create()
# Check expected calls.
self.assertEqual(prep_image_mock.call_args_list, [
mock.call(payload.tgt_image, gen.tgt_image_file),
mock.call(payload.src_image, gen.src_image_file),
])
gen_mock.assert_called_once_with()
sign_mock.assert_called_once_with()
store_mock.assert_called_once_with(['metadata_sigs'])
prep_part_mock.assert_called_once()
def testUploadResults(self):
"""Test the overall payload generation process."""
gen_sign = self._GetStdGenerator(work_dir='/work', sign=True)
gen_nosign = self._GetStdGenerator(work_dir='/work', sign=False)
# Set up stubs.
copy_mock = self.PatchObject(urilib, 'Copy')
# Run the test.
gen_sign._UploadResults()
gen_nosign._UploadResults()
self.assertEqual(copy_mock.call_args_list, [
# Check signed calls.
mock.call('/work/delta.bin.signed', 'gs://full_old_foo/boo'),
mock.call('/work/delta.log', 'gs://full_old_foo/boo.log'),
mock.call('/work/delta.json', 'gs://full_old_foo/boo.json'),
# Check unsigned calls.
mock.call('/work/delta.bin', 'gs://full_old_foo/boo'),
mock.call('/work/delta.log', 'gs://full_old_foo/boo.log'),
mock.call('/work/delta.json', 'gs://full_old_foo/boo.json'),
])
def testFindCacheDir(self):
"""Test calculating the location of the cache directory."""
gen = self._GetStdGenerator(work_dir='/foo')
# The correct result is based on the system cache directory, which changes.
# Ensure it ends with the right directory name.
self.assertEqual(os.path.basename(gen._FindCacheDir()), 'paygen_cache')
def testGetPayloadPropertiesMap(self):
"""Tests getting the payload properties as a dict."""
gen = self._GetStdGenerator(sign=False)
gen._appid = 'foo-appid'
run_mock = self.PatchObject(gen, '_RunGeneratorCmd')
props_file = os.path.join(self.tempdir, 'properties.json')
osutils.WriteFile(props_file, json.dumps({'metadata_signature': '',
'metadata_size': 10}))
payload_path = '/foo'
props_map = gen.GetPayloadPropertiesMap(payload_path)
cmd = ['delta_generator',
'--in_file=' + payload_path,
'--properties_file=' + props_file,
'--properties_format=json']
run_mock.assert_called_once_with(cmd)
self.assertEqual(props_map,
# This tests that if metadata_signature is empty, the code
# converts it to None.
{'metadata_signature': None,
'metadata_size': 10,
'appid': 'foo-appid'})
def testGetPayloadPropertiesMapSigned(self):
"""Tests getting the payload properties as a dict for signed payloads."""
gen = self._GetStdGenerator()
props_file = os.path.join(self.tempdir, 'properties.json')
osutils.WriteFile(props_file, json.dumps({'metadata_signature': 'foo-sig',
'metadata_size': 10}))
gen._public_key = os.path.join(self.tempdir, 'public_key.pem')
osutils.WriteFile(gen._public_key, 'foo-pubkey')
payload_path = '/foo'
props_map = gen.GetPayloadPropertiesMap(payload_path)
self.assertEqual(props_map,
{'metadata_signature': 'foo-sig',
'metadata_size': 10,
# This tests that appid key is always added to the
# properties file even if it is empty.
'appid': '',
# This is the base64 encode of 'foo-pubkey'.
'public_key': 'Zm9vLXB1YmtleQ=='})
class PaygenPayloadLibEndToEndTest(PaygenPayloadLibTest):
"""PaygenPayloadLib end-to-end testing."""
def _EndToEndIntegrationTest(self, tgt_image, src_image, sign):
"""Helper test function for validating end to end payload generation."""
output_uri = os.path.join(self.tempdir, 'expected_payload_out')
output_metadata_uri = output_uri + '.metadata-signature'
output_metadata_json = output_uri + '.json'
payload = gspaths.Payload(tgt_image=tgt_image,
src_image=src_image,
uri=output_uri)
paygen_payload_lib.CreateAndUploadPayload(
payload=payload,
sign=sign)
self.assertExists(output_uri)
self.assertEqual(os.path.exists(output_metadata_uri), sign)
self.assertExists(output_metadata_json)
@cros_test_lib.NetworkTest()
def testEndToEndIntegrationFull(self):
"""Integration test to generate a full payload for old_image."""
self._EndToEndIntegrationTest(self.old_image, None, sign=True)
@cros_test_lib.NetworkTest()
def testEndToEndIntegrationDelta(self):
"""Integration test to generate a delta payload for N -> N."""
self._EndToEndIntegrationTest(self.new_image, self.new_image, sign=False)