blob: 96c9eaebdb6ad0ae74e61815c5c53d0891b2b695 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION :
#
# This is a factory test to test external SD and USB ports.
import cairo
import gtk
import os
import pango
import pyudev
import pyudev.glib
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.test import task
from cros.factory.test import ui
_STATE_WAIT_INSERT = 1
_STATE_WAIT_REMOVE = 2
_STATE_LOCKTEST_WAIT_INSERT = 3
_STATE_LOCKTEST_WAIT_REMOVE = 4
# udev constants
_UDEV_ACTION_INSERT = 'add'
_UDEV_ACTION_REMOVE = 'remove'
_UDEV_MMCBLK_PATH = '/dev/mmcblk'
# USB card reader attributes and common text string in descriptors
_USB_CARD_ATTRS = ['vendor', 'model', 'product', 'configuration',
'manufacturer']
_USB_CARD_DESCS = ['card', 'reader']
_INSERT_FMT_STR = lambda t: (
'\n'.join(['insert %s drive...' % t,
'WARNING: DATA ON INSERTED MEDIA WILL BE LOST!\n',
'插入%s存储...' % t,
'注意: 插入装置上的资料将会被清除!',
]))
_REMOVE_FMT_STR = lambda t: 'remove %s drive...\n提取%s存储...' % (t, t)
_TESTING_FMT_STR = lambda t:'testing %s...\n%s 检查当中...' % (t, t)
_LOCKTEST_INSERT_FMT_STR = lambda t: (
'\n'.join(['toggle lock switch and insert %s drive again...' % t,
'切换写保护开关并再次插入%s存储...' % t]))
_LOCKTEST_REMOVE_FMT_STR = lambda t: (
'\n'.join(['remove %s drive and toggle lock switch...' % t,
'提取%s存储并关闭写保护开关...' % t]))
_ERR_TOO_EARLY_REMOVE_FMT_STR = \
lambda t: \
'Device removed too early (%s).\n' \
'太早移除外部储存装置 (%s).\n' % (t, t)
_ERR_FIO_TEST_FAILED_FMT_STR = \
lambda target_dev: 'IO error while running test on %s.\n' % target_dev
_ERR_LOCKTEST_FAILED_FMT_STR = \
lambda target_dev: 'Locktest failed on %s.\n' % target_dev
class factory_ExternalStorage(test.test):
version = 1
preserve_srcdir = True
def expose_event(self, widget, event):
context = widget.window.cairo_create()
context.set_source_surface(self._image, 0, 0)
context.paint()
return True
def get_attrs(self, device, key_set):
if device is None:
return ''
attrs = [device.attributes[key] for key in
set(device.attributes.keys()) & key_set]
attr_str = ' '.join(attrs).strip()
if len(attr_str):
attr_str = '/' + attr_str
return self.get_attrs(device.parent, key_set) + attr_str
def get_vidpid(self, device):
if device is None:
return None
if device.device_type == 'usb_device':
attrs = device.attributes
if set(['idProduct', 'idVendor']) <= set(attrs.keys()):
vidpid = attrs['idVendor'] + ':' + attrs['idProduct']
return vidpid.strip()
return self.get_vidpid(device.parent)
def is_usb_cardreader(self, device):
attr_str = self.get_attrs(device, set(_USB_CARD_ATTRS)).lower()
for desc in _USB_CARD_DESCS:
if desc in attr_str:
return True
return False
def is_sd(self, device):
if device.device_node.find(_UDEV_MMCBLK_PATH) == 0:
return True
return self.is_usb_cardreader(device)
def get_device_type(self, device):
if self.is_sd(device):
return 'SD'
return 'USB'
def test_read_write(self, subtest_tag):
self._prompt.set_text(_TESTING_FMT_STR(self._target_device))
self._image = self.testing_image
self._pictogram.queue_draw()
task.schedule(self._invoke_test_read_write, subtest_tag)
def _invoke_test_read_write(self, subtest_tag):
devpath = self._target_device
requirement = {'read_write_verify': ['read_bw', 'write_bw']}
constraint = list()
if self._min_read_speed is not None:
constraint.append(
'read_bw_read_write_verify >= %d * 1024 * 1024' %
self._min_read_speed)
if self._min_write_speed is not None:
constraint.append(
'write_bw_read_write_verify >= %d * 1024 * 1024' %
self._min_write_speed)
self.job.drop_caches_between_iterations = True
result = self.job.run_test('hardware_StorageFio',
dev=devpath,
tag=subtest_tag + "rwv",
requirements=requirement,
constraints=constraint)
if result is not True:
self._error += _ERR_FIO_TEST_FAILED_FMT_STR(self._target_device)
self._prompt.set_text(_REMOVE_FMT_STR(self._media))
self._state = _STATE_WAIT_REMOVE
self._image = self.removal_image
self._pictogram.queue_draw()
def test_lock(self, subtest_tag):
self._prompt.set_text(_TESTING_FMT_STR(self._target_device))
self._image = self.testing_image
self._pictogram.queue_draw()
task.schedule(self._invoke_test_lock, subtest_tag)
def _invoke_test_lock(self, subtest_tag):
devpath = self._target_device
requirement = {'read_write_verify': list()}
result = self.job.run_test('hardware_StorageFio',
dev=devpath,
tag=subtest_tag + "lt",
requirements = requirement)
if result is True:
self._error += _ERR_LOCKTEST_FAILED_FMT_STR(self._target_device)
self._prompt.set_text(_LOCKTEST_REMOVE_FMT_STR(self._media))
self._state = _STATE_LOCKTEST_WAIT_REMOVE
self._image = self.locktest_removal_image
self._pictogram.queue_draw()
def udev_event_cb(self, subtest_tag, action, device):
if action == _UDEV_ACTION_INSERT:
if self._state == _STATE_WAIT_INSERT:
if self._vidpid is None:
if self._media != self.get_device_type(device):
return True
else:
device_vidpid = self.get_vidpid(device)
if device_vidpid not in self._vidpid:
return True
factory.log('VID:PID == %s' % device_vidpid)
factory.log('%s device inserted : %s' %
(self._media, device.device_node))
self._target_device = device.device_node
self.test_read_write(subtest_tag)
if self._state == _STATE_LOCKTEST_WAIT_INSERT:
if self._target_device == device.device_node:
self.test_lock(subtest_tag)
elif action == _UDEV_ACTION_REMOVE:
if self._target_device == device.device_node:
factory.log('Device removed : %s' % device.device_node)
if self._state == _STATE_WAIT_REMOVE:
if self._has_locktest:
self._prompt.set_text(
_LOCKTEST_INSERT_FMT_STR(self._media))
self._state = _STATE_LOCKTEST_WAIT_INSERT
self._image = self.locktest_insertion_image
self._pictogram.queue_draw()
else:
gtk.main_quit()
elif self._state == _STATE_LOCKTEST_WAIT_REMOVE:
gtk.main_quit()
else:
self._error += _ERR_TOO_EARLY_REMOVE_FMT_STR(
self._target_device)
factory.log('Device %s removed too early' %
self._target_device)
gtk.main_quit()
return True
def run_once(self,
subtest_tag=None,
media=None,
vidpid=None,
min_read_speed=None,
min_write_speed=None,
perform_locktest=False):
factory.log('%s run_once' % self.__class__)
# If not provided, set subtest tag based on the media type.
if subtest_tag is None and media:
subtest_tag = media.lower()
self._error = ''
self._target_device = None
os.chdir(self.srcdir)
self._media = media
factory.log('media = %s' % media)
if vidpid is None:
self._vidpid = None
elif type(vidpid) != type(list()):
# Convert vidpid to a list.
self._vidpid = [vidpid]
else:
self._vidpid = vidpid
self._min_read_speed = min_read_speed
self._min_write_speed = min_write_speed
self.insertion_image = cairo.ImageSurface.create_from_png(
'%s_insert.png' % media)
self.removal_image = cairo.ImageSurface.create_from_png(
'%s_remove.png' % media)
self.testing_image = cairo.ImageSurface.create_from_png(
'%s_testing.png' % media)
self._has_locktest = perform_locktest
if perform_locktest:
self.locktest_insertion_image = cairo.ImageSurface.create_from_png(
'%s_locktest_insert.png' % media)
self.locktest_removal_image = cairo.ImageSurface.create_from_png(
'%s_locktest_remove.png' % media)
image_size_set = set([(i.get_width(), i.get_height()) for
i in [self.insertion_image,
self.removal_image,
self.testing_image]])
assert len(image_size_set) == 1
image_size = image_size_set.pop()
factory.log('subtest_tag = %s' % subtest_tag)
label = gtk.Label('')
label.modify_font(pango.FontDescription('courier new condensed 20'))
label.set_alignment(0.5, 0.5)
label.modify_fg(gtk.STATE_NORMAL, gtk.gdk.color_parse('light green'))
self._prompt = label
self._prompt.set_text(_INSERT_FMT_STR(self._media))
self._state = _STATE_WAIT_INSERT
self._image = self.insertion_image
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='block', device_type='disk')
observer = pyudev.glib.GUDevMonitorObserver(monitor)
observer.connect('device-event',
lambda observer, action, device: \
self.udev_event_cb(subtest_tag, action,
device))
monitor.start()
drawing_area = gtk.DrawingArea()
drawing_area.set_size_request(*image_size)
drawing_area.connect('expose_event', self.expose_event)
self._pictogram = drawing_area
hbox = gtk.HBox()
hbox.pack_start(drawing_area, expand=True, fill=False)
vbox = gtk.VBox()
vbox.pack_start(hbox, False, False)
vbox.pack_start(label, False, False)
test_widget = gtk.EventBox()
test_widget.modify_bg(gtk.STATE_NORMAL, gtk.gdk.color_parse('black'))
test_widget.add(vbox)
ui.run_test_widget(self.job, test_widget)
if self._error:
raise error.TestFail(self._error)
factory.log('%s run_once finished' % self.__class__)