blob: 5be99c6b486c840c4dae9302fd3054e93e12cd6b [file] [log] [blame]
# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the image_lib module."""
import collections
import gc
import glob
import os
from pathlib import Path
import stat
from unittest import mock
from chromite.lib import chromeos_version
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import git
from chromite.lib import image_lib
from chromite.lib import install_mask
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import portage_util
from chromite.lib import retry_util
from chromite.utils import c_loop
from chromite.utils import os_util
# pylint: disable=protected-access
class FakeException(Exception):
"""Fake exception used for testing exception handling."""
FAKE_PATH = "/imaginary/file"
LOOP_DEV = "/dev/loop9999"
LOOP_PART_COUNT = 12
LOOP_PARTITION_INFO = [
image_lib.PartitionInfo(1, 2928640, 28672, "", "STATE"),
image_lib.PartitionInfo(2, 20480, 32768, "", "KERN-A"),
image_lib.PartitionInfo(3, 286720, 2641920, "", "ROOT-A"),
image_lib.PartitionInfo(4, 53248, 32768, "", "KERN-B"),
image_lib.PartitionInfo(5, 282624, 4096, "", "ROOT-B"),
image_lib.PartitionInfo(6, 16448, 1, "", "KERN-C"),
image_lib.PartitionInfo(7, 16449, 1, "", "ROOT-C"),
image_lib.PartitionInfo(8, 86016, 32768, "", "OEM"),
image_lib.PartitionInfo(9, 16450, 1, "", "reserved"),
image_lib.PartitionInfo(10, 16451, 1, "", "reserved"),
image_lib.PartitionInfo(11, 64, 16384, "", "RWFW"),
image_lib.PartitionInfo(12, 249856, 32768, "", "EFI-SYSTEM"),
]
LOOP_PARTS_DICT = {
p.number: "%sp%d" % (LOOP_DEV, p.number) for p in LOOP_PARTITION_INFO
}
LOOP_PARTS_LIST = LOOP_PARTS_DICT.values()
FAKE_DATE_STRING = "2022_07_20_203326"
class LoopbackPartitionsMock(image_lib.LoopbackPartitions):
"""Mocked loopback partition class to use in unit tests."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.enable_rw_called = set()
self.disable_rw_called = set()
def _InitGpt(self) -> None:
"""Initialize the GPT info."""
self._gpt_table = LOOP_PARTITION_INFO
def Attach(self) -> None:
"""Initialize the loopback device."""
self.dev = LOOP_DEV
if not self.destination:
self.destination = osutils.TempDir()
self.parts = {
p.number: "%sp%s" % (self.dev, p.number) for p in self._gpt_table
}
def EnableRwMount(self, part_id, offset=0) -> None:
"""Stub out enable rw mount."""
self.enable_rw_called.add((part_id, offset))
def DisableRwMount(self, part_id, offset=0) -> None:
"""Stub out disable rw mount."""
self.disable_rw_called.add((part_id, offset))
def _Mount(self, part, mount_opts):
"""Stub out mount operations."""
dest_number, _ = self._GetMountPointAndSymlink(part)
# Don't actually even try to mount it.
self._mounted.add(part)
return dest_number
def _Unmount(self, part) -> None:
"""Stub out unmount operations."""
self._mounted.remove(part)
def close(self) -> None:
pass
class LoopbackPartitionsTest(cros_test_lib.MockTempDirTestCase):
"""Test the loopback partitions class"""
def setUp(self) -> None:
self.rc_mock = cros_test_lib.RunCommandMock()
self.StartPatcher(self.rc_mock)
self.rc_mock.SetDefaultCmdResult()
self.rc_mock.AddCmdResult(partial_mock.In("--show"), stdout=LOOP_DEV)
self.PatchObject(
image_lib,
"GetImageDiskPartitionInfo",
return_value=LOOP_PARTITION_INFO,
)
self.PatchObject(glob, "glob", return_value=LOOP_PARTS_LIST)
self.mount_mock = self.PatchObject(osutils, "MountDir")
self.umount_mock = self.PatchObject(osutils, "UmountDir")
self.retry_mock = self.PatchObject(retry_util, "RetryException")
self.addpart_mock = self.PatchObject(
image_lib.LoopbackPartitions, "_AddPartitions"
)
self.delpart_mock = self.PatchObject(
image_lib.LoopbackPartitions, "_DeletePartitions"
)
self.detach_mock = self.PatchObject(c_loop, "detach")
def fake_which(val, *_arg, **_kwargs):
return val
self.PatchObject(osutils, "Which", side_effect=fake_which)
# Patch osutils.IsRootUser() to pretend running as root so we attempt to
# do all the setup directly instead of falling back to our sudo helper.
self.PatchObject(os_util, "is_root_user", return_value=True)
def testContextManager(self) -> None:
"""Test using the loopback class as a context manager."""
with image_lib.LoopbackPartitions(FAKE_PATH) as lb:
self.rc_mock.assertCommandContains(
["losetup", "--show", "-f", FAKE_PATH]
)
self.delpart_mock.assert_called_once()
self.delpart_mock.reset_mock()
self.addpart_mock.assert_called_once()
self.detach_mock.assert_not_called()
self.assertEqual(lb.parts, LOOP_PARTS_DICT)
self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO)
self.delpart_mock.assert_called_once()
self.detach_mock.assert_called_once()
def testContextManagerWithMounts(self) -> None:
"""Test using the loopback class as a context manager with mounts."""
syml = self.PatchObject(osutils, "SafeSymlink")
part_ids = (1, "ROOT-A")
with image_lib.LoopbackPartitions(
FAKE_PATH, part_ids=part_ids, mount_opts=("ro",)
) as lb:
expected_mounts = set()
expected_calls = []
for part_id in part_ids:
for part in LOOP_PARTITION_INFO:
if part_id in (part.name, part.number):
expected_mounts.add(part)
expected_calls.append(
mock.call(
"dir-%d" % part.number,
os.path.join(
lb.destination, "dir-%s" % part.name
),
sudo=True,
)
)
break
self.rc_mock.assertCommandContains(
["losetup", "--show", "-f", FAKE_PATH]
)
self.delpart_mock.assert_called_once()
self.delpart_mock.reset_mock()
self.addpart_mock.assert_called_once()
self.detach_mock.assert_not_called()
self.assertEqual(lb.parts, LOOP_PARTS_DICT)
self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO)
self.assertEqual(expected_calls, syml.call_args_list)
self.assertEqual(expected_mounts, lb._mounted)
self.delpart_mock.assert_called_once()
self.detach_mock.assert_called_once()
def testManual(self) -> None:
"""Test using the loopback class closed manually."""
lb = image_lib.LoopbackPartitions(FAKE_PATH)
lb.Attach()
self.rc_mock.assertCommandContains(
["losetup", "--show", "-f", FAKE_PATH]
)
self.delpart_mock.assert_called_once()
self.delpart_mock.reset_mock()
self.addpart_mock.assert_called_once()
self.detach_mock.assert_not_called()
self.assertEqual(lb.parts, LOOP_PARTS_DICT)
self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO)
lb.close()
self.delpart_mock.assert_called_once()
self.detach_mock.assert_called_once()
def gcFunc(self) -> None:
"""Isolates a local variable so it'll be garbage collected."""
lb = image_lib.LoopbackPartitions(FAKE_PATH)
lb.Attach()
self.rc_mock.assertCommandContains(
["losetup", "--show", "-f", FAKE_PATH]
)
self.delpart_mock.assert_called_once()
self.delpart_mock.reset_mock()
self.addpart_mock.assert_called_once()
self.detach_mock.assert_not_called()
self.assertEqual(lb.parts, LOOP_PARTS_DICT)
self.assertEqual(lb._gpt_table, LOOP_PARTITION_INFO)
def testGarbageCollected(self) -> None:
"""Test using the loopback class closed by garbage collection."""
self.gcFunc()
# Force garbage collection in case python didn't already clean up the
# loopback object.
gc.collect()
self.delpart_mock.assert_called_once()
self.detach_mock.assert_called_once()
def testMountUnmount(self) -> None:
"""Test Mount() and Unmount() entry points."""
lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir)
lb.Attach()
# Mount four partitions.
lb.Mount((1, 3, "ROOT-B", "ROOT-C"))
for p in (1, 3, 5, 7):
self.mount_mock.assert_any_call(
"%sp%d" % (LOOP_DEV, p),
"%s/dir-%d" % (self.tempdir, p),
makedirs=True,
skip_mtab=False,
sudo=True,
mount_opts=("ro",),
)
linkname = "%s/dir-%s" % (
self.tempdir,
LOOP_PARTITION_INFO[p - 1].name,
)
self.assertTrue(stat.S_ISLNK(os.lstat(linkname).st_mode))
self.assertEqual(4, self.mount_mock.call_count)
self.umount_mock.assert_not_called()
# Check that Mounted provides the right info.
mounts = lb.Mounted()
self.assertDictEqual(
mounts,
{
"STATE": f"{self.tempdir}/dir-1",
"ROOT-A": f"{self.tempdir}/dir-3",
"ROOT-B": f"{self.tempdir}/dir-5",
"ROOT-C": f"{self.tempdir}/dir-7",
},
)
# Unmount half of them, confirm that they were unmounted.
lb.Unmount((1, "ROOT-B"))
for p in (1, 5):
self.umount_mock.assert_any_call(
"%s/dir-%d" % (self.tempdir, p), cleanup=False
)
self.assertEqual(2, self.umount_mock.call_count)
self.umount_mock.reset_mock()
# Check that Mounted has been updated
mounts = lb.Mounted()
self.assertDictEqual(
mounts,
{
"ROOT-A": f"{self.tempdir}/dir-3",
"ROOT-C": f"{self.tempdir}/dir-7",
},
)
# Close the object, so that we unmount the other half of them.
lb.close()
for p in (3, 7):
self.umount_mock.assert_any_call(
"%s/dir-%d" % (self.tempdir, p), cleanup=False
)
self.assertEqual(2, self.umount_mock.call_count)
# Verify that the directories were cleaned up.
for p in (1, 3):
self.retry_mock.assert_any_call(
cros_build_lib.RunCommandError,
60,
osutils.RmDir,
"%s/dir-%d" % (self.tempdir, p),
sudo=True,
sleep=1,
)
def testMountingMountedPartReturnsName(self) -> None:
"""Verify Mount returns the directory name even when already mounted."""
lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir)
lb.Attach()
dirname = "%s/dir-%d" % (self.tempdir, lb._gpt_table[0].number)
# First make sure we get the directory name when we actually mount.
self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",)))
# Then make sure we get it when we call it again.
self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",)))
lb.close()
def testRemountCallsMount(self) -> None:
"""Verify Mount returns the directory name even when already mounted."""
lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir)
lb.Attach()
devname = "%sp%d" % (LOOP_DEV, lb._gpt_table[0].number)
dirname = "%s/dir-%d" % (self.tempdir, lb._gpt_table[0].number)
# First make sure we get the directory name when we actually mount.
self.assertEqual(dirname, lb._Mount(lb._gpt_table[0], ("ro",)))
self.mount_mock.assert_called_once_with(
devname,
dirname,
makedirs=True,
skip_mtab=False,
sudo=True,
mount_opts=("ro",),
)
# Then make sure we get it when we call it again.
self.assertEqual(
dirname, lb._Mount(lb._gpt_table[0], ("remount", "rw"))
)
self.assertEqual(
mock.call(
devname,
dirname,
makedirs=True,
skip_mtab=False,
sudo=True,
mount_opts=("remount", "rw"),
),
self.mount_mock.call_args,
)
lb.close()
def testGetPartitionDevName(self) -> None:
"""Test GetPartitionDevName()."""
lb = image_lib.LoopbackPartitions(FAKE_PATH)
lb.Attach()
for part in LOOP_PARTITION_INFO:
self.assertEqual(
"%sp%d" % (LOOP_DEV, part.number),
lb.GetPartitionDevName(part.number),
)
if part.name != "reserved":
self.assertEqual(
"%sp%d" % (LOOP_DEV, part.number),
lb.GetPartitionDevName(part.name),
)
lb.close()
def test_GetMountPointAndSymlink(self) -> None:
"""Test _GetMountPointAndSymlink()."""
lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir)
lb.Attach()
for part in LOOP_PARTITION_INFO:
expected = [
os.path.join(lb.destination, "dir-%s" % n)
for n in (part.number, part.name)
]
self.assertEqual(expected, list(lb._GetMountPointAndSymlink(part)))
lb.close()
def testIsExt2OnVarious(self) -> None:
"""Test _IsExt2 works with the various partition types."""
# STATE, ROOT-A, and OEM generally have ext2 filesystems.
FS_PARTITIONS = (1, 3, 8)
def ext_mock(path, offset=0): # pylint: disable=unused-argument
for num in FS_PARTITIONS:
if path.endswith(f"p{num}"):
return True
return False
self.PatchObject(image_lib, "IsExt2Image", side_effect=ext_mock)
lb = image_lib.LoopbackPartitions(FAKE_PATH, destination=self.tempdir)
lb.Attach()
# We expect that only the partitions in FS_PARTITIONS are ext2.
self.assertEqual(
[part.number in FS_PARTITIONS for part in LOOP_PARTITION_INFO],
[lb._IsExt2(part.name) for part in LOOP_PARTITION_INFO],
)
lb.close()
class LsbUtilsTest(cros_test_lib.RunCommandTempDirTestCase):
"""Tests the various LSB utilities."""
def setUp(self) -> None:
# Patch osutils.IsRootUser() to pretend running as root, so
# reading/writing the lsb-release file doesn't require escalated
# privileges and the test can clean itself up correctly.
self.PatchObject(os_util, "is_root_user", return_value=True)
def testWriteLsbRelease(self) -> None:
"""Tests writing out the lsb_release file using WriteLsbRelease(..)."""
fields = collections.OrderedDict(
(
("x", "1"),
("y", "2"),
("foo", "bar"),
)
)
image_lib.WriteLsbRelease(self.tempdir, fields)
lsb_release_file = os.path.join(self.tempdir, "etc", "lsb-release")
expected_content = "x=1\ny=2\nfoo=bar\n"
self.assertFileContents(lsb_release_file, expected_content)
self.rc.assertCommandCalled(
[
"setfattr",
"-n",
"security.selinux",
"-v",
"u:object_r:cros_conf_file:s0",
os.path.join(self.tempdir, "etc/lsb-release"),
]
)
def testOverwriteLsbRelease(self) -> None:
"""Tests overwriting the lsb_release file using WriteLsbRelease(..)."""
lsb_release_file = os.path.join(self.tempdir, "etc", "lsb-release")
image_lib.WriteLsbRelease(
self.tempdir,
collections.OrderedDict(
(
("x", "1"),
("y", "2"),
("foo", "bar"),
)
),
)
# Test that WriteLsbRelease(..) correctly handles an existing file.
fields = collections.OrderedDict(
(
("newkey1", "value1"),
("newkey2", "value2"),
("a", "3"),
("b", "4"),
)
)
image_lib.WriteLsbRelease(self.tempdir, fields)
expected_content = (
"x=1\ny=2\nfoo=bar\nnewkey1=value1\nnewkey2=value2\na=3\nb=4\n"
)
self.assertFileContents(lsb_release_file, expected_content)
self.rc.assertCommandCalled(
[
"setfattr",
"-n",
"security.selinux",
"-v",
"u:object_r:cros_conf_file:s0",
os.path.join(self.tempdir, "etc/lsb-release"),
]
)
class BuildImagePathTest(cros_test_lib.MockTempDirTestCase):
"""BuildImagePath tests."""
def setUp(self) -> None:
self.board = "board"
self.board_dir = os.path.join(self.tempdir, self.board)
D = cros_test_lib.Directory
filesystem = (
D(self.board, ("recovery_image.bin", "other_image.bin")),
"full_path_image.bin",
)
cros_test_lib.CreateOnDiskHierarchy(self.tempdir, filesystem)
self.full_path = os.path.join(self.tempdir, "full_path_image.bin")
def testBuildImagePath(self) -> None:
"""BuildImagePath tests."""
self.PatchObject(
image_lib,
"GetLatestImageLink",
return_value=os.path.join(self.tempdir, self.board),
)
# Board and full image path provided.
result = image_lib.BuildImagePath(self.board, self.full_path)
self.assertEqual(self.full_path, result)
# Only full image path provided.
result = image_lib.BuildImagePath(None, self.full_path)
self.assertEqual(self.full_path, result)
# Full image path provided that does not exist.
with self.assertRaises(image_lib.ImageDoesNotExistError):
image_lib.BuildImagePath(self.board, "/does/not/exist")
with self.assertRaises(image_lib.ImageDoesNotExistError):
image_lib.BuildImagePath(None, "/does/not/exist")
# Default image is used.
result = image_lib.BuildImagePath(self.board, None)
self.assertEqual(
os.path.join(self.board_dir, "recovery_image.bin"), result
)
# Image basename provided.
result = image_lib.BuildImagePath(self.board, "other_image.bin")
self.assertEqual(
os.path.join(self.board_dir, "other_image.bin"), result
)
# Image basename provided that does not exist.
with self.assertRaises(image_lib.ImageDoesNotExistError):
image_lib.BuildImagePath(self.board, "does_not_exist.bin")
default_mock = self.PatchObject(cros_build_lib, "GetDefaultBoard")
# Nothing provided, and no default.
default_mock.return_value = None
with self.assertRaises(image_lib.ImageDoesNotExistError):
image_lib.BuildImagePath(None, None)
# Nothing provided, with default.
default_mock.return_value = "board"
result = image_lib.BuildImagePath(None, None)
self.assertEqual(
os.path.join(self.board_dir, "recovery_image.bin"), result
)
class SecurityTestConfigTest(cros_test_lib.RunCommandTempDirTestCase):
"""SecurityTestConfig class tests."""
# pylint: disable=protected-access
def setUp(self) -> None:
self.image = "/path/to/image.bin"
self.baselines = "/path/to/baselines"
self.vboot_hash = "abc123"
self.config = image_lib.SecurityTestConfig(
self.image, self.baselines, self.vboot_hash, self.tempdir
)
def testVbootCheckout(self) -> None:
"""Test normal flow - clone and checkout."""
clone_patch = self.PatchObject(git, "Clone")
self.config._VbootCheckout()
clone_patch.assert_called_once()
self.assertCommandContains(["git", "checkout", self.vboot_hash])
# Make sure it doesn't try to clone & checkout again after already
# having done so successfully.
clone_patch = self.PatchObject(git, "Clone")
self.config._VbootCheckout()
clone_patch.assert_not_called()
def testVbootCheckoutError(self) -> None:
"""Test exceptions in a git command."""
rce = cros_build_lib.RunCommandError("error")
self.PatchObject(git, "Clone", side_effect=rce)
with self.assertRaises(image_lib.VbootCheckoutError):
self.config._VbootCheckout()
def testVbootCheckoutNoDirectory(self) -> None:
"""Test the error handling when the directory does not exist."""
# Test directory that does not exist.
self.config.directory = "/DOES/NOT/EXIST"
with self.assertRaises(image_lib.SecurityConfigDirectoryError):
self.config._VbootCheckout()
def testRunCheck(self) -> None:
"""RunCheck tests."""
# No config argument when running check.
self.config.RunCheck("check1", False)
check1 = os.path.join(self.config._checks_dir, "ensure_check1.sh")
config1 = os.path.join(self.baselines, "ensure_check1.config")
self.assertCommandContains([check1, self.image])
self.assertCommandContains([config1], expected=False)
# Include config argument when running check.
self.config.RunCheck("check2", True)
check2 = os.path.join(self.config._checks_dir, "ensure_check2.sh")
config2 = os.path.join(self.baselines, "ensure_check2.config")
self.assertCommandContains([check2, self.image, config2])
class GetImageDiskPartitionInfoTests(cros_test_lib.RunCommandTempDirTestCase):
"""Tests the GetImageDiskPartitionInfo function."""
SAMPLE_PARTED = """/foo/chromiumos_qemu_image.bin:\
2271240192B:file:512:512:gpt::;
11:32768B:8421375B:8388608B::RWFW:;
6:8421376B:8421887B:512B::KERN-C:;
7:8421888B:8422399B:512B::ROOT-C:;
9:8422400B:8422911B:512B::reserved:;
10:8422912B:8423423B:512B::reserved:;
2:10485760B:27262975B:16777216B::KERN-A:;
4:27262976B:44040191B:16777216B::KERN-B:;
8:44040192B:60817407B:16777216B:ext4:OEM:msftdata;
12:127926272B:161480703B:33554432B:fat16:EFI-SYSTEM:boot, esp;
5:161480704B:163577855B:2097152B::ROOT-B:;
3:163577856B:2260729855B:2097152000B:ext2:ROOT-A:;
1:2260729856B:2271215615B:10485760B:ext2:STATE:msftdata;
"""
SAMPLE_CGPT = """\
start size part contents
0 1 PMBR (Boot GUID: 88FB7EB8-2B3F-B943-B933-\
EEC571FFB6E1)
1 1 Pri GPT header
2 32 Pri GPT table
1921024 2097152 1 Label: "STATE"
Type: 0FC63DAF-8483-4772-8E79-3D69D8477DE4
UUID: EEBD83BE-397E-BD44-878B-0DDDD5A5C510
Attr: [0]
20480 32768 2 Label: "KERN-A"
Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309
UUID: 7007C2F3-08E5-AB40-A4BC-FF5B01F5460D
Attr: [101]
1101824 819200 3 Label: "ROOT-A"
Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC
UUID: F4C5C3AD-027F-894B-80CD-3DEC57932948
Attr: [0]
53248 32768 4 Label: "KERN-B"
Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309
UUID: C85FB478-404C-8741-ADB8-11312A35880D
Attr: [0]
282624 819200 5 Label: "ROOT-B"
Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC
UUID: A99F4231-1EC3-C542-AC0C-DF3729F5DB07
Attr: [0]
16448 1 6 Label: "KERN-C"
Type: FE3A2A5D-4F32-41A7-B725-ACCC3285A309
UUID: 81F0E336-FAC9-174D-A08C-864FE627B637
Attr: [0]
16449 1 7 Label: "ROOT-C"
Type: 3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC
UUID: 9E127FCA-30C1-044E-A5F2-DF74E6932692
Attr: [0]
86016 32768 8 Label: "OEM"
Type: 0FC63DAF-8483-4772-8E79-3D69D8477DE4
UUID: 72986347-A37C-684F-9A19-4DBAF41C55A9
Attr: [0]
16450 1 9 Label: "reserved"
Type: 2E0A753D-9E48-43B0-8337-B15192CB1B5E
UUID: BA85A0A7-1850-964D-8EF8-6707AC106C3A
Attr: [0]
16451 1 10 Label: "reserved"
Type: 2E0A753D-9E48-43B0-8337-B15192CB1B5E
UUID: 16C9EC9B-50FA-DD46-98DC-F781360817B4
Attr: [0]
64 16384 11 Label: "RWFW"
Type: CAB6E88E-ABF3-4102-A07A-D4BB9BE3C1D3
UUID: BE8AECB9-4F78-7C44-8F23-5A9273B7EC8F
Attr: [0]
249856 32768 12 Label: "EFI-SYSTEM"
Type: C12A7328-F81F-11D2-BA4B-00A0C93EC93B
UUID: 88FB7EB8-2B3F-B943-B933-EEC571FFB6E1
Attr: [0]
4050847 32 Sec GPT table
4050879 1 Sec GPT header
"""
def setUp(self) -> None:
self.WriteTempFile("foo", "non-empty file")
self._image_path = os.path.join(self.tempdir, "foo")
def testCgpt(self) -> None:
"""Tests that we can list all partitions with `cgpt` correctly."""
self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_CGPT)
partitions = image_lib.GetImageDiskPartitionInfo(self._image_path)
part_dict = {p.name: p for p in partitions}
self.assertEqual(part_dict["STATE"].start, 983564288)
self.assertEqual(part_dict["STATE"].size, 1073741824)
self.assertEqual(part_dict["STATE"].number, 1)
self.assertEqual(part_dict["STATE"].name, "STATE")
self.assertEqual(part_dict["EFI-SYSTEM"].start, 249856 * 512)
self.assertEqual(part_dict["EFI-SYSTEM"].size, 32768 * 512)
self.assertEqual(part_dict["EFI-SYSTEM"].number, 12)
self.assertEqual(part_dict["EFI-SYSTEM"].name, "EFI-SYSTEM")
self.assertEqual(12, len(partitions))
def testNormalPath(self) -> None:
self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_PARTED)
partitions = image_lib.GetImageDiskPartitionInfo(self._image_path)
part_dict = {p.name: p for p in partitions}
self.assertEqual(12, len(partitions))
self.assertEqual(1, part_dict["STATE"].number)
self.assertEqual(2097152000, part_dict["ROOT-A"].size)
def testKeyedByNumber(self) -> None:
self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_PARTED)
partitions = image_lib.GetImageDiskPartitionInfo(self._image_path)
part_dict = {p.number: p for p in partitions}
self.assertEqual(12, len(part_dict))
self.assertEqual("STATE", part_dict[1].name)
self.assertEqual(2097152000, part_dict[3].size)
self.assertEqual("reserved", part_dict[9].name)
self.assertEqual("reserved", part_dict[10].name)
def testChangeUnitInsideChroot(self) -> None:
self.PatchObject(cros_build_lib, "IsInsideChroot", return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.SAMPLE_CGPT)
partitions = image_lib.GetImageDiskPartitionInfo(self._image_path)
part_dict = {p.name: p for p in partitions}
self.assertEqual(part_dict["STATE"].start, 983564288)
self.assertEqual(part_dict["STATE"].size, 1073741824)
class GetImagesToBuildTests(cros_test_lib.MockTestCase):
"""Tests the GetImagesToBuild function."""
def testExpectedInput(self) -> None:
"""Pass in all expected image types and check expected image names."""
# TODO(b/236161656): Fix.
# pylint: disable-next=consider-using-dict-items
for k in constants.IMAGE_TYPE_TO_NAME:
image = image_lib.GetImagesToBuild([k])
self.assertEqual(len(image), 1)
self.assertTrue(constants.IMAGE_TYPE_TO_NAME[k] in image)
def testInvalidInput(self) -> None:
"""Pass in an invalid image type and check for ValueError."""
with self.assertRaises(ValueError):
image_lib.GetImagesToBuild([constants.IMAGE_TYPE_DEV, "invalid"])
def testInvalidImageCombination(self) -> None:
"""Verify an invalid image type combination raises a ValueError."""
with self.assertRaises(ValueError):
image_lib.GetImagesToBuild(
[constants.IMAGE_TYPE_DEV, constants.FACTORY_IMAGE_BIN]
)
class GetBuildImageEnvvarTests(cros_test_lib.MockTestCase):
"""Tests the GetBuildImageEnvvars function."""
def setUp(self) -> None:
self.use_flag_mock = self.PatchObject(
portage_util, "GetBoardUseFlags", return_value=[]
)
def testStandardImage(self) -> None:
"""Test with standard base/dev/test image name."""
expected_envvar = {
"INSTALL_MASK": (
"\n".join(install_mask.DEFAULT)
+ "\n"
+ "\n".join(install_mask.SYSTEMD)
),
"PRISTINE_IMAGE_NAME": constants.BASE_IMAGE_BIN,
"BASE_PACKAGE": "virtual/target-os",
}
image_to_test = [
constants.BASE_IMAGE_BIN,
constants.DEV_IMAGE_BIN,
constants.TEST_IMAGE_BIN,
]
for image in image_to_test:
envar = image_lib.GetBuildImageEnvvars(set([image]), "test_board")
self.assertDictEqual(envar, expected_envvar)
# Validate scenario with systemd in USE flag
self.use_flag_mock.return_value = ["cros_debug", "systemd"]
expected_envvar["INSTALL_MASK"] = "\n".join(install_mask.DEFAULT)
for image in image_to_test:
envar = image_lib.GetBuildImageEnvvars(set([image]), "test_board")
self.assertDictEqual(envar, expected_envvar)
def testFactoryImage(self) -> None:
"""Test with factory image name."""
expected_envvar = {
"INSTALL_MASK": (
"\n".join(install_mask.FACTORY_SHIM)
+ "\n"
+ "\n".join(install_mask.SYSTEMD)
),
"USE": image_lib._FACTORY_SHIM_USE_FLAGS,
"PRISTINE_IMAGE_NAME": constants.FACTORY_IMAGE_BIN,
"BASE_PACKAGE": "virtual/target-os-factory-shim",
}
envar = image_lib.GetBuildImageEnvvars(
set([constants.FACTORY_IMAGE_BIN]), "betty"
)
self.assertDictEqual(envar, expected_envvar)
# Validate scenario with systemd in USE flag
self.use_flag_mock.return_value = ["cros_debug", "systemd"]
expected_envvar["INSTALL_MASK"] = "\n".join(install_mask.FACTORY_SHIM)
envar.clear()
envar = image_lib.GetBuildImageEnvvars(
set([constants.FACTORY_IMAGE_BIN]), "betty"
)
print(envar)
self.assertDictEqual(envar, expected_envvar)
# Validate if extra environment variable is passed
extra_env = {
"USE": "test test1",
"ENV": "TEST_VALUE",
}
expected_envvar["USE"] = extra_env["USE"] + " " + expected_envvar["USE"]
expected_envvar["ENV"] = extra_env["ENV"]
envar.clear()
envar = image_lib.GetBuildImageEnvvars(
set([constants.FACTORY_IMAGE_BIN]), "betty", env_var_init=extra_env
)
self.assertDictEqual(envar, expected_envvar)
def testChromeOSVersion(self) -> None:
"""Test ChromeOS version environment variable."""
version_info = chromeos_version.VersionInfo(
version_string="1.2.3", chrome_branch="4"
)
envar = image_lib.GetBuildImageEnvvars(
set([constants.BASE_IMAGE_BIN]), "betty", version_info=version_info
)
self.assertEqual(envar["CHROME_BRANCH"], "4")
self.assertEqual(envar["CHROMEOS_BUILD"], "1")
self.assertEqual(envar["CHROMEOS_BRANCH"], "2")
self.assertEqual(envar["CHROMEOS_PATCH"], "3")
self.assertEqual(envar["CHROMEOS_VERSION_STRING"], "1.2.3")
def testBuildAndOutputDir(self) -> None:
"""Test BUILD_DIR and OUTPUT_DIR environment variable."""
build_dir = "build/dir"
output_dir = Path("ouput/dir")
envar = image_lib.GetBuildImageEnvvars(
set([constants.BASE_IMAGE_BIN]),
"betty",
build_dir=build_dir,
output_dir=output_dir,
)
self.assertEqual(envar["BUILD_DIR"], build_dir)
self.assertEqual(envar["OUTPUT_DIR"], str(output_dir))
class CreateBuildDirTests(cros_test_lib.MockTempDirTestCase):
"""Test CreateBuildDir."""
def setUp(self) -> None:
self.PatchObject(
chromeos_version.VersionInfo,
"_GetDateTime",
return_value=FAKE_DATE_STRING,
)
self.build_top_dir = self.tempdir / "build"
self.output_top_dir = self.tempdir / "output"
self.testBoard = "TestBoard"
self.version_info = chromeos_version.VersionInfo(
version_string="1.2.3", chrome_branch="4"
)
self.attempt = 5
self.result_build_dir = self.build_top_dir / self.testBoard
self.result_output_dir = self.output_top_dir / self.testBoard
self.image_dir = (
f"R{self.version_info.chrome_branch}-"
+ f"{self.version_info.VersionString()}"
)
self.image_dir_attempt = self.image_dir + f"-a{self.attempt}"
self.image_dir_date = (
f"R{self.version_info.chrome_branch}-"
+ f"{self.version_info.VersionStringWithDateTime()}"
)
self.image_dir_date_attempt = f"{self.image_dir_date}-a{self.attempt}"
self.symlink = "latest"
def testChromeBranchVersion(self) -> None:
"""Test with chrome_branch and version string."""
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(build_dir, self.result_build_dir / self.image_dir)
self.assertExists(output_dir, self.result_output_dir / self.image_dir)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(self.image_dir, os.readlink(symlink_dir))
# Now test the case where the build directory already exists.
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
replace=True,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(build_dir, self.result_build_dir / self.image_dir)
self.assertExists(output_dir, self.result_output_dir / self.image_dir)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(self.image_dir, os.readlink(symlink_dir))
# Now test the case where the build directory already exists with
# replace as false.
with self.assertRaises(FileExistsError):
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
)
def testChromeBranchVersionDate(self) -> None:
"""Test with chrome_branch and version string with date."""
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionStringWithDateTime(),
self.testBoard,
self.symlink,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(build_dir, self.result_build_dir / self.image_dir_date)
self.assertExists(
output_dir, self.result_output_dir / self.image_dir_date
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(self.image_dir_date, os.readlink(symlink_dir))
def testBuildAttempt(self) -> None:
"""Test with chrome_branch, version string and build attempt."""
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
build_attempt=self.attempt,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir, self.result_build_dir / self.image_dir_attempt
)
self.assertExists(
output_dir, self.result_output_dir / self.image_dir_attempt
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(self.image_dir_attempt, os.readlink(symlink_dir))
def testBuildAttemptDate(self) -> None:
"""Test with chrome_branch, version string, date, and build attempt."""
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionStringWithDateTime(),
self.testBoard,
self.symlink,
build_attempt=self.attempt,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir, self.result_build_dir / self.image_dir_date_attempt
)
self.assertExists(
output_dir, self.result_output_dir / self.image_dir_date_attempt
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(self.image_dir_date_attempt, os.readlink(symlink_dir))
def testOutputSuffix(self) -> None:
"""Test with output suffix."""
output_suffix = "test-suffix"
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
build_attempt=self.attempt,
output_suffix=output_suffix,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir,
self.result_build_dir
/ (self.image_dir_attempt + "-" + output_suffix),
)
self.assertExists(
output_dir,
self.result_output_dir
/ (self.image_dir_attempt + "-" + output_suffix),
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(
self.image_dir_attempt + "-" + output_suffix,
os.readlink(symlink_dir),
)
# Test the case without build_attempt.
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionString(),
self.testBoard,
self.symlink,
output_suffix=output_suffix,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir,
self.result_build_dir / (self.image_dir + "-" + output_suffix),
)
self.assertExists(
output_dir,
self.result_output_dir / (self.image_dir + "-" + output_suffix),
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(
self.image_dir + "-" + output_suffix, os.readlink(symlink_dir)
)
def testOutputSuffixWithDate(self) -> None:
"""Test with output suffix with date."""
output_suffix = "test-suffix"
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionStringWithDateTime(),
self.testBoard,
self.symlink,
build_attempt=self.attempt,
output_suffix=output_suffix,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir,
self.result_build_dir
/ f"{self.image_dir_date_attempt}-{output_suffix}",
)
self.assertExists(
output_dir,
self.result_output_dir
/ f"{self.image_dir_date_attempt}-{output_suffix}",
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(
f"{self.image_dir_date_attempt}-{output_suffix}",
os.readlink(symlink_dir),
)
# Test the case without build_attempt.
build_dir, output_dir, symlink_dir = image_lib.CreateBuildDir(
self.build_top_dir,
self.output_top_dir,
self.version_info.chrome_branch,
self.version_info.VersionStringWithDateTime(),
self.testBoard,
self.symlink,
output_suffix=output_suffix,
)
self.assertExists(build_dir)
self.assertExists(output_dir)
self.assertEqual(
build_dir,
self.result_build_dir / f"{self.image_dir_date}-{output_suffix}",
)
self.assertExists(
output_dir,
self.result_output_dir / f"{self.image_dir_date}-{output_suffix}",
)
self.assertTrue(symlink_dir.is_symlink())
self.assertEqual(
f"{self.image_dir_date}-{output_suffix}", os.readlink(symlink_dir)
)
class UtilsTests(cros_test_lib.TempDirTestCase):
"""Test simple util funcs."""
def testIsSquashfsImageFails(self) -> None:
"""Test SquashFS identification on non-images."""
image = self.tempdir / "img.squashfs"
osutils.AllocateFile(image, 1024 * 1024)
self.assertFalse(image_lib.IsSquashfsImage(image))
def testIsSquashfsImage(self) -> None:
"""Tests we correctly identify a SquashFS image."""
image = self.tempdir / "img.squashfs"
root = self.tempdir / "root"
root.mkdir()
cros_build_lib.run(
["mksquashfs", root.name, image.name], cwd=self.tempdir
)
self.assertTrue(image_lib.IsSquashfsImage(image))
def testIsExt4Image(self) -> None:
"""Tests we correctly identify an Ext4 image."""
for ver in (2, 3, 4):
image = self.tempdir / f"rootfs.ext{ver}"
# 2 MiB is big enough for ext3/ext4 specific features.
osutils.AllocateFile(image, 2 * 1024 * 1024)
# Tests failure to identify.
self.assertFalse(image_lib.IsExt2Image(image))
# Make a real ext2/ext3/ext4 images.
cros_build_lib.run(
[f"mkfs.ext{ver}", image],
extra_env={"PATH": "/sbin:/usr/sbin:%s" % os.environ["PATH"]},
)
self.assertTrue(image_lib.IsExt2Image(image))