blob: f7c0479a45d4a3f5fedf0bae59b256cf559854d6 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A module containing rootfs handler class."""
import os
import re
TMP_FILE_NAME = 'kernel_dump'
_KERNEL_MAP = {'A': '2', 'B': '4'}
_ROOTFS_MAP = {'A': '3', 'B': '5'}
_DM_DEVICE = 'verifyroot'
_DM_DEV_PATH = os.path.join('/dev/mapper', _DM_DEVICE)
class RootfsHandler(object):
"""An object to provide ChromeOS root FS related actions.
It provides functions to verify the integrity of the root FS.
@type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface
"""
def __init__(self, os_if):
self.os_if = os_if
self.root_dev = None
self.kernel_dump_file = None
self.initialized = False
def verify_rootfs(self, section):
"""Verifies the integrity of the root FS.
@param section: The rootfs to verify. May be A or B.
"""
kernel_path = self.os_if.join_part(self.root_dev,
_KERNEL_MAP[section.upper()])
rootfs_path = self.os_if.join_part(self.root_dev,
_ROOTFS_MAP[section.upper()])
# vbutil_kernel won't operate on a device, only a file.
self.os_if.run_shell_command(
'dd if=%s of=%s' % (kernel_path, self.kernel_dump_file))
vbutil_kernel = self.os_if.run_shell_command_get_output(
'vbutil_kernel --verify %s --verbose' % self.kernel_dump_file)
DM_REGEXP = re.compile(
r'dm="(?:1 )?vroot none ro(?: 1)?,(0 (\d+) .+)"')
match = DM_REGEXP.search('\n'.join(vbutil_kernel))
if not match:
return False
table = match.group(1)
partition_size = int(match.group(2)) * 512
if table.find('PARTUUID=%U/PARTNROFF=1') < 0:
return False
table = table.replace('PARTUUID=%U/PARTNROFF=1', rootfs_path)
# Cause I/O error on invalid bytes
table += ' error_behavior=eio'
self._remove_mapper()
assert not self.os_if.path_exists(_DM_DEV_PATH)
self.os_if.run_shell_command(
"dmsetup create -r %s --table '%s'" % (_DM_DEVICE, table))
assert self.os_if.path_exists(_DM_DEV_PATH)
try:
count = self.os_if.get_file_size(_DM_DEV_PATH)
return count == partition_size
except:
return False
finally:
self._remove_mapper()
def _remove_mapper(self):
"""Removes the dm device mapper used by this class."""
if self.os_if.path_exists(_DM_DEV_PATH):
self.os_if.run_shell_command_get_output(
'dmsetup remove %s' % _DM_DEVICE)
def init(self):
"""Initialize the rootfs handler object."""
self.root_dev = self.os_if.get_root_dev()
self.kernel_dump_file = self.os_if.state_dir_file(TMP_FILE_NAME)
self.initialized = True