| # Copyright 2019 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for VM.""" |
| |
| import fcntl |
| import logging |
| import multiprocessing |
| import os |
| import socket |
| import stat |
| import sys |
| from unittest import mock |
| |
| from chromite.cli.cros import cros_chrome_sdk |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import device |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| from chromite.lib import remote_access |
| from chromite.lib import vm |
| |
| |
| pytestmark = cros_test_lib.pytestmark_inside_only |
| |
| |
| # pylint: disable=protected-access |
| class VMTester(cros_test_lib.RunCommandTempDirTestCase): |
| """Test vm.VM.""" |
| |
| def setUp(self): |
| """Common set up method for all tests.""" |
| # Pick a port that is valid, but we can't bind normally, and is unlikely |
| # to be used in general. |
| opts = vm.VM.GetParser().parse_args(["--ssh-port=1"]) |
| opts.enable_kvm = True |
| with mock.patch.object(multiprocessing, "cpu_count", return_value=8): |
| self._vm = vm.VM(opts) |
| self._vm.use_sudo = False |
| self._vm.board = "amd64-generic" |
| self._vm.cache_dir = self.tempdir |
| self._vm.image_path = self.TempFilePath(constants.TEST_IMAGE_BIN) |
| osutils.Touch(self._vm.image_path) |
| |
| self.nested_kvm_file = self.TempFilePath("kvm_intel_nested") |
| osutils.WriteFile(self.nested_kvm_file, "N") |
| # Make the glob match the path we just created for nested_kvm_file. |
| self._vm.NESTED_KVM_GLOB = os.path.join( |
| os.path.dirname(self.nested_kvm_file), "kvm_*_nested" |
| ) |
| |
| # Satisfy QEMU version check. |
| version_str = ( |
| "QEMU emulator version 2.6.0, Copyright (c) " |
| "2003-2008 Fabrice Bellard" |
| ) |
| self.rc.AddCmdResult(partial_mock.In("--version"), stdout=version_str) |
| |
| self.ssh_port = self._vm.ssh_port |
| |
| def TempFilePath(self, file_path): |
| return os.path.join(self.tempdir, file_path) |
| |
| def TempVMPath(self, kvm_file): |
| return self.TempFilePath( |
| os.path.join("cros_vm_%d" % self.ssh_port, kvm_file) |
| ) |
| |
| def FindPathInArgs(self, args, path): |
| """Checks the called commands to see if the path is present. |
| |
| Args: |
| args: List of called commands. |
| path: Path to check if present in the called commands. |
| |
| Returns: |
| Whether the path is found in the called commands. |
| """ |
| for call in args: |
| # A typical call looks like: |
| # call(['.../chroot/usr/bin/qemu-system-x86_64', '--version'], |
| # capture_output=True) |
| if any(path in a for a in call[0][0]): |
| return True |
| return False |
| |
| def testStart(self): |
| self._vm.Start() |
| self.assertCommandContains([self._vm.qemu_path]) |
| self.assertCommandContains( |
| [ |
| "-m", |
| "8G", |
| "-smp", |
| "8", |
| "-vga", |
| "virtio", |
| "-daemonize", |
| "-device", |
| "usb-tablet", |
| ] |
| ) |
| self.assertCommandContains( |
| [ |
| "-pidfile", |
| self.TempVMPath("kvm.pid"), |
| "-chardev", |
| "pipe,id=control_pipe,path=%s" % self.TempVMPath("kvm.monitor"), |
| "-serial", |
| "file:%s" % self.TempVMPath("kvm.monitor.serial"), |
| "-mon", |
| "chardev=control_pipe", |
| ] |
| ) |
| self.assertCommandContains( |
| [ |
| "-cpu", |
| "Haswell-noTSX,-invpcid,-tsc-deadline,check", |
| ] |
| ) |
| self.assertCommandContains( |
| [ |
| "-device", |
| "virtio-net,netdev=eth0", |
| "-netdev", |
| "user,id=eth0,net=10.0.2.0/27,hostfwd=tcp:127.0.0.1:" |
| f"{self.ssh_port}-:22", |
| ] |
| ) |
| self.assertCommandContains( |
| [ |
| "-device", |
| "virtio-scsi-pci,id=scsi", |
| "-device", |
| "scsi-hd,drive=hd", |
| "-drive", |
| "if=none,id=hd,file=%s,cache=unsafe,format=raw" |
| % self.TempFilePath(constants.TEST_IMAGE_BIN), |
| ] |
| ) |
| self.assertCommandContains(["-enable-kvm"]) |
| |
| @mock.patch("chromite.lib.device.Device.WaitForBoot") |
| def testStartRetriesSuccess(self, mock_wait): |
| """Start() returns normally if WaitForBoot fails transiently once.""" |
| mock_wait.side_effect = ( |
| device.DeviceError("error"), |
| True, |
| ) |
| self._vm.Start() |
| |
| @mock.patch("chromite.lib.device.Device.WaitForBoot") |
| def testStartRetriesFailure(self, mock_wait): |
| """Start() raises a DeviceError if WaitForBoot fails all attempts.""" |
| mock_wait.side_effect = ( |
| device.DeviceError("error"), |
| device.DeviceError("error"), |
| ) |
| self.assertRaises(device.DeviceError, self._vm.Start) |
| |
| def testStartWithVMX(self): |
| """Verify vmx is enabled if the host supports nested virtualization.""" |
| osutils.WriteFile(self.nested_kvm_file, "1") |
| self._vm.Start() |
| self.assertCommandContains( |
| [ |
| "-cpu", |
| "Haswell-noTSX,-invpcid,-tsc-deadline,check,vmx=on,svm=on", |
| ] |
| ) |
| |
| def testStop(self): |
| pid = "12345" |
| self.assertEqual(self._vm.pidfile, self.TempVMPath("kvm.pid")) |
| osutils.WriteFile(self._vm.pidfile, pid) |
| self._vm.Stop() |
| self.assertCommandContains(["kill", "-9", pid]) |
| |
| def testBuiltVMImagePath(self): |
| """Verify locally built VM image path is picked up by vm.VM.""" |
| self._vm.image_path = None |
| expected_vm_image_path = os.path.join( |
| constants.SOURCE_ROOT, |
| "src", |
| "build", |
| "images", |
| self._vm.board, |
| "latest", |
| constants.TEST_IMAGE_BIN, |
| ) |
| osutils.Touch(expected_vm_image_path, makedirs=True) |
| self._vm.Start() |
| self.assertTrue( |
| self.FindPathInArgs(self.rc.call_args_list, expected_vm_image_path) |
| ) |
| |
| def testSDKVMImagePath(self): |
| """Verify vm.VM picks up the downloaded VM in the SDK.""" |
| self._vm.image_path = None |
| vm_image_dir = cros_test_lib.FakeSDKCache( |
| self._vm.cache_dir |
| ).CreateCacheReference(self._vm.board, constants.TEST_IMAGE_TAR) |
| vm_image_path = os.path.join(vm_image_dir, constants.TEST_IMAGE_BIN) |
| osutils.Touch(vm_image_path, makedirs=True) |
| self._vm.Start() |
| expected_vm_image_path = os.path.join( |
| self._vm.cache_dir, |
| "chrome-sdk/symlinks/%s+12225.0.0+%s/%s" |
| % ( |
| self._vm.board, |
| constants.TEST_IMAGE_TAR, |
| constants.TEST_IMAGE_BIN, |
| ), |
| ) |
| self.assertTrue( |
| self.FindPathInArgs(self.rc.call_args_list, expected_vm_image_path) |
| ) |
| |
| def testVMImageNotFound(self): |
| """Verify VMError is raised when a fake board image cannot be found.""" |
| self._vm.image_path = None |
| self._vm.board = "fake_board_name" |
| self.assertRaises(vm.VMError, self._vm.Start) |
| |
| def testVMImageDoesNotExist(self): |
| """Verify that VMError is raised when image path is not real.""" |
| self._vm.image_path = "/fake/path/to/the/vm/image" |
| self.assertRaises(vm.VMError, self._vm.Start) |
| |
| def testAppendBinFile(self): |
| """Verify bin file appended when image-path points to a directory.""" |
| self._vm.image_path = self.tempdir |
| self._vm.Start() |
| self.assertEqual( |
| self._vm.image_path, self.TempFilePath(constants.TEST_IMAGE_BIN) |
| ) |
| |
| def testChrootQemuPath(self): |
| """Verify that QEMU in the chroot is picked up by vm.VM.""" |
| if cros_build_lib.IsInsideChroot(): |
| self._vm._SetQemuPath() |
| self.assertTrue( |
| "usr/bin/qemu-system-x86_64" in self._vm.qemu_path |
| or "usr/libexec/qemu/bin/qemu-system-x86_64" |
| in self._vm.qemu_path |
| ) |
| |
| def testSDKQemuPath(self): |
| """Verify vm.VM picks up the downloaded QEMU in the SDK.""" |
| self._vm.qemu_path = None |
| qemu_dir_path = cros_chrome_sdk.SDKFetcher.QEMU_BIN_PATH |
| # Creates a fake SDK cache with the QEMU binary. |
| qemu_path = cros_test_lib.FakeSDKCache( |
| self._vm.cache_dir |
| ).CreateCacheReference(self._vm.board, qemu_dir_path) |
| qemu_path = os.path.join(qemu_path, "usr", "bin", "qemu-system-x86_64") |
| osutils.Touch(qemu_path, makedirs=True) |
| self._vm._SetQemuPath() |
| self.assertEqual(self._vm.qemu_path, qemu_path) |
| |
| @mock.patch("chromite.lib.vm.VM._CheckQemuMinVersion") |
| def testSystemQemuPath(self, check_min_version_mock): |
| """Verify that QEMU in the system is picked up by vm.VM.""" |
| # Skip the SDK Cache. |
| os.environ[cros_chrome_sdk.SDKFetcher.SDK_VERSION_ENV] = "None" |
| |
| # Skip the Chroot. |
| self._vm.chroot_path = "fake/path" |
| |
| # Checks the QEMU path in the system. |
| qemu_path = self.TempFilePath("qemu-system-x86_64") |
| osutils.Touch(qemu_path, mode=stat.S_IRWXU, makedirs=True) |
| qemu_dir = os.path.dirname(qemu_path) |
| |
| os.environ["PATH"] = qemu_dir |
| self._vm._SetQemuPath() |
| |
| # SetQemuPath() calls _CheckQemuMinVersion(). |
| # Checks that mock version has been called. |
| check_min_version_mock.assert_called() |
| |
| self.assertEqual(self._vm.qemu_path, qemu_path) |
| |
| def testInvalidQemuBiosPath(self): |
| """Verify that VMError is raised for nonexistent qemu bios path.""" |
| self._vm.qemu_bios_path = "/invalid/qemu/bios/path/" |
| self.assertRaises(vm.VMError, self._vm.Start) |
| |
| def testCreateQcow2Image(self): |
| """Tests that a qcow2 image is created with --copy-on-write.""" |
| self._vm.copy_on_write = True |
| initial_img_path = self._vm.image_path |
| self._vm.Start() |
| |
| # The command that creates the Qcow2 image. |
| self.assertCommandContains( |
| [ |
| self._vm.qemu_img_path, |
| "create", |
| "-f", |
| "qcow2", |
| "-o", |
| "backing_file=%s,backing_fmt=raw" % initial_img_path, |
| os.path.join(self._vm.vm_dir, "qcow2.img"), |
| ] |
| ) |
| # The command that launches a VM with the new Qcow2 image. |
| self.assertCommandContains( |
| [ |
| "-drive", |
| "if=none,id=hd,file=%s,cache=unsafe,format=qcow2" |
| % os.path.join(self._vm.vm_dir, "qcow2.img"), |
| ] |
| ) |
| |
| @mock.patch("os.path.isfile", return_value=False) |
| @mock.patch("chromite.lib.osutils.Which", return_value=None) |
| def testQemuNotFound(self, which_mock, is_file_mock): |
| """Verify that VMError is raised when qemu path cannot be set.""" |
| self.assertRaises(vm.VMError, self._vm._SetQemuPath) |
| which_mock.assert_called() |
| is_file_mock.assert_called() |
| |
| def testQemuImageNotFound(self): |
| """Veryify that VMError is raised for nonexistent qemu image path.""" |
| self._vm.copy_on_write = True |
| self._vm.qemu_img_path = "/invalid/qemu/img/path/" |
| self.assertRaises(vm.VMError, self._vm._SetQemuPath) |
| |
| def testRmVMDir(self): |
| """Verify that the vm directory is removed after calling RmVMDir.""" |
| self.assertExists(self._vm.vm_dir) |
| self._vm.use_sudo = False |
| self._vm.Stop() |
| self.assertNotExists(self._vm.vm_dir) |
| |
| @mock.patch("chromite.lib.osutils.SafeMakedirs", return_value=False) |
| def testCreateVMDirError(self, make_dir_mock): |
| """Verify an error is raised when vm_dir is not a valid directory.""" |
| self._vm.vm_dir = "/not/a/valid/dir" |
| self.assertRaises(AssertionError, self._vm._CreateVMDir) |
| make_dir_mock.assert_called() |
| |
| @mock.patch("chromite.lib.osutils.SafeMakedirs", return_value=False) |
| def testCreateVMDirLinkError(self, make_dir_mock): |
| """Verify that an error is raised when vm_dir is a symbolic link.""" |
| # Create the symlink. |
| symlink = self.TempFilePath("symlink") |
| os.symlink(self.TempFilePath("fakepath"), symlink) |
| self._vm.vm_dir = symlink |
| |
| self.assertRaises(AssertionError, self._vm._CreateVMDir) |
| make_dir_mock.assert_called() |
| |
| @mock.patch("chromite.lib.osutils.SafeMakedirs", return_value=False) |
| @mock.patch("os.getuid") |
| def testCreateVMDirStatError(self, getuid_mock, make_dir_mock): |
| """Verify an error is raised when user does not own the vm dir.""" |
| self.assertRaises(AssertionError, self._vm._CreateVMDir) |
| getuid_mock.assert_called() |
| make_dir_mock.assert_called() |
| |
| def testQemuVersionError(self): |
| """Verify VMError is raised without an expected QEMU version number.""" |
| version_str = "Fake Version String" |
| self.rc.AddCmdResult(partial_mock.In("--version"), stdout=version_str) |
| self.assertRaises(vm.VMError, self._vm._SetQemuPath) |
| |
| def testQemuVersion(self): |
| """Verify that the correct QEMU version is identified.""" |
| version_str = ( |
| "QEMU emulator version 2.8.0, Copyright (c) " |
| "2003-2008 Fabrice Bellard" |
| ) |
| self.rc.AddCmdResult(partial_mock.In("--version"), stdout=version_str) |
| self._vm._SetQemuPath() |
| self.assertEqual("2.8.0", self._vm.QemuVersion()) |
| self.assertCommandContains([self._vm.qemu_path, "--version"]) |
| |
| def testCheckQemuError(self): |
| """Verify that VMError is raised when the QEMU version is too old.""" |
| version_str = ( |
| "QEMU emulator version 2.5.0, Copyright (c) " |
| "2003-2008 Fabrice Bellard" |
| ) |
| self.rc.AddCmdResult(partial_mock.In("--version"), stdout=version_str) |
| self.assertRaises(vm.VMError, self._vm._SetQemuPath) |
| |
| def testRunError(self): |
| """Verify that VMError is raised when no action is specified.""" |
| self._vm.start = False |
| self._vm.stop = False |
| self._vm.cmd = None |
| self.assertRaises(vm.VMError, self._vm.Run) |
| |
| def testIsRunningError(self): |
| """Verify that VMError is raised when VM is not running.""" |
| self._vm.cmd = ["fake_command", "--test_cmd"] |
| self.assertRaises(vm.VMError, self._vm.Run) |
| |
| @mock.patch("chromite.lib.vm.VM.IsRunning", return_value=True) |
| def testRunRemoteCmd(self, is_running_mock): |
| """Tests that the VM runs with a specific command.""" |
| self._vm.cmd = ["fake_command", "--test_cmd"] |
| self._vm.Run() |
| self.assertCommandContains( |
| [ |
| "ssh", |
| "-p", |
| str(self.ssh_port), |
| "root@localhost", |
| "--", |
| "fake_command", |
| "--test_cmd", |
| ] |
| ) |
| is_running_mock.assert_called() |
| |
| def testGetVMPidDir(self): |
| """Verify that isRunning is False with a nonexistent directory.""" |
| self._vm.vm_dir = "fake/directory" |
| self.assertFalse(self._vm.IsRunning()) |
| |
| def testGetVMPidFile(self): |
| """Verify that isRunning is False with a nonexistent pid file.""" |
| self._vm.pidfile = "fake/pid/file" |
| self.assertFalse(self._vm.IsRunning()) |
| |
| def testPidString(self): |
| """Verify that isRunning is False if the pid is not an integer.""" |
| osutils.WriteFile(self._vm.pidfile, "fake_pid") |
| self.assertFalse(self._vm.IsRunning()) |
| |
| def testGetVMPid(self): |
| """Verify that a proper pid number kills the VM process.""" |
| # Using this process's pid to fake the VM's pid. |
| pid = str(os.getpid()) |
| osutils.WriteFile(self._vm.pidfile, pid) |
| self.assertTrue(self._vm.IsRunning()) |
| self._vm.Stop() |
| self.assertCommandContains(["kill", "-9", pid]) |
| |
| def testBiosPath(self): |
| """Verify QEMU bios path.""" |
| self._vm.qemu_bios_path = self.TempFilePath("qemu/bios/path") |
| osutils.SafeMakedirs(self._vm.qemu_bios_path) |
| self._vm.Start() |
| self.assertCommandContains(["-L", self._vm.qemu_bios_path]) |
| |
| def testQemuHost(self): |
| """Verify QEMU host forwarding.""" |
| self._vm.ssh_port = 1028 |
| self._vm.qemu_hostfwd = ["tcp:127.0.0.1:1024-:22"] |
| self._vm.Start() |
| self.assertCommandContains( |
| [ |
| "-netdev", |
| "user,id=eth0,net=10.0.2.0/27," |
| "hostfwd=tcp:127.0.0.1:1028-:22,hostfwd=tcp:127.0.0.1:1024-:22", |
| ] |
| ) |
| |
| def testQemuArgs(self): |
| """Verify QEMU arguments.""" |
| self._vm.qemu_args = ["-portrait", "-full-screen", "-no-reboot"] |
| self._vm.Start() |
| self.assertCommandContains(["-portrait", "-full-screen", "-no-reboot"]) |
| |
| def testNoDisplay(self): |
| """Check the command call's arguments when there is no display.""" |
| self._vm.display = False |
| self._vm.Start() |
| self.assertCommandContains(["-display", "none"]) |
| |
| def testWaitForSSHPort(self): |
| """Verify VM correctly waits on the SSH port if it is busy.""" |
| # Assigning an unused port to the VM SSH Port. |
| self._vm.ssh_port = remote_access.GetUnusedPort() |
| sock = socket.socket() |
| sock.bind((remote_access.LOCALHOST_IP, self._vm.ssh_port)) |
| |
| # Look for retry messages in output. |
| with cros_test_lib.LoggingCapturer(log_level=logging.INFO) as logger: |
| with self.assertRaises(vm.VMError) as ctx: |
| self._vm._WaitForSSHPort(sleep=0) |
| e = ctx.exception |
| in_use_message = "SSH port %d in use" % self._vm.ssh_port |
| self.assertEqual(in_use_message, str(e)) |
| self.assertTrue(logger.LogsMatch((in_use_message + "...\n") * 11)) |
| |
| # Verify the VM works correctly when the port is not in use. |
| # There should be no retries after the port is released. |
| # Another process could grab the unused port between closing it |
| # and calling _WaitForSSHPort but this is extremely unlikely. |
| sock.close() |
| with cros_test_lib.LoggingCapturer(log_level=logging.INFO) as logger: |
| self._vm._WaitForSSHPort() |
| self.assertEqual(logger.messages, "") |
| |
| @mock.patch( |
| "chromite.lib.remote_access.RemoteDevice.GetRunningPids", |
| return_value=[], |
| ) |
| def testWaitForProcsError(self, pid_mocker): |
| """Verify an error is raised when no chrome processes are running.""" |
| # Look for retry messages in output. |
| with cros_test_lib.LoggingCapturer(log_level=logging.INFO) as logger: |
| with self.assertRaises(vm.VMError) as ctx: |
| self._vm._WaitForProcs(sleep=0) |
| e = ctx.exception |
| |
| pid_message = "chrome pids: []\n" |
| self.assertTrue(logger.LogsContain(pid_message * 6)) |
| self.assertIn( |
| "_WaitForProcs failed: timed out while waiting for 8 chrome " |
| "processes to start.", |
| str(e), |
| ) |
| pid_mocker.assert_called() |
| |
| @mock.patch( |
| "chromite.lib.remote_access.RemoteDevice.GetRunningPids", |
| return_value=[756, 905, 1065, 1092, 1096, 1171, 1180, 1181], |
| ) |
| def testWaitForProcs(self, pid_mocker): |
| """Verify VM waits for chrome processes to launch.""" |
| # Check the log output for expected chrome pids. |
| with cros_test_lib.LoggingCapturer(log_level=logging.INFO) as logger: |
| self._vm._WaitForProcs(sleep=0) |
| self.assertEqual( |
| logger.messages, |
| "chrome pids: " "[756, 905, 1065, 1092, 1096, 1171, 1180, 1181]\n", |
| ) |
| pid_mocker.assert_called() |
| |
| @mock.patch("chromite.lib.vm.VM._WaitForProcs") |
| @mock.patch("chromite.lib.device.Device.WaitForBoot") |
| @mock.patch("chromite.lib.vm.VM.Start") |
| def testWaitForBoot(self, start_mock, boot_mock, procs_mock): |
| """Verify we wait for the VM to boot up under different conditions.""" |
| # Testing with an existing VM directory and hardware emulation. |
| self._vm.vm_dir = self.TempFilePath("vm_dir") |
| osutils.SafeMakedirs(self._vm.vm_dir) |
| self._vm.enable_kvm = True |
| self._vm.WaitForBoot() |
| start_mock.assert_not_called() |
| boot_mock.assert_called() |
| procs_mock.assert_not_called() |
| |
| start_mock.reset_mock() |
| boot_mock.reset_mock() |
| procs_mock.reset_mock() |
| |
| # Testing with a non-existent VM directory and software emulation. |
| self._vm._RmVMDir() |
| self._vm.enable_kvm = False |
| self._vm.WaitForBoot() |
| start_mock.assert_called() |
| boot_mock.assert_called() |
| procs_mock.assert_called() |
| |
| @mock.patch("fcntl.fcntl") |
| def testSaveVMImageOnShutdownBasic(self, fcntl_mock): |
| # mock.mock_open only seems to properly mock out read, not readline, so |
| # do it ourselves. |
| def readline_impl(): |
| readline_impl.count += 1 |
| if readline_impl.count == 1: |
| return "some_output\n" |
| return "thisisafakecommand\n" |
| |
| readline_impl.count = 0 |
| |
| fcntl_mock.return_value = 0 |
| |
| builtin = "__builtin__" if sys.version_info[0] == 2 else "builtins" |
| m = mock.mock_open() |
| filehandle = m() |
| filehandle.readline.side_effect = readline_impl |
| with mock.patch("%s.open" % builtin, m, create=True): |
| self._vm.SaveVMImageOnShutdown("/some/dir/") |
| |
| self.assertTrue(self._vm.copy_image_on_shutdown) |
| self.assertEqual(self._vm.image_copy_dir, "/some/dir/") |
| |
| write_calls = [ |
| mock.call("savevm chromite_lib_vm_snapshot\n"), |
| mock.call("thisisafakecommand\n"), |
| ] |
| filehandle.write.assert_has_calls(write_calls) |
| self.assertEqual(filehandle.write.call_count, 2) |
| |
| fcntl_calls = [ |
| mock.call(mock.ANY, fcntl.F_GETFL), |
| mock.call(mock.ANY, fcntl.F_SETFL, os.O_NONBLOCK), |
| ] |
| fcntl_mock.assert_has_calls(fcntl_calls) |
| self.assertEqual(fcntl_mock.call_count, 2) |
| |
| @mock.patch("fcntl.fcntl") |
| @mock.patch("time.time") |
| @mock.patch("time.sleep") |
| def testSaveVMImageOnShutdownTimeout( |
| self, sleep_mock, time_mock, fcntl_mock |
| ): |
| def time_impl(): |
| time_impl.count += 1 |
| if time_impl.count <= 2: |
| return 0 |
| if time_impl.count == 3: |
| return 30 |
| if time_impl.count == 4: |
| return 31 |
| return 100 |
| |
| time_impl.count = 0 |
| time_mock.side_effect = time_impl |
| |
| fcntl_mock.return_value = 0 |
| |
| self._vm.copy_on_write = True |
| builtin = "__builtin__" if sys.version_info[0] == 2 else "builtins" |
| m = mock.mock_open() |
| filehandle = m() |
| filehandle.readline.side_effect = IOError( |
| "Resource temporarily unavailable" |
| ) |
| with mock.patch("%s.open" % builtin, m, create=True): |
| self._vm.SaveVMImageOnShutdown("/some/dir/") |
| |
| self.assertEqual(sleep_mock.call_count, 1) |
| |
| self.assertTrue(self._vm.copy_image_on_shutdown) |
| self.assertEqual(self._vm.image_copy_dir, "/some/dir/") |
| |
| write_calls = [ |
| mock.call("savevm chromite_lib_vm_snapshot\n"), |
| mock.call("thisisafakecommand\n"), |
| ] |
| filehandle.write.assert_has_calls(write_calls) |
| self.assertEqual(filehandle.write.call_count, 2) |
| |
| fcntl_calls = [ |
| mock.call(mock.ANY, fcntl.F_GETFL), |
| mock.call(mock.ANY, fcntl.F_SETFL, os.O_NONBLOCK), |
| ] |
| fcntl_mock.assert_has_calls(fcntl_calls) |
| self.assertEqual(fcntl_mock.call_count, 2) |