| # -*- coding: utf-8 -*- |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| # DESCRIPTION : |
| # |
| # This is a factory test to test removable storage devices. |
| # We implement the following tests: |
| # * Read/Write test |
| # * Lock (write protection) test |
| |
| import cairo |
| import gtk |
| import os |
| import pango |
| import pyudev |
| import pyudev.glib |
| import random |
| import time |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import factory_setup_modules |
| from cros.factory.test import factory |
| from cros.factory.test import task |
| from cros.factory.test import ui |
| |
| |
| _STATE_RW_TEST_WAIT_INSERT = 1 |
| _STATE_RW_TEST_WAIT_REMOVE = 2 |
| _STATE_LOCKTEST_WAIT_INSERT = 3 |
| _STATE_LOCKTEST_WAIT_REMOVE = 4 |
| |
| # udev constants |
| _UDEV_ACTION_INSERT = 'add' |
| _UDEV_ACTION_REMOVE = 'remove' |
| _UDEV_MMCBLK_PATH = '/dev/mmcblk' |
| # USB card reader attributes and common text string in descriptors |
| _USB_CARD_ATTRS = ['vendor', 'model', 'product', 'configuration', |
| 'manufacturer'] |
| _USB_CARD_DESCS = ['card', 'reader'] |
| |
| # The GPT ( http://en.wikipedia.org/wiki/GUID_Partition_Table ) |
| # occupies the first 34 and the last 33 512-byte blocks. |
| # |
| # We don't want to upset kernel by changing the partition table. |
| # Skip the first 34 and the last 33 512-byte blocks when doing |
| # read/write tests. |
| _SECTOR_SIZE = 512 |
| _SKIP_HEAD_BLOCK = 34 |
| _SKIP_TAIL_BLOCK = 33 |
| |
| # Read/Write test modes |
| _RW_TEST_MODE_RANDOM = 1 |
| _RW_TEST_MODE_SEQUENTIAL = 2 |
| |
| _RW_TEST_INSERT_FMT_STR = lambda t: ( |
| '\n'.join(['insert %s drive for read/write test...' % t, |
| 'WARNING: DATA ON INSERTED MEDIA WILL BE LOST!\n', |
| '插入%s存储以进行读写测试...' % t, |
| '注意: 插入装置上的资料将会被清除!', |
| ])) |
| _REMOVE_FMT_STR = lambda t: 'remove %s drive...\n提取%s存储...' % (t, t) |
| _TESTING_FMT_STR = lambda t:'testing %s...\n%s 检查当中...' % (t, t) |
| _LOCKTEST_INSERT_FMT_STR = lambda t: ( |
| '\n'.join(['toggle lock switch and insert %s drive again...' % t, |
| '切换写保护开关并再次插入%s存储...' % t])) |
| _LOCKTEST_REMOVE_FMT_STR = lambda t: ( |
| '\n'.join(['remove %s drive and toggle lock switch...' % t, |
| '提取%s存储并关闭写保护开关...' % t])) |
| _ERR_TOO_EARLY_REMOVE_FMT_STR = \ |
| lambda t: \ |
| 'Device removed too early (%s).\n' \ |
| '太早移除外部储存装置 (%s).\n' % (t, t) |
| _ERR_TEST_FAILED_FMT_STR = lambda test_name, target_dev: ( |
| 'IO error while running %s test on %s.\n' % |
| (test_name, target_dev)) |
| _ERR_LOCKTEST_FAILED_FMT_STR = \ |
| lambda target_dev: 'Locktest failed on %s.\n' % target_dev |
| _ERR_DEVICE_READ_ONLY_STR = \ |
| lambda target_dev: '%s is read-only.\n' % target_dev |
| |
| |
| class factory_RemovableStorage(test.test): |
| version = 1 |
| preserve_srcdir = True |
| |
| def expose_event(self, widget, event): |
| context = widget.window.cairo_create() |
| context.set_source_surface(self._image, 0, 0) |
| context.paint() |
| return True |
| |
| def get_attrs(self, device, key_set): |
| if device is None: |
| return '' |
| attrs = [device.attributes[key] for key in |
| set(device.attributes.keys()) & key_set] |
| attr_str = ' '.join(attrs).strip() |
| if len(attr_str): |
| attr_str = '/' + attr_str |
| return self.get_attrs(device.parent, key_set) + attr_str |
| |
| def get_vidpid(self, device): |
| if device is None: |
| return None |
| if device.device_type == 'usb_device': |
| attrs = device.attributes |
| if set(['idProduct', 'idVendor']) <= set(attrs.keys()): |
| vidpid = attrs['idVendor'] + ':' + attrs['idProduct'] |
| return vidpid.strip() |
| return self.get_vidpid(device.parent) |
| |
| def is_usb_cardreader(self, device): |
| attr_str = self.get_attrs(device, set(_USB_CARD_ATTRS)).lower() |
| for desc in _USB_CARD_DESCS: |
| if desc in attr_str: |
| return True |
| return False |
| |
| |
| def is_sd(self, device): |
| if device.device_node.find(_UDEV_MMCBLK_PATH) == 0: |
| return True |
| return self.is_usb_cardreader(device) |
| |
| def get_device_type(self, device): |
| if self.is_sd(device): |
| return 'SD' |
| return 'USB' |
| |
| def get_device_size(self, dev_path): |
| try: |
| dev_size = utils.system_output('blockdev --getsize64 %s' % |
| dev_path) |
| except: |
| raise error.TestError('Unable to determine dev size of %s' % |
| dev_path) |
| |
| dev_size = int(dev_size) |
| if dev_size == 0: |
| raise error.TestError('Unable to determine dev size of %s' % |
| dev_path) |
| |
| gb = dev_size / 1000.0 / 1000.0 / 1000.0 |
| factory.log('dev size of %s : %d bytes (%.3f GB)' % |
| (dev_path, dev_size, gb)) |
| |
| return dev_size |
| |
| def get_device_ro(self, dev_path): |
| try: |
| ro = utils.system_output('blockdev --getro %s' % dev_path) |
| except: |
| raise error.TestError('Unable to get RO status of %s' % dev_path) |
| |
| factory.log('%s RO : %s' % (dev_path, ro)) |
| |
| return ro == '1' |
| |
| def test_read_write(self): |
| self._prompt.set_text(_TESTING_FMT_STR(self._target_device)) |
| self._image = self.testing_image |
| self._pictogram.queue_draw() |
| task.schedule(self._invoke_test_read_write) |
| |
| def _invoke_test_read_write(self): |
| dev_path = self._target_device |
| dev_size = self.get_device_size(dev_path) |
| dev_fd = None |
| ok = True |
| total_time_read = 0.0 |
| total_time_write = 0.0 |
| |
| mode = [_RW_TEST_MODE_RANDOM] |
| if self._has_sequential_test is True: |
| mode.append(_RW_TEST_MODE_SEQUENTIAL) |
| |
| for m in mode: |
| if m == _RW_TEST_MODE_RANDOM: |
| # Read/Write one block each time |
| bytes_to_operate = self._block_size |
| loop = self._random_block_count |
| factory.log('Performing r/w test on %d %d-byte random blocks' % |
| (loop, self._block_size)) |
| elif m == _RW_TEST_MODE_SEQUENTIAL: |
| # Converts block counts into bytes |
| bytes_to_operate = (self._sequential_block_count * |
| self._block_size) |
| loop = 1 |
| factory.log('Performing sequential r/w test of %d bytes' % |
| bytes_to_operate) |
| |
| try: |
| dev_fd = os.open(dev_path, os.O_RDWR) |
| except Exception as e: |
| ok = False |
| factory.log('Unable to open %s : %s' % (dev_path, e)) |
| |
| if dev_fd is not None: |
| blocks = dev_size / _SECTOR_SIZE |
| # Determine the range in which the random block is selected |
| random_head = _SKIP_HEAD_BLOCK |
| random_tail = (blocks - |
| _SKIP_TAIL_BLOCK - |
| int(bytes_to_operate / _SECTOR_SIZE)) |
| |
| if dev_size > 0x7FFFFFFF: |
| # The following try...except section is for system that does |
| # not have large file support enabled for Python. This is |
| # typically observed on 32-bit machines. In some 32-bit |
| # machines, doing seek() with an offset larger than 0x7FFFFFFF |
| # (which is the largest possible value of singned int) will |
| # cause OverflowError, due to failed conversion from long int |
| # to int. |
| try: |
| # Test whether large file support is enabled or not. |
| os.lseek(dev_fd, 0x7FFFFFFF + 1, os.SEEK_SET) |
| except OverflowError: |
| # The system does not have large file support, so we |
| # restrict the range in which we perform the random r/w |
| # test. |
| random_tail = min( |
| random_tail, |
| int(0x7FFFFFFF / _SECTOR_SIZE) - |
| int(bytes_to_operate / _SECTOR_SIZE)) |
| factory.log('No large file support') |
| |
| if random_tail < random_head: |
| raise Exception('Block size too large for r/w test') |
| |
| random.seed() |
| for x in xrange(loop): |
| # Select one random block as starting point. |
| random_block = random.randint(random_head, random_tail) |
| offset = random_block * _SECTOR_SIZE |
| |
| try: |
| os.lseek(dev_fd, offset, os.SEEK_SET) |
| read_start = time.time() |
| in_block = os.read(dev_fd, bytes_to_operate) |
| read_finish = time.time() |
| except Exception as e: |
| factory.log('Failed to read block %s' % e) |
| ok = False |
| break |
| |
| if m == _RW_TEST_MODE_RANDOM: |
| # Modify the first byte and write the whole block back. |
| out_block = chr(ord(in_block[0]) ^ 0xff) + in_block[1:] |
| elif m == _RW_TEST_MODE_SEQUENTIAL: |
| out_block = chr(0x00) * bytes_to_operate |
| try: |
| os.lseek(dev_fd, offset, os.SEEK_SET) |
| write_start = time.time() |
| os.write(dev_fd, out_block) |
| os.fsync(dev_fd) |
| write_finish = time.time() |
| except Exception as e: |
| factory.log('Failed to write block %s' % e) |
| ok = False |
| break |
| |
| # Check if the block was actually written, and restore the |
| # original content of the block. |
| os.lseek(dev_fd, offset, os.SEEK_SET) |
| b = os.read(dev_fd, bytes_to_operate) |
| if b != out_block: |
| factory.log('Failed to write block') |
| ok = False |
| break |
| os.lseek(dev_fd, offset, os.SEEK_SET) |
| os.write(dev_fd, in_block) |
| os.fsync(dev_fd) |
| |
| total_time_read += read_finish - read_start |
| total_time_write += write_finish - write_start |
| |
| # Make sure we close() the device file so later tests won't |
| # fail. |
| os.close(dev_fd) |
| |
| if ok is False: |
| if self.get_device_ro(dev_path) is True: |
| factory.log('Is write protection on?') |
| self._error += _ERR_DEVICE_READ_ONLY_STR(dev_path) |
| test_name = '' |
| if m == _RW_TEST_MODE_RANDOM: |
| test_name = 'random r/w' |
| elif m == _RW_TEST_MODE_SEQUENTIAL: |
| test_name = 'sequential r/w' |
| self._error += _ERR_TEST_FAILED_FMT_STR(test_name, |
| self._target_device) |
| else: |
| if m == _RW_TEST_MODE_RANDOM: |
| factory.log('random_read_speed: %.3f MB/s' % |
| ((self._block_size * loop) / total_time_read / |
| 1000 / 1000)) |
| factory.log('random_write_speed: %.3f MB/s' % |
| ((self._block_size * loop) / total_time_write / |
| 1000 / 1000)) |
| elif m == _RW_TEST_MODE_SEQUENTIAL: |
| factory.log('sequential_read_speed: %.3f MB/s' % |
| (bytes_to_operate / total_time_read / |
| 1000 / 1000)) |
| factory.log('sequential_write_speed: %.3f MB/s' % |
| (bytes_to_operate / total_time_write / |
| 1000 / 1000)) |
| |
| self._prompt.set_text(_REMOVE_FMT_STR(self._media)) |
| self._state = _STATE_RW_TEST_WAIT_REMOVE |
| self._image = self.removal_image |
| self._pictogram.queue_draw() |
| |
| def test_lock(self): |
| self._prompt.set_text(_TESTING_FMT_STR(self._target_device)) |
| self._image = self.testing_image |
| self._pictogram.queue_draw() |
| task.schedule(self._invoke_test_lock) |
| |
| def _invoke_test_lock(self): |
| ro = self.get_device_ro(self._target_device) |
| |
| if ro is False: |
| self._error += _ERR_LOCKTEST_FAILED_FMT_STR(self._target_device) |
| self._prompt.set_text(_LOCKTEST_REMOVE_FMT_STR(self._media)) |
| self._state = _STATE_LOCKTEST_WAIT_REMOVE |
| self._image = self.locktest_removal_image |
| self._pictogram.queue_draw() |
| |
| def udev_event_cb(self, action, device): |
| if action == _UDEV_ACTION_INSERT: |
| if self._state == _STATE_RW_TEST_WAIT_INSERT: |
| if self._vidpid is None: |
| if self._media != self.get_device_type(device): |
| return True |
| else: |
| device_vidpid = self.get_vidpid(device) |
| if device_vidpid not in self._vidpid: |
| return True |
| factory.log('VID:PID == %s' % self._vidpid) |
| factory.log('%s device inserted : %s' % |
| (self._media, device.device_node)) |
| self._target_device = device.device_node |
| self.test_read_write() |
| elif self._state == _STATE_LOCKTEST_WAIT_INSERT: |
| factory.log('%s device inserted : %s' % |
| (self._media, device.device_node)) |
| if self._target_device == device.device_node: |
| self.test_lock() |
| elif action == _UDEV_ACTION_REMOVE: |
| if self._target_device == device.device_node: |
| factory.log('Device removed : %s' % device.device_node) |
| if self._state == _STATE_RW_TEST_WAIT_REMOVE: |
| if self._has_locktest: |
| self._prompt.set_text( |
| _LOCKTEST_INSERT_FMT_STR(self._media)) |
| self._state = _STATE_LOCKTEST_WAIT_INSERT |
| self._image = self.locktest_insertion_image |
| self._pictogram.queue_draw() |
| else: |
| gtk.main_quit() |
| elif self._state == _STATE_LOCKTEST_WAIT_REMOVE: |
| gtk.main_quit() |
| else: |
| self._error += _ERR_TOO_EARLY_REMOVE_RND_TEST_FMT_STR( |
| self._target_device) |
| factory.log('Device %s removed too early' % |
| self._target_device) |
| gtk.main_quit() |
| return True |
| |
| def run_once(self, |
| media=None, |
| vidpid=None, |
| block_size=1024, # in bytes |
| random_block_count=3, # Number of blocks for random test |
| perform_sequential_test=False, |
| sequential_block_count=1024, # Number of blocks for seq. test |
| perform_locktest=False): |
| |
| factory.log('%s run_once' % self.__class__) |
| |
| self._error = '' |
| self._target_device = None |
| |
| os.chdir(self.srcdir) |
| |
| self._media = media |
| if vidpid is None: |
| self._vidpid = None |
| elif type(vidpid) != type(list()): |
| # Convert vidpid to a list. |
| self._vidpid = [vidpid] |
| else: |
| self._vidpid = vidpid |
| |
| self._block_size = block_size |
| self._random_block_count = random_block_count |
| self._has_sequential_test = perform_sequential_test |
| self._sequential_block_count = sequential_block_count |
| |
| factory.log('media = %s' % media) |
| |
| self.insertion_image = cairo.ImageSurface.create_from_png( |
| '%s_insert.png' % self._media) |
| self.removal_image = cairo.ImageSurface.create_from_png( |
| '%s_remove.png' % self._media) |
| self.testing_image = cairo.ImageSurface.create_from_png( |
| '%s_testing.png' % self._media) |
| |
| self._has_locktest = perform_locktest |
| if perform_locktest: |
| self.locktest_insertion_image = cairo.ImageSurface.create_from_png( |
| '%s_locktest_insert.png' % self._media) |
| self.locktest_removal_image = cairo.ImageSurface.create_from_png( |
| '%s_locktest_remove.png' % self._media) |
| |
| image_size_set = set([(i.get_width(), i.get_height()) for |
| i in [self.insertion_image, |
| self.removal_image, |
| self.testing_image]]) |
| assert len(image_size_set) == 1 |
| image_size = image_size_set.pop() |
| |
| label = gtk.Label('') |
| label.modify_font(pango.FontDescription('courier new condensed 20')) |
| label.set_alignment(0.5, 0.5) |
| label.modify_fg(gtk.STATE_NORMAL, gtk.gdk.color_parse('light green')) |
| self._prompt = label |
| |
| self._prompt.set_text(_RW_TEST_INSERT_FMT_STR(self._media)) |
| self._state = _STATE_RW_TEST_WAIT_INSERT |
| self._image = self.insertion_image |
| context = pyudev.Context() |
| monitor = pyudev.Monitor.from_netlink(context) |
| monitor.filter_by(subsystem='block', device_type='disk') |
| observer = pyudev.glib.GUDevMonitorObserver(monitor) |
| observer.connect('device-event', |
| lambda observer, action, device: \ |
| self.udev_event_cb(action, device)) |
| monitor.start() |
| |
| drawing_area = gtk.DrawingArea() |
| drawing_area.set_size_request(*image_size) |
| drawing_area.connect('expose_event', self.expose_event) |
| self._pictogram = drawing_area |
| hbox = gtk.HBox() |
| hbox.pack_start(drawing_area, expand=True, fill=False) |
| |
| vbox = gtk.VBox() |
| vbox.pack_start(hbox, False, False) |
| vbox.pack_start(label, False, False) |
| |
| test_widget = gtk.EventBox() |
| test_widget.modify_bg(gtk.STATE_NORMAL, gtk.gdk.color_parse('black')) |
| test_widget.add(vbox) |
| |
| ui.run_test_widget(self.job, test_widget) |
| |
| if self._error: |
| raise error.TestFail(self._error) |
| |
| factory.log('%s run_once finished' % self.__class__) |