| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittests for the osutils.py module (imagine that!).""" |
| |
| import collections |
| import ctypes |
| import errno |
| import filecmp |
| import glob |
| import grp |
| import itertools |
| import os |
| from pathlib import Path |
| import pwd |
| import re |
| import stat |
| import sys |
| import tempfile |
| import time |
| from unittest import mock |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| |
| |
| class TestOsutils(cros_test_lib.TempDirTestCase): |
| """General unittests for the osutils module.""" |
| |
| def testIsSubPath(self): |
| self.assertTrue(osutils.IsSubPath("/a", "/")) |
| self.assertTrue(osutils.IsSubPath("/", "/")) |
| |
| self.assertTrue(osutils.IsSubPath("/a", "/a")) |
| self.assertTrue(osutils.IsSubPath("/a", "/a/")) |
| |
| self.assertTrue(osutils.IsSubPath("/a/b", "/a")) |
| self.assertTrue(osutils.IsSubPath("/a/b", "/a/")) |
| |
| self.assertTrue(osutils.IsSubPath("/a/b/e", "/a/b/c/../../b")) |
| |
| self.assertFalse(osutils.IsSubPath("/ab", "/a/b")) |
| self.assertFalse(osutils.IsSubPath("/a/bcde", "/a/b")) |
| |
| def testAllocateNewFile(self): |
| """Verify we can allocate a file of a certain length.""" |
| filename = self.tempdir / "foo" |
| size = 1234 |
| osutils.AllocateFile(filename, size) |
| |
| self.assertExists(filename) |
| self.assertEqual(size, os.path.getsize(filename)) |
| |
| def testAllocateExistingFile(self): |
| """Verify we can allocate an existing file of a certain length.""" |
| file = self.tempdir / "foo" |
| file.write_text("abcd", encoding="utf-8") |
| |
| size = 1234 |
| osutils.AllocateFile(file, size) |
| self.assertEqual(size, os.path.getsize(file)) |
| |
| # Content should be reset. |
| self.assertEqual(osutils.ReadFile(file, "rb", size=4), b"\0\0\0\0") |
| |
| def testReadWriteFile(self): |
| """Verify we can write data to a file, and then read it back.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = "alsdkfjasldkfjaskdlfjasdf" |
| self.assertIsNone(osutils.WriteFile(filename, data)) |
| self.assertEqual(osutils.ReadFile(filename), data) |
| |
| def testReadWritePath(self): |
| """Verify we can write data to a Path, and then read it back.""" |
| filename = self.tempdir / "foo" |
| data = "alsdkfjasldkfjaskdlfjasdf" |
| self.assertIsNone(osutils.WriteFile(filename, data)) |
| self.assertEqual(osutils.ReadFile(filename), data) |
| |
| def testReadBinary(self): |
| """Verify we can read data as binary.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| self.assertIsNone(osutils.WriteFile(filename, data, mode="wb")) |
| self.assertEqual(osutils.ReadFile(filename, mode="rb"), data) |
| |
| def testReadSize(self): |
| """Verify we can read partial data.""" |
| filename = self.tempdir / "foo" |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| filename.write_bytes(data) |
| self.assertEqual(osutils.ReadFile(filename, mode="rb", size=3), b"als") |
| self.assertEqual(osutils.ReadFile(filename, mode="r", size=3), "als") |
| |
| def testReadSeek(self): |
| """Verify we can read data from the middle.""" |
| filename = self.tempdir / "foo" |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| sdata = data.decode("utf-8") |
| filename.write_bytes(data) |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="rb", seek=3), data[3:] |
| ) |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", seek=3), sdata[3:] |
| ) |
| |
| def testReadSeekSize(self): |
| """Verify we can read partial data from the middle.""" |
| filename = self.tempdir / "foo" |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| filename.write_bytes(data) |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="rb", seek=3, size=3), b"dkf" |
| ) |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", seek=3, size=3), "dkf" |
| ) |
| |
| def testWriteFileStringIter(self): |
| """Verify that we can write an iterable of strings.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = ["a", "cd", "ef"] |
| self.assertIsNone(osutils.WriteFile(filename, data)) |
| self.assertEqual(osutils.ReadFile(filename), "".join(data)) |
| |
| def testWriteFileBytesIter(self): |
| """Verify that we can write an iterable of bytes.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = [b"ab", b"cd", b"ef"] |
| self.assertIsNone(osutils.WriteFile(filename, data, mode="wb")) |
| self.assertEqual(osutils.ReadFile(filename, mode="rb"), b"".join(data)) |
| |
| def testReadBytes(self): |
| """Verify we can read data as binary via ReadBytes.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = b"0123456789" |
| self.assertIsNone(osutils.WriteFile(filename, data, mode="wb")) |
| self.assertEqual(osutils.ReadBytes(filename, 7 - 3, 3), b"3456") |
| |
| def testReadText(self): |
| """Verify we can read data as text via ReadText.""" |
| filename = os.path.join(self.tempdir, "foo") |
| data = b"0123456789" |
| self.assertIsNone(osutils.WriteFile(filename, data, mode="wb")) |
| self.assertEqual(osutils.ReadText(filename, 7 - 3, 3), "3456") |
| |
| def testReadSudo(self): |
| """Verify we can read data as root (in a world-readable dir).""" |
| # First read a non-root file. |
| filename = self.tempdir / "foo" |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| sdata = data.decode("utf-8") |
| filename.write_bytes(data) |
| self.assertEqual(osutils.ReadFile(filename, mode="r", sudo=True), sdata) |
| self.assertEqual(osutils.ReadFile(filename, mode="rb", sudo=True), data) |
| |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| # Create a file only root can read. |
| filename = os.path.join(tempdir, "file") |
| osutils.WriteFile(filename, data, mode="wb", chmod=0o600, sudo=True) |
| |
| # Read a root file w/out sudo should fail. |
| self.assertRaises(IOError, osutils.ReadFile, filename) |
| # Read a root file w/sudo as text. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", sudo=True), sdata |
| ) |
| # Read a root file w/sudo as bytes. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="rb", sudo=True), data |
| ) |
| # Read a root file w/partial reads. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", sudo=True, size=3), |
| sdata[:3], |
| ) |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="rb", sudo=True, size=3), |
| data[:3], |
| ) |
| # Read a root file w/seeks & partial reads. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", sudo=True, seek=3, size=3), |
| sdata[3:6], |
| ) |
| self.assertEqual( |
| osutils.ReadFile( |
| filename, mode="rb", sudo=True, seek=3, size=3 |
| ), |
| data[3:6], |
| ) |
| |
| def testReadSudoSubdir(self): |
| """Verify we can read data as root in a subdir.""" |
| data = b"alsdkfjasldkfjaskdlfjasdf" |
| sdata = data.decode("utf-8") |
| |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| # Create a file only root can read. |
| tempdir = Path(tempdir) |
| subdir = tempdir / "subdir" |
| self.assertTrue(osutils.SafeMakedirs(subdir, mode=0o700, sudo=True)) |
| filename = subdir / "file" |
| osutils.WriteFile(filename, data, mode="wb", chmod=0o600, sudo=True) |
| |
| # Read a root file w/out sudo should fail. |
| self.assertRaises(IOError, osutils.ReadFile, filename) |
| # Read a root file w/sudo as text. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="r", sudo=True), sdata |
| ) |
| # Read a root file w/sudo as bytes. |
| self.assertEqual( |
| osutils.ReadFile(filename, mode="rb", sudo=True), data |
| ) |
| |
| def testSudoWrite(self): |
| """Verify that we can write a file as sudo.""" |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| root_owned_dir = Path(tempdir) / "foo" |
| self.assertTrue(osutils.SafeMakedirs(root_owned_dir, sudo=True)) |
| |
| for atomic, path_to_test in itertools.product( |
| (True, False), |
| (os.path.join(root_owned_dir, "bar"), root_owned_dir / "bar"), |
| ): |
| self.assertRaises( |
| IOError, osutils.WriteFile, path_to_test, "data" |
| ) |
| |
| osutils.WriteFile( |
| path_to_test, "test", atomic=atomic, sudo=True |
| ) |
| self.assertEqual("test", osutils.ReadFile(path_to_test)) |
| self.assertEqual(0, os.stat(path_to_test).st_uid) |
| osutils.SafeUnlink(path_to_test, sudo=True) |
| |
| def testSudoWriteAppendNew(self): |
| """Verify that we can write a new file as sudo when appending.""" |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| path = os.path.join(tempdir, "foo") |
| osutils.WriteFile(path, "two", mode="a", sudo=True) |
| self.assertEqual("two", osutils.ReadFile(path)) |
| |
| def testSudoWriteAppendExisting(self): |
| """Verify that we can write a file as sudo when appending.""" |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| path = os.path.join(tempdir, "foo") |
| osutils.WriteFile(path, "one", sudo=True) |
| self.assertRaises(IOError, osutils.WriteFile, path, "data") |
| osutils.WriteFile(path, "two", mode="a", sudo=True) |
| self.assertEqual("onetwo", osutils.ReadFile(path)) |
| |
| def testSudoReadNoTrunc(self): |
| """Verify that we can write a new file as sudo when r+.""" |
| |
| def testit(path, sudo): |
| osutils.WriteFile(path, "two") |
| self.assertEqual("two", osutils.ReadFile(path)) |
| osutils.WriteFile(path, "X", mode="r+", sudo=sudo) |
| self.assertEqual("Xwo", osutils.ReadFile(path)) |
| |
| # Make sure that r+ works as we think it does. |
| path = os.path.join(self.tempdir, "foo") |
| testit(path, False) |
| |
| # Now run it again with sudo. |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| path = os.path.join(tempdir, "foo") |
| testit(path, True) |
| |
| def testReadFileNonExistent(self): |
| """Verify what happens if you ReadFile a file that isn't there.""" |
| filename = os.path.join(self.tempdir, "bogus") |
| with self.assertRaises(IOError): |
| osutils.ReadFile(filename) |
| |
| def testWriteChmod(self): |
| """Verify writing files with perms works.""" |
| |
| def getmode(path): |
| return os.stat(path).st_mode & 0o7777 |
| |
| def assertMode(path, mode): |
| self.assertEqual(getmode(path), mode) |
| |
| for path in (os.path.join(self.tempdir, "file"), self.tempdir / "file"): |
| osutils.WriteFile(path, "asdf") |
| assertMode(path, 0o644) |
| |
| osutils.WriteFile(path, "asdf", chmod=0o666) |
| assertMode(path, 0o666) |
| |
| osutils.WriteFile(path, "asdf", atomic=True, chmod=0o664) |
| assertMode(path, 0o664) |
| |
| osutils.WriteFile(path, "asdf", sudo=True, chmod=0o755) |
| assertMode(path, 0o755) |
| |
| osutils.WriteFile(path, "asdf", sudo=True, atomic=True, chmod=0o775) |
| assertMode(path, 0o775) |
| |
| osutils.SafeUnlink(path, sudo=True) |
| |
| def testSafeSymlink(self): |
| """Test that we can create symlinks.""" |
| with osutils.TempDir(sudo_rm=True) as tempdir: |
| file_a = os.path.join(tempdir, "a") |
| osutils.WriteFile(file_a, "a") |
| |
| file_b = os.path.join(tempdir, "b") |
| osutils.WriteFile(file_b, "b") |
| |
| user_dir = os.path.join(tempdir, "bar") |
| user_link = os.path.join(user_dir, "link") |
| osutils.SafeMakedirs(user_dir) |
| |
| root_dir = os.path.join(tempdir, "foo") |
| root_link = os.path.join(root_dir, "link") |
| osutils.SafeMakedirs(root_dir, sudo=True) |
| |
| # We can create and override links owned by a non-root user. |
| osutils.SafeSymlink(file_a, user_link) |
| self.assertEqual("a", osutils.ReadFile(user_link)) |
| |
| osutils.SafeSymlink(file_b, user_link) |
| self.assertEqual("b", osutils.ReadFile(user_link)) |
| |
| # Handle Path objects. |
| osutils.SafeSymlink(Path(file_a), Path(user_link)) |
| self.assertEqual("a", osutils.ReadFile(user_link)) |
| |
| osutils.SafeSymlink(Path(file_b), Path(user_link)) |
| self.assertEqual("b", osutils.ReadFile(user_link)) |
| |
| # We can create and override links owned by root. |
| osutils.SafeSymlink(file_a, root_link, sudo=True) |
| self.assertEqual("a", osutils.ReadFile(root_link)) |
| |
| osutils.SafeSymlink(file_b, root_link, sudo=True) |
| self.assertEqual("b", osutils.ReadFile(root_link)) |
| |
| # Handle Path objects. |
| osutils.SafeSymlink(Path(file_a), Path(root_link), sudo=True) |
| self.assertEqual("a", osutils.ReadFile(root_link)) |
| |
| osutils.SafeSymlink(Path(file_b), Path(root_link), sudo=True) |
| self.assertEqual("b", osutils.ReadFile(root_link)) |
| |
| def testSafeUnlink(self): |
| """Test unlinking files work (existing or not).""" |
| |
| def f(sudo=False, as_path=False): |
| with osutils.TempDir(sudo_rm=sudo) as dirname: |
| path = os.path.join(dirname, "foon") |
| if as_path: |
| path = Path(path) |
| osutils.Touch(path, makedirs=True) |
| self.assertExists(path) |
| if sudo: |
| osutils.Chown( |
| dirname, user="root", group="root", recursive=False |
| ) |
| self.assertRaises(EnvironmentError, os.unlink, path) |
| self.assertTrue(osutils.SafeUnlink(path, sudo=sudo)) |
| self.assertNotExists(path) |
| self.assertFalse(osutils.SafeUnlink(path)) |
| self.assertNotExists(path) |
| |
| f(False) |
| f(True) |
| f(False, True) |
| f(True, True) |
| |
| def testSafeUnlinkSudoInaccessible(self): |
| """Test unlinking files work in a dir only root can read.""" |
| with osutils.TempDir(sudo_rm=True) as dirname: |
| path = os.path.join(dirname, "exists") |
| osutils.Touch(path, mode=0o000) |
| os.chmod(dirname, 0o000) |
| self.assertRaises(EnvironmentError, os.unlink, path) |
| self.assertTrue(osutils.SafeUnlink(path, sudo=True)) |
| self.assertFalse(osutils.SafeUnlink(path, sudo=True)) |
| |
| os.chmod(dirname, 0o700) |
| self.assertNotExists(path) |
| |
| def testSafeMakedirs(self): |
| """Test creating directory trees work (existing or not).""" |
| path = os.path.join(self.tempdir, "a", "b", "c", "d", "e") |
| self.assertTrue(osutils.SafeMakedirs(path)) |
| self.assertExists(path) |
| self.assertFalse(osutils.SafeMakedirs(path)) |
| self.assertExists(path) |
| |
| def testSafeMakedirsWithPathObject(self): |
| """Test creating directory trees work (existing or not) on |Path|s.""" |
| path = self.tempdir / "a" / "b" / "c" / "d" / "e" |
| self.assertTrue(osutils.SafeMakedirs(path)) |
| self.assertExists(path) |
| self.assertFalse(osutils.SafeMakedirs(path)) |
| self.assertExists(path) |
| |
| def testSafeMakedirsMode(self): |
| """Test that mode is honored.""" |
| path = os.path.join(self.tempdir, "a", "b", "c", "d", "e") |
| self.assertTrue(osutils.SafeMakedirs(path, mode=0o775)) |
| self.assertEqual(0o775, stat.S_IMODE(os.stat(path).st_mode)) |
| self.assertFalse(osutils.SafeMakedirs(path, mode=0o777)) |
| self.assertEqual(0o777, stat.S_IMODE(os.stat(path).st_mode)) |
| cros_build_lib.sudo_run(["chown", "root:root", path], print_cmd=False) |
| # Tries, but fails to change the mode. |
| self.assertFalse(osutils.SafeMakedirs(path, 0o755)) |
| self.assertEqual(0o777, stat.S_IMODE(os.stat(path).st_mode)) |
| |
| def testSafeMakedirs_error(self): |
| """Check error paths.""" |
| with self.assertRaises(OSError): |
| osutils.SafeMakedirs("/foo/bar/cow/moo/wee") |
| ret = cros_build_lib.run( |
| ["ls", "-Rla", "/foo"], check=False, capture_output=True |
| ) |
| print( |
| "ls output of /foo:\n{{{%s}}}" % (ret.stdout,), file=sys.stderr |
| ) |
| self.assertRaises(OSError, osutils.SafeMakedirs, "") |
| |
| def testSafeMakedirsSudo(self): |
| """Test creating directory trees work as root (existing or not).""" |
| self.ExpectRootOwnedFiles() |
| path = os.path.join(self.tempdir, "a", "b", "c", "d", "e") |
| self.assertTrue(osutils.SafeMakedirs(path, sudo=True)) |
| self.assertExists(path) |
| self.assertFalse(osutils.SafeMakedirs(path, sudo=True)) |
| self.assertExists(path) |
| self.assertEqual(os.stat(path).st_uid, 0) |
| |
| def testSafeMakedirsNoSudoRootOwnedDirs(self): |
| """Test that we can recover some root owned directories.""" |
| self.ExpectRootOwnedFiles() |
| root_owned_prefix = os.path.join(self.tempdir, "root_owned_prefix") |
| root_owned_dir = os.path.join(root_owned_prefix, "root_owned_dir") |
| non_root_dir = os.path.join(root_owned_prefix, "non_root_dir") |
| self.assertTrue(osutils.SafeMakedirs(root_owned_dir, sudo=True)) |
| self.assertExists(root_owned_prefix) |
| self.assertEqual(os.stat(root_owned_prefix).st_uid, 0) |
| self.assertExists(root_owned_dir) |
| self.assertEqual(os.stat(root_owned_dir).st_uid, 0) |
| |
| # Test that we can reclaim a root-owned dir. |
| # Note, return value is False because the directory already exists. |
| self.assertFalse(osutils.SafeMakedirsNonRoot(root_owned_dir)) |
| self.assertNotEqual(os.stat(root_owned_dir).st_uid, 0) |
| |
| # Test that we can create a non-root directory in a root-path. |
| self.assertTrue(osutils.SafeMakedirsNonRoot(non_root_dir)) |
| self.assertNotEqual(os.stat(non_root_dir).st_uid, 0) |
| |
| def testRmDir(self): |
| """Test that removing dirs work.""" |
| main_path = os.path.join(self.tempdir, "a", "b", "c", "d", "e") |
| paths_to_test = [main_path, Path(main_path)] |
| |
| for path in paths_to_test: |
| self.assertRaises(EnvironmentError, osutils.RmDir, path) |
| osutils.SafeMakedirs(path) |
| osutils.RmDir(path) |
| osutils.RmDir(path, ignore_missing=True) |
| self.assertRaises(EnvironmentError, osutils.RmDir, path) |
| |
| osutils.SafeMakedirs(path) |
| osutils.RmDir(path) |
| self.assertNotExists(path) |
| |
| def testRmDirSudo(self): |
| """Test that removing dirs via sudo works.""" |
| subpath = os.path.join(self.tempdir, "a") |
| main_path = os.path.join(subpath, "b", "c", "d", "e") |
| paths_to_test = [main_path, Path(main_path)] |
| |
| for path in paths_to_test: |
| self.assertTrue(osutils.SafeMakedirs(path, sudo=True)) |
| self.assertRaises(OSError, osutils.RmDir, path) |
| osutils.RmDir(subpath, sudo=True) |
| self.assertRaises( |
| cros_build_lib.RunCommandError, |
| osutils.RmDir, |
| subpath, |
| sudo=True, |
| ) |
| |
| def testTouchFile(self): |
| """Test that we can touch files.""" |
| path = os.path.join(self.tempdir, "touchit") |
| self.assertNotExists(path) |
| osutils.Touch(path) |
| self.assertExists(path) |
| self.assertEqual(os.path.getsize(path), 0) |
| |
| def testTouchReadOnlyFile(self): |
| """Test that we can touch read-only files that we own.""" |
| path = self.tempdir / "touchit" |
| nowish = time.time() - 60 |
| path.touch(mode=0o600) |
| # Set the times to very old. |
| os.utime(path, (1, 1)) |
| assert os.path.getmtime(path) < 10 |
| # This should still update the times even though it's read-only. |
| osutils.Touch(path) |
| assert os.path.getmtime(path) >= nowish |
| |
| def testTouchFileSubDir(self): |
| """Test that we can touch files in non-existent subdirs.""" |
| path = os.path.join(self.tempdir, "a", "b", "c", "touchit") |
| self.assertNotExists(os.path.dirname(path)) |
| osutils.Touch(path, makedirs=True) |
| self.assertExists(path) |
| self.assertEqual(os.path.getsize(path), 0) |
| |
| def testChmod(self): |
| """Test Chmod.""" |
| |
| def getmode(path): |
| return os.stat(path).st_mode & 0o7777 |
| |
| path = os.path.join(self.tempdir, "chmodtests") |
| osutils.Touch(path) |
| |
| osutils.Chmod(path, 0o700) |
| self.assertEqual(getmode(path), 0o700) |
| |
| osutils.Chmod(Path(path), 0o644) |
| self.assertEqual(getmode(path), 0o644) |
| |
| osutils.Chown(path, user="root", group="root") |
| osutils.Chmod(path, 0o660, sudo=True) |
| self.assertEqual(getmode(path), 0o660) |
| osutils.Chmod(Path(path), 0o661, sudo=True) |
| self.assertEqual(getmode(path), 0o661) |
| |
| self.assertRaises(OSError, osutils.Chmod, path, 0o600) |
| |
| def testChown(self): |
| """Test chown.""" |
| |
| # Helpers to get the user and group name of the given path's owner. |
| def User(path): |
| return pwd.getpwuid(os.stat(path).st_uid).pw_name |
| |
| def Group(path): |
| return grp.getgrgid(os.stat(path).st_gid).gr_name |
| |
| filename = os.path.join(self.tempdir, "chowntests") |
| osutils.Touch(filename) |
| |
| user = User(filename) |
| group = Group(filename) |
| |
| new_user = new_group = "root" |
| |
| # Change only the user. |
| osutils.Chown(filename, user=new_user) |
| self.assertEqual(new_user, User(filename)) |
| self.assertEqual(new_group, Group(filename)) |
| |
| # Change both user and group. |
| osutils.Chown(filename, user=user, group=group) |
| self.assertEqual(user, User(filename)) |
| self.assertEqual(group, Group(filename)) |
| |
| # With Path object. |
| osutils.Chown(Path(filename), user=new_user, group=new_group) |
| self.assertEqual(new_user, User(filename)) |
| self.assertEqual(new_group, Group(filename)) |
| osutils.Chown(Path(filename), user=user, group=group) |
| self.assertEqual(user, User(filename)) |
| self.assertEqual(group, Group(filename)) |
| |
| # User and group ids as the arguments. |
| osutils.Chown(filename, user=0, group=0) |
| self.assertEqual(new_user, User(filename)) |
| self.assertEqual(new_group, Group(filename)) |
| |
| # Recursive. |
| dirname = os.path.join(self.tempdir, "chowntestsdir") |
| osutils.SafeMakedirs(dirname) |
| filename = os.path.join(dirname, "chowntestsfile") |
| osutils.Touch(filename) |
| # Chown without recursive. |
| osutils.Chown(dirname, user=new_user, group=new_group) |
| self.assertEqual(new_user, User(dirname)) |
| self.assertEqual(new_group, Group(dirname)) |
| self.assertEqual(user, User(filename)) |
| self.assertEqual(group, Group(filename)) |
| # Chown with recursive. |
| osutils.Chown(dirname, user=new_user, group=new_group, recursive=True) |
| self.assertEqual(new_user, User(filename)) |
| self.assertEqual(new_group, Group(filename)) |
| osutils.Chown(dirname, user=user, group=group, recursive=True) |
| self.assertEqual(user, User(dirname)) |
| self.assertEqual(group, Group(dirname)) |
| self.assertEqual(user, User(filename)) |
| self.assertEqual(group, Group(filename)) |
| |
| |
| class TestEmptyDir(cros_test_lib.TempDirTestCase): |
| """Test osutils.EmptyDir.""" |
| |
| def setUp(self): |
| self.subdir = os.path.join(self.tempdir, "a") |
| self.nestedfile = os.path.join(self.subdir, "b", "c", "d", "e") |
| self.topfile = os.path.join(self.tempdir, "file") |
| |
| def testEmptyDir(self): |
| """Empty an empty directory.""" |
| osutils.EmptyDir(self.tempdir) |
| osutils.EmptyDir(self.tempdir, ignore_missing=True, sudo=True) |
| |
| def testNonExistentDir(self): |
| """Non-existent directory.""" |
| # Ignore_missing=False |
| with self.assertRaises(osutils.EmptyDirNonExistentException): |
| osutils.EmptyDir(self.subdir) |
| |
| # Ignore missing=True |
| osutils.EmptyDir(self.subdir, ignore_missing=True) |
| |
| def testEmptyWithContentsMinFlags(self): |
| """Test ability to empty actual directory contents.""" |
| osutils.Touch(self.nestedfile, makedirs=True) |
| osutils.Touch(self.topfile, makedirs=True) |
| |
| osutils.EmptyDir(self.tempdir) |
| |
| self.assertExists(self.tempdir) |
| self.assertNotExists(self.subdir) |
| self.assertNotExists(self.topfile) |
| |
| def testEmptyWithContentsMaxFlags(self): |
| """Test ability to empty actual directory contents.""" |
| osutils.Touch(self.nestedfile, makedirs=True) |
| osutils.Touch(self.topfile, makedirs=True) |
| |
| osutils.EmptyDir(self.tempdir, ignore_missing=True, sudo=True) |
| |
| self.assertExists(self.tempdir) |
| self.assertNotExists(self.subdir) |
| self.assertNotExists(self.topfile) |
| |
| def testEmptyWithRootOwnedContents(self): |
| """Test handling of root owned sub directories.""" |
| # Root owned contents. |
| osutils.SafeMakedirs(self.nestedfile, sudo=True) |
| |
| # Fails without sudo=True |
| with self.assertRaises(OSError): |
| osutils.EmptyDir(self.tempdir) |
| self.assertExists(self.nestedfile) |
| |
| # Works with sudo=True |
| osutils.EmptyDir(self.tempdir, sudo=True) |
| self.assertExists(self.tempdir) |
| self.assertNotExists(self.subdir) |
| |
| def testExclude(self): |
| """Test ability to empty actual directory contents. |
| |
| Also ensure that the excludes argument can really be just an iterable. |
| """ |
| files = { |
| "keep": True, |
| "keepdir/foo": True, |
| "keepdir/bar": True, |
| "remove": False, |
| "removedir/foo": False, |
| "removedir/bar": False, |
| } |
| |
| excludes = ["keep", "keepdir", "bogus"] |
| |
| # Perform exclusion of non-existent files. |
| osutils.EmptyDir(self.tempdir, exclude=iter(excludes)) |
| |
| # Create files. |
| for f in files.keys(): |
| osutils.Touch(os.path.join(self.tempdir, f), makedirs=True) |
| |
| # Empty with excludes. |
| osutils.EmptyDir(self.tempdir, exclude=iter(excludes)) |
| |
| # Verify that the results are what we expect. |
| for f, expected in files.items(): |
| f = os.path.join(self.tempdir, f) |
| self.assertEqual(os.path.exists(f), expected, "Unexpected: %s" % f) |
| self.assertExists(os.path.join(self.tempdir, "keepdir")) |
| self.assertNotExists(os.path.join(self.tempdir, "removedir")) |
| |
| |
| class TestProcess(cros_test_lib.RunCommandTestCase): |
| """Tests for osutils.IsChildProcess.""" |
| |
| def testIsChildProcess(self): |
| """Test IsChildProcess with no name.""" |
| mock_pstree_output = "a(1)-+-b(2)\n\t|-c(3)\n\t|-foo(4)-bar(5)" |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=mock_pstree_output) |
| self.assertTrue(osutils.IsChildProcess(4)) |
| self.assertTrue(osutils.IsChildProcess(4, name="foo")) |
| self.assertFalse(osutils.IsChildProcess(5, name="foo")) |
| |
| |
| class TempDirTests(cros_test_lib.TestCase): |
| """Unittests of osutils.TempDir. |
| |
| Unlike other test classes in this file, TempDirTestCase isn't used as a base |
| class, because that is the functionality under test. |
| """ |
| |
| PREFIX = "chromite.test.osutils.TempDirTests" |
| |
| class HelperException(Exception): |
| """Exception for tests to raise to test exception handling.""" |
| |
| class HelperExceptionInner(Exception): |
| """Exception for tests to raise to test exception handling.""" |
| |
| def testBasicSuccessEmpty(self): |
| """Test we create and cleanup an empty tempdir.""" |
| with osutils.TempDir(prefix=self.PREFIX) as td: |
| tempdir = td |
| # Show the temp directory exists and is empty. |
| self.assertTrue(os.path.isdir(tempdir)) |
| self.assertEqual(os.listdir(tempdir), []) |
| |
| # Show the temp directory no longer exists. |
| self.assertNotExists(tempdir) |
| |
| def testBasicSuccessNotEmpty(self): |
| """Test we cleanup tempdir with stuff in it.""" |
| with osutils.TempDir(prefix=self.PREFIX) as td: |
| tempdir = td |
| # Show the temp directory exists and is empty. |
| self.assertTrue(os.path.isdir(tempdir)) |
| self.assertEqual(os.listdir(tempdir), []) |
| |
| # Create an empty file. |
| osutils.Touch(os.path.join(tempdir, "foo.txt")) |
| |
| # Create nested sub directories. |
| subdir = os.path.join(tempdir, "foo", "bar", "taco") |
| os.makedirs(subdir) |
| osutils.Touch(os.path.join(subdir, "sauce.txt")) |
| |
| # Show the temp directory no longer exists. |
| self.assertNotExists(tempdir) |
| |
| def testErrorCleanup(self): |
| """Test we cleanup, even if an exception is raised.""" |
| try: |
| with osutils.TempDir(prefix=self.PREFIX) as td: |
| tempdir = td |
| raise TempDirTests.HelperException() |
| except TempDirTests.HelperException: |
| pass |
| |
| # Show the temp directory no longer exists. |
| self.assertNotExists(tempdir) |
| |
| def testCleanupExceptionContextException(self): |
| """Test an exception during cleanup if the context DID raise.""" |
| was_raised = False |
| tempdir_obj = osutils.TempDir(prefix=self.PREFIX) |
| |
| with mock.patch.object( |
| osutils, |
| "_TempDirTearDown", |
| side_effect=TempDirTests.HelperException, |
| ): |
| try: |
| with tempdir_obj as td: |
| tempdir = td |
| raise TempDirTests.HelperExceptionInner() |
| except TempDirTests.HelperExceptionInner: |
| was_raised = True |
| |
| # Show that the exception exited the context. |
| self.assertTrue(was_raised) |
| |
| # Verify the tempdir object no longer contains a reference to the |
| # tempdir. |
| self.assertIsNone(tempdir_obj.tempdir) |
| |
| # Cleanup the dir leaked by our mock exception. |
| os.rmdir(tempdir) |
| |
| def testCleanupExceptionNoContextException(self): |
| """Test an exception during cleanup if the context did NOT raise.""" |
| was_raised = False |
| tempdir_obj = osutils.TempDir(prefix=self.PREFIX) |
| |
| with mock.patch.object( |
| osutils, |
| "_TempDirTearDown", |
| side_effect=TempDirTests.HelperException, |
| ): |
| try: |
| with tempdir_obj as td: |
| tempdir = td |
| except TempDirTests.HelperException: |
| was_raised = True |
| |
| # Show that the exception exited the context. |
| self.assertTrue(was_raised) |
| |
| # Verify the tempdir object no longer contains a reference to the |
| # tempdir. |
| self.assertIsNone(tempdir_obj.tempdir) |
| |
| # Cleanup the dir leaked by our mock exception. |
| os.rmdir(tempdir) |
| |
| def testSkipCleanup(self): |
| """Test that we leave behind tempdirs when requested.""" |
| tempdir_obj = osutils.TempDir(prefix=self.PREFIX, delete=False) |
| tempdir = tempdir_obj.tempdir |
| tempdir_obj.Cleanup() |
| # Ensure we cleaned up ... |
| self.assertIsNone(tempdir_obj.tempdir) |
| # ... but leaked the directory. |
| self.assertExists(tempdir) |
| # Now really cleanup the directory leaked by the test. |
| os.rmdir(tempdir) |
| |
| def testSkipCleanupGlobal(self): |
| """Test that we reset global tempdir as expected even with skip.""" |
| with osutils.TempDir(prefix=self.PREFIX, set_global=True) as tempdir: |
| tempdir_before = tempfile.gettempdir() |
| tempdir_obj = osutils.TempDir( |
| prefix=self.PREFIX, set_global=True, delete=False |
| ) |
| tempdir_inside = tempfile.gettempdir() |
| tempdir_obj.Cleanup() |
| tempdir_after = tempfile.gettempdir() |
| |
| # We shouldn't leak the outer directory. |
| self.assertNotExists(tempdir) |
| self.assertEqual(tempdir_before, tempdir_after) |
| # This is a strict substring check. |
| self.assertLess(tempdir_before, tempdir_inside) |
| |
| |
| class MountTests(cros_test_lib.TestCase): |
| """Unittests for osutils mounting and umounting helpers.""" |
| |
| def testMountTmpfsDir(self): |
| """Verify mounting a tmpfs works""" |
| cleaned = False |
| with osutils.TempDir(prefix="chromite.test.osutils") as tempdir: |
| st_before = os.stat(tempdir) |
| try: |
| # Mount the dir and verify it worked. |
| osutils.MountTmpfsDir(tempdir) |
| st_after = os.stat(tempdir) |
| self.assertNotEqual(st_before.st_dev, st_after.st_dev) |
| |
| # Unmount the dir and verify it worked. |
| osutils.UmountDir(tempdir) |
| cleaned = True |
| |
| # Finally make sure it's cleaned up. |
| self.assertNotExists(tempdir) |
| finally: |
| if not cleaned: |
| cros_build_lib.sudo_run( |
| ["umount", "-lf", tempdir], check=False |
| ) |
| |
| def testUnmountTree(self): |
| with osutils.TempDir(prefix="chromite.test.osutils") as tempdir: |
| # Mount the dir and verify it worked. |
| st_before = os.stat(tempdir) |
| osutils.MountTmpfsDir(tempdir) |
| st_after = os.stat(tempdir) |
| self.assertNotEqual(st_before.st_dev, st_after.st_dev) |
| |
| # Mount an inner dir the same way. |
| tempdir2 = os.path.join(tempdir, "inner") |
| osutils.SafeMakedirsNonRoot(tempdir2) |
| st_before2 = os.stat(tempdir2) |
| osutils.MountTmpfsDir(tempdir2) |
| st_after2 = os.stat(tempdir2) |
| self.assertNotEqual(st_before2.st_dev, st_after2.st_dev) |
| |
| # Unmount the whole tree and verify it worked. |
| osutils.UmountTree(tempdir) |
| st_umount = os.stat(tempdir) |
| self.assertNotExists(tempdir2) |
| self.assertEqual(st_before.st_dev, st_umount.st_dev) |
| |
| |
| class IteratePathsTest(cros_test_lib.TestCase): |
| """Test iterating through all segments of a path.""" |
| |
| def testType(self): |
| """Check that return value is an iterator.""" |
| self.assertIsInstance( |
| osutils.IteratePaths("/"), collections.abc.Iterator |
| ) |
| |
| def testRoot(self): |
| """Test iterating from root directory.""" |
| inp = "/" |
| exp = [Path("/")] |
| self.assertEqual(list(osutils.IteratePaths(inp)), exp) |
| |
| def testOneDir(self): |
| """Test iterating from a directory in a root directory.""" |
| inp = "/abc" |
| exp = [Path("/"), Path("/abc")] |
| self.assertEqual(list(osutils.IteratePaths(inp)), exp) |
| |
| def testTwoDirs(self): |
| """Test iterating two dirs down.""" |
| inp = "/abc/def" |
| exp = [Path("/"), Path("/abc"), Path("/abc/def")] |
| self.assertEqual(list(osutils.IteratePaths(inp)), exp) |
| |
| def testNormalize(self): |
| """Test argument being normalized.""" |
| cases = [ |
| ("//", [Path("/")]), |
| ("///", [Path("/")]), |
| ("/abc/", [Path("/"), Path("/abc")]), |
| ("/abc//def", [Path("/"), Path("/abc"), Path("/abc/def")]), |
| ] |
| for inp, exp in cases: |
| self.assertEqual(list(osutils.IteratePaths(inp)), exp) |
| |
| |
| class IteratePathParentsTest(cros_test_lib.TestCase): |
| """Test parent directory iteration functionality.""" |
| |
| def _RunForPath(self, path, expected): |
| result_components = [] |
| for p in osutils.IteratePathParents(path): |
| result_components.append(os.path.basename(p)) |
| |
| result_components.reverse() |
| if expected is not None: |
| self.assertEqual(expected, result_components) |
| |
| def testIt(self): |
| """Run the test vectors.""" |
| vectors = { |
| "/": [""], |
| "/path/to/nowhere": ["", "path", "to", "nowhere"], |
| "/path/./to": ["", "path", "to"], |
| "//path/to": ["", "path", "to"], |
| "path/to": None, |
| "": None, |
| } |
| for p, e in vectors.items(): |
| self._RunForPath(p, e) |
| |
| |
| class FindInPathParentsTest(cros_test_lib.TempDirTestCase): |
| """Test FindInPathParents functionality.""" |
| |
| D = cros_test_lib.Directory |
| |
| DIR_STRUCT = [D("a", [D(".repo", []), D("b", [D("c", [])])])] |
| |
| START_PATH = os.path.join("a", "b", "c") |
| |
| def setUp(self): |
| cros_test_lib.CreateOnDiskHierarchy(self.tempdir, self.DIR_STRUCT) |
| |
| def testFoundStr(self): |
| """Target (str) is found.""" |
| found = osutils.FindInPathParents( |
| ".repo", os.path.join(self.tempdir, self.START_PATH) |
| ) |
| self.assertEqual(found, os.path.join(self.tempdir, "a", ".repo")) |
| |
| def testFoundPath(self): |
| """Target (Path) is found.""" |
| found = osutils.FindInPathParents( |
| ".repo", self.tempdir / self.START_PATH |
| ) |
| self.assertEqual(found, self.tempdir / "a" / ".repo") |
| |
| def testNotFoundStr(self): |
| """Target is not found.""" |
| found = osutils.FindInPathParents( |
| "does.not/exist", os.path.join(self.tempdir, self.START_PATH) |
| ) |
| self.assertIsNone(found) |
| |
| def testNotFoundPath(self): |
| """Target is not found.""" |
| found = osutils.FindInPathParents( |
| "does.not/exist", self.tempdir / self.START_PATH |
| ) |
| self.assertIsNone(found) |
| |
| |
| class SourceEnvironmentTest(cros_test_lib.TempDirTestCase): |
| """Test osutil's environmental variable related methods.""" |
| |
| ENV_ALLOWLIST = { |
| "ENV1": "monkeys like bananas", |
| "ENV3": "merci", |
| "ENV6": "", |
| } |
| |
| ENV_OTHER = { |
| "ENV2": "bananas are yellow", |
| "ENV4": "de rien", |
| } |
| |
| ENV = """ |
| declare -x ENV1="monkeys like bananas" |
| declare -x ENV2="bananas are yellow" |
| declare -x ENV3="merci" |
| declare -x ENV4="de rien" |
| declare -x ENV6='' |
| declare -x ENVA=('a b c' 'd' 'e 1234 %') |
| """ |
| |
| ENV_MULTILINE = """ |
| declare -x ENVM="gentil |
| mechant" |
| """ |
| |
| def setUp(self): |
| self.env_file = os.path.join(self.tempdir, "environment") |
| self.env_file_multiline = os.path.join(self.tempdir, "multiline") |
| osutils.WriteFile(self.env_file, self.ENV) |
| osutils.WriteFile(self.env_file_multiline, self.ENV_MULTILINE) |
| |
| def testAllowList(self): |
| env_dict = osutils.SourceEnvironment( |
| self.env_file, ("ENV1", "ENV3", "ENV5", "ENV6") |
| ) |
| self.assertEqual(env_dict, self.ENV_ALLOWLIST) |
| |
| def testArrays(self): |
| env_dict = osutils.SourceEnvironment(self.env_file, ("ENVA",)) |
| self.assertEqual(env_dict, {"ENVA": "a b c,d,e 1234 %"}) |
| |
| env_dict = osutils.SourceEnvironment(self.env_file, ("ENVA",), ifs=" ") |
| self.assertEqual(env_dict, {"ENVA": "a b c d e 1234 %"}) |
| |
| env_dict = osutils.SourceEnvironment( |
| self.env_file_multiline, ("ENVM",), multiline=True |
| ) |
| self.assertEqual(env_dict, {"ENVM": "gentil\nmechant"}) |
| |
| |
| class DeviceInfoTests(cros_test_lib.RunCommandTestCase): |
| """Tests methods retrieving information about devices.""" |
| |
| FULL_OUTPUT = """ |
| NAME="sda" RM="0" TYPE="disk" SIZE="128G" HOTPLUG="0" |
| NAME="sda1" RM="1" TYPE="part" SIZE="100G" HOTPLUG="1" |
| NAME="sda2" RM="1" TYPE="part" SIZE="28G" HOTPLUG="1" |
| NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G" HOTPLUG="1" |
| NAME="sdc1" RM="1" TYPE="part" SIZE="1G" HOTPLUG="1" |
| NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G" HOTPLUG="1" |
| """ |
| |
| PARTIAL_OUTPUT = """ |
| NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G" HOTPLUG="0" |
| NAME="sdc1" RM="1" TYPE="part" SIZE="1G" HOTPLUG="0" |
| NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G" HOTPLUG="0" |
| """ |
| |
| def testListBlockDevices(self): |
| """Tests that we can list all block devices correctly.""" |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.FULL_OUTPUT) |
| devices = osutils.ListBlockDevices() |
| self.assertEqual(devices[0].NAME, "sda") |
| self.assertEqual(devices[0].RM, "0") |
| self.assertEqual(devices[0].TYPE, "disk") |
| self.assertEqual(devices[0].SIZE, "128G") |
| self.assertEqual(devices[0].HOTPLUG, "0") |
| self.assertEqual(devices[3].NAME, "sdc") |
| self.assertEqual(devices[3].RM, "1") |
| self.assertEqual(devices[3].TYPE, "disk") |
| self.assertEqual(devices[3].SIZE, "7.4G") |
| self.assertEqual(devices[3].HOTPLUG, "1") |
| |
| def testGetDeviceSize(self): |
| """Tests that we can get the size of a device.""" |
| self.rc.AddCmdResult(partial_mock.Ignore(), stdout=self.PARTIAL_OUTPUT) |
| self.assertEqual(osutils.GetDeviceSize("/dev/sdc"), "7.4G") |
| |
| |
| class ChdirTests(cros_test_lib.MockTempDirTestCase): |
| """Tests for ChdirContext.""" |
| |
| def testChdir(self): |
| current_dir = Path.cwd() |
| self.assertNotEqual(self.tempdir, current_dir) |
| with osutils.ChdirContext(self.tempdir): |
| self.assertEqual(self.tempdir, Path.cwd()) |
| self.assertEqual(current_dir, Path.cwd()) |
| |
| |
| class MountOverlayTest(cros_test_lib.MockTempDirTestCase): |
| """Tests MountOverlayContext.""" |
| |
| def setUp(self): |
| self.upperdir = os.path.join(self.tempdir, "first_level", "upperdir") |
| self.lowerdir = os.path.join(self.tempdir, "lowerdir") |
| self.mergeddir = os.path.join(self.tempdir, "mergeddir") |
| |
| for path in [self.upperdir, self.lowerdir, self.mergeddir]: |
| osutils.Touch(path, makedirs=True) |
| |
| def testMountWriteUnmountRead(self): |
| mount_call = self.PatchObject(osutils, "MountDir") |
| umount_call = self.PatchObject(osutils, "UmountDir") |
| for cleanup in (True, False): |
| with osutils.MountOverlayContext( |
| self.lowerdir, self.upperdir, self.mergeddir, cleanup=cleanup |
| ): |
| mount_call.assert_any_call( |
| "overlay", |
| self.mergeddir, |
| fs_type="overlay", |
| makedirs=False, |
| mount_opts=( |
| "lowerdir=%s" % self.lowerdir, |
| "upperdir=%s" % self.upperdir, |
| mock.ANY, |
| ), |
| quiet=mock.ANY, |
| ) |
| umount_call.assert_any_call(self.mergeddir, cleanup=cleanup) |
| |
| def testMountFailFallback(self): |
| """Verify mount failure with overlay fs_type falls back to overlayfs.""" |
| |
| def _FailOverlay(*_args, **kwargs): |
| if kwargs["fs_type"] == "overlay": |
| raise cros_build_lib.RunCommandError( |
| "Phony failure", |
| cros_build_lib.CompletedProcess(["MounDir"], returncode=32), |
| ) |
| |
| mount_call = self.PatchObject(osutils, "MountDir") |
| mount_call.side_effect = _FailOverlay |
| umount_call = self.PatchObject(osutils, "UmountDir") |
| for cleanup in (True, False): |
| with osutils.MountOverlayContext( |
| self.lowerdir, self.upperdir, self.mergeddir, cleanup=cleanup |
| ): |
| mount_call.assert_any_call( |
| "overlay", |
| self.mergeddir, |
| fs_type="overlay", |
| makedirs=False, |
| mount_opts=( |
| "lowerdir=%s" % self.lowerdir, |
| "upperdir=%s" % self.upperdir, |
| mock.ANY, |
| ), |
| quiet=mock.ANY, |
| ) |
| mount_call.assert_any_call( |
| "overlayfs", |
| self.mergeddir, |
| fs_type="overlayfs", |
| makedirs=False, |
| mount_opts=( |
| "lowerdir=%s" % self.lowerdir, |
| "upperdir=%s" % self.upperdir, |
| ), |
| quiet=mock.ANY, |
| ) |
| umount_call.assert_any_call(self.mergeddir, cleanup=cleanup) |
| |
| def testNoValidWorkdirFallback(self): |
| """Test that we fallback to overlayfs when no valid workdir is found.""" |
| |
| def _FailFileSystemCheck(_path1, _path2): |
| return False |
| |
| check_filesystem = self.PatchObject(osutils, "_SameFileSystem") |
| check_filesystem.side_effect = _FailFileSystemCheck |
| mount_call = self.PatchObject(osutils, "MountDir") |
| umount_call = self.PatchObject(osutils, "UmountDir") |
| |
| for cleanup in (True, False): |
| with osutils.MountOverlayContext( |
| self.lowerdir, self.upperdir, self.mergeddir, cleanup=cleanup |
| ): |
| mount_call.assert_any_call( |
| "overlayfs", |
| self.mergeddir, |
| fs_type="overlayfs", |
| makedirs=False, |
| mount_opts=( |
| "lowerdir=%s" % self.lowerdir, |
| "upperdir=%s" % self.upperdir, |
| ), |
| quiet=mock.ANY, |
| ) |
| umount_call.assert_any_call(self.mergeddir, cleanup=cleanup) |
| |
| |
| class IterateMountPointsTests(cros_test_lib.TempDirTestCase): |
| """Test for IterateMountPoints function.""" |
| |
| def setUp(self): |
| self.proc_mount = os.path.join(self.tempdir, "mounts") |
| osutils.WriteFile( |
| self.proc_mount, |
| r"""/dev/loop0 /mnt/dir_8 ext4 rw,relatime,data=ordered 0 0 |
| /dev/loop2 /mnt/dir_1 ext4 rw,relatime,data=ordered 0 0 |
| /dev/loop1 /mnt/dir_12 vfat rw 0 0 |
| /dev/loop4 /mnt/dir_3 ext4 ro,relatime 0 0 |
| weird\040system /mnt/weirdo unknown ro 0 0 |
| tmpfs /mnt/spaced\040dir tmpfs ro 0 0 |
| tmpfs /mnt/\134 tmpfs ro 0 0 |
| """, |
| ) |
| |
| def testOkay(self): |
| r = list(osutils.IterateMountPoints(self.proc_mount)) |
| self.assertEqual(len(r), 7) |
| self.assertEqual(r[0].source, "/dev/loop0") |
| self.assertEqual(r[1].destination, "/mnt/dir_1") |
| self.assertEqual(r[2].filesystem, "vfat") |
| self.assertEqual(r[3].options, "ro,relatime") |
| |
| def testEscape(self): |
| r = list(osutils.IterateMountPoints(self.proc_mount)) |
| self.assertEqual(r[4].source, "weird system") |
| self.assertEqual(r[5].destination, "/mnt/spaced dir") |
| self.assertEqual(r[6].destination, "/mnt/\\") |
| |
| |
| class ResolveSymlinkInRootTest(cros_test_lib.TempDirTestCase): |
| """Tests for ResolveSymlinkInRoot.""" |
| |
| def setUp(self): |
| # Create symlinks in tempdir so they are cleaned up automatically. |
| os.chdir(self.tempdir) |
| |
| def testRelativeLink(self): |
| os.symlink("target", "link") |
| self.assertEqual( |
| osutils.ResolveSymlinkInRoot("link", "/root"), "target" |
| ) |
| |
| def testRelativeLinkPath(self): |
| """Verify Path objects work.""" |
| os.symlink("target", "link") |
| self.assertEqual( |
| osutils.ResolveSymlinkInRoot(Path("link"), Path("/root")), "target" |
| ) |
| |
| def testAbsoluteLink(self): |
| os.symlink("/target", "link") |
| self.assertEqual( |
| osutils.ResolveSymlinkInRoot("link", "/root"), "/root/target" |
| ) |
| |
| def testRecursion(self): |
| os.symlink("target", "link1") |
| os.symlink("link1", "link2") |
| self.assertEqual( |
| osutils.ResolveSymlinkInRoot("link2", "/root"), "target" |
| ) |
| |
| def testRecursionWithAbsoluteLink(self): |
| os.symlink("target", "link1") |
| os.symlink("/link1", "link2") |
| self.assertEqual(osutils.ResolveSymlinkInRoot("link2", "."), "./target") |
| |
| |
| class ResolveSymlinkTest(cros_test_lib.TempDirTestCase): |
| """Tests for ResolveSymlink.""" |
| |
| def setUp(self): |
| self.file_path = self.tempdir / "file" |
| self.dir_path = self.tempdir / "directory" |
| osutils.Touch(self.file_path) |
| osutils.SafeMakedirs(self.dir_path) |
| |
| os.chdir(self.tempdir) |
| osutils.SafeSymlink(self.file_path, "abs_file_symlink") |
| osutils.SafeSymlink("./file", "rel_file_symlink") |
| osutils.SafeSymlink(self.dir_path, "abs_dir_symlink") |
| osutils.SafeSymlink("./directory", "rel_dir_symlink") |
| |
| self.abs_file_symlink = self.tempdir / "abs_file_symlink" |
| self.rel_file_symlink = self.tempdir / "rel_file_symlink" |
| self.abs_dir_symlink = self.tempdir / "abs_dir_symlink" |
| self.rel_dir_symlink = self.tempdir / "rel_dir_symlink" |
| |
| def testAbsoluteResolution(self): |
| """Test absolute path resolutions using Path objects.""" |
| self.assertEqual( |
| self.file_path, osutils.ResolveSymlink(self.abs_file_symlink) |
| ) |
| self.assertEqual( |
| self.dir_path, osutils.ResolveSymlink(self.abs_dir_symlink) |
| ) |
| |
| def testAbsoluteResolutionStr(self): |
| """Test absolute path resolutions using strings.""" |
| self.assertEqual( |
| str(self.file_path), |
| osutils.ResolveSymlink(str(self.abs_file_symlink)), |
| ) |
| self.assertEqual( |
| str(self.dir_path), |
| osutils.ResolveSymlink(str(self.abs_dir_symlink)), |
| ) |
| |
| def testRelativeResolution(self): |
| """Test relative path resolutions using Path objects.""" |
| self.assertEqual( |
| self.file_path, osutils.ResolveSymlink(self.rel_file_symlink) |
| ) |
| self.assertEqual( |
| self.dir_path, osutils.ResolveSymlink(self.rel_dir_symlink) |
| ) |
| |
| def testRelativeResolutionStr(self): |
| """Test relative path resolutions using strings.""" |
| self.assertEqual( |
| str(self.file_path), |
| osutils.ResolveSymlink(str(self.rel_file_symlink)), |
| ) |
| self.assertEqual( |
| str(self.dir_path), |
| osutils.ResolveSymlink(str(self.rel_dir_symlink)), |
| ) |
| |
| |
| class IsInsideVmTest(cros_test_lib.MockTempDirTestCase): |
| """Test osutils.IsInsideVmTest function.""" |
| |
| def setUp(self): |
| self.model_file = os.path.join(self.tempdir, "sda", "device", "model") |
| osutils.SafeMakedirs(os.path.dirname(self.model_file)) |
| self.mock_glob = self.PatchObject( |
| glob, "glob", return_value=[self.model_file] |
| ) |
| |
| def testIsInsideVm(self): |
| osutils.WriteFile(self.model_file, "VBOX") |
| self.assertTrue(osutils.IsInsideVm()) |
| self.assertEqual( |
| self.mock_glob.call_args[0][0], "/sys/block/*/device/model" |
| ) |
| |
| osutils.WriteFile(self.model_file, "VMware") |
| self.assertTrue(osutils.IsInsideVm()) |
| |
| def testIsNotInsideVm(self): |
| osutils.WriteFile(self.model_file, "ST1000DM000-1CH1") |
| self.assertFalse(osutils.IsInsideVm()) |
| |
| |
| class MoveDirContentsTestCase(cros_test_lib.MockTempDirTestCase): |
| """Test MoveDirContents.""" |
| |
| def setUp(self): |
| self.from_dir = self.tempdir / "from" |
| self.to_dir = self.tempdir / "to" |
| osutils.SafeMakedirs(self.from_dir) |
| osutils.SafeMakedirs(self.to_dir) |
| |
| def _crossdevice_rename(self, src, dst): |
| raise OSError(errno.EXDEV, "fake cross-device rename failure") |
| |
| def testMoveEmptyDir(self): |
| """Move empty from directory.""" |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| self.assertExists(self.from_dir) |
| osutils.MoveDirContents( |
| self.from_dir, self.to_dir, remove_from_dir=True |
| ) |
| self.assertNotExists(self.from_dir) |
| self.assertListEqual(os.listdir(self.to_dir), []) |
| |
| def testMoveFiles(self): |
| """Move files from source to destination.""" |
| osutils.WriteFile(self.from_dir / "a.txt", "aaa") |
| osutils.WriteFile(self.from_dir / "b.txt", "bbb") |
| osutils.WriteFile(self.from_dir / ".hidden", "hiden") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| self.assertFileContents(self.to_dir / "a.txt", "aaa") |
| self.assertFileContents(self.to_dir / "b.txt", "bbb") |
| self.assertFileContents(self.to_dir / ".hidden", "hiden") |
| self.assertExists(self.from_dir) |
| self.assertNotExists(self.from_dir / "a.txt") |
| self.assertNotExists(self.from_dir / "b.txt") |
| self.assertNotExists(self.from_dir / ".hidden") |
| |
| def testMoveFilesAndDelete(self): |
| """Move files from source to destination and delete source.""" |
| osutils.WriteFile(self.from_dir / "a.txt", "aaa") |
| osutils.WriteFile(self.from_dir / "b.txt", "bbb") |
| osutils.MoveDirContents( |
| self.from_dir, self.to_dir, remove_from_dir=True |
| ) |
| self.assertNotExists(self.from_dir) |
| |
| def testNonEmptyDestination(self): |
| """Move files from source to destination, which has contents.""" |
| osutils.WriteFile(self.to_dir / "a.txt", "aaa") |
| osutils.WriteFile(self.to_dir / "b.txt", "bbb") |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| osutils.MoveDirContents(self.from_dir, self.to_dir, allow_nonempty=True) |
| |
| def testMoveDir(self): |
| """Move files and directory from source to destination.""" |
| osutils.WriteFile(self.from_dir / "a.txt", "aaa") |
| osutils.SafeMakedirs(self.from_dir / "b") |
| osutils.WriteFile(self.from_dir / "b" / "b.txt", "bbb") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| self.assertFileContents(self.to_dir / "a.txt", "aaa") |
| self.assertFileContents(self.to_dir / "b" / "b.txt", "bbb") |
| self.assertExists(self.from_dir) |
| self.assertNotExists(self.from_dir / "b" / "b.txt") |
| |
| def testSymlink(self): |
| """Move symlink from source to destination.""" |
| osutils.WriteFile(self.tempdir / "a.txt", "aaa") |
| (self.from_dir / "sym.txt").symlink_to(self.tempdir / "a.txt") |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| self.assertTrue((self.to_dir / "sym.txt").is_symlink()) |
| self.assertEqual( |
| os.readlink(self.to_dir / "sym.txt"), str(self.tempdir / "a.txt") |
| ) |
| |
| def testSymlinkTargetDoesntExist(self): |
| """Move symlink from source to destination when target doesn't exist.""" |
| (self.from_dir / "sym.txt").symlink_to(self.tempdir / "a.txt") |
| self.assertNotExists(self.tempdir / "a.txt") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| |
| self.assertTrue((self.to_dir / "sym.txt").is_symlink()) |
| self.assertNotExists(self.tempdir / "a.txt") |
| self.assertEqual( |
| os.readlink(self.to_dir / "sym.txt"), str(self.tempdir / "a.txt") |
| ) |
| |
| def testSymlinkTargetDoesntExistCrossDevice(self): |
| """Move symlink from source to destination, cross-device. |
| |
| Cover a few cases when we can't do easy os.rename(), such as when |
| moving across filesystem boundaries. |
| """ |
| # Mock os.rename() to fail, so shutil will fall back to copy |
| # operations. |
| _ = self.PatchObject(os, "rename", side_effect=self._crossdevice_rename) |
| |
| (self.from_dir / "sym.txt").symlink_to(self.tempdir / "a.txt") |
| (self.to_dir / "sym.txt").symlink_to(self.tempdir / "b.txt") |
| self.assertNotExists(self.tempdir / "a.txt") |
| self.assertNotExists(self.tempdir / "b.txt") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir, allow_nonempty=True) |
| |
| self.assertTrue((self.to_dir / "sym.txt").is_symlink()) |
| self.assertNotExists(self.tempdir / "a.txt") |
| self.assertEqual( |
| os.readlink(self.to_dir / "sym.txt"), |
| str(self.tempdir / "a.txt"), |
| ) |
| |
| def testMoveDirCrossDevice(self): |
| """Move dir across filesystem boundaries.""" |
| # Mock os.rename() to fail, so shutil will fall back to copy |
| # operations. |
| _ = self.PatchObject(os, "rename", side_effect=self._crossdevice_rename) |
| |
| osutils.SafeMakedirs(self.from_dir / "b") |
| osutils.WriteFile(self.from_dir / "b" / "a.txt", "aaa") |
| osutils.WriteFile(self.from_dir / "b" / "b.txt", "bbb") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| self.assertFileContents(self.to_dir / "b" / "a.txt", "aaa") |
| self.assertFileContents(self.to_dir / "b" / "b.txt", "bbb") |
| self.assertExists(self.from_dir) |
| self.assertNotExists(self.from_dir / "b" / "a.txt") |
| self.assertNotExists(self.from_dir / "b" / "b.txt") |
| |
| def testOverWriteFiles(self): |
| """Move files with same name from source to destination.""" |
| # test dotfiles in top and multiple level directories. |
| D = cros_test_lib.Directory |
| src_layout = ( |
| D("a", ["a.txt"]), |
| D("b", ["b.txt"]), |
| "top.txt", |
| ".hidden", |
| ) |
| dest_layout = ( |
| D("a", ["a.txt", ".hidden", "b.txt"]), |
| "top.txt", |
| ".hidden", |
| # test a directory in source directory having a file with same name |
| # in destination. |
| "b", |
| ) |
| cros_test_lib.CreateOnDiskHierarchy(self.from_dir, src_layout) |
| cros_test_lib.CreateOnDiskHierarchy(self.to_dir, dest_layout) |
| osutils.WriteFile(self.from_dir / "top.txt", "aaa") |
| osutils.WriteFile(self.from_dir / ".hidden", "hidden") |
| osutils.WriteFile(self.from_dir / "a" / "a.txt", "aaa") |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir, allow_nonempty=True) |
| self.assertFileContents(self.to_dir / "top.txt", "aaa") |
| self.assertFileContents(self.to_dir / ".hidden", "hidden") |
| self.assertFileContents(self.to_dir / "a" / "a.txt", "aaa") |
| self.assertExists(self.to_dir / "a" / ".hidden") |
| self.assertExists(self.to_dir / "a" / "b.txt") |
| self.assertTrue((self.to_dir / "b").is_dir()) |
| self.assertExists(self.to_dir / "b" / "b.txt") |
| |
| def testOverlaidDirs(self): |
| """Move files with overlapping directories.""" |
| D = cros_test_lib.Directory |
| src_layout = ( |
| D("a", ["foo.txt"]), |
| D("b", []), |
| ) |
| dest_layout = (D("a", ["bar.txt"]),) |
| cros_test_lib.CreateOnDiskHierarchy(self.from_dir, src_layout) |
| cros_test_lib.CreateOnDiskHierarchy(self.to_dir, dest_layout) |
| |
| osutils.MoveDirContents(self.from_dir, self.to_dir, allow_nonempty=True) |
| |
| self.assertNotExists(self.from_dir / "a") |
| self.assertNotExists(self.from_dir / "b") |
| self.assertExists(self.to_dir / "a" / "foo.txt") |
| self.assertExists(self.to_dir / "a" / "bar.txt") |
| self.assertExists(self.to_dir / "b") |
| |
| def testSameDirectory(self): |
| """Test source and destination directory are the same.""" |
| osutils.MoveDirContents(self.from_dir, self.from_dir) |
| self.assertExists(self.from_dir) |
| osutils.MoveDirContents( |
| self.from_dir, self.from_dir, remove_from_dir=True |
| ) |
| self.assertExists(self.from_dir) |
| |
| def testMissingDirectory(self): |
| """Test source and destination directory missing case.""" |
| osutils.RmDir(self.from_dir) |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| osutils.RmDir(self.to_dir) |
| osutils.SafeMakedirs(self.from_dir) |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.MoveDirContents(self.from_dir, self.to_dir) |
| |
| |
| class CopyDirContentsTestCase(cros_test_lib.TempDirTestCase): |
| """Test CopyDirContents.""" |
| |
| def testCopyEmptyDir(self): |
| """Copy "empty" contents from a dir.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.CopyDirContents(in_dir, out_dir) |
| |
| def testCopyFiles(self): |
| """Copy from a dir that contains files.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.WriteFile(os.path.join(in_dir, "a.txt"), "aaa") |
| osutils.WriteFile(os.path.join(in_dir, "b.txt"), "bbb") |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.CopyDirContents(in_dir, out_dir) |
| self.assertEqual( |
| osutils.ReadFile(os.path.join(out_dir, "a.txt")).strip(), "aaa" |
| ) |
| self.assertEqual( |
| osutils.ReadFile(os.path.join(out_dir, "b.txt")).strip(), "bbb" |
| ) |
| |
| def testCopyTree(self): |
| """Copy from a dir that contains files.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(os.path.join(in_dir, "a")) |
| osutils.WriteFile(os.path.join(in_dir, "a", "b.txt"), "bbb") |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.CopyDirContents(in_dir, out_dir) |
| self.assertEqual( |
| osutils.ReadFile(os.path.join(out_dir, "a", "b.txt")).strip(), "bbb" |
| ) |
| |
| def testSourceDirDoesNotExistRaises(self): |
| """Coping from a non-existent source dir raises.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(out_dir) |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.CopyDirContents(in_dir, out_dir) |
| |
| def testDestinationDirDoesNotExistRaises(self): |
| """Coping to a non-existent destination dir raises.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.CopyDirContents(in_dir, out_dir) |
| |
| def testDestinationDirNonEmptyRaises(self): |
| """Coping to a non-empty destination dir raises.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.SafeMakedirsNonRoot(os.path.join(out_dir, "blah")) |
| with self.assertRaises(osutils.BadPathsException): |
| osutils.CopyDirContents(in_dir, out_dir) |
| |
| def testDestinationDirNonEmptyAllowNonEmptySet(self): |
| """Copying to a non-empty destination with allow_nonempty succeeds.""" |
| in_dir = os.path.join(self.tempdir, "input") |
| out_dir = os.path.join(self.tempdir, "output") |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.SafeMakedirsNonRoot(os.path.join(out_dir, "blah")) |
| osutils.CopyDirContents(in_dir, out_dir, allow_nonempty=True) |
| |
| def testCopyingSymlinks(self): |
| in_dir = os.path.join(self.tempdir, "input") |
| in_dir_link = os.path.join(in_dir, "link") |
| in_dir_symlinks_dir = os.path.join(in_dir, "holding_symlink") |
| in_dir_symlinks_dir_link = os.path.join(in_dir_symlinks_dir, "link") |
| |
| out_dir = os.path.join(self.tempdir, "output") |
| out_dir_link = os.path.join(out_dir, "link") |
| out_dir_symlinks_dir = os.path.join(out_dir, "holding_symlink") |
| out_dir_symlinks_dir_link = os.path.join(out_dir_symlinks_dir, "link") |
| |
| # Create directories and symlinks appropriately. |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(in_dir_symlinks_dir) |
| os.symlink(self.tempdir, in_dir_link) |
| os.symlink(self.tempdir, in_dir_symlinks_dir_link) |
| |
| osutils.SafeMakedirsNonRoot(out_dir) |
| |
| # Actual execution. |
| osutils.CopyDirContents(in_dir, out_dir, symlinks=True) |
| |
| # Assertions. |
| self.assertTrue(os.path.islink(out_dir_link)) |
| self.assertTrue(os.path.islink(out_dir_symlinks_dir_link)) |
| |
| def testNotCopyingSymlinks(self): |
| # Create temporary to symlink against. |
| tmp_file = os.path.join(self.tempdir, "a.txt") |
| osutils.WriteFile(tmp_file, "aaa") |
| tmp_subdir = os.path.join(self.tempdir, "tmp") |
| osutils.SafeMakedirsNonRoot(tmp_subdir) |
| tmp_subdir_file = os.path.join(tmp_subdir, "b.txt") |
| osutils.WriteFile(tmp_subdir_file, "bbb") |
| |
| in_dir = os.path.join(self.tempdir, "input") |
| in_dir_link = os.path.join(in_dir, "link") |
| in_dir_symlinks_dir = os.path.join(in_dir, "holding_symlink") |
| in_dir_symlinks_dir_link = os.path.join(in_dir_symlinks_dir, "link") |
| |
| out_dir = os.path.join(self.tempdir, "output") |
| out_dir_file = os.path.join(out_dir, "link") |
| out_dir_symlinks_dir = os.path.join(out_dir, "holding_symlink") |
| out_dir_symlinks_dir_subdir = os.path.join(out_dir_symlinks_dir, "link") |
| |
| # Create directories and symlinks appropriately. |
| osutils.SafeMakedirsNonRoot(in_dir) |
| osutils.SafeMakedirsNonRoot(in_dir_symlinks_dir) |
| os.symlink(tmp_file, in_dir_link) |
| os.symlink(tmp_subdir, in_dir_symlinks_dir_link) |
| |
| osutils.SafeMakedirsNonRoot(out_dir) |
| |
| # Actual execution. |
| osutils.CopyDirContents(in_dir, out_dir, symlinks=False) |
| |
| # Assertions. |
| self.assertFalse(os.path.islink(out_dir_file)) |
| self.assertFalse(os.path.islink(out_dir_symlinks_dir_subdir)) |
| self.assertTrue(filecmp.cmp(out_dir_file, tmp_file)) |
| self.assertTrue( |
| filecmp.cmp( |
| os.path.join(out_dir_symlinks_dir_subdir, "b.txt"), |
| tmp_subdir_file, |
| ) |
| ) |
| |
| def testCopyingSymlinksAndFilesWithPathArgs(self): |
| """Copying given |Path| arguments works properly for symlinks+files.""" |
| in_dir = self.tempdir / "input" |
| osutils.SafeMakedirs(in_dir) |
| |
| tmp_file = in_dir / "a.txt" |
| tmp_file.write_text("aaa", encoding="utf-8") |
| tmp_file_link = tmp_file.with_suffix(".link") |
| tmp_file_link.symlink_to(tmp_file) |
| |
| out_dir = self.tempdir / "output" |
| osutils.SafeMakedirs(out_dir) |
| osutils.CopyDirContents(in_dir, out_dir, symlinks=True) |
| |
| out_tmp_file = out_dir / tmp_file.name |
| self.assertEqual(out_tmp_file.read_text(encoding="utf-8"), "aaa") |
| out_tmp_file_link = out_dir / tmp_file_link.name |
| self.assertEqual(Path(os.readlink(out_tmp_file_link)), tmp_file) |
| |
| def testCopyingSubDirWithPathArgs(self): |
| """Copying given |Path| arguments works properly for subdirectories.""" |
| in_dir = self.tempdir / "input" |
| osutils.SafeMakedirs(in_dir) |
| |
| tmp_file = in_dir / "subdir" / "a.txt" |
| osutils.SafeMakedirs(tmp_file.parent) |
| |
| tmp_file.write_text("aaa", encoding="utf-8") |
| |
| out_dir = self.tempdir / "output" |
| osutils.SafeMakedirs(out_dir) |
| osutils.CopyDirContents(in_dir, out_dir, symlinks=True) |
| |
| out_tmp_file = out_dir / "subdir" / tmp_file.name |
| self.assertEqual(out_tmp_file.read_text(encoding="utf-8"), "aaa") |
| |
| |
| class WhichTests(cros_test_lib.TempDirTestCase): |
| """Test Which.""" |
| |
| def setUp(self): |
| self.prog_path = os.path.join(self.tempdir, "prog") |
| osutils.Touch(self.prog_path, mode=0o755) |
| self.text_path = os.path.join(self.tempdir, "text") |
| osutils.Touch(self.text_path, mode=0o644) |
| |
| # A random path for us to validate. |
| os.environ["PATH"] = "/:%s" % (self.tempdir,) |
| |
| def testPath(self): |
| """Check $PATH/path handling.""" |
| self.assertEqual(self.prog_path, osutils.Which("prog")) |
| |
| os.environ["PATH"] = "" |
| self.assertEqual(None, osutils.Which("prog")) |
| |
| self.assertEqual( |
| self.prog_path, osutils.Which("prog", path=self.tempdir) |
| ) |
| |
| def testMode(self): |
| """Check mode handling.""" |
| self.assertEqual(self.prog_path, osutils.Which("prog")) |
| self.assertEqual(self.prog_path, osutils.Which("prog", mode=os.X_OK)) |
| self.assertEqual(self.prog_path, osutils.Which("prog", mode=os.R_OK)) |
| self.assertEqual(None, osutils.Which("text")) |
| self.assertEqual(None, osutils.Which("text", mode=os.X_OK)) |
| self.assertEqual(self.text_path, osutils.Which("text", mode=os.F_OK)) |
| |
| def testRoot(self): |
| """Check root handling.""" |
| self.assertEqual(None, osutils.Which("prog", root="/.........")) |
| self.assertEqual( |
| self.prog_path, osutils.Which("prog", path="/", root=self.tempdir) |
| ) |
| |
| |
| class UmaskTests(cros_test_lib.TestCase): |
| """Test Umask.""" |
| |
| @staticmethod |
| def getUmask(): |
| """Return the current umask setting.""" |
| # Testing this is messy because there is no syscall to look this up |
| # without side-effects. os.umask sets & queries at once. |
| m = re.search( |
| r"^Umask:\s+([0-7]+)", |
| osutils.ReadFile("/proc/self/status"), |
| flags=re.M, |
| ) |
| assert m is not None |
| return int(m.group(1), 8) |
| |
| def testBasic(self): |
| """Verify umask is saved & restored.""" |
| os.umask(0o222) |
| with osutils.UmaskContext(0o123) as old: |
| assert self.getUmask() == 0o123 |
| assert self.getUmask() == old |
| assert old == 0o222 |
| |
| |
| class TestSyncStorage(cros_test_lib.TestCase): |
| """Test sync_storage helper.""" |
| |
| def testNoArgs(self): |
| """Verify default behavior.""" |
| assert osutils.sync_storage() |
| |
| def testSudo(self): |
| """Verify sudo behavior.""" |
| assert osutils.sync_storage(sudo=True) |
| assert osutils.sync_storage(Path.cwd(), sudo=True) |
| |
| def testPath(self): |
| """Verify with path.""" |
| assert osutils.sync_storage(Path.cwd()) |
| |
| def testMissingPath(self): |
| """Verify with path that doesn't work.""" |
| assert not osutils.sync_storage("alskdjfalskdjflasjdflasjdf") |
| |
| def testPathData(self): |
| """Verify syncing path data.""" |
| assert osutils.sync_storage(".", data_only=True) |
| |
| def testPathDataNoPath(self): |
| """Verify syncing data w/out path.""" |
| with self.assertRaises(ValueError): |
| osutils.sync_storage(data_only=True) |
| |
| def testPathFilesystem(self): |
| """Verify syncing path filesystem.""" |
| assert osutils.sync_storage(".", filesystem=True) |
| |
| |
| class TestMockCmdSyncStorage(cros_test_lib.RunCommandTestCase): |
| """Test sync_storage helper with a mock run command.""" |
| |
| def testSync(self): |
| """Verify basic `sync` call.""" |
| assert osutils.sync_storage() |
| assert osutils.sync_storage(sudo=True) |
| self.assertEqual(self.rc.call_count, 0) |
| |
| def testSyncData(self): |
| """Verify basic `sync` call.""" |
| assert osutils.sync_storage(".", data_only=True, sudo=True) |
| self.rc.assertCommandContains(["sync", "--data", "."]) |
| |
| def testSyncFilesystem(self): |
| """Verify basic `sync` call.""" |
| assert osutils.sync_storage(".", filesystem=True, sudo=True) |
| self.rc.assertCommandContains(["sync", "--file-system", "."]) |
| |
| def testSyncFile(self): |
| """Verify basic `sync` call.""" |
| assert osutils.sync_storage(".", sudo=True) |
| self.rc.assertCommandContains(["sync", "."]) |
| |
| |
| class TestMockSyncStorage(cros_test_lib.TestCase): |
| """Test sync_storage helper with a mock C library.""" |
| |
| def testSync(self): |
| """Verify we call libc.sync().""" |
| m = mock.MagicMock() |
| with mock.patch.object(ctypes, "CDLL", return_value=m): |
| osutils.sync_storage() |
| m.sync.assert_called_once() |
| |
| def testFDataSync(self): |
| """Verify we call libc.fdatasync().""" |
| m = mock.MagicMock() |
| with mock.patch.object(ctypes, "CDLL", return_value=m): |
| osutils.sync_storage(".", data_only=True) |
| m.fdatasync.assert_called_once() |
| |
| def testSyncfs(self): |
| """Verify we call libc.syncfs().""" |
| m = mock.MagicMock() |
| with mock.patch.object(ctypes, "CDLL", return_value=m): |
| osutils.sync_storage(".", filesystem=True) |
| m.syncfs.assert_called_once() |
| |
| def testFsync(self): |
| """Verify we call libc.fsync().""" |
| m = mock.MagicMock() |
| with mock.patch.object(ctypes, "CDLL", return_value=m): |
| osutils.sync_storage(".") |
| m.fsync.assert_called_once() |