blob: 3aea49c31e043fcb214d319254653c7af96f1e7f [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Storage device utilities to be used in storage device based tests
"""
import logging, re, time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import base_utils
from autotest_lib.client.cros import liststorage
class StorageException(error.TestError):
"""Indicates that a storage/volume operation failed.
It is fatal to the test unless caught.
"""
pass
class StorageScanner(object):
"""Scan device for storage points.
It also performs basic operations on found storage devices as mount/umount,
creating file with randomized content or checksum file content
Each storage device is defined by a dictionary containing the following
keys:
device: the device path (e.g. /dev/sdb1)
bus: the bus name (e.g. usb, ata, etc)
model: the kind of device (e.g. Multi-Card, USB_DISK_2.0, SanDisk)
size: the size of the volume/partition ib bytes (int)
fs_uuid: the UUID for the filesystem (str)
fstype: filesystem type
is_mounted: wether the FS is mounted (0=False,1=True)
mountpoint: where the FS is mounted (if mounted=1) or a suggestion where to
mount it (if mounted=0)
Also |filter()| and |scan()| will use the same dictionary keys associated
with regular expression in order to filter a result set.
Multiple keys act in an AND-fashion way. The absence of a key in the filter
make the filter matching all the values for said key in the storage
dictionary.
Example: {'device':'/dev/sd[ab]1', 'is_mounted':'0'} will match all the
found devices which block device file is either /dev/sda1 or /dev/sdb1, AND
are not mounted, excluding all other devices from the matched result.
"""
storages = None
def __init__(self):
self.__mounted = {}
def filter(self, storage_filter={}):
"""Filters a stored scanned result returning a list of matching devices
The passed dictionary represent the filter and its values are regular
expressions (str). If an element of self.storage matches the regex
defined in all the keys for a filter, the item will be part of the
returning value.
Calling this method does not change self.storages, thus can be called
several times against the same result set
@param storage_filter: a dictionary representing the filter
@return a list of dictionaries representing the found devices after the
application of the filter. The list can be empty if no device
has been found.
"""
ret = []
for storage in self.storages:
matches = True
for key in storage_filter:
if not re.match(storage_filter[key], storage[key]):
matches = False
break
if matches:
ret.append(storage.copy())
return ret
def scan(self, storage_filter={}):
"""Scan the current storage devices
If no parameter is given, it will return all the storage devices found.
Otherwise it will internally call self.filter() with the passed
filter.
The result (being it filtered or not) will be saved in self.storages.
Such list can be (re)-filtered using self.filter().
@param storage_filter: a dict representing the filter, default is
matching anything
@return a list of found dictionaries representing the found devices.
The list can be empty if no device has been found.
"""
self.storages = liststorage.get_all()
if storage_filter:
self.storages = self.filter(storage_filter)
return self.storages
def mount_volume(self, index=None, storage_dict=None, args=""):
"""Mount the passed volume, by index or storage dictionary
Either index or storage_dict must be set, but not both at the same time.
@param index: (int) the index in self.storages for the storage
device/volume to be mounted
@param storage_dict: (dict) the storage dictionary representing the
storage device, the dictionary should be obtained from
self.storage or using self.scan() or self.filter()
@param args: (str) args to be passed to the mount command, if needed.
e.g., "-o foo,bar -t ext3"
"""
if index is None and storage_dict is None:
storage_dict = self.storages[0]
elif isinstance(index, int):
storage_dict = self.storages[index]
elif not isinstance(storage_dict, dict):
raise TypeError("either index (int) or storage_dict (dict) must be "
"not None")
if storage_dict['is_mounted']:
return
msg = 'Mounting %(device)s in %(mountpoint)s' % storage_dict
logging.info(msg)
# for mkdir we ignore the return status, it might already exist
cmd = "mkdir %(mountpoint)s" % storage_dict
utils.system_output(cmd, ignore_status=True)
cmd = "mount %s" % args
cmd += " %(device)s %(mountpoint)s" % storage_dict
utils.system_output(cmd)
storage_dict['is_mounted'] = True
self.__mounted[storage_dict['mountpoint']] = storage_dict
def umount_volume(self, index=None, storage_dict=None, args=""):
"""Un-mount the passed volume, by index or storage dictionary
Either index or storage_dict must be set, but not both at the same time.
@param index: (int) the index in self.storages for the storage
device/volume to be mounted
@param storage_dict: (dict) the storage dictionary representing the
storage device, the dictionary should be obtained from
self.storage or using self.scan() or self.filter()
@param args: (str) args to be passed to the umount command, if needed.
e.g., "-f -t" for force+lazy umount
"""
if index is None and storage_dict is None:
storage_dict = self.storages[0]
elif isinstance(index, int):
storage_dict = self.storages[index]
elif not isinstance(storage_dict, dict):
raise TypeError("either index (int) or "
"storage_dict (dict) must be not None")
if not storage_dict['is_mounted']:
return
msg = 'Unmounting %(device)s from %(mountpoint)s' % storage_dict
logging.info(msg)
cmd = "umount %s" % args
cmd += " %(device)s" % storage_dict
utils.system_output(cmd)
# we don't care if it fails, it might be busy for a /proc/mounts issue
# See BUG=chormium-os:32105
cmd = "rmdir %(mountpoint)s" % storage_dict
utils.system_output(cmd, ignore_status=True)
storage_dict['is_mounted'] = False
# if we previously mounted it, remove it from our internal list
if storage_dict['mountpoint'] in self.__mounted:
del self.__mounted[storage_dict['mountpoint']]
def unmount_all(self):
"""Unmount all volumes mounted by self.mount_volume()
"""
# we need to copy it since we are iterating over a dict which will
# change size
for volume in self.__mounted.copy():
self.umount_volume(storage_dict=self.__mounted[volume])
class StorageTester(test.test):
"""This is a class all tests about Storage can use.
It has methods to
- create random files
- compute a file's md5 checksum
- look/wait for a specific device (specified using StorageScanner
dictionary format)
"""
scanner = None
def initialize(self):
self.scanner = StorageScanner()
def wait_for_devices(self, storage_filter, time_to_sleep=1, cycles=10,
mount_volume=True):
"""Cycles |cycles| times waiting |time_to_sleep| seconds each cycle,
looking for a device matching |storage_filter|
@param storage_filter: a dictionary holding a set of storage device's
keys which are used as filter, to look for devices
@see StorageDevice class documentation
@param time_to_sleep: time (int) to wait after each |cycles|
@param cycles: number of tentatives. Use -1 for infinite.
@raises StorageException if no device can be found
@return (storage_dict, waited_time) tuple. storage_dict is the found
device list and waited_time is the time spent waiting for the
device to be found.
"""
logging.debug("scanning for %s for %d times, waiting each time "
"%d secs", storage_filter, cycles, time_to_sleep)
if mount_volume:
logging.debug("and mounting each matched volume")
else:
logging.debug("but not mounting each matched volume")
if cycles == -1:
logging.info("waiting until device is inserted, "
"no timeout has been set")
cycle = 0
while cycles == -1 or cycle < cycles:
ret = self.scanner.scan(storage_filter)
if ret:
logging.debug("found %s (mount_volume=%d)", ret, mount_volume)
if mount_volume:
for storage in ret:
self.scanner.mount_volume(storage_dict=storage)
return (ret, cycle*time_to_sleep)
else:
msg = ("storage %s not found, wait and rescan (cycle %d)" %
(storage_filter, cycle))
logging.debug(msg)
# wait a bit and rescan storage list
time.sleep(time_to_sleep)
cycle += 1
# Device still not found
msg = ('Could not find anything matching "%s" after %d seconds' %
(storage_filter, time_to_sleep*cycles))
raise StorageException(msg)
def wait_for_device(self, storage_filter, time_to_sleep=1, cycles=10,
mount_volume=True):
"""Cycles |cycles| times waiting |time_to_sleep| seconds each cycle,
looking for a device matching |storage_filter|
This method needs to match one and only one device.
@raises StorageException if no device can be found or more than one is
found
@param storage_filter: a dictionary holding a set of storage device's
keys which are used as filter, to look for devices
The filter has to be match a single device, a multiple matching
filter will lead to StorageException to e risen. Use
self.wait_for_devices() if more than one device is allowed to
be found.
@see StorageDevice class documentation
@param time_to_sleep: time (int) to wait after each |cycles|
@param cycles: number of tentatives. Use -1 for infinite.
@return (storage_dict, waited_time) tuple. storage_dict is the found
device list and waited_time is the time spent waiting for the
device to be found.
"""
storages, waited_time = self.wait_for_devices(storage_filter,
time_to_sleep=time_to_sleep,
cycles=cycles,
mount_volume=mount_volume)
if len(storages) > 1:
msg = ('filter matched more than one storage volume, use '
'%s.wait_for_devices() if you need more than one match' %
self.__class__)
raise StorageException(msg)
# return the first element if only this one has been matched
return (storages[0], waited_time)
# Some helpers not present in base_utils.py to abstract normal file operations
def create_file(path, size):
"""Create a file using /dev/urandom
@param path: the path of the file
@param size: the file size in bytes
"""
blocksize = 8192
cmd = ('dd if=/dev/urandom of="%s" bs=%d count=%s' %
(path, blocksize, (size//blocksize)))
utils.system_output(cmd)
def checksum_file(path):
"""Compute the MD5 Checksum for a file
@param path: the path of the file
@return a string with the checksum
"""
cmd = 'md5sum "%s"' % path
checksum_with_path = utils.system_output(cmd)
checksum = checksum_with_path.split()[0]
return checksum
def args_to_storage_dict(args):
"""Map args into storage dictionaries
This function is to be used (likely) in control files to obtain a storage
dictionary from command line arguments.
@param args: a list of arguments as passed to control file
@return a tuple (storage_dict, rest_of_args) where storage_dict is a
dictionary for storage filtering and rest_of_args is a dictionary
of keys which do not match storage dict keys.
"""
args_dict = base_utils.args_to_dict(args)
storage_dict = {}
logging.debug("passed arg to xform to storage dict: %s", args)
# A list of all allowed keys and their type
key_list = ('device', 'bus', 'model', 'size', 'fs_uuid', 'fstype',
'is_mounted', 'mountpoint')
def set_if_exists(src, dst, key):
"""If |src| has |key| copies its value to |dst|
@return True if |key| exists in |src|, False otherwise
"""
if key in src:
dst[key] = src[key]
return True
else:
return False
for key in key_list:
if set_if_exists(args_dict, storage_dict, key):
del args_dict[key]
# return the storage dict and the leftovers of the args to be evaluated
# later
return storage_dict, args_dict