| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| |
| from autotest_lib.client.bin import utils as bin_utils |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error, utils |
| |
| class platform_RootPartitionsNotMounted(test.test): |
| version = 1 |
| |
| _CGPT_PATH = '/usr/bin/cgpt' |
| _ROOTDEV_PATH = '/usr/bin/rootdev' |
| _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine' |
| |
| def get_root_partitions(self, device): |
| """Gets a list of root partitions of a device. |
| |
| Gets a list of root partitions of a device by calling |
| `cgpt find -t rootfs <device>`. |
| |
| Args: |
| device: The device, specified by its device file, to examine. |
| |
| Returns: |
| A list of root partitions, specified by their device file, |
| (e.g. /dev/sda1) of the given device. |
| """ |
| cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device) |
| return utils.run(cgpt_command).stdout.strip('\n').split('\n') |
| |
| def get_mounted_devices(self, mounts_file): |
| """Gets a set of mounted devices from a given mounts file. |
| |
| Gets a set of device files that are currently mounted. This method |
| parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the |
| entries with a source path under /dev/. |
| |
| Returns: |
| A set of device file names (e.g. /dev/sda1) |
| """ |
| mounted_devices = set() |
| try: |
| entries = open(mounts_file).readlines() |
| except: |
| entries = [] |
| for entry in entries: |
| node = entry.split(' ')[0] |
| if node.startswith('/dev/'): |
| mounted_devices.add(node) |
| return mounted_devices |
| |
| def get_process_executable(self, pid): |
| """Gets the executable path of a given process ID. |
| |
| Args: |
| pid: Target process ID. |
| |
| Returns: |
| The executable path of the given process ID or None on error. |
| """ |
| try: |
| return os.readlink('/proc/%s/exe' % pid) |
| except: |
| return "" |
| |
| def get_process_list(self, excluded_executables=[]): |
| """Gets a list of process IDs of active processes. |
| |
| Gets a list of process IDs of active processes by looking into /proc |
| and filters out those processes with a executable path that is |
| excluded. |
| |
| Args: |
| excluded_executables: A list of executable paths to exclude. |
| |
| Returns: |
| A list of process IDs of active processes. |
| """ |
| processes = [] |
| for path in os.listdir('/proc'): |
| if not path.isdigit(): continue |
| process_exe = self.get_process_executable(path) |
| if process_exe and process_exe not in excluded_executables: |
| processes.append(path) |
| return processes |
| |
| def run_once(self): |
| if os.geteuid() != 0: |
| raise error.TestNAError('This test needs to be run under root') |
| |
| for path in [self._CGPT_PATH, self._ROOTDEV_PATH]: |
| if not os.path.isfile(path): |
| raise error.TestNAError('%s not found' % path) |
| |
| root_device = bin_utils.get_root_device() |
| if not root_device: |
| raise error.TestNAError('Could not find the root device') |
| logging.debug('Root device: %s' % root_device) |
| |
| root_partitions = self.get_root_partitions(root_device) |
| if not root_partitions: |
| raise error.TestNAError('Could not find any root partition') |
| logging.debug('Root partitions: %s' % ', '.join(root_partitions)) |
| |
| processes = self.get_process_list([self._UPDATE_ENGINE_PATH]) |
| if not processes: |
| raise error.TestNAError('Could not find any process') |
| logging.debug('Active processes: %s' % ', '.join(processes)) |
| |
| for process in processes: |
| process_exe = self.get_process_executable(process) |
| mounts_file = '/proc/%s/mounts' % process |
| mounted_devices = self.get_mounted_devices(mounts_file) |
| for partition in root_partitions: |
| if partition in mounted_devices: |
| raise error.TestFail( |
| 'Root partition "%s" is mounted by process %s (%s)' |
| % (partition, process, process_exe)) |