| # Copyright 2015 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Test the image_lib module.""" |
| |
| import collections |
| import gc |
| import glob |
| import os |
| from pathlib import Path |
| import stat |
| from unittest import mock |
| |
| from chromite.lib import chromeos_version |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import git |
| from chromite.lib import image_lib |
| from chromite.lib import install_mask |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| from chromite.lib import portage_util |
| from chromite.lib import retry_util |
| from chromite.utils import c_loop |
| from chromite.utils import os_util |
| |
| |
| # pylint: disable=protected-access |
| |
| |
| class FakeException(Exception): |
| """Fake exception used for testing exception handling.""" |
| |
| |
| FAKE_PATH = "/imaginary/file" |
| LOOP_DEV = "/dev/loop9999" |
| LOOP_PART_COUNT = 12 |
| LOOP_PARTITION_INFO = [ |
| image_lib.PartitionInfo(1, 2928640, 28672, "", "STATE"), |
| image_lib.PartitionInfo(2, 20480, 32768, "", "KERN-A"), |
| image_lib.PartitionInfo(3, 286720, 2641920, "", "ROOT-A"), |
| image_lib.PartitionInfo(4, 53248, 32768, "", "KERN-B"), |
| image_lib.PartitionInfo(5, 282624, 4096, "", "ROOT-B"), |
| image_lib.PartitionInfo(6, 16448, 1, "", "KERN-C"), |
| image_lib.PartitionInfo(7, 16449, 1, "", "ROOT-C"), |
| image_lib.PartitionInfo(8, 86016, 32768, "", "OEM"), |
| image_lib.PartitionInfo(9, 16450, 1, "", "reserved"), |
| image_lib.PartitionInfo(10, 16451, 1, "", "reserved"), |
| image_lib.PartitionInfo(11, 64, 16384, "", "RWFW"), |
| image_lib.PartitionInfo(12, 249856, 32768, "", "EFI-SYSTEM"), |
| ] |
| LOOP_PARTS_DICT = { |
| p.number: "%sp%d" % (LOOP_DEV, p.number) for p in LOOP_PARTITION_INFO |
| } |
| LOOP_PARTS_LIST = LOOP_PARTS_DICT.values() |
| FAKE_DATE_STRING = "2022_07_20_203326" |
| |
| |
| class LoopbackPartitionsMock(image_lib.LoopbackPartitions): |
| """Mocked loopback partition class to use in unit tests.""" |
| |
| def __init__(self, *args, **kwargs) -> None: |
| super().__init__(*args, **kwargs) |
| self.enable_rw_called = set() |
| self.disable_rw_called = set() |
| |
| def _InitGpt(self) -> None: |
| """Initialize the GPT info.""" |
| self._gpt_table = LOOP_PARTITION_INFO |
| |
| def Attach(self) -> None: |
| """Initialize the loopback device.""" |
| self.dev = LOOP_DEV |
| if not self.destination: |
| self.destination = osutils.TempDir() |
| self.parts = { |
| p.number: "%sp%s" % (self.dev, p.number) for p in self._gpt_table |
| } |
| |
| def EnableRwMount(self, part_id, offset=0) -> None: |
| """Stub out enable rw mount.""" |
| self.enable_rw_called.add((part_id, offset)) |
| |
| def DisableRwMount(self, part_id, offset=0) -> None: |
| """Stub out disable rw mount.""" |
| self.disable_rw_called.add((part_id, offset)) |
| |
| def _Mount(self, part, mount_opts): |
| """Stub out mount operations.""" |
| dest_number, _ = self._GetMountPointAndSymlink(part) |
| # Don't actually even try to mount it. |
| self._mounted.add(part) |
| return dest_number |
| |
| def _Unmount(self, part) -> None: |
| """Stub out unmount operations.""" |
| self._mounted.remove(part) |
| |
| def close(self) -> None: |
| pass |
| |
| |
| class LoopbackPartitionsTest(cros_test_lib.MockTempDirTestCase): |
| """Test the loopback partitions class""" |
| |
| def setUp(self) -> None: |
| self.rc_mock = cros_test_lib.RunCommandMock() |
| self.StartPatcher(self.rc_mock) |
| self.rc_mock.SetDefaultCmdResult() |
| self.rc_mock.AddCmdResult(partial_mock.In("--show"), stdout=LOOP_DEV) |
| |
| self.PatchObject( |
| image_lib, |
| "GetImageDiskPartitionInfo", |
| return_value=LOOP_PARTITION_INFO, |
| ) |
| self.PatchObject(glob, "glob", return_value=LOOP_PARTS_LIST) |
| self.mount_mock = self.PatchObject(osutils, "MountDir") |
| self.umount_mock = self.PatchObject(osutils, "UmountDir") |
| self.retry_mock = self.PatchObject(retry_util, "RetryException") |
| self.addpart_mock = self.PatchObject( |
| image_lib.LoopbackPartitions, "_AddPartitions" |
| ) |
| self.delpart_mock = self.PatchObject( |
| image_lib.LoopbackPartitions, "_DeletePartitions" |
| ) |
| self.detach_mock = self.PatchObject(c_loop, "detach") |
| |
| def fake_which(val, *_arg, **_kwargs): |
| return val |
| |
| self.PatchObject(osutils, "Which", side_effect=fake_which) |
| |
| # Patch osutils.IsRootUser() to pretend running as root so we attempt to |
| # do all the setup directly instead of falling back to our sudo helper. |
| self.PatchObject(os_util, "is_root_user", return_value=True) |
| |
| def testContextManager(self) -> None: |
| """Test using the loopback class as a context manager.""" |
| with image_lib.LoopbackPartitions(FAKE_PATH) as lb: |
| self.rc_mock.assertCommandContains( |
| ["losetup", "--show", "-f", FAKE_PATH] |
| ) |
| self.delpart_mock.assert_called_once() |
| self.delpart_mock.reset_mock() |
| self.addpart_mock.assert_called_once() |
| self.detach_mock.assert_not_called() |
| self.assertEqual(lb.parts, LOOP_PARTS_DICT) |
| self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO) |
| self.delpart_mock.assert_called_once() |
| self.detach_mock.assert_called_once() |
| |
| def testContextManagerWithMounts(self) -> None: |
| """Test using the loopback class as a context manager with mounts.""" |
| syml = self.PatchObject(osutils, "SafeSymlink") |
| part_ids = (1, "ROOT-A") |
| with image_lib.LoopbackPartitions( |
| FAKE_PATH, part_ids=part_ids, mount_opts=("ro",) |
| ) as lb: |
| expected_mounts = set() |
| expected_calls = [] |
| for part_id in part_ids: |
| for part in LOOP_PARTITION_INFO: |
| if part_id in (part.name, part.number): |
| expected_mounts.add(part) |
| expected_calls.append( |
| mock.call( |
| "dir-%d" % part.number, |
| os.path.join( |
| lb.destination, "dir-%s" % part.name |
| ), |
| sudo=True, |
| ) |
| ) |
| break |
| self.rc_mock.assertCommandContains( |
| ["losetup", "--show", "-f", FAKE_PATH] |
| ) |
| self.delpart_mock.assert_called_once() |
| self.delpart_mock.reset_mock() |
| self.addpart_mock.assert_called_once() |
| self.detach_mock.assert_not_called() |
| self.assertEqual(lb.parts, LOOP_PARTS_DICT) |
| self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO) |
| self.assertEqual(expected_calls, syml.call_args_list) |
| self.assertEqual(expected_mounts, lb._mounted) |
| self.delpart_mock.assert_called_once() |
| self.detach_mock.assert_called_once() |
| |
| def testManual(self) -> None: |
| """Test using the loopback class closed manually.""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH) |
| lb.Attach() |
| self.rc_mock.assertCommandContains( |
| ["losetup", "--show", "-f", FAKE_PATH] |
| ) |
| self.delpart_mock.assert_called_once() |
| self.delpart_mock.reset_mock() |
| self.addpart_mock.assert_called_once() |
| self.detach_mock.assert_not_called() |
| self.assertEqual(lb.parts, LOOP_PARTS_DICT) |
| self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO) |
| lb.close() |
| self.delpart_mock.assert_called_once() |
| self.detach_mock.assert_called_once() |
| |
| def gcFunc(self) -> None: |
| """Isolates a local variable so it'll be garbage collected.""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH) |
| lb.Attach() |
| self.rc_mock.assertCommandContains( |
| ["losetup", "--show", "-f", FAKE_PATH] |
| ) |
| self.delpart_mock.assert_called_once() |
| self.delpart_mock.reset_mock() |
| self.addpart_mock.assert_called_once() |
| self.detach_mock.assert_not_called() |
| self.assertEqual(lb.parts, LOOP_PARTS_DICT) |
| self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO) |
| |
| def testGarbageCollected(self) -> None: |
| """Test using the loopback class closed by garbage collection.""" |
| self.gcFunc() |
| # Force garbage collection in case python didn't already clean up the |
| # loopback object. |
| gc.collect() |
| self.delpart_mock.assert_called_once() |
| self.detach_mock.assert_called_once() |
| |
| def testMountUnmount(self) -> None: |
| """Test Mount() and Unmount() entry points.""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir) |
| lb.Attach() |
| # Mount four partitions. |
| lb.Mount((1, 3, "ROOT-B", "ROOT-C")) |
| for p in (1, 3, 5, 7): |
| self.mount_mock.assert_any_call( |
| "%sp%d" % (LOOP_DEV, p), |
| "%s/dir-%d" % (self.tempdir, p), |
| makedirs=True, |
| skip_mtab=False, |
| sudo=True, |
| mount_opts=("ro",), |
| ) |
| linkname = "%s/dir-%s" % ( |
| self.tempdir, |
| LOOP_PARTITION_INFO[p - 1].name, |
| ) |
| self.assertTrue(stat.S_ISLNK(os.lstat(linkname).st_mode)) |
| self.assertEqual(4, self.mount_mock.call_count) |
| self.umount_mock.assert_not_called() |
| |
| # Check that Mounted provides the right info. |
| mounts = lb.Mounted() |
| self.assertDictEqual( |
| mounts, |
| { |
| "STATE": f"{self.tempdir}/dir-1", |
| "ROOT-A": f"{self.tempdir}/dir-3", |
| "ROOT-B": f"{self.tempdir}/dir-5", |
| "ROOT-C": f"{self.tempdir}/dir-7", |
| }, |
| ) |
| |
| # Unmount half of them, confirm that they were unmounted. |
| lb.Unmount((1, "ROOT-B")) |
| for p in (1, 5): |
| self.umount_mock.assert_any_call( |
| "%s/dir-%d" % (self.tempdir, p), cleanup=False |
| ) |
| self.assertEqual(2, self.umount_mock.call_count) |
| self.umount_mock.reset_mock() |
| |
| # Check that Mounted has been updated |
| mounts = lb.Mounted() |
| self.assertDictEqual( |
| mounts, |
| { |
| "ROOT-A": f"{self.tempdir}/dir-3", |
| "ROOT-C": f"{self.tempdir}/dir-7", |
| }, |
| ) |
| |
| # Close the object, so that we unmount the other half of them. |
| lb.close() |
| for p in (3, 7): |
| self.umount_mock.assert_any_call( |
| "%s/dir-%d" % (self.tempdir, p), cleanup=False |
| ) |
| self.assertEqual(2, self.umount_mock.call_count) |
| |
| # Verify that the directories were cleaned up. |
| for p in (1, 3): |
| self.retry_mock.assert_any_call( |
| cros_build_lib.RunCommandError, |
| 60, |
| osutils.RmDir, |
| "%s/dir-%d" % (self.tempdir, p), |
| sudo=True, |
| sleep=1, |
| ) |
| |
| def testMountingMountedPartReturnsName(self) -> None: |
| """Verify Mount returns the directory name even when already mounted.""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir) |
| lb.Attach() |
| dirname = "%s/dir-%d" % (self.tempdir, lb._gpt_table[0].number) |
| # First make sure we get the directory name when we actually mount. |
| self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",))) |
| # Then make sure we get it when we call it again. |
| self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",))) |
| lb.close() |
| |
| def testRemountCallsMount(self) -> None: |
| """Verify Mount returns the directory name even when already mounted.""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir) |
| lb.Attach() |
| devname = "%sp%d" % (LOOP_DEV, lb._gpt_table[0].number) |
| dirname = "%s/dir-%d" % (self.tempdir, lb._gpt_table[0].number) |
| # First make sure we get the directory name when we actually mount. |
| self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",))) |
| self.mount_mock.assert_called_once_with( |
| devname, |
| dirname, |
| makedirs=True, |
| skip_mtab=False, |
| sudo=True, |
| mount_opts=("ro",), |
| ) |
| # Then make sure we get it when we call it again. |
| self.assertEqual( |
| dirname, lb._Mount(lb._gpt_table[0], ("remount", "rw")) |
| ) |
| self.assertEqual( |
| mock.call( |
| devname, |
| dirname, |
| makedirs=True, |
| skip_mtab=False, |
| sudo=True, |
| mount_opts=("remount", "rw"), |
| ), |
| self.mount_mock.call_args, |
| ) |
| lb.close() |
| |
| def testGetPartitionDevName(self) -> None: |
| """Test GetPartitionDevName().""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH) |
| lb.Attach() |
| for part in LOOP_PARTITION_INFO: |
| self.assertEqual( |
| "%sp%d" % (LOOP_DEV, part.number), |
| lb.GetPartitionDevName(part.number), |
| ) |
| if part.name != "reserved": |
| self.assertEqual( |
| "%sp%d" % (LOOP_DEV, part.number), |
| lb.GetPartitionDevName(part.name), |
| ) |
| lb.close() |
| |
| def test_GetMountPointAndSymlink(self) -> None: |
| """Test _GetMountPointAndSymlink().""" |
| lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir) |
| lb.Attach() |
| for part in LOOP_PARTITION_INFO: |
| expected = [ |
| os.path.join(lb.destination, "dir-%s" % n) |
| for n in (part.number, part.name) |
| ] |
| self.assertEqual(expected, list(lb._GetMountPointAndSymlink(part))) |
| lb.close() |
| |
| def testIsExt2OnVarious(self) -> None: |
| """Test _IsExt2 works with the various partition types.""" |
| # STATE, ROOT-A, and OEM generally have ext2 filesystems. |
| FS_PARTITIONS = (1, 3, 8) |
| |
| def ext_mock(path, offset=0): # pylint: disable=unused-argument |
| for num in FS_PARTITIONS: |
| if path.endswith(f"p{num}"): |
| return True |
| return False |
| |
| self.PatchObject(image_lib, "IsExt2Image", side_effect=ext_mock) |
| |
| lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir) |
| lb.Attach() |
| # We expect that only the partitions in FS_PARTITIONS are ext2. |
| self.assertEqual( |
| [part.number in FS_PARTITIONS for part in LOOP_PARTITION_INFO], |
| [lb._IsExt2(part.name) for part in LOOP_PARTITION_INFO], |
| ) |
| lb.close() |
| |
| |
| class LsbUtilsTest(cros_test_lib.RunCommandTempDirTestCase): |
| """Tests the various LSB utilities.""" |
| |
| def setUp(self) -> None: |
| # Patch osutils.IsRootUser() to pretend running as root, so |
| # reading/writing the lsb-release file doesn't require escalated |
| # privileges and the test can clean itself up correctly. |
| self.PatchObject(os_util, "is_root_user", return_value=True) |
| |
| def testWriteLsbRelease(self) -> None: |
| """Tests writing out the lsb_release file using WriteLsbRelease(..).""" |
| fields = collections.OrderedDict( |
| ( |
| ("x", "1"), |
| ("y", "2"), |
| ("foo", "bar"), |
| ) |
| ) |
| image_lib.WriteLsbRelease(self.tempdir, fields) |
| lsb_release_file = os.path.join(self.tempdir, "etc", "lsb-release") |
| expected_content = "x=1\ny=2\nfoo=bar\n" |
| self.assertFileContents(lsb_release_file, expected_content) |
| self.rc.assertCommandCalled( |
| [ |
| "setfattr", |
| "-n", |
| "security.selinux", |
| "-v", |
| "u:object_r:cros_conf_file:s0", |
| os.path.join(self.tempdir, "etc/lsb-release"), |
| ] |
| ) |
| |
| def testOverwriteLsbRelease(self) -> None: |
| """Tests overwriting the lsb_release file using WriteLsbRelease(..).""" |
| lsb_release_file = os.path.join(self.tempdir, "etc", "lsb-release") |
| |
| image_lib.WriteLsbRelease( |
| self.tempdir, |
| collections.OrderedDict( |
| ( |
| ("x", "1"), |
| ("y", "2"), |
| ("foo", "bar"), |
| ) |
| ), |
| ) |
| |
| # Test that WriteLsbRelease(..) correctly handles an existing file. |
| fields = collections.OrderedDict( |
| ( |
| ("newkey1", "value1"), |
| ("newkey2", "value2"), |
| ("a", "3"), |
| ("b", "4"), |
| ) |
| ) |
| image_lib.WriteLsbRelease(self.tempdir, fields) |
| expected_content = ( |
| "x=1\ny=2\nfoo=bar\nnewkey1=value1\nnewkey2=value2\na=3\nb=4\n" |
| ) |
| self.assertFileContents(lsb_release_file, expected_content) |
| self.rc.assertCommandCalled( |
| [ |
| "setfattr", |
| "-n", |
| "security.selinux", |
| "-v", |
| "u:object_r:cros_conf_file:s0", |
| os.path.join(self.tempdir, "etc/lsb-release"), |
| ] |
| ) |
| |
| |
| class BuildImagePathTest(cros_test_lib.MockTempDirTestCase): |
| """BuildImagePath tests.""" |
| |
| def setUp(self) -> None: |
| self.board = "board" |
| self.board_dir = os.path.join(self.tempdir, self.board) |
| |
| D = cros_test_lib.Directory |
| filesystem = ( |
| D(self.board, ("recovery_image.bin", "other_image.bin")), |
| "full_path_image.bin", |
| ) |
| cros_test_lib.CreateOnDiskHierarchy(self.tempdir, filesystem) |
| |
| self.full_path = os.path.join(self.tempdir, "full_path_image.bin") |
| |
| def testBuildImagePath(self) -> None: |
| """BuildImagePath tests.""" |
| self.PatchObject( |
| image_lib, |
| "GetLatestImageLink", |
| return_value=os.path.join(self.tempdir, self.board), |
| ) |
| |
| # Board and full image path provided. |
| result = image_lib.BuildImagePath(self.board, self.full_path) |
| self.assertEqual(self.full_path, result) |
| |
| # Only full image path provided. |
| result = image_lib.BuildImagePath(None, self.full_path) |
| self.assertEqual(self.full_path, result) |
| |
| # Full image path provided that does not exist. |
| with self.assertRaises(image_lib.ImageDoesNotExistError): |
| image_lib.BuildImagePath(self.board, "/does/not/exist") |
| with self.assertRaises(image_lib.ImageDoesNotExistError): |
| image_lib.BuildImagePath(None, "/does/not/exist") |
| |
| # Default image is used. |
| result = image_lib.BuildImagePath(self.board, None) |
| self.assertEqual( |
| os.path.join(self.board_dir, "recovery_image.bin"), result |
| ) |
| |
| # Image basename provided. |
| result = image_lib.BuildImagePath(self.board, "other_image.bin") |
| self.assertEqual( |
| os.path.join(self.board_dir, "other_image.bin"), result |
| ) |
| |
| # Image basename provided that does not exist. |
| with self.assertRaises(image_lib.ImageDoesNotExistError): |
| image_lib.BuildImagePath(self.board, "does_not_exist.bin") |
| |
| default_mock = self.PatchObject(cros_build_lib, "GetDefaultBoard") |
| |
| # Nothing provided, and no default. |
| default_mock.return_value = None |
| with self.assertRaises(image_lib.ImageDoesNotExistError): |
| image_lib.BuildImagePath(None, None) |
| |
| # Nothing provided, with default. |
| default_mock.return_value = "board" |
| result = image_lib.BuildImagePath(None, None) |
| self.assertEqual( |
| os.path.join(self.board_dir, "recovery_image.bin"), result |
| ) |
| |
| |
| class SecurityTestConfigTest(cros_test_lib.RunCommandTempDirTestCase): |
| """SecurityTestConfig class tests.""" |
| |
| # pylint: disable=protected-access |
| |
| def setUp(self) -> None: |
| self.image = "/path/to/image.bin" |
| self.baselines = "/path/to/baselines" |
| self.vboot_hash = "abc123" |
| self.config = image_lib.SecurityTestConfig( |
| self.image, self.baselines, self.vboot_hash, self.tempdir |
| ) |
| |
| def testVbootCheckout(self) -> None: |
| """Test normal flow - clone and checkout.""" |
| clone_patch = self.PatchObject(git, "Clone") |
| self.config._VbootCheckout() |
| clone_patch.assert_called_once() |
| self.assertCommandContains(["git", "checkout", self.vboot_hash]) |
| |
| # Make sure it doesn't try to clone & checkout again after already |
| # having done so successfully. |
| clone_patch = self.PatchObject(git, "Clone") |
| self.config._VbootCheckout() |
| clone_patch.assert_not_called() |
| |
| def testVbootCheckoutError(self) -> None: |
| """Test exceptions in a git command.""" |
| rce = cros_build_lib.RunCommandError("error") |
| self.PatchObject(git, "Clone", side_effect=rce) |
| with self.assertRaises(image_lib.VbootCheckoutError): |
| self.config._VbootCheckout() |
| |
| def testVbootCheckoutNoDirectory(self) -> None: |
| """Test the error handling when the directory does not exist.""" |
| # Test directory that does not exist. |
| self.config.directory = "/DOES/NOT/EXIST" |
| with self.assertRaises(image_lib.SecurityConfigDirectoryError): |
| self.config._VbootCheckout() |
| |
| def testRunCheck(self) -> None: |
| """RunCheck tests.""" |
| # No config argument when running check. |
| self.config.RunCheck("check1", False) |
| check1 = os.path.join(self.config._checks_dir, "ensure_check1.sh") |
| config1 = os.path.join(self.baselines, "ensure_check1.config") |
| self.assertCommandContains([check1, self.image]) |
| self.assertCommandContains([config1], expected=False) |
| |
| # Include config argument when running check. |
| self.config.RunCheck("check2", True) |
| check2 = os.path.join(self.config._checks_dir, "ensure_check2.sh") |
| config2 = os.path.join(self.baselines, "ensure_check2.config") |
| self.assertCommandContains([check2, self.image, config2]) |
| |
| |
| class GetImageDiskPartitionInfoTests(cros_test_lib.RunCommandTempDirTestCase): |
| """Tests the GetImageDiskPartitionInfo function.""" |
| |
| SAMPLE_PARTED = """/foo/chromiumos_qemu_image.bin:\ |
| 2271240192B:file:512:512:gpt::; |
| 11:32768B:8421375B:8388608B::RWFW:; |
| 6:8421376B:8421887B:512B::KERN-C:; |
| 7:8421888B:8422399B:512B::ROOT-C:; |
| 9:8422400B:8422911B:512B::reserved:; |
| 10:8422912B:8423423B:512B::reserved:; |
| 2:10485760B:27262975B:16777216B::KERN-A:; |
| 4:27262976B:44040191B:16777216B::KERN-B:; |
| 8:44040192B:60817407B:16777216B:ext4:OEM:msftdata; |
| 12:127926272B:161480703B:33554432B:fat16:EFI-SYSTEM:boot, esp; |
| 5:161480704B:163577855B:2097152B::ROOT-B:; |
| 3:163577856B:2260729855B:2097152000B:ext2:ROOT-A:; |
| 1:2260729856B:2271215615B:10485760B:ext2:STATE:msftdata; |
| """ |
| |
| SAMPLE_CGPT = """\ |
| start size part contents |
| 0 1 PMBR (Boot GUID: 88FB7EB8-2B3F-B943-B933-\ |
| EEC571FFB6E1) |
| 1 1 Pri GPT header |
| 2 32 Pri GPT table |
| 1921024 2097152 1 Label: "STATE" |
| Type: 0FC63DAF-8483-4772-8E79-3D69D8477DE4 |
| UUID: EEBD83BE-397E-BD44-878B-0DDDD5A5C510 |
| Attr: [0] |
| 20480 32768 2 Label: "KERN-A" |
| Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309 |
| UUID: 7007C2F3-08E5-AB40-A4BC-FF5B01F5460D |
| Attr: [101] |
| 1101824 819200 3 Label: "ROOT-A" |
| Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC |
| UUID: F4C5C3AD-027F-894B-80CD-3DEC57932948 |
| Attr: [0] |
| 53248 32768 4 Label: "KERN-B" |
| Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309 |
| UUID: C85FB478-404C-8741-ADB8-11312A35880D |
| Attr: [0] |
| 282624 819200 5 Label: "ROOT-B" |
| Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC |
| UUID: A99F4231-1EC3-C542-AC0C-DF3729F5DB07 |
| Attr: [0] |
| 16448 1 6 Label: "KERN-C" |
| Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309 |
| UUID: 81F0E336-FAC9-174D-A08C-864FE627B637 |
| Attr: [0] |
| 16449 1 7 Label: "ROOT-C" |
| Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC |
| UUID: 9E127FCA-30C1-044E-A5F2-DF74E6932692 |
| Attr: [0] |
| 86016 32768 8 Label: "OEM" |
| Type: 0FC63DAF-8483-4772-8E79-3D69D8477DE4 |
| UUID: 72986347-A37C-684F-9A19-4DBAF41C55A9 |
| Attr: [0] |
| 16450 1 9 Label: "reserved" |
| Type: 2E0A753D-9E48-43B0-8337-B15192CB1B5E |
| UUID: BA85A0A7-1850-964D-8EF8-6707AC106C3A |
| Attr: [0] |
| 16451 1 10 Label: "reserved" |
| Type: 2E0A753D-9E48-43B0-8337-B15192CB1B5E |
| UUID: 16C9EC9B-50FA-DD46-98DC-F781360817B4 |
| Attr: [0] |
| 64 16384 11 Label: "RWFW" |
| Type: CAB6E88E-ABF3-4102-A07A-D4BB9BE3C1D3 |
| UUID: BE8AECB9-4F78-7C44-8F23-5A9273B7EC8F |
| Attr: [0] |
| 249856 32768 12 Label: "EFI-SYSTEM" |
| Type: C12A7328-F81F-11D2-BA4B-00A0C93EC93B |
| UUID: 88FB7EB8-2B3F-B943-B933-EEC571FFB6E1 |
| Attr: [0] |
| 4050847 32 Sec GPT table |
| 4050879 1 Sec GPT header |
| """ |
| |
| def setUp(self) -> None: |
| self.WriteTempFile("foo", "non-empty file") |
| self._image_path = os.path.join(self.tempdir, "foo") |
| |
| def testCgpt(self) -> None: |
| """Tests that we can list all partitions with `cgpt` correctly.""" |
| self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=True) |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_CGPT) |
| partitions = image_lib.GetImageDiskPartitionInfo(self._image_path) |
| part_dict = {p.name: p for p in partitions} |
| self.assertEqual(part_dict["STATE"].start, 983564288) |
| self.assertEqual(part_dict["STATE"].size, 1073741824) |
| self.assertEqual(part_dict["STATE"].number, 1) |
| self.assertEqual(part_dict["STATE"].name, "STATE") |
| self.assertEqual(part_dict["EFI-SYSTEM"].start, 249856 * 512) |
| self.assertEqual(part_dict["EFI-SYSTEM"].size, 32768 * 512) |
| self.assertEqual(part_dict["EFI-SYSTEM"].number, 12) |
| self.assertEqual(part_dict["EFI-SYSTEM"].name, "EFI-SYSTEM") |
| self.assertEqual(12, len(partitions)) |
| |
| def testNormalPath(self) -> None: |
| self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=False) |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_PARTED) |
| partitions = image_lib.GetImageDiskPartitionInfo(self._image_path) |
| part_dict = {p.name: p for p in partitions} |
| self.assertEqual(12, len(partitions)) |
| self.assertEqual(1, part_dict["STATE"].number) |
| self.assertEqual(2097152000, part_dict["ROOT-A"].size) |
| |
| def testKeyedByNumber(self) -> None: |
| self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=False) |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_PARTED) |
| partitions = image_lib.GetImageDiskPartitionInfo(self._image_path) |
| part_dict = {p.number: p for p in partitions} |
| self.assertEqual(12, len(part_dict)) |
| self.assertEqual("STATE", part_dict[1].name) |
| self.assertEqual(2097152000, part_dict[3].size) |
| self.assertEqual("reserved", part_dict[9].name) |
| self.assertEqual("reserved", part_dict[10].name) |
| |
| def testChangeUnitInsideChroot(self) -> None: |
| self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=True) |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_CGPT) |
| partitions = image_lib.GetImageDiskPartitionInfo(self._image_path) |
| part_dict = {p.name: p for p in partitions} |
| self.assertEqual(part_dict["STATE"].start, 983564288) |
| self.assertEqual(part_dict["STATE"].size, 1073741824) |
| |
| |
| class GetImagesToBuildTests(cros_test_lib.MockTestCase): |
| """Tests the GetImagesToBuild function.""" |
| |
| def testExpectedInput(self) -> None: |
| """Pass in all expected image types and check expected image names.""" |
| # TODO(b/236161656): Fix. |
| # pylint: disable-next=consider-using-dict-items |
| for k in constants.IMAGE_TYPE_TO_NAME: |
| image = image_lib.GetImagesToBuild([k]) |
| self.assertEqual(len(image), 1) |
| self.assertTrue(constants.IMAGE_TYPE_TO_NAME[k] in image) |
| |
| def testInvalidInput(self) -> None: |
| """Pass in an invalid image type and check for ValueError.""" |
| with self.assertRaises(ValueError): |
| image_lib.GetImagesToBuild([constants.IMAGE_TYPE_DEV, "invalid"]) |
| |
| def testInvalidImageCombination(self) -> None: |
| """Verify an invalid image type combination raises a ValueError.""" |
| with self.assertRaises(ValueError): |
| image_lib.GetImagesToBuild( |
| [constants.IMAGE_TYPE_DEV, constants.FACTORY_IMAGE_BIN] |
| ) |
| |
| |
| class GetBuildImageEnvvarTests(cros_test_lib.MockTestCase): |
| """Tests the GetBuildImageEnvvars function.""" |
| |
| def setUp(self) -> None: |
| self.use_flag_mock = self.PatchObject( |
| portage_util, "GetBoardUseFlags", return_value=[] |
| ) |
| |
| def testStandardImage(self) -> None: |
| """Test with standard base/dev/test image name.""" |
| expected_envvar = { |
| "INSTALL_MASK": ( |
| "\n".join(install_mask.DEFAULT) |
| + "\n" |
| + "\n".join(install_mask.SYSTEMD) |
| ), |
| "PRISTINE_IMAGE_NAME": constants.BASE_IMAGE_BIN, |
| "BASE_PACKAGE": "virtual/target-os", |
| } |
| image_to_test = [ |
| constants.BASE_IMAGE_BIN, |
| constants.DEV_IMAGE_BIN, |
| constants.TEST_IMAGE_BIN, |
| ] |
| for image in image_to_test: |
| envar = image_lib.GetBuildImageEnvvars(set([image]), "test_board") |
| self.assertDictEqual(envar, expected_envvar) |
| |
| # Validate scenario with systemd in USE flag |
| self.use_flag_mock.return_value = ["cros_debug", "systemd"] |
| expected_envvar["INSTALL_MASK"] = "\n".join(install_mask.DEFAULT) |
| for image in image_to_test: |
| envar = image_lib.GetBuildImageEnvvars(set([image]), "test_board") |
| self.assertDictEqual(envar, expected_envvar) |
| |
| def testFactoryImage(self) -> None: |
| """Test with factory image name.""" |
| expected_envvar = { |
| "INSTALL_MASK": ( |
| "\n".join(install_mask.FACTORY_SHIM) |
| + "\n" |
| + "\n".join(install_mask.SYSTEMD) |
| ), |
| "USE": image_lib._FACTORY_SHIM_USE_FLAGS, |
| "PRISTINE_IMAGE_NAME": constants.FACTORY_IMAGE_BIN, |
| "BASE_PACKAGE": "virtual/target-os-factory-shim", |
| } |
| envar = image_lib.GetBuildImageEnvvars( |
| set([constants.FACTORY_IMAGE_BIN]), "betty" |
| ) |
| self.assertDictEqual(envar, expected_envvar) |
| |
| # Validate scenario with systemd in USE flag |
| self.use_flag_mock.return_value = ["cros_debug", "systemd"] |
| expected_envvar["INSTALL_MASK"] = "\n".join(install_mask.FACTORY_SHIM) |
| envar.clear() |
| envar = image_lib.GetBuildImageEnvvars( |
| set([constants.FACTORY_IMAGE_BIN]), "betty" |
| ) |
| print(envar) |
| self.assertDictEqual(envar, expected_envvar) |
| |
| # Validate if extra environment variable is passed |
| extra_env = { |
| "USE": "test test1", |
| "ENV": "TEST_VALUE", |
| } |
| expected_envvar["USE"] = extra_env["USE"] + " " + expected_envvar["USE"] |
| expected_envvar["ENV"] = extra_env["ENV"] |
| envar.clear() |
| envar = image_lib.GetBuildImageEnvvars( |
| set([constants.FACTORY_IMAGE_BIN]), "betty", env_var_init=extra_env |
| ) |
| self.assertDictEqual(envar, expected_envvar) |
| |
| def testChromeOSVersion(self) -> None: |
| """Test ChromeOS version environment variable.""" |
| version_info = chromeos_version.VersionInfo( |
| version_string="1.2.3", chrome_branch="4" |
| ) |
| envar = image_lib.GetBuildImageEnvvars( |
| set([constants.BASE_IMAGE_BIN]), "betty", version_info=version_info |
| ) |
| |
| self.assertEqual(envar["CHROME_BRANCH"], "4") |
| self.assertEqual(envar["CHROMEOS_BUILD"], "1") |
| self.assertEqual(envar["CHROMEOS_BRANCH"], "2") |
| self.assertEqual(envar["CHROMEOS_PATCH"], "3") |
| self.assertEqual(envar["CHROMEOS_VERSION_STRING"], "1.2.3") |
| |
| def testBuildAndOutputDir(self) -> None: |
| """Test BUILD_DIR and OUTPUT_DIR environment variable.""" |
| build_dir = "build/dir" |
| output_dir = Path("ouput/dir") |
| envar = image_lib.GetBuildImageEnvvars( |
| set([constants.BASE_IMAGE_BIN]), |
| "betty", |
| build_dir=build_dir, |
| output_dir=output_dir, |
| ) |
| |
| self.assertEqual(envar["BUILD_DIR"], build_dir) |
| self.assertEqual(envar["OUTPUT_DIR"], str(output_dir)) |
| |
| |
| class CreateBuildDirTests(cros_test_lib.MockTempDirTestCase): |
| """Test CreateBuildDir.""" |
| |
| def setUp(self) -> None: |
| self.PatchObject( |
| chromeos_version.VersionInfo, |
| "_GetDateTime", |
| return_value=FAKE_DATE_STRING, |
| ) |
| self.build_top_dir = self.tempdir / "build" |
| self.output_top_dir = self.tempdir / "output" |
| self.testBoard = "TestBoard" |
| self.version_info = chromeos_version.VersionInfo( |
| version_string="1.2.3", chrome_branch="4" |
| ) |
| self.attempt = 5 |
| self.result_build_dir = self.build_top_dir / self.testBoard |
| self.result_output_dir = self.output_top_dir / self.testBoard |
| self.image_dir = ( |
| f"R{self.version_info.chrome_branch}-" |
| + f"{self.version_info.VersionString()}" |
| ) |
| self.image_dir_attempt = self.image_dir + f"-a{self.attempt}" |
| self.image_dir_date = ( |
| f"R{self.version_info.chrome_branch}-" |
| + f"{self.version_info.VersionStringWithDateTime()}" |
| ) |
| self.image_dir_date_attempt = f"{self.image_dir_date}-a{self.attempt}" |
| self.symlink = "latest" |
| |
| def testChromeBranchVersion(self) -> None: |
| """Test with chrome_branch and version string.""" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual(build_dir, self.result_build_dir / self.image_dir) |
| self.assertExists(output_dir, self.result_output_dir / self.image_dir) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual(self.image_dir, os.readlink(symlink_dir)) |
| |
| # Now test the case where the build directory already exists. |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| replace=True, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual(build_dir, self.result_build_dir / self.image_dir) |
| self.assertExists(output_dir, self.result_output_dir / self.image_dir) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual(self.image_dir, os.readlink(symlink_dir)) |
| |
| # Now test the case where the build directory already exists with |
| # replace as false. |
| with self.assertRaises(FileExistsError): |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| ) |
| |
| def testChromeBranchVersionDate(self) -> None: |
| """Test with chrome_branch and version string with date.""" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionStringWithDateTime(), |
| self.testBoard, |
| self.symlink, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual(build_dir, self.result_build_dir / self.image_dir_date) |
| self.assertExists( |
| output_dir, self.result_output_dir / self.image_dir_date |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual(self.image_dir_date, os.readlink(symlink_dir)) |
| |
| def testBuildAttempt(self) -> None: |
| """Test with chrome_branch, version string and build attempt.""" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| build_attempt=self.attempt, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, self.result_build_dir / self.image_dir_attempt |
| ) |
| self.assertExists( |
| output_dir, self.result_output_dir / self.image_dir_attempt |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual(self.image_dir_attempt, os.readlink(symlink_dir)) |
| |
| def testBuildAttemptDate(self) -> None: |
| """Test with chrome_branch, version string, date, and build attempt.""" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionStringWithDateTime(), |
| self.testBoard, |
| self.symlink, |
| build_attempt=self.attempt, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, self.result_build_dir / self.image_dir_date_attempt |
| ) |
| self.assertExists( |
| output_dir, self.result_output_dir / self.image_dir_date_attempt |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual(self.image_dir_date_attempt, os.readlink(symlink_dir)) |
| |
| def testOutputSuffix(self) -> None: |
| """Test with output suffix.""" |
| output_suffix = "test-suffix" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| build_attempt=self.attempt, |
| output_suffix=output_suffix, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, |
| self.result_build_dir |
| / (self.image_dir_attempt + "-" + output_suffix), |
| ) |
| self.assertExists( |
| output_dir, |
| self.result_output_dir |
| / (self.image_dir_attempt + "-" + output_suffix), |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual( |
| self.image_dir_attempt + "-" + output_suffix, |
| os.readlink(symlink_dir), |
| ) |
| |
| # Test the case without build_attempt. |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionString(), |
| self.testBoard, |
| self.symlink, |
| output_suffix=output_suffix, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, |
| self.result_build_dir / (self.image_dir + "-" + output_suffix), |
| ) |
| self.assertExists( |
| output_dir, |
| self.result_output_dir / (self.image_dir + "-" + output_suffix), |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual( |
| self.image_dir + "-" + output_suffix, os.readlink(symlink_dir) |
| ) |
| |
| def testOutputSuffixWithDate(self) -> None: |
| """Test with output suffix with date.""" |
| output_suffix = "test-suffix" |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionStringWithDateTime(), |
| self.testBoard, |
| self.symlink, |
| build_attempt=self.attempt, |
| output_suffix=output_suffix, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, |
| self.result_build_dir |
| / f"{self.image_dir_date_attempt}-{output_suffix}", |
| ) |
| self.assertExists( |
| output_dir, |
| self.result_output_dir |
| / f"{self.image_dir_date_attempt}-{output_suffix}", |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual( |
| f"{self.image_dir_date_attempt}-{output_suffix}", |
| os.readlink(symlink_dir), |
| ) |
| |
| # Test the case without build_attempt. |
| build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir( |
| self.build_top_dir, |
| self.output_top_dir, |
| self.version_info.chrome_branch, |
| self.version_info.VersionStringWithDateTime(), |
| self.testBoard, |
| self.symlink, |
| output_suffix=output_suffix, |
| ) |
| |
| self.assertExists(build_dir) |
| self.assertExists(output_dir) |
| self.assertEqual( |
| build_dir, |
| self.result_build_dir / f"{self.image_dir_date}-{output_suffix}", |
| ) |
| self.assertExists( |
| output_dir, |
| self.result_output_dir / f"{self.image_dir_date}-{output_suffix}", |
| ) |
| self.assertTrue(symlink_dir.is_symlink()) |
| self.assertEqual( |
| f"{self.image_dir_date}-{output_suffix}", os.readlink(symlink_dir) |
| ) |
| |
| |
| class UtilsTests(cros_test_lib.TempDirTestCase): |
| """Test simple util funcs.""" |
| |
| def testIsSquashfsImageFails(self) -> None: |
| """Test SquashFS identification on non-images.""" |
| image = self.tempdir / "img.squashfs" |
| osutils.AllocateFile(image, 1024 * 1024) |
| self.assertFalse(image_lib.IsSquashfsImage(image)) |
| |
| def testIsSquashfsImage(self) -> None: |
| """Tests we correctly identify a SquashFS image.""" |
| image = self.tempdir / "img.squashfs" |
| root = self.tempdir / "root" |
| root.mkdir() |
| cros_build_lib.run( |
| ["mksquashfs", root.name, image.name], cwd=self.tempdir |
| ) |
| self.assertTrue(image_lib.IsSquashfsImage(image)) |
| |
| def testIsExt4Image(self) -> None: |
| """Tests we correctly identify an Ext4 image.""" |
| for ver in (2, 3, 4): |
| image = self.tempdir / f"rootfs.ext{ver}" |
| # 2 MiB is big enough for ext3/ext4 specific features. |
| osutils.AllocateFile(image, 2 * 1024 * 1024) |
| |
| # Tests failure to identify. |
| self.assertFalse(image_lib.IsExt2Image(image)) |
| |
| # Make a real ext2/ext3/ext4 images. |
| cros_build_lib.run( |
| [f"mkfs.ext{ver}", image], |
| extra_env={"PATH": "/sbin:/usr/sbin:%s" % os.environ["PATH"]}, |
| ) |
| self.assertTrue(image_lib.IsExt2Image(image)) |