blob: 80269d48fe2080459d4561eea94420f63752217e [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION :
# This factory test runs at the start of a test sequence to verify the DUT has
# been setup correctly.
#
# The start provides several settings (set via darg):
# 'require_external_power': Prompts and waits for external power to be applied.
# 'require_shop_floor': Prompts and waits for serial number as input. The
# server is default to the host running mini-omaha, unless you specify an
# URL by 'shop_floor_server_url' darg.
# 'press_to_continue': Prompts and waits for a key press (SPACE) to continue.
import glob
import logging
import os
import socket
import sys
import gobject
import gtk
import pango
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.test import shopfloor
from cros.factory.test import task
from cros.factory.test import ui
from cros.factory.test.event import Event, EventClient
from cros.factory.event_log import EventLog
# Messages for tasks
_MSG_TASK_POWER = (
'Plug in external power to continue.\n'
'请插上外接电源以继续。')
_MSG_TASK_SERIAL = (
'Enter valid serial number:\n'
'请输入有效的序号:')
_MSG_TASK_SPACE = (
'Hit SPACE to start testing...\n'
'按 "空白键" 开始测试...')
_MSG_TASK_SHOP_FLOOR = (
'Preparing to connect to shop floor server.\n'
'Please plug in network cable and hit SPACE\n'
'to start testing...\n'
'准备连线 shop floor 伺服器。 \n'
'请插上网路线后,按下"空白键"开始测试...')
_MSG_NO_SHOP_FLOOR_SERVER_URL = (
'No shop floor server URL. Auto-testing stopped.\n\n'
'Please install the factory test image using the mini-Omaha server\n'
'rather than booting from a USB drive.\n\n'
'For debugging or development, use the listed hot-keys to start\n'
'individual tests.\n\n'
'未指定 Shop Floor 服务器位址,停止自动测试。\n\n'
'请使用完整的 mini-Omaha 服务器安装测试程式,\n'
'不要直接从 USB 碟开机执行。\n\n'
'若想除错或执行部份测试,请直接按下对应热键。')
_LABEL_FONT = pango.FontDescription('courier new condensed 24')
class PressSpaceTask(task.FactoryTask):
def __init__(self, pop_up_message):
self.pop_up_message = pop_up_message
def start(self):
self.add_widget(
ui.make_label(self.pop_up_message, font=ui.LABEL_LARGE_FONT))
self.connect_window('key-press-event', self.window_key_press)
def window_key_press(self, window, event):
if event.keyval == gtk.keysyms.space:
self.stop()
else:
factory.log('PressSpaceTask: non-space hit: %d' % event.keyval)
return True
class ExternalPowerTask(task.FactoryTask):
AC_CONNECTED = 1
AC_DISCONNECTED = 2
AC_CHECK_PERIOD = 500
def start(self):
self.active = True
widget = ui.make_label(_MSG_TASK_POWER, font=ui.LABEL_LARGE_FONT)
self.add_widget(widget)
self.add_timeout(self.AC_CHECK_PERIOD, self.check_event, widget)
def stop(self):
self.active = False
def check_event(self, label):
if not self.active:
return True
state = self.get_external_power_state()
if state == self.AC_CONNECTED:
self.stop()
return True
def get_external_power_state(self):
for type_file in glob.glob('/sys/class/power_supply/*/type'):
type_value = utils.read_one_line(type_file).strip()
if type_value == 'Mains':
status_file = os.path.join(os.path.dirname(type_file), 'online')
try:
status = int(utils.read_one_line(status_file).strip())
except ValueError as details:
raise ValueError('Invalid external power state in %s: %s' %
(status_file, details))
if status == 0:
return self.AC_DISCONNECTED
elif status == 1:
return self.AC_CONNECTED
else:
raise ValueError('Invalid external power state "%s" in %s' %
(status, status_file))
raise IOError('Unable to determine external power state.')
class ShopFloorTask(task.FactoryTask):
def __init__(self, server_url, serial_number):
self.server_url = server_url or shopfloor.detect_default_server_url()
self.serial_number = serial_number
def start(self):
# Many developers will try to run factory test image directly without
# mini-omaha server, so we should either alert and fail, or ask for
# server address.
if not self.server_url:
self.add_widget(ui.make_label(_MSG_NO_SHOP_FLOOR_SERVER_URL,
fg=ui.RED))
return
shopfloor.set_server_url(self.server_url)
# If no partner-specific serial number, pop-up a make_input window.
if self.serial_number is None:
self.add_widget(ui.make_input_window(
prompt=_MSG_TASK_SERIAL,
on_validate=self.validate_serial_number,
on_complete=self.complete_serial_task))
else:
# Use partner-specific serial number.
if self.validate_serial_number(self.serial_number):
self.complete_serial_task(self.serial_number)
def validate_serial_number(self, serial):
# This is a callback function for widgets created by make_input_window.
# When the input is not valid (or temporary network failure), either
# return False or raise a ValueError with message to be displayed in
# bottom status line of input window.
try:
# All exceptions
shopfloor.check_serial_number(serial.strip())
return True
except shopfloor.ServerFault as e:
raise ui.InputError("Server error:\n%s" % e)
except ValueError as e:
logging.exception("ValueError:")
raise ui.InputError(e.message)
except socket.gaierror as e:
raise ui.InputError("Network failure (address error).")
except socket.error as e:
raise ui.InputError("Network failure:\n%s" % e[1])
except:
logging.exception("UnknownException:")
raise ui.InputError(sys.exc_info()[1])
return False
def complete_serial_task(self, serial):
serial = serial.strip()
EventLog.ForAutoTest().Log('mlb_serial_number',
serial_number=serial)
factory.log('Serial number: %s' % serial)
shopfloor.set_serial_number(serial)
EventClient().post_event(Event(Event.Type.UPDATE_SYSTEM_INFO))
self.stop()
return True
class factory_Start(test.test):
version = 2
def run_once(self,
press_to_continue=True,
require_external_power=False,
require_shop_floor=None,
shop_floor_server_url=None,
serial_number=None,
press_to_start_shop_floor=False):
factory.log('%s run_once' % self.__class__)
self._task_list = []
# Reset shop floor data only if require_shop_floor is explicitly
# defined, for test lists using factory_Start multiple times between
# groups (ex, to prompt for space or check power adapter).
if require_shop_floor is not None:
shopfloor.reset()
shopfloor.set_enabled(require_shop_floor)
if press_to_start_shop_floor:
self._task_list.append(PressSpaceTask(_MSG_TASK_SHOP_FLOOR))
if require_shop_floor:
self._task_list.append(
ShopFloorTask(shop_floor_server_url, serial_number))
if require_external_power:
self._task_list.append(ExternalPowerTask())
if press_to_continue:
self._task_list.append(PressSpaceTask(_MSG_TASK_SPACE))
if self._task_list:
task.run_factory_tasks(self.job, self._task_list)
factory.log('%s run_once finished' % repr(self.__class__))