| # -*- coding: utf-8 -*- |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Test paygen_payload_lib library.""" |
| |
| from __future__ import print_function |
| |
| import json |
| import os |
| import shutil |
| import subprocess |
| import tempfile |
| |
| import mock |
| |
| from chromite.lib import dlc_lib |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| from chromite.lib.paygen import download_cache |
| from chromite.lib.paygen import gspaths |
| from chromite.lib.paygen import partition_lib |
| from chromite.lib.paygen import paygen_payload_lib |
| from chromite.lib.paygen import signer_payloads_client |
| from chromite.lib.paygen import urilib |
| from chromite.lib.paygen import utils |
| |
| pytestmark = cros_test_lib.pytestmark_inside_only |
| |
| |
| # We access a lot of protected members during testing. |
| # pylint: disable=protected-access |
| |
| |
| class PaygenPayloadLibTest(cros_test_lib.RunCommandTempDirTestCase): |
| """PaygenPayloadLib tests base class.""" |
| |
| def setUp(self): |
| self.old_build = gspaths.Build( |
| channel='dev-channel', |
| board='x86-alex', |
| version='1620.0.0', |
| bucket='chromeos-releases') |
| |
| self.old_image = gspaths.Image( |
| build=self.old_build, |
| key='mp-v3', |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/' |
| 'chromeos_1620.0.0_x86-alex_recovery_dev-channel_mp-v3.bin')) |
| |
| self.old_base_image = gspaths.Image( |
| build=self.old_build, |
| key='mp-v3', |
| image_type='base', |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/' |
| 'chromeos_1620.0.0_x86-alex_base_dev-channel_mp-v3.bin')) |
| |
| self.new_build = gspaths.Build( |
| channel='dev-channel', |
| board='x86-alex', |
| version='4171.0.0', |
| bucket='chromeos-releases') |
| |
| self.new_image = gspaths.Image( |
| build=self.new_build, |
| key='mp-v3', |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/' |
| 'chromeos_4171.0.0_x86-alex_recovery_dev-channel_mp-v3.bin')) |
| |
| self.new_dlc_image = gspaths.DLCImage( |
| build=self.new_build, |
| key='dlc', |
| dlc_id='sample-dlc', |
| dlc_package='sample-package', |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/dlc/' |
| 'sample-dlc/sample-package/dlc.img')) |
| |
| self.old_dlc_image = gspaths.DLCImage( |
| build=self.old_build, |
| key='dlc', |
| dlc_id='sample-dlc', |
| dlc_package='sample-package', |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/dlc/' |
| 'sample-dlc/sample-package/dlc.img')) |
| |
| |
| self.old_test_image = gspaths.UnsignedImageArchive( |
| build=self.old_build, |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/1620.0.0/' |
| 'chromeos_1620.0.0_x86-alex_recovery_dev-channel_test.bin')) |
| |
| self.new_test_image = gspaths.Image( |
| build=self.new_build, |
| uri=('gs://chromeos-releases/dev-channel/x86-alex/4171.0.0/' |
| 'chromeos_4171.0.0_x86-alex_recovery_dev-channel_test.bin')) |
| |
| self.full_payload = gspaths.Payload(tgt_image=self.old_base_image, |
| src_image=None, |
| uri='gs://full_old_foo/boo') |
| |
| self.full_dlc_payload = gspaths.Payload(tgt_image=self.new_dlc_image, |
| src_image=None, |
| uri='gs://full_old_foo/boo') |
| |
| self.delta_payload = gspaths.Payload(tgt_image=self.new_image, |
| src_image=self.old_image, |
| uri='gs://delta_new_old/boo') |
| |
| self.delta_dlc_payload = gspaths.Payload(tgt_image=self.new_dlc_image, |
| src_image=self.old_dlc_image, |
| uri='gs://full_old_foo/boo') |
| |
| self.full_test_payload = gspaths.Payload(tgt_image=self.old_test_image, |
| src_image=None, |
| uri='gs://full_old_foo/boo-test') |
| |
| self.delta_test_payload = gspaths.Payload(tgt_image=self.new_test_image, |
| src_image=self.old_test_image, |
| uri='gs://delta_new_old/boo-test') |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.cache_dir = tempfile.mkdtemp(prefix='crostools-unittest-cache') |
| cls.cache = download_cache.DownloadCache(cls.cache_dir) |
| |
| @classmethod |
| def tearDownClass(cls): |
| cls.cache = None |
| shutil.rmtree(cls.cache_dir) |
| |
| |
| class PaygenPayloadLibBasicTest(PaygenPayloadLibTest): |
| """PaygenPayloadLib basic (and quick) testing.""" |
| |
| def _GetStdGenerator(self, work_dir=None, payload=None, sign=True): |
| """Helper function to create a standardized PayloadGenerator.""" |
| if payload is None: |
| payload = self.full_payload |
| |
| if work_dir is None: |
| work_dir = self.tempdir |
| |
| gen = paygen_payload_lib.PaygenPayload( |
| payload=payload, |
| work_dir=work_dir, |
| sign=sign, |
| verify=False) |
| |
| gen.partition_names = ('foo-root', 'foo-kernel') |
| gen.tgt_partitions = ('/work/tgt_root.bin', '/work/tgt_kernel.bin') |
| gen.src_partitions = ('/work/src_root.bin', '/work/src_kernel.bin') |
| |
| return gen |
| |
| def testWorkingDirNames(self): |
| """Make sure that some of the files we create have the expected names.""" |
| gen = self._GetStdGenerator(work_dir='/foo') |
| |
| self.assertEqual(gen.src_image_file, '/foo/src_image.bin') |
| self.assertEqual(gen.tgt_image_file, '/foo/tgt_image.bin') |
| self.assertEqual(gen.payload_file, '/foo/delta.bin') |
| self.assertEqual(gen.log_file, '/foo/delta.log') |
| |
| # Siged image specific values. |
| self.assertEqual(gen.signed_payload_file, '/foo/delta.bin.signed') |
| self.assertEqual(gen.metadata_signature_file, |
| '/foo/delta.bin.signed.metadata-signature') |
| |
| def testUriManipulators(self): |
| """Validate _MetadataUri.""" |
| gen = self._GetStdGenerator(work_dir='/foo') |
| |
| self.assertEqual(gen._MetadataUri('/foo/bar'), |
| '/foo/bar.metadata-signature') |
| self.assertEqual(gen._MetadataUri('gs://foo/bar'), |
| 'gs://foo/bar.metadata-signature') |
| |
| self.assertEqual(gen._LogsUri('/foo/bar'), |
| '/foo/bar.log') |
| self.assertEqual(gen._LogsUri('gs://foo/bar'), |
| 'gs://foo/bar.log') |
| |
| self.assertEqual(gen._JsonUri('/foo/bar'), |
| '/foo/bar.json') |
| self.assertEqual(gen._JsonUri('gs://foo/bar'), |
| 'gs://foo/bar.json') |
| |
| def testSetupOfficialSigner(self): |
| """Tests that official signer is being setup properly.""" |
| gen = self._GetStdGenerator(work_dir='/foo', sign=True) |
| gen._private_key = 'foo-private-key' |
| |
| gen._SetupSigner(self.new_build) |
| self.assertIsInstance( |
| gen.signer, signer_payloads_client.SignerPayloadsClientGoogleStorage) |
| self.assertIsNone(gen._private_key) |
| self.assertIsNone(gen._public_key) |
| |
| def testSetupUnofficialSigner(self): |
| """Tests that unofficial signer is being setup properly. |
| |
| And private key is set to the default correctly. |
| """ |
| gen = self._GetStdGenerator(work_dir='/foo', sign=True) |
| |
| pubkey_extract_mock = self.PatchObject( |
| signer_payloads_client.UnofficialSignerPayloadsClient, |
| 'ExtractPublicKey') |
| |
| build = self.new_build |
| build.bucket = 'foo-bucket' |
| gen._SetupSigner(build) |
| self.assertIsInstance( |
| gen.signer, signer_payloads_client.UnofficialSignerPayloadsClient) |
| self.assertEqual(gen._private_key, os.path.join(constants.CHROMITE_DIR, |
| 'ssh_keys', 'testing_rsa')) |
| pubkey_extract_mock.assert_called_once_with('/foo/public_key.pem') |
| |
| def testSetupUnofficialSignerPassedPrivateKey(self): |
| """Tests that setting signers use correct passed private key.""" |
| gen = self._GetStdGenerator(work_dir='/foo', sign=True) |
| gen._private_key = 'some-foo-private-key' |
| |
| build = self.new_build |
| build.bucket = 'foo-bucket' |
| |
| gen._SetupSigner(build) |
| self.assertIsInstance( |
| gen.signer, signer_payloads_client.UnofficialSignerPayloadsClient) |
| self.assertEqual(gen._private_key, 'some-foo-private-key') |
| |
| def testRunGeneratorCmd(self): |
| """Test the specialized command to run programs in chroot.""" |
| mock_result = cros_build_lib.CommandResult(stdout=b'foo output') |
| run_mock = self.PatchObject(cros_build_lib, 'run', |
| return_value=mock_result) |
| |
| expected_cmd = ['cmd', 'bar', 'jo nes'] |
| gen = self._GetStdGenerator(work_dir=self.tempdir) |
| gen._RunGeneratorCmd(expected_cmd) |
| |
| run_mock.assert_called_once_with( |
| expected_cmd, |
| stdout=True, |
| enter_chroot=True, |
| stderr=subprocess.STDOUT) |
| |
| self.assertIn(mock_result.output, |
| osutils.ReadFile(os.path.join(self.tempdir, 'delta.log'), |
| mode='rb')) |
| |
| # Now run with squawk_wrap=true. |
| run_mock = self.PatchObject(cros_build_lib, 'run', |
| return_value=mock_result) |
| gen._RunGeneratorCmd(expected_cmd, squawk_wrap=True) |
| |
| run_mock.assert_called_once_with( |
| expected_cmd, |
| stdout=True, |
| enter_chroot=True, |
| stderr=subprocess.STDOUT) |
| |
| self.assertIn(mock_result.output, |
| osutils.ReadFile(os.path.join(self.tempdir, 'delta.log'), |
| mode='rb')) |
| |
| def testBuildArg(self): |
| """Make sure the function semantics is satisfied.""" |
| gen = self._GetStdGenerator(work_dir='/work') |
| test_dict = {'foo': 'bar'} |
| |
| # Value present. |
| self.assertEqual(gen._BuildArg('--foo', test_dict, 'foo'), '--foo=bar') |
| |
| # Value present, default has no impact. |
| self.assertEqual(gen._BuildArg('--foo', test_dict, 'foo', default='baz'), |
| '--foo=bar') |
| |
| # Value missing, default kicking in. |
| self.assertEqual(gen._BuildArg('--foo2', test_dict, 'foo2', default='baz'), |
| '--foo2=baz') |
| |
| def _DoPrepareImageTest(self, image_type): |
| """Test _PrepareImage.""" |
| download_uri = 'gs://bucket/foo/image.bin' |
| image_file = '/work/image.bin' |
| gen = self._GetStdGenerator(work_dir=self.tempdir) |
| |
| if image_type == 'Image': |
| image_obj = gspaths.Image(build=self.new_build, key='mp-v3', |
| uri=download_uri) |
| test_extract_file = None |
| elif image_type == 'UnsignedImageArchive': |
| image_obj = gspaths.UnsignedImageArchive( |
| build=self.new_build, image_type='test', uri=download_uri) |
| test_extract_file = paygen_payload_lib.PaygenPayload.TEST_IMAGE_NAME |
| else: |
| raise ValueError('invalid image type descriptor (%s)' % image_type) |
| |
| # Stub out and Check the expected function calls. |
| copy_mock = self.PatchObject(download_cache.DownloadCache, 'GetFileCopy') |
| if test_extract_file: |
| download_file = mock.ANY |
| else: |
| download_file = image_file |
| |
| if test_extract_file: |
| run_mock = self.PatchObject(cros_build_lib, 'run') |
| move_mock = self.PatchObject(shutil, 'move') |
| |
| # Run the test. |
| gen._PrepareImage(image_obj, image_file) |
| |
| copy_mock.assert_called_once_with(download_uri, download_file) |
| |
| if test_extract_file: |
| run_mock.assert_called_once_with( |
| ['tar', '-xJf', download_file, test_extract_file], cwd=self.tempdir) |
| move_mock.assert_called_once_with( |
| os.path.join(self.tempdir, test_extract_file), image_file) |
| |
| def testPrepareImageNormal(self): |
| """Test preparing a normal image.""" |
| self._DoPrepareImageTest('Image') |
| |
| def testPrepareImageTest(self): |
| """Test preparing a test image.""" |
| self._DoPrepareImageTest('UnsignedImageArchive') |
| |
| def testGeneratePostinstConfigTrue(self): |
| """Tests creating the postinstall config file.""" |
| gen = self._GetStdGenerator(payload=self.full_payload, |
| work_dir=self.tempdir) |
| |
| gen._GeneratePostinstConfig(True) |
| self.assertEqual(osutils.ReadFile(gen._postinst_config_file), |
| 'RUN_POSTINSTALL_root=true\n') |
| |
| def testGeneratePostinstConfigFalse(self): |
| """Tests creating the postinstall config file.""" |
| gen = self._GetStdGenerator(payload=self.full_payload, |
| work_dir=self.tempdir) |
| |
| gen._GeneratePostinstConfig(False) |
| self.assertEqual(osutils.ReadFile(gen._postinst_config_file), |
| 'RUN_POSTINSTALL_root=false\n') |
| |
| def testPreparePartitionsImageType(self): |
| """Tests discrepency in image types for _PreparePartitions function.""" |
| gen = self._GetStdGenerator(payload=self.delta_payload, |
| work_dir=self.tempdir) |
| |
| def side_effect(image): |
| if image == gen.tgt_image_file: |
| return partition_lib.CROS_IMAGE |
| else: |
| return partition_lib.DLC_IMAGE |
| |
| self.PatchObject(partition_lib, 'LookupImageType', side_effect=side_effect) |
| |
| with self.assertRaisesRegex(paygen_payload_lib.Error, 'different types'): |
| gen._PreparePartitions() |
| |
| def testPreparePartitionsGptFull(self): |
| """Tests _PreparePartitions function for GPT (platform) images. |
| |
| This test is or full payloads only. |
| """ |
| gen = self._GetStdGenerator(payload=self.full_payload, |
| work_dir=self.tempdir) |
| # Mock out needed functions. |
| self.PatchObject(partition_lib, 'LookupImageType', |
| return_value=partition_lib.CROS_IMAGE) |
| platform_params_mock = self.PatchObject(gen, '_GetPlatformImageParams', |
| return_value='foo-appid') |
| root_ext_mock = self.PatchObject(partition_lib, 'ExtractRoot') |
| kern_ext_mock = self.PatchObject(partition_lib, 'ExtractKernel') |
| postinst_mock = self.PatchObject(gen, '_GeneratePostinstConfig') |
| tgt_image_file = gen.tgt_image_file |
| |
| gen._PreparePartitions() |
| |
| # Check the appid was correctly set. |
| self.assertEqual(gen._appid, 'foo-appid') |
| # Check extract partition functions are called correctly. |
| root_ext_mock(gen.tgt_image_file, gen.tgt_partitions[0]) |
| kern_ext_mock(gen.tgt_image_file, gen.tgt_partitions[1]) |
| # Checks that postinstall config file was generated. |
| postinst_mock.assert_called_once_with(True) |
| # Checks we mounted the image and read the lsb-release file correctly. |
| platform_params_mock.assert_called_once_with(tgt_image_file) |
| # Make sure the tgt_image_file variable is set to None so no one can use it |
| # again. |
| self.assertIsNone(gen.tgt_image_file) |
| |
| def testPreparePartitionsGptDelta(self): |
| """Tests _PreparePartitions function for GPT (platform) images. |
| |
| This test is or delta payloads only. |
| """ |
| gen = self._GetStdGenerator(payload=self.delta_payload, |
| work_dir=self.tempdir) |
| |
| # Mock out needed functions. |
| self.PatchObject(partition_lib, 'LookupImageType', |
| return_value=partition_lib.CROS_IMAGE) |
| self.PatchObject(gen, '_GetPlatformImageParams', return_value='foo-appid') |
| root_ext_mock = self.PatchObject(partition_lib, 'ExtractRoot') |
| kern_ext_mock = self.PatchObject(partition_lib, 'ExtractKernel') |
| |
| gen._PreparePartitions() |
| |
| # Check extract partition functions are called correctly. Two times each |
| # partition (kernel, rootfs), once for source partition and once for target |
| # partition. |
| self.assertEqual(root_ext_mock.call_count, 2) |
| self.assertEqual(kern_ext_mock.call_count, 2) |
| |
| def testPreparePartitionsDlc(self): |
| """Tests _PreparePartitions function for DLC images.""" |
| gen = self._GetStdGenerator(payload=self.full_payload, |
| work_dir=self.tempdir) |
| self.PatchObject(partition_lib, 'LookupImageType', |
| return_value=partition_lib.DLC_IMAGE) |
| get_params_mock = self.PatchObject( |
| gen, '_GetDlcImageParams', |
| return_value=('foo-id', 'foo-package', 'foo-appid')) |
| postinst_mock = self.PatchObject(gen, '_GeneratePostinstConfig') |
| |
| gen._PreparePartitions() |
| |
| self.assertEqual(gen._appid, 'foo-appid') |
| self.assertEqual(gen.partition_names, ('dlc/foo-id/foo-package',)) |
| self.assertFalse(postinst_mock.called) |
| get_params_mock.assert_called_once() |
| |
| def _TestGetDlcImageParams(self, tgt_id, tgt_package): |
| """Utility function for Testing _GetDlcImageParams.""" |
| payload = self.full_dlc_payload |
| tgt_image = self.new_dlc_image |
| tgt_image.dlc_id = tgt_id |
| tgt_image.dlc_package = tgt_package |
| |
| gen = self._GetStdGenerator(payload=payload, work_dir=self.tempdir) |
| self.PatchObject(osutils, 'MountDir') |
| self.PatchObject(osutils, 'UmountDir') |
| lsb_read_mock = self.PatchObject( |
| utils, 'ReadLsbRelease', |
| return_value={dlc_lib.DLC_APPID_KEY: 'foo-appid', |
| dlc_lib.DLC_ID_KEY: 'sample-dlc', |
| dlc_lib.DLC_PACKAGE_KEY: 'sample-package'}) |
| |
| dlc_id, dlc_package, dlc_appid = gen._GetDlcImageParams(tgt_image) |
| |
| self.assertEqual(dlc_id, 'sample-dlc') |
| self.assertEqual(dlc_package, 'sample-package') |
| self.assertEqual(dlc_appid, 'foo-appid') |
| lsb_read_mock.assert_called_once() |
| |
| def testGetDlcImageParamsCorrect(self): |
| """Tests _GetDlcImageParams function.""" |
| self._TestGetDlcImageParams('sample-dlc', 'sample-package') |
| |
| def testGetDlcImageParamsMismatchId(self): |
| """Tests _GetDlcImageParams function with mismatched DLC ID.""" |
| with self.assertRaises(paygen_payload_lib.Error): |
| self._TestGetDlcImageParams('sample-dlc2', 'sample-package') |
| |
| def testGetDlcImageParamsMismatchPackage(self): |
| """Tests _GetDlcImageParams function with mismatched DLC package.""" |
| with self.assertRaises(paygen_payload_lib.Error): |
| self._TestGetDlcImageParams('sample-dlc', 'sample-package2') |
| |
| def testGenerateUnsignedPayloadFull(self): |
| """Test _GenerateUnsignedPayload with full payload.""" |
| gen = self._GetStdGenerator(payload=self.full_payload, |
| work_dir=self.tempdir) |
| osutils.WriteFile(gen._postinst_config_file, 'Fake postinst content') |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| |
| # Run the test. |
| gen._GenerateUnsignedPayload() |
| |
| # Check the expected function calls. |
| cmd = ['delta_generator', |
| '--major_version=2', |
| '--out_file=' + gen.payload_file, |
| '--partition_names=' + ':'.join(gen.partition_names), |
| '--new_partitions=' + ':'.join(gen.tgt_partitions), |
| '--new_postinstall_config_file=' + gen._postinst_config_file] |
| run_mock.assert_called_once_with(cmd, squawk_wrap=True) |
| |
| def testGenerateUnsignedPayloadDelta(self): |
| """Test _GenerateUnsignedPayload with delta payload.""" |
| gen = self._GetStdGenerator(payload=self.delta_payload, |
| work_dir=self.tempdir) |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| |
| # Run the test. |
| gen._GenerateUnsignedPayload() |
| |
| # Check the expected function calls. |
| cmd = ['delta_generator', |
| '--major_version=2', |
| '--out_file=' + gen.payload_file, |
| '--partition_names=' + ':'.join(gen.partition_names), |
| '--new_partitions=' + ':'.join(gen.tgt_partitions), |
| '--old_partitions=' + ':'.join(gen.src_partitions)] |
| run_mock.assert_called_once_with(cmd, squawk_wrap=True) |
| |
| def testGenerateHashes(self): |
| """Test _GenerateHashes.""" |
| gen = self._GetStdGenerator() |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_RunGeneratorCmd') |
| osutils.WriteFile(gen.payload_hash_file, 'payload') |
| osutils.WriteFile(gen.metadata_hash_file, 'hash') |
| |
| # Run the test. |
| self.assertEqual(gen._GenerateHashes(), (b'payload', b'hash')) |
| |
| # Check the expected function calls. |
| cmd = ['delta_generator', |
| '--in_file=' + gen.payload_file, |
| '--signature_size=256', |
| partial_mock.HasString('--out_hash_file='), |
| partial_mock.HasString('--out_metadata_hash_file=')] |
| run_mock.assert_called_once_with(cmd) |
| |
| def testSignHashes(self): |
| """Test _SignHashes.""" |
| hashes = (b'foo', b'bar') |
| signatures = ((b'0' * 256,), (b'1' * 256,)) |
| |
| gen = self._GetStdGenerator(work_dir='/work') |
| |
| # Stub out the required functions. |
| hash_mock = self.PatchObject( |
| signer_payloads_client.SignerPayloadsClientGoogleStorage, |
| 'GetHashSignatures', |
| return_value=signatures) |
| |
| # Run the test. |
| self.assertEqual(gen._SignHashes(hashes), signatures) |
| |
| # Check the expected function calls. |
| hash_mock.assert_called_once_with( |
| hashes, |
| keysets=gen.PAYLOAD_SIGNATURE_KEYSETS) |
| |
| def testWriteSignaturesToFile(self): |
| """Test writing signatures into files.""" |
| gen = self._GetStdGenerator(payload=self.delta_payload) |
| signatures = (b'0' * 256, b'1' * 256) |
| |
| file_paths = gen._WriteSignaturesToFile(signatures) |
| self.assertEqual(len(file_paths), len(signatures)) |
| self.assertExists(file_paths[0]) |
| self.assertExists(file_paths[1]) |
| self.assertEqual(osutils.ReadFile(file_paths[0], mode='rb'), signatures[0]) |
| self.assertEqual(osutils.ReadFile(file_paths[1], mode='rb'), signatures[1]) |
| |
| def testInsertSignaturesIntoPayload(self): |
| """Test inserting payload and metadata signatures.""" |
| gen = self._GetStdGenerator(payload=self.delta_payload) |
| payload_signatures = (b'0' * 256,) |
| metadata_signatures = (b'0' * 256,) |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_RunGeneratorCmd') |
| |
| # Run the test. |
| gen._InsertSignaturesIntoPayload(payload_signatures, metadata_signatures) |
| |
| # Check the expected function calls. |
| cmd = ['delta_generator', |
| '--in_file=' + gen.payload_file, |
| '--signature_size=256', |
| partial_mock.HasString('payload_signature_file'), |
| partial_mock.HasString('metadata_signature_file'), |
| '--out_file=' + gen.signed_payload_file] |
| run_mock.assert_called_once_with(cmd) |
| |
| def testStoreMetadataSignatures(self): |
| """Test how we store metadata signatures.""" |
| gen = self._GetStdGenerator(payload=self.delta_payload) |
| metadata_signatures = (b'1' * 256,) |
| encoded_metadata_signature = ( |
| 'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT' |
| 'ExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTEx' |
| 'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT' |
| 'ExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTEx' |
| 'MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMT' |
| 'ExMTExMTExMQ==') |
| |
| gen._StoreMetadataSignatures(metadata_signatures) |
| |
| self.assertEqual(osutils.ReadFile(gen.metadata_signature_file), |
| encoded_metadata_signature) |
| |
| def testVerifyPayloadDelta(self): |
| """Test _VerifyPayload with delta payload.""" |
| gen = self._GetStdGenerator(payload=self.delta_test_payload, |
| work_dir='/work') |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| gen.metadata_size = 10 |
| |
| # Run the test. |
| gen._VerifyPayload() |
| |
| # Check the expected function calls. |
| cmd = ['check_update_payload', |
| gen.signed_payload_file, |
| '--check', |
| '--type', 'delta', |
| '--disabled_tests', 'move-same-src-dst-block', |
| '--part_names', 'foo-root', 'foo-kernel', |
| '--dst_part_paths', '/work/tgt_root.bin', '/work/tgt_kernel.bin', |
| '--meta-sig', gen.metadata_signature_file, |
| '--metadata-size', '10', |
| '--src_part_paths', '/work/src_root.bin', '/work/src_kernel.bin'] |
| run_mock.assert_called_once_with(cmd) |
| |
| def testVerifyPayloadFull(self): |
| """Test _VerifyPayload with Full payload.""" |
| gen = self._GetStdGenerator(payload=self.full_test_payload, |
| work_dir='/work') |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| gen.metadata_size = 10 |
| |
| # Run the test. |
| gen._VerifyPayload() |
| |
| # Check the expected function calls. |
| cmd = ['check_update_payload', |
| gen.signed_payload_file, |
| '--check', |
| '--type', 'full', |
| '--disabled_tests', 'move-same-src-dst-block', |
| '--part_names', 'foo-root', 'foo-kernel', |
| '--dst_part_paths', '/work/tgt_root.bin', '/work/tgt_kernel.bin', |
| '--meta-sig', gen.metadata_signature_file, |
| '--metadata-size', '10'] |
| run_mock.assert_called_once_with(cmd) |
| |
| def testVerifyPayloadPublicKey(self): |
| """Test _VerifyPayload with delta payload.""" |
| payload = self.full_test_payload |
| payload.build.bucket = 'gs://chromeos-release-test' |
| gen = self._GetStdGenerator(payload=payload, work_dir='/work') |
| |
| # Stub out the required functions. |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| gen.metadata_size = 10 |
| gen._private_key = 'foo-private-key' |
| |
| # Run the test. |
| gen._VerifyPayload() |
| |
| # Check the expected function calls. |
| cmd = [mock.ANY] * 17 |
| public_key = os.path.join('/work/public_key.pem') |
| cmd.extend(['--key', public_key]) |
| run_mock.assert_called_once_with(cmd) |
| |
| def testSignPayload(self): |
| """Test the overall payload signature process.""" |
| payload_hash = b'payload_hash' |
| metadata_hash = b'metadata_hash' |
| payload_sigs = [b'payload_sig'] |
| metadata_sigs = [b'metadata_sig'] |
| |
| gen = self._GetStdGenerator(payload=self.delta_payload, work_dir='/work') |
| |
| # Set up stubs. |
| gen_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_GenerateHashes', |
| return_value=(payload_hash, metadata_hash)) |
| sign_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_SignHashes', |
| return_value=[payload_sigs, metadata_sigs]) |
| |
| ins_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_InsertSignaturesIntoPayload') |
| store_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_StoreMetadataSignatures') |
| |
| # Run the test. |
| result_payload_sigs, result_metadata_sigs = gen._SignPayload() |
| |
| self.assertEqual(payload_sigs, result_payload_sigs) |
| self.assertEqual(metadata_sigs, result_metadata_sigs) |
| |
| # Check expected calls. |
| gen_mock.assert_called_once_with() |
| sign_mock.assert_called_once_with([payload_hash, metadata_hash]) |
| ins_mock.assert_called_once_with(payload_sigs, metadata_sigs) |
| store_mock.assert_called_once_with(metadata_sigs) |
| |
| def testCreateSignedDelta(self): |
| """Test the overall payload generation process.""" |
| payload = self.delta_payload |
| gen = self._GetStdGenerator(payload=payload, work_dir='/work') |
| |
| # Set up stubs. |
| prep_image_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_PrepareImage') |
| prep_part_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_PreparePartitions') |
| gen_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_GenerateUnsignedPayload') |
| sign_mock = self.PatchObject( |
| paygen_payload_lib.PaygenPayload, '_SignPayload', |
| return_value=(['payload_sigs'], ['metadata_sigs'])) |
| store_mock = self.PatchObject(paygen_payload_lib.PaygenPayload, |
| '_StorePayloadJson') |
| |
| # Run the test. |
| gen._Create() |
| |
| # Check expected calls. |
| self.assertEqual(prep_image_mock.call_args_list, [ |
| mock.call(payload.tgt_image, gen.tgt_image_file), |
| mock.call(payload.src_image, gen.src_image_file), |
| ]) |
| gen_mock.assert_called_once_with() |
| sign_mock.assert_called_once_with() |
| store_mock.assert_called_once_with(['metadata_sigs']) |
| prep_part_mock.assert_called_once() |
| |
| def testUploadResults(self): |
| """Test the overall payload generation process.""" |
| gen_sign = self._GetStdGenerator(work_dir='/work', sign=True) |
| gen_nosign = self._GetStdGenerator(work_dir='/work', sign=False) |
| |
| # Set up stubs. |
| copy_mock = self.PatchObject(urilib, 'Copy') |
| |
| # Run the test. |
| gen_sign._UploadResults() |
| gen_nosign._UploadResults() |
| |
| self.assertEqual(copy_mock.call_args_list, [ |
| # Check signed calls. |
| mock.call('/work/delta.bin.signed', 'gs://full_old_foo/boo'), |
| mock.call('/work/delta.log', 'gs://full_old_foo/boo.log'), |
| mock.call('/work/delta.json', 'gs://full_old_foo/boo.json'), |
| # Check unsigned calls. |
| mock.call('/work/delta.bin', 'gs://full_old_foo/boo'), |
| mock.call('/work/delta.log', 'gs://full_old_foo/boo.log'), |
| mock.call('/work/delta.json', 'gs://full_old_foo/boo.json'), |
| ]) |
| |
| def testFindCacheDir(self): |
| """Test calculating the location of the cache directory.""" |
| gen = self._GetStdGenerator(work_dir='/foo') |
| |
| # The correct result is based on the system cache directory, which changes. |
| # Ensure it ends with the right directory name. |
| self.assertEqual(os.path.basename(gen._FindCacheDir()), 'paygen_cache') |
| |
| def testGetPayloadPropertiesMap(self): |
| """Tests getting the payload properties as a dict.""" |
| gen = self._GetStdGenerator(sign=False) |
| gen._appid = 'foo-appid' |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| |
| props_file = os.path.join(self.tempdir, 'properties.json') |
| osutils.WriteFile(props_file, json.dumps({'metadata_signature': '', |
| 'metadata_size': 10})) |
| payload_path = '/foo' |
| props_map = gen.GetPayloadPropertiesMap(payload_path) |
| |
| cmd = ['delta_generator', |
| '--in_file=' + payload_path, |
| '--properties_file=' + props_file, |
| '--properties_format=json'] |
| run_mock.assert_called_once_with(cmd) |
| self.assertEqual(props_map, |
| # This tests that if metadata_signature is empty, the code |
| # converts it to None. |
| {'metadata_signature': None, |
| 'metadata_size': 10, |
| 'appid': 'foo-appid', |
| 'target_version': '1620.0.0'}) |
| |
| def testGetDeltaPayloadPropertiesMap(self): |
| """Tests getting the delta payload properties as a dict.""" |
| gen = self._GetStdGenerator(sign=False, payload=self.delta_payload) |
| gen._appid = 'foo-appid' |
| run_mock = self.PatchObject(gen, '_RunGeneratorCmd') |
| |
| props_file = os.path.join(self.tempdir, 'properties.json') |
| osutils.WriteFile(props_file, json.dumps({'metadata_signature': '', |
| 'metadata_size': 10})) |
| payload_path = '/foo' |
| props_map = gen.GetPayloadPropertiesMap(payload_path) |
| |
| cmd = ['delta_generator', |
| '--in_file=' + payload_path, |
| '--properties_file=' + props_file, |
| '--properties_format=json'] |
| run_mock.assert_called_once_with(cmd) |
| self.assertEqual(props_map, |
| # This tests that if metadata_signature is empty, the code |
| # converts it to None. |
| {'metadata_signature': None, |
| 'metadata_size': 10, |
| 'appid': 'foo-appid', |
| 'target_version': '4171.0.0', |
| 'source_version': '1620.0.0'}) |
| |
| def testGetPayloadPropertiesMapSigned(self): |
| """Tests getting the payload properties as a dict for signed payloads.""" |
| gen = self._GetStdGenerator() |
| |
| props_file = os.path.join(self.tempdir, 'properties.json') |
| osutils.WriteFile(props_file, json.dumps({'metadata_signature': 'foo-sig', |
| 'metadata_size': 10})) |
| gen._public_key = os.path.join(self.tempdir, 'public_key.pem') |
| osutils.WriteFile(gen._public_key, 'foo-pubkey') |
| |
| payload_path = '/foo' |
| props_map = gen.GetPayloadPropertiesMap(payload_path) |
| self.assertEqual(props_map, |
| {'metadata_signature': 'foo-sig', |
| 'metadata_size': 10, |
| # This tests that appid key is always added to the |
| # properties file even if it is empty. |
| 'appid': '', |
| # This is the base64 encode of 'foo-pubkey'. |
| 'public_key': 'Zm9vLXB1YmtleQ==', |
| 'target_version': '1620.0.0'}) |
| |
| |
| class PaygenPayloadLibEndToEndTest(PaygenPayloadLibTest): |
| """PaygenPayloadLib end-to-end testing.""" |
| |
| def _EndToEndIntegrationTest(self, tgt_image, src_image, sign): |
| """Helper test function for validating end to end payload generation.""" |
| output_uri = os.path.join(self.tempdir, 'expected_payload_out') |
| output_metadata_uri = output_uri + '.metadata-signature' |
| output_metadata_json = output_uri + '.json' |
| |
| payload = gspaths.Payload(tgt_image=tgt_image, |
| src_image=src_image, |
| uri=output_uri) |
| |
| paygen_payload_lib.CreateAndUploadPayload( |
| payload=payload, |
| sign=sign) |
| |
| self.assertExists(output_uri) |
| self.assertEqual(os.path.exists(output_metadata_uri), sign) |
| self.assertExists(output_metadata_json) |
| |
| @cros_test_lib.NetworkTest() |
| def testEndToEndIntegrationFull(self): |
| """Integration test to generate a full payload for old_image.""" |
| self._EndToEndIntegrationTest(self.old_image, None, sign=True) |
| |
| @cros_test_lib.NetworkTest() |
| def testEndToEndIntegrationDelta(self): |
| """Integration test to generate a delta payload for N -> N.""" |
| self._EndToEndIntegrationTest(self.new_image, self.new_image, sign=False) |