# -*- coding: utf-8 -*-
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


# DESCRIPTION :
#
# This is a factory test to test external SD and USB ports.


import cairo
import gtk
import os
import pango
import pyudev
import pyudev.glib

from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.test import task
from cros.factory.test import ui


_STATE_WAIT_INSERT = 1
_STATE_WAIT_REMOVE = 2
_STATE_LOCKTEST_WAIT_INSERT = 3
_STATE_LOCKTEST_WAIT_REMOVE = 4

# udev constants
_UDEV_ACTION_INSERT = 'add'
_UDEV_ACTION_REMOVE = 'remove'
_UDEV_MMCBLK_PATH   = '/dev/mmcblk'
# USB card reader attributes and common text string in descriptors
_USB_CARD_ATTRS     = ['vendor', 'model', 'product', 'configuration',
                       'manufacturer']
_USB_CARD_DESCS     = ['card', 'reader']

_INSERT_FMT_STR = lambda t: (
    '\n'.join(['insert %s drive...' % t,
               'WARNING: DATA ON INSERTED MEDIA WILL BE LOST!\n',
               '插入%s存储...' % t,
               '注意: 插入装置上的资料将会被清除!',
               ]))
_REMOVE_FMT_STR = lambda t: 'remove %s drive...\n提取%s存储...' % (t, t)
_TESTING_FMT_STR = lambda t:'testing %s...\n%s 检查当中...' % (t, t)
_LOCKTEST_INSERT_FMT_STR = lambda t: (
    '\n'.join(['toggle lock switch and insert %s drive again...' % t,
               '切换写保护开关并再次插入%s存储...' % t]))
_LOCKTEST_REMOVE_FMT_STR = lambda t: (
    '\n'.join(['remove %s drive and toggle lock switch...' % t,
               '提取%s存储并关闭写保护开关...' % t]))

_ERR_TOO_EARLY_REMOVE_FMT_STR = \
        lambda t: \
            'Device removed too early (%s).\n' \
            '太早移除外部储存装置 (%s).\n' % (t, t)
_ERR_FIO_TEST_FAILED_FMT_STR = \
        lambda target_dev: 'IO error while running test on %s.\n' % target_dev

_ERR_LOCKTEST_FAILED_FMT_STR = \
        lambda target_dev: 'Locktest failed on %s.\n' % target_dev

class factory_ExternalStorage(test.test):
    version = 1
    preserve_srcdir = True

    def expose_event(self, widget, event):
        context = widget.window.cairo_create()
        context.set_source_surface(self._image, 0, 0)
        context.paint()
        return True

    def get_attrs(self, device, key_set):
        if device is None:
            return ''
        attrs = [device.attributes[key] for key in
                    set(device.attributes.keys()) & key_set]
        attr_str = ' '.join(attrs).strip()
        if len(attr_str):
            attr_str = '/' + attr_str
        return self.get_attrs(device.parent, key_set) + attr_str

    def get_vidpid(self, device):
        if device is None:
            return None
        if device.device_type == 'usb_device':
            attrs = device.attributes
            if set(['idProduct', 'idVendor']) <= set(attrs.keys()):
                vidpid = attrs['idVendor'] + ':' + attrs['idProduct']
                return vidpid.strip()
        return self.get_vidpid(device.parent)

    def is_usb_cardreader(self, device):
        attr_str = self.get_attrs(device, set(_USB_CARD_ATTRS)).lower()
        for desc in _USB_CARD_DESCS:
            if desc in attr_str:
                return True
        return False


    def is_sd(self, device):
        if device.device_node.find(_UDEV_MMCBLK_PATH) == 0:
            return True
        return self.is_usb_cardreader(device)

    def get_device_type(self, device):
        if self.is_sd(device):
            return 'SD'
        return 'USB'

    def test_read_write(self, subtest_tag):
        self._prompt.set_text(_TESTING_FMT_STR(self._target_device))
        self._image = self.testing_image
        self._pictogram.queue_draw()
        task.schedule(self._invoke_test_read_write, subtest_tag)

    def _invoke_test_read_write(self, subtest_tag):
        devpath = self._target_device
        requirement = {'read_write_verify': ['read_bw', 'write_bw']}
        constraint = list()
        if self._min_read_speed is not None:
            constraint.append(
                'read_bw_read_write_verify >= %d * 1024 * 1024' %
                self._min_read_speed)
        if self._min_write_speed is not None:
            constraint.append(
                'write_bw_read_write_verify >= %d * 1024 * 1024' %
                self._min_write_speed)
        self.job.drop_caches_between_iterations = True
        result = self.job.run_test('hardware_StorageFio',
                                   dev=devpath,
                                   tag=subtest_tag + "rwv",
                                   requirements=requirement,
                                   constraints=constraint)
        if result is not True:
            self._error += _ERR_FIO_TEST_FAILED_FMT_STR(self._target_device)
        self._prompt.set_text(_REMOVE_FMT_STR(self._media))
        self._state = _STATE_WAIT_REMOVE
        self._image = self.removal_image
        self._pictogram.queue_draw()

    def test_lock(self, subtest_tag):
        self._prompt.set_text(_TESTING_FMT_STR(self._target_device))
        self._image = self.testing_image
        self._pictogram.queue_draw()
        task.schedule(self._invoke_test_lock, subtest_tag)

    def _invoke_test_lock(self, subtest_tag):
        devpath = self._target_device
        requirement = {'read_write_verify': list()}
        result = self.job.run_test('hardware_StorageFio',
                                   dev=devpath,
                                   tag=subtest_tag + "lt",
                                   requirements = requirement)
        if result is True:
            self._error += _ERR_LOCKTEST_FAILED_FMT_STR(self._target_device)
        self._prompt.set_text(_LOCKTEST_REMOVE_FMT_STR(self._media))
        self._state = _STATE_LOCKTEST_WAIT_REMOVE
        self._image = self.locktest_removal_image
        self._pictogram.queue_draw()

    def udev_event_cb(self, subtest_tag, action, device):
        if action == _UDEV_ACTION_INSERT:
            if self._state == _STATE_WAIT_INSERT:
                if self._vidpid is None:
                    if self._media != self.get_device_type(device):
                        return True
                else:
                    device_vidpid = self.get_vidpid(device)
                    if device_vidpid not in self._vidpid:
                        return True
                    factory.log('VID:PID == %s' % device_vidpid)
                factory.log('%s device inserted : %s' %
                        (self._media, device.device_node))
                self._target_device = device.device_node
                self.test_read_write(subtest_tag)
            if self._state == _STATE_LOCKTEST_WAIT_INSERT:
                if self._target_device == device.device_node:
                    self.test_lock(subtest_tag)
        elif action == _UDEV_ACTION_REMOVE:
            if self._target_device == device.device_node:
                factory.log('Device removed : %s' % device.device_node)
                if self._state == _STATE_WAIT_REMOVE:
                    if self._has_locktest:
                        self._prompt.set_text(
                            _LOCKTEST_INSERT_FMT_STR(self._media))
                        self._state = _STATE_LOCKTEST_WAIT_INSERT
                        self._image = self.locktest_insertion_image
                        self._pictogram.queue_draw()
                    else:
                        gtk.main_quit()
                elif self._state == _STATE_LOCKTEST_WAIT_REMOVE:
                    gtk.main_quit()
                else:
                    self._error += _ERR_TOO_EARLY_REMOVE_FMT_STR(
                            self._target_device)
                    factory.log('Device %s removed too early' %
                            self._target_device)
                    gtk.main_quit()
        return True

    def run_once(self,
                 subtest_tag=None,
                 media=None,
                 vidpid=None,
                 min_read_speed=None,
                 min_write_speed=None,
                 perform_locktest=False):

        factory.log('%s run_once' % self.__class__)

        # If not provided, set subtest tag based on the media type.
        if subtest_tag is None and media:
            subtest_tag = media.lower()

        self._error = ''
        self._target_device = None

        os.chdir(self.srcdir)

        self._media = media
        factory.log('media = %s' % media)

        if vidpid is None:
            self._vidpid = None
        elif type(vidpid) != type(list()):
            # Convert vidpid to a list.
            self._vidpid = [vidpid]
        else:
            self._vidpid = vidpid

        self._min_read_speed = min_read_speed
        self._min_write_speed = min_write_speed

        self.insertion_image = cairo.ImageSurface.create_from_png(
            '%s_insert.png' % media)
        self.removal_image = cairo.ImageSurface.create_from_png(
            '%s_remove.png' % media)
        self.testing_image = cairo.ImageSurface.create_from_png(
            '%s_testing.png' % media)

        self._has_locktest = perform_locktest
        if perform_locktest:
            self.locktest_insertion_image = cairo.ImageSurface.create_from_png(
                '%s_locktest_insert.png' % media)
            self.locktest_removal_image = cairo.ImageSurface.create_from_png(
                '%s_locktest_remove.png' % media)

        image_size_set = set([(i.get_width(), i.get_height()) for
                              i in [self.insertion_image,
                                    self.removal_image,
                                    self.testing_image]])
        assert len(image_size_set) == 1
        image_size = image_size_set.pop()

        factory.log('subtest_tag = %s' % subtest_tag)

        label = gtk.Label('')
        label.modify_font(pango.FontDescription('courier new condensed 20'))
        label.set_alignment(0.5, 0.5)
        label.modify_fg(gtk.STATE_NORMAL, gtk.gdk.color_parse('light green'))
        self._prompt = label

        self._prompt.set_text(_INSERT_FMT_STR(self._media))
        self._state = _STATE_WAIT_INSERT
        self._image = self.insertion_image
        context = pyudev.Context()
        monitor = pyudev.Monitor.from_netlink(context)
        monitor.filter_by(subsystem='block', device_type='disk')
        observer = pyudev.glib.GUDevMonitorObserver(monitor)
        observer.connect('device-event',
                         lambda observer, action, device: \
                                self.udev_event_cb(subtest_tag, action,
                                        device))
        monitor.start()

        drawing_area = gtk.DrawingArea()
        drawing_area.set_size_request(*image_size)
        drawing_area.connect('expose_event', self.expose_event)
        self._pictogram = drawing_area
        hbox = gtk.HBox()
        hbox.pack_start(drawing_area, expand=True, fill=False)

        vbox = gtk.VBox()
        vbox.pack_start(hbox, False, False)
        vbox.pack_start(label, False, False)

        test_widget = gtk.EventBox()
        test_widget.modify_bg(gtk.STATE_NORMAL, gtk.gdk.color_parse('black'))
        test_widget.add(vbox)

        ui.run_test_widget(self.job, test_widget)

        if self._error:
            raise error.TestFail(self._error)

        factory.log('%s run_once finished' % self.__class__)
