| # -*- coding: utf-8 -*- |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Test the cros_sdk_lib module.""" |
| |
| from __future__ import print_function |
| |
| import os |
| import sys |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_sdk_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| |
| |
| assert sys.version_info >= (3, 6), 'This module requires Python 3.6+' |
| |
| |
| class VersionHookTestCase(cros_test_lib.TempDirTestCase): |
| """Class to set up tests that use the version hooks.""" |
| |
| def setUp(self): |
| # Build set of expected scripts. |
| D = cros_test_lib.Directory |
| filesystem = ( |
| D('hooks', ( |
| '8_invalid_gap', |
| '10_run_success', |
| '11_run_success', |
| '12_run_success', |
| )), |
| 'version_file', |
| ) |
| cros_test_lib.CreateOnDiskHierarchy(self.tempdir, filesystem) |
| |
| self.chroot_path = os.path.join(self.tempdir, 'chroot') |
| self.version_file = os.path.join( |
| self.chroot_path, cros_sdk_lib.CHROOT_VERSION_FILE.lstrip(os.sep)) |
| osutils.WriteFile(self.version_file, '0', makedirs=True) |
| self.hooks_dir = os.path.join(self.tempdir, 'hooks') |
| |
| self.earliest_version = 8 |
| self.latest_version = 12 |
| self.deprecated_versions = (6, 7, 8) |
| self.invalid_versions = (13,) |
| self.success_versions = (9, 10, 11, 12) |
| |
| |
| class TestGetFileSystemDebug(cros_test_lib.RunCommandTestCase): |
| """Tests GetFileSystemDebug functionality.""" |
| |
| def testNoPs(self): |
| """Verify with run_ps=False.""" |
| self.rc.AddCmdResult( |
| ['sudo', '--', 'fuser', '/some/path'], stdout='fuser_output') |
| self.rc.AddCmdResult( |
| ['sudo', '--', 'lsof', '/some/path'], stdout='lsof_output') |
| file_system_debug_tuple = cros_sdk_lib.GetFileSystemDebug( |
| '/some/path', run_ps=False) |
| self.assertEqual(file_system_debug_tuple.fuser, 'fuser_output') |
| self.assertEqual(file_system_debug_tuple.lsof, 'lsof_output') |
| self.assertIsNone(file_system_debug_tuple.ps) |
| |
| def testWithPs(self): |
| """Verify with run_ps=False.""" |
| self.rc.AddCmdResult( |
| ['sudo', '--', 'fuser', '/some/path'], stdout='fuser_output') |
| self.rc.AddCmdResult( |
| ['sudo', '--', 'lsof', '/some/path'], stdout='lsof_output') |
| self.rc.AddCmdResult( |
| ['ps', 'auxf'], stdout='ps_output') |
| file_system_debug_tuple = cros_sdk_lib.GetFileSystemDebug( |
| '/some/path', run_ps=True) |
| self.assertEqual(file_system_debug_tuple.fuser, 'fuser_output') |
| self.assertEqual(file_system_debug_tuple.lsof, 'lsof_output') |
| self.assertEqual(file_system_debug_tuple.ps, 'ps_output') |
| |
| |
| class TestGetChrootVersion(cros_test_lib.MockTestCase): |
| """Tests GetChrootVersion functionality.""" |
| |
| def testNoChroot(self): |
| """Verify we don't blow up when there is no chroot yet.""" |
| self.PatchObject(cros_sdk_lib.ChrootUpdater, 'GetVersion', |
| side_effect=IOError()) |
| self.assertIsNone(cros_sdk_lib.GetChrootVersion('/.$om3/place/nowhere')) |
| |
| |
| class TestChrootVersionValid(VersionHookTestCase): |
| """Test valid chroot version method.""" |
| |
| def testLowerVersionValid(self): |
| """Lower versions are considered valid.""" |
| osutils.WriteFile(self.version_file, str(self.latest_version - 1)) |
| self.assertTrue( |
| cros_sdk_lib.IsChrootVersionValid(self.chroot_path, self.hooks_dir)) |
| |
| def testLatestVersionValid(self): |
| """Test latest version.""" |
| osutils.WriteFile(self.version_file, str(self.latest_version)) |
| self.assertTrue( |
| cros_sdk_lib.IsChrootVersionValid(self.chroot_path, self.hooks_dir)) |
| |
| def testInvalidVersion(self): |
| """Test version higher than latest.""" |
| osutils.WriteFile(self.version_file, str(self.latest_version + 1)) |
| self.assertFalse( |
| cros_sdk_lib.IsChrootVersionValid(self.chroot_path, self.hooks_dir)) |
| |
| |
| class TestLatestChrootVersion(VersionHookTestCase): |
| """LatestChrootVersion tests.""" |
| |
| def testLatest(self): |
| """Test latest version.""" |
| self.assertEqual(self.latest_version, |
| cros_sdk_lib.LatestChrootVersion(self.hooks_dir)) |
| |
| |
| class TestEarliestChrootVersion(VersionHookTestCase): |
| """EarliestChrootVersion tests.""" |
| |
| def testEarliest(self): |
| """Test earliest version.""" |
| self.assertEqual(self.earliest_version, |
| cros_sdk_lib.EarliestChrootVersion(self.hooks_dir)) |
| |
| |
| class TestIsChrootReady(cros_test_lib.MockTestCase): |
| """Tests IsChrootReady functionality.""" |
| |
| def setUp(self): |
| self.version_mock = self.PatchObject(cros_sdk_lib, 'GetChrootVersion') |
| |
| def testMissing(self): |
| """Check behavior w/out a chroot.""" |
| self.version_mock.return_value = None |
| self.assertFalse(cros_sdk_lib.IsChrootReady('/')) |
| |
| def testNotSetup(self): |
| """Check behavior w/an existing uninitialized chroot.""" |
| self.version_mock.return_value = 0 |
| self.assertFalse(cros_sdk_lib.IsChrootReady('/')) |
| |
| def testUpToDate(self): |
| """Check behavior w/a valid chroot.""" |
| self.version_mock.return_value = 123 |
| self.assertTrue(cros_sdk_lib.IsChrootReady('/')) |
| |
| |
| class TestFindVolumeGroupForDevice(cros_test_lib.MockTempDirTestCase): |
| """Tests the FindVolumeGroupForDevice function.""" |
| |
| def testExistingDevice(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| test_vg\t/dev/loop1 |
| wrong_vg2\t/dev/loop0 |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '/dev/loop1') |
| self.assertEqual(vg, 'test_vg') |
| |
| def testNoMatchingVolumeGroup(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| wrong_vg2\t/dev/loop0 |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '') |
| self.assertEqual(vg, 'cros_chroot_000') |
| |
| def testPhysicalVolumeWithoutVolumeGroup(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| \t/dev/loop0 |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '/dev/loop0') |
| self.assertEqual(vg, 'cros_chroot_000') |
| |
| def testMatchingVolumeGroup(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| cros_chroot_000\t/dev/loop1 |
| wrong_vg2\t/dev/loop0 |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '') |
| self.assertEqual(vg, 'cros_chroot_001') |
| |
| def testTooManyVolumeGroups(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| %s |
| wrong_vg2\t/dev/loop0 |
| """ % '\n'.join([' cros_chroot_%03d\t/dev/any' % i for i in range(1000)])) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '') |
| self.assertIsNone(vg) |
| |
| def testInvalidChars(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| wrong_vg1\t/dev/sda1 |
| cros_chroot_000\t/dev/loop1 |
| wrong_vg2\t/dev/loop0 |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice( |
| '//full path /to& "my" /chroot', '') |
| self.assertEqual(vg, 'cros_full+path++to+++my+++chroot_000') |
| |
| def testInvalidLines(self): |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.SetDefaultCmdResult(output=""" |
| \t/dev/sda1 |
| |
| wrong_vg2\t/dev/loop0\t |
| """) |
| vg = cros_sdk_lib.FindVolumeGroupForDevice('/chroot', '') |
| self.assertEqual(vg, 'cros_chroot_000') |
| |
| |
| class TestMountChroot(cros_test_lib.MockTempDirTestCase): |
| """Tests various partial setups for MountChroot.""" |
| |
| _VGS_LOOKUP = ['sudo', '--', 'vgs', partial_mock.Ignore()] |
| _VGCREATE = ['sudo', '--', 'vgcreate', '-q', partial_mock.Ignore(), |
| partial_mock.Ignore()] |
| _VGCHANGE = ['sudo', '--', 'vgchange', '-q', '-ay', partial_mock.Ignore()] |
| _LVS_LOOKUP = ['sudo', '--', 'lvs', partial_mock.Ignore()] |
| _LVCREATE = ['sudo', '--', 'lvcreate', '-q', '-L499G', '-T', |
| partial_mock.Ignore(), '-V500G', '-n', partial_mock.Ignore()] |
| _MKE2FS = ['sudo', '--', 'mke2fs', '-q', '-m', '0', '-t', 'ext4', |
| partial_mock.Ignore()] |
| _MOUNT = [] # Set correctly in setUp. |
| _LVM_FAILURE_CODE = 5 # Shell exit code when lvm commands fail. |
| _LVM_SUCCESS_CODE = 0 # Shell exit code when lvm commands succeed. |
| |
| def _makeImageFile(self, chroot_img): |
| with open(chroot_img, 'w') as f: |
| f.seek(2**30) |
| f.write('\0') |
| |
| def _mockFindVolumeGroupForDevice(self): |
| m = self.PatchObject(cros_sdk_lib, 'FindVolumeGroupForDevice') |
| m.return_value = 'cros_test_chroot_000' |
| return m |
| |
| def _mockAttachDeviceToFile(self, loop_dev='loop0'): |
| m = self.PatchObject(cros_sdk_lib, '_AttachDeviceToFile') |
| m.return_value = '/dev/%s' % loop_dev |
| return m |
| |
| def _mockDeviceFromFile(self, dev): |
| m = self.PatchObject(cros_sdk_lib, '_DeviceFromFile') |
| m.return_value = dev |
| return m |
| |
| def setUp(self): |
| self.chroot_path = os.path.join(self.tempdir, 'chroot') |
| osutils.SafeMakedirsNonRoot(self.chroot_path) |
| self.chroot_img = self.chroot_path + '.img' |
| |
| self._MOUNT = ['sudo', '--', 'mount', '-text4', '-onoatime', |
| partial_mock.Ignore(), self.chroot_path] |
| |
| def testFromScratch(self): |
| # Create the whole setup from nothing. |
| # Should call losetup, vgs, vgcreate, lvs, lvcreate, mke2fs, mount |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockAttachDeviceToFile() |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._VGCREATE, output='') |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._LVCREATE) |
| rc_mock.AddCmdResult(self._MKE2FS) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop0') |
| m2.assert_called_with(self.chroot_img) |
| |
| def testMissingMount(self): |
| # Re-mount an image that has a loopback and VG active but isn't mounted. |
| # This can happen if the person unmounts the chroot or calls |
| # osutils.UmountTree() on the path. |
| # Should call losetup, vgchange, lvs, mount |
| self._makeImageFile(self.chroot_img) |
| |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockDeviceFromFile('/dev/loop1') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._VGCHANGE) |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop1') |
| m2.assert_called_with(self.chroot_img) |
| |
| def testImageAfterReboot(self): |
| # Re-mount an image that has everything setup, but doesn't have anything |
| # attached, e.g. after reboot. |
| # Should call losetup -j, losetup -f, vgs, vgchange, lvs, lvchange, mount |
| self._makeImageFile(self.chroot_img) |
| |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockDeviceFromFile('') |
| m3 = self._mockAttachDeviceToFile('loop1') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._VGCHANGE) |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop1') |
| m2.assert_called_with(self.chroot_img) |
| m3.assert_called_with(self.chroot_img) |
| |
| def testImagePresentNotSetup(self): |
| # Mount an image that is present but doesn't have anything set up. This |
| # can't arise in normal usage, but could happen if cros_sdk crashes in the |
| # middle of setup. |
| # Should call losetup -j, losetup -f, vgs, vgcreate, lvs, lvcreate, mke2fs, |
| # mount |
| self._makeImageFile(self.chroot_img) |
| |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockAttachDeviceToFile() |
| m3 = self._mockDeviceFromFile('') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._VGCREATE) |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._LVCREATE) |
| rc_mock.AddCmdResult(self._MKE2FS) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop0') |
| m2.assert_called_with(self.chroot_img) |
| m3.assert_called_with(self.chroot_img) |
| |
| def testImagePresentOnlyLoopbackSetup(self): |
| # Mount an image that is present and attached to a loopback device, but |
| # doesn't have anything else set up. This can't arise in normal usage, but |
| # could happen if cros_sdk crashes in the middle of setup. |
| # Should call losetup, vgs, vgcreate, lvs, lvcreate, mke2fs, mount |
| self._makeImageFile(self.chroot_img) |
| |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockDeviceFromFile('/dev/loop0') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._VGCREATE) |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._LVCREATE) |
| rc_mock.AddCmdResult(self._MKE2FS) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop0') |
| m2.assert_called_with(self.chroot_img) |
| |
| def testImagePresentOnlyVgSetup(self): |
| # Mount an image that is present, attached to a loopback device, and has a |
| # VG, but doesn't have anything else set up. This can't arise in normal |
| # usage, but could happen if cros_sdk crashes in the middle of setup. |
| # Should call losetup, vgs, vgchange, lvs, lvcreate, mke2fs, mount |
| self._makeImageFile(self.chroot_img) |
| |
| m = self._mockFindVolumeGroupForDevice() |
| m2 = self._mockDeviceFromFile('/dev/loop0') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_LOOKUP, returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._VGCHANGE) |
| rc_mock.AddCmdResult(self._LVS_LOOKUP, returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._LVCREATE) |
| rc_mock.AddCmdResult(self._MKE2FS) |
| rc_mock.AddCmdResult(self._MOUNT) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path) |
| self.assertTrue(success) |
| |
| m.assert_called_with(self.chroot_path, '/dev/loop0') |
| m2.assert_called_with(self.chroot_img) |
| |
| def testMissingNoCreate(self): |
| # Chroot image isn't present, but create is False. |
| # Should return False without running any commands. |
| success = cros_sdk_lib.MountChroot(self.chroot_path, create=False) |
| self.assertFalse(success) |
| |
| def testExistingChroot(self): |
| # Chroot version file exists in the chroot. |
| # Should return True without running any commands. |
| osutils.Touch(os.path.join(self.chroot_path, 'etc', 'cros_chroot_version'), |
| makedirs=True) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path, create=False) |
| self.assertTrue(success) |
| |
| success = cros_sdk_lib.MountChroot(self.chroot_path, create=True) |
| self.assertTrue(success) |
| |
| def testEmptyChroot(self): |
| # Chroot mounted from proper image but without the version file present, |
| # e.g. if cros_sdk fails in the middle of populating the chroot. |
| # Should return True without running any commands. |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/mapper/cros_test_000-chroot %s ext4 rw 0 0\n' % |
| self.chroot_path) |
| |
| success = cros_sdk_lib.MountChroot( |
| self.chroot_path, create=False, proc_mounts=proc_mounts) |
| self.assertTrue(success) |
| |
| success = cros_sdk_lib.MountChroot( |
| self.chroot_path, create=True, proc_mounts=proc_mounts) |
| self.assertTrue(success) |
| |
| def testBadMount(self): |
| # Chroot with something else mounted on it. |
| # Should return False without running any commands. |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/sda1 %s ext4 rw 0 0\n' % self.chroot_path) |
| |
| success = cros_sdk_lib.MountChroot( |
| self.chroot_path, create=False, proc_mounts=proc_mounts) |
| self.assertFalse(success) |
| |
| success = cros_sdk_lib.MountChroot( |
| self.chroot_path, create=True, proc_mounts=proc_mounts) |
| self.assertFalse(success) |
| |
| |
| class TestFindChrootMountSource(cros_test_lib.MockTempDirTestCase): |
| """Tests the FindChrootMountSource function.""" |
| def testNoMatchingMount(self): |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('sysfs /sys sysfs rw 0 0\n') |
| |
| vg, lv = cros_sdk_lib.FindChrootMountSource('/chroot', |
| proc_mounts=proc_mounts) |
| self.assertIsNone(vg) |
| self.assertIsNone(lv) |
| |
| def testMatchWrongName(self): |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/sda1 /chroot ext4 rw 0 0\n') |
| |
| vg, lv = cros_sdk_lib.FindChrootMountSource('/chroot', |
| proc_mounts=proc_mounts) |
| self.assertIsNone(vg) |
| self.assertIsNone(lv) |
| |
| def testMatchRightName(self): |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/mapper/cros_vg_name-lv_name /chroot ext4 rw 0 0\n') |
| |
| vg, lv = cros_sdk_lib.FindChrootMountSource('/chroot', |
| proc_mounts=proc_mounts) |
| self.assertEqual(vg, 'cros_vg_name') |
| self.assertEqual(lv, 'lv_name') |
| |
| def testMatchMultipleMounts(self): |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write("""/dev/mapper/cros_first_mount-lv_name /chroot ext4 rw 0 0 |
| /dev/mapper/cros_inner_mount-lv /chroot/inner ext4 rw 0 0 |
| /dev/mapper/cros_second_mount-lv_name /chroot ext4 rw 0 0 |
| """) |
| |
| vg, lv = cros_sdk_lib.FindChrootMountSource('/chroot', |
| proc_mounts=proc_mounts) |
| self.assertEqual(vg, 'cros_second_mount') |
| self.assertEqual(lv, 'lv_name') |
| |
| |
| class TestCleanupChrootMount(cros_test_lib.MockTempDirTestCase): |
| """Tests the CleanupChrootMount function.""" |
| |
| _VGS_DEV_LOOKUP = ['sudo', '--', 'vgs', '-q', '--noheadings', '-o', 'pv_name', |
| '--unbuffered', partial_mock.Ignore()] |
| _VGS_VG_LOOKUP = ['sudo', '--', 'vgs', partial_mock.Ignore()] |
| _LOSETUP_FIND = ['sudo', '--', 'losetup', '-j', partial_mock.Ignore()] |
| _LOSETUP_DETACH = ['sudo', '--', 'losetup', '-d', partial_mock.Ignore()] |
| _VGCHANGE_N = ['sudo', '--', 'vgchange', '-an', partial_mock.Ignore()] |
| _LVM_FAILURE_CODE = 5 # Shell exit code when lvm commands fail. |
| _LVM_SUCCESS_CODE = 0 # Shell exit code when lvm commands succeed. |
| |
| def setUp(self): |
| self.chroot_path = os.path.join(self.tempdir, 'chroot') |
| osutils.SafeMakedirsNonRoot(self.chroot_path) |
| self.chroot_img = self.chroot_path + '.img' |
| |
| def testMountedCleanup(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, '_RescanDeviceLvmMetadata') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/mapper/cros_vg_name-chroot %s ext4 rw 0 0\n' % |
| self.chroot_path) |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_DEV_LOOKUP, output=' /dev/loop0') |
| rc_mock.AddCmdResult(self._VGCHANGE_N) |
| rc_mock.AddCmdResult(self._LOSETUP_DETACH) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with('/dev/loop0') |
| |
| def testMountedCleanupByBuildroot(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, '_RescanDeviceLvmMetadata') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/mapper/cros_vg_name-chroot %s ext4 rw 0 0\n' % |
| self.chroot_path) |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_DEV_LOOKUP, output=' /dev/loop0') |
| rc_mock.AddCmdResult(self._VGCHANGE_N) |
| rc_mock.AddCmdResult(self._LOSETUP_DETACH) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| None, self.tempdir, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with('/dev/loop0') |
| |
| def testMountedCleanupWithDelete(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(osutils, 'SafeUnlink') |
| m3 = self.PatchObject(osutils, 'RmDir') |
| m4 = self.PatchObject(cros_sdk_lib, '_RescanDeviceLvmMetadata') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('/dev/mapper/cros_vg_name-chroot %s ext4 rw 0 0\n' % |
| self.chroot_path) |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._VGS_DEV_LOOKUP, output=' /dev/loop0') |
| rc_mock.AddCmdResult(self._VGCHANGE_N) |
| rc_mock.AddCmdResult(self._LOSETUP_DETACH) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, delete=True, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with(self.chroot_img) |
| m3.assert_called_with(self.chroot_path, ignore_missing=True, sudo=True) |
| m4.assert_called_with('/dev/loop0') |
| |
| def testUnmountedCleanup(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, 'FindVolumeGroupForDevice') |
| m2.return_value = 'cros_chroot_001' |
| m3 = self.PatchObject(cros_sdk_lib, '_RescanDeviceLvmMetadata') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('sysfs /sys sysfs rw 0 0\n') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._LOSETUP_FIND, output='/dev/loop1') |
| rc_mock.AddCmdResult(self._VGS_VG_LOOKUP, |
| returncode=self._LVM_SUCCESS_CODE) |
| rc_mock.AddCmdResult(self._VGCHANGE_N) |
| rc_mock.AddCmdResult(self._LOSETUP_DETACH) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with(self.chroot_path, '/dev/loop1') |
| m3.assert_called_with('/dev/loop1') |
| |
| def testDevOnlyCleanup(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, 'FindVolumeGroupForDevice') |
| m2.return_value = 'cros_chroot_001' |
| m3 = self.PatchObject(cros_sdk_lib, '_RescanDeviceLvmMetadata') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('sysfs /sys sysfs rw 0 0\n') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._LOSETUP_FIND, output='/dev/loop1') |
| rc_mock.AddCmdResult(self._VGS_VG_LOOKUP, |
| returncode=self._LVM_FAILURE_CODE) |
| rc_mock.AddCmdResult(self._LOSETUP_DETACH) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with(self.chroot_path, '/dev/loop1') |
| m3.assert_called_with('/dev/loop1') |
| |
| def testNothingCleanup(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, 'FindVolumeGroupForDevice') |
| m2.return_value = 'cros_chroot_001' |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('sysfs /sys sysfs rw 0 0\n') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._LOSETUP_FIND, returncode=1) |
| rc_mock.AddCmdResult(self._VGS_VG_LOOKUP, |
| returncode=self._LVM_FAILURE_CODE) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with(self.chroot_path, None) |
| |
| def testNothingCleanupWithDelete(self): |
| m = self.PatchObject(osutils, 'UmountTree') |
| m2 = self.PatchObject(cros_sdk_lib, 'FindVolumeGroupForDevice') |
| m2.return_value = 'cros_chroot_001' |
| m3 = self.PatchObject(osutils, 'SafeUnlink') |
| m4 = self.PatchObject(osutils, 'RmDir') |
| |
| proc_mounts = os.path.join(self.tempdir, 'proc_mounts') |
| with open(proc_mounts, 'w') as f: |
| f.write('sysfs /sys sysfs rw 0 0\n') |
| |
| with cros_test_lib.RunCommandMock() as rc_mock: |
| rc_mock.AddCmdResult(self._LOSETUP_FIND, returncode=1) |
| rc_mock.AddCmdResult(self._VGS_VG_LOOKUP, |
| returncode=self._LVM_FAILURE_CODE) |
| |
| cros_sdk_lib.CleanupChrootMount( |
| self.chroot_path, None, delete=True, proc_mounts=proc_mounts) |
| |
| m.assert_called_with(self.chroot_path) |
| m2.assert_called_with(self.chroot_path, None) |
| m3.assert_called_with(self.chroot_img) |
| m4.assert_called_with(self.chroot_path, ignore_missing=True, sudo=True) |
| |
| |
| class ChrootUpdaterTest(cros_test_lib.MockTestCase, VersionHookTestCase): |
| """ChrootUpdater tests.""" |
| |
| def setUp(self): |
| # Avoid sudo password prompt for config writing. |
| self.PatchObject(os, 'getuid', return_value=0) |
| self.PatchObject(os, 'geteuid', return_value=0) |
| |
| self.chroot = cros_sdk_lib.ChrootUpdater(version_file=self.version_file, |
| hooks_dir=self.hooks_dir) |
| |
| def testVersion(self): |
| """Test the version property logic.""" |
| # Testing default value. |
| self.assertEqual(0, self.chroot.GetVersion()) |
| |
| # Test setting the version. |
| self.chroot.SetVersion(5) |
| self.assertEqual(5, self.chroot.GetVersion()) |
| self.assertEqual('5', osutils.ReadFile(self.version_file)) |
| |
| # The current behavior is that outside processes writing to the file |
| # does not affect our view after we've already read it. This shouldn't |
| # generally be a problem since run_chroot_version_hooks should be the only |
| # process writing to it. |
| osutils.WriteFile(self.version_file, '10') |
| self.assertEqual(5, self.chroot.GetVersion()) |
| |
| def testInvalidVersion(self): |
| """Test invalid version file contents.""" |
| osutils.WriteFile(self.version_file, 'invalid') |
| with self.assertRaises(cros_sdk_lib.InvalidChrootVersionError): |
| self.chroot.GetVersion() |
| |
| def testMissingFileVersion(self): |
| """Test missing version file.""" |
| osutils.SafeUnlink(self.version_file) |
| with self.assertRaises(cros_sdk_lib.UninitializedChrootError): |
| self.chroot.GetVersion() |
| |
| def testLatestVersion(self): |
| """Test the latest_version property/_LatestScriptsVersion method.""" |
| self.assertEqual(self.latest_version, self.chroot.latest_version) |
| |
| def testGetChrootUpdates(self): |
| """Test GetChrootUpdates.""" |
| # Test the deprecated error conditions. |
| for version in self.deprecated_versions: |
| self.chroot.SetVersion(version) |
| with self.assertRaises(cros_sdk_lib.ChrootDeprecatedError): |
| self.chroot.GetChrootUpdates() |
| |
| def testMultipleUpdateFiles(self): |
| """Test handling of multiple files existing for a single version.""" |
| # When the version would be run. |
| osutils.WriteFile(os.path.join(self.hooks_dir, '10_duplicate'), '') |
| |
| self.chroot.SetVersion(9) |
| with self.assertRaises(cros_sdk_lib.VersionHasMultipleHooksError): |
| self.chroot.GetChrootUpdates() |
| |
| # When the version would not be run. |
| self.chroot.SetVersion(11) |
| with self.assertRaises(cros_sdk_lib.VersionHasMultipleHooksError): |
| self.chroot.GetChrootUpdates() |
| |
| def testApplyUpdates(self): |
| """Test ApplyUpdates.""" |
| self.PatchObject(cros_build_lib, 'run', |
| return_value=cros_build_lib.CommandResult(returncode=0)) |
| for version in self.success_versions: |
| self.chroot.SetVersion(version) |
| self.chroot.ApplyUpdates() |
| self.assertEqual(self.latest_version, self.chroot.GetVersion()) |
| |
| def testApplyInvalidUpdates(self): |
| """Test the invalid version conditions for ApplyUpdates.""" |
| for version in self.invalid_versions: |
| self.chroot.SetVersion(version) |
| with self.assertRaises(cros_sdk_lib.InvalidChrootVersionError): |
| self.chroot.ApplyUpdates() |
| |
| def testIsInitialized(self): |
| """Test IsInitialized conditions.""" |
| self.chroot.SetVersion(0) |
| self.assertFalse(self.chroot.IsInitialized()) |
| |
| self.chroot.SetVersion(1) |
| self.assertTrue(self.chroot.IsInitialized()) |
| |
| # Test handling each of the errors thrown by GetVersion. |
| self.PatchObject(self.chroot, 'GetVersion', |
| side_effect=cros_sdk_lib.InvalidChrootVersionError()) |
| self.assertFalse(self.chroot.IsInitialized()) |
| |
| self.PatchObject(self.chroot, 'GetVersion', |
| side_effect=IOError()) |
| self.assertFalse(self.chroot.IsInitialized()) |
| |
| self.PatchObject(self.chroot, 'GetVersion', |
| side_effect=cros_sdk_lib.UninitializedChrootError()) |
| self.assertFalse(self.chroot.IsInitialized()) |