| # -*- coding: utf-8 -*- |
| # |
| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # DESCRIPTION : |
| # This factory test runs at the start of a test sequence to verify the DUT has |
| # been setup correctly. |
| # |
| # The start provides several settings (set via darg): |
| # 'require_external_power': Prompts and waits for external power to be applied. |
| # 'require_shop_floor': Prompts and waits for serial number as input. The |
| # server is default to the host running mini-omaha, unless you specify an |
| # URL by 'shop_floor_server_url' darg. |
| # 'press_to_continue': Prompts and waits for a key press (SPACE) to continue. |
| |
| import glob |
| import logging |
| import os |
| import socket |
| import sys |
| |
| import gobject |
| import gtk |
| import pango |
| |
| from autotest_lib.client.bin import test, utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import factory_setup_modules |
| from cros.factory.test import factory |
| from cros.factory.test import shopfloor |
| from cros.factory.test import task |
| from cros.factory.test import ui |
| from cros.factory.test.event import Event, EventClient |
| from cros.factory.event_log import EventLog |
| |
| |
| # Messages for tasks |
| _MSG_TASK_POWER = ( |
| 'Plug in external power to continue.\n' |
| '请插上外接电源以继续。') |
| _MSG_TASK_SERIAL = ( |
| 'Enter valid serial number:\n' |
| '请输入有效的序号:') |
| _MSG_TASK_SPACE = ( |
| 'Hit SPACE to start testing...\n' |
| '按 "空白键" 开始测试...') |
| _MSG_TASK_SHOP_FLOOR = ( |
| 'Preparing to connect to shop floor server.\n' |
| 'Please plug in network cable and hit SPACE\n' |
| 'to start testing...\n' |
| '准备连线 shop floor 伺服器。 \n' |
| '请插上网路线后,按下"空白键"开始测试...') |
| |
| _MSG_NO_SHOP_FLOOR_SERVER_URL = ( |
| 'No shop floor server URL. Auto-testing stopped.\n\n' |
| 'Please install the factory test image using the mini-Omaha server\n' |
| 'rather than booting from a USB drive.\n\n' |
| 'For debugging or development, use the listed hot-keys to start\n' |
| 'individual tests.\n\n' |
| '未指定 Shop Floor 服务器位址,停止自动测试。\n\n' |
| '请使用完整的 mini-Omaha 服务器安装测试程式,\n' |
| '不要直接从 USB 碟开机执行。\n\n' |
| '若想除错或执行部份测试,请直接按下对应热键。') |
| |
| _LABEL_FONT = pango.FontDescription('courier new condensed 24') |
| |
| |
| class PressSpaceTask(task.FactoryTask): |
| def __init__(self, pop_up_message): |
| self.pop_up_message = pop_up_message |
| |
| def start(self): |
| self.add_widget( |
| ui.make_label(self.pop_up_message, font=ui.LABEL_LARGE_FONT)) |
| self.connect_window('key-press-event', self.window_key_press) |
| |
| def window_key_press(self, window, event): |
| if event.keyval == gtk.keysyms.space: |
| self.stop() |
| else: |
| factory.log('PressSpaceTask: non-space hit: %d' % event.keyval) |
| return True |
| |
| class ExternalPowerTask(task.FactoryTask): |
| |
| AC_CONNECTED = 1 |
| AC_DISCONNECTED = 2 |
| AC_CHECK_PERIOD = 500 |
| |
| def start(self): |
| self.active = True |
| widget = ui.make_label(_MSG_TASK_POWER, font=ui.LABEL_LARGE_FONT) |
| self.add_widget(widget) |
| self.add_timeout(self.AC_CHECK_PERIOD, self.check_event, widget) |
| |
| def stop(self): |
| self.active = False |
| |
| def check_event(self, label): |
| if not self.active: |
| return True |
| state = self.get_external_power_state() |
| if state == self.AC_CONNECTED: |
| self.stop() |
| return True |
| |
| def get_external_power_state(self): |
| for type_file in glob.glob('/sys/class/power_supply/*/type'): |
| type_value = utils.read_one_line(type_file).strip() |
| if type_value == 'Mains': |
| status_file = os.path.join(os.path.dirname(type_file), 'online') |
| try: |
| status = int(utils.read_one_line(status_file).strip()) |
| except ValueError as details: |
| raise ValueError('Invalid external power state in %s: %s' % |
| (status_file, details)) |
| if status == 0: |
| return self.AC_DISCONNECTED |
| elif status == 1: |
| return self.AC_CONNECTED |
| else: |
| raise ValueError('Invalid external power state "%s" in %s' % |
| (status, status_file)) |
| raise IOError('Unable to determine external power state.') |
| |
| |
| class ShopFloorTask(task.FactoryTask): |
| def __init__(self, server_url, serial_number): |
| self.server_url = server_url or shopfloor.detect_default_server_url() |
| self.serial_number = serial_number |
| |
| def start(self): |
| # Many developers will try to run factory test image directly without |
| # mini-omaha server, so we should either alert and fail, or ask for |
| # server address. |
| if not self.server_url: |
| self.add_widget(ui.make_label(_MSG_NO_SHOP_FLOOR_SERVER_URL, |
| fg=ui.RED)) |
| return |
| |
| shopfloor.set_server_url(self.server_url) |
| |
| # If no partner-specific serial number, pop-up a make_input window. |
| if self.serial_number is None: |
| self.add_widget(ui.make_input_window( |
| prompt=_MSG_TASK_SERIAL, |
| on_validate=self.validate_serial_number, |
| on_complete=self.complete_serial_task)) |
| else: |
| # Use partner-specific serial number. |
| if self.validate_serial_number(self.serial_number): |
| self.complete_serial_task(self.serial_number) |
| |
| def validate_serial_number(self, serial): |
| # This is a callback function for widgets created by make_input_window. |
| # When the input is not valid (or temporary network failure), either |
| # return False or raise a ValueError with message to be displayed in |
| # bottom status line of input window. |
| try: |
| # All exceptions |
| shopfloor.check_serial_number(serial.strip()) |
| return True |
| except shopfloor.ServerFault as e: |
| raise ui.InputError("Server error:\n%s" % e) |
| except ValueError as e: |
| logging.exception("ValueError:") |
| raise ui.InputError(e.message) |
| except socket.gaierror as e: |
| raise ui.InputError("Network failure (address error).") |
| except socket.error as e: |
| raise ui.InputError("Network failure:\n%s" % e[1]) |
| except: |
| logging.exception("UnknownException:") |
| raise ui.InputError(sys.exc_info()[1]) |
| return False |
| |
| def complete_serial_task(self, serial): |
| serial = serial.strip() |
| EventLog.ForAutoTest().Log('mlb_serial_number', |
| serial_number=serial) |
| factory.log('Serial number: %s' % serial) |
| shopfloor.set_serial_number(serial) |
| |
| EventClient().post_event(Event(Event.Type.UPDATE_SYSTEM_INFO)) |
| self.stop() |
| return True |
| |
| |
| class factory_Start(test.test): |
| version = 2 |
| |
| def run_once(self, |
| press_to_continue=True, |
| require_external_power=False, |
| require_shop_floor=None, |
| shop_floor_server_url=None, |
| serial_number=None, |
| press_to_start_shop_floor=False): |
| factory.log('%s run_once' % self.__class__) |
| |
| self._task_list = [] |
| |
| # Reset shop floor data only if require_shop_floor is explicitly |
| # defined, for test lists using factory_Start multiple times between |
| # groups (ex, to prompt for space or check power adapter). |
| |
| if require_shop_floor is not None: |
| shopfloor.reset() |
| shopfloor.set_enabled(require_shop_floor) |
| if press_to_start_shop_floor: |
| self._task_list.append(PressSpaceTask(_MSG_TASK_SHOP_FLOOR)) |
| |
| if require_shop_floor: |
| self._task_list.append( |
| ShopFloorTask(shop_floor_server_url, serial_number)) |
| if require_external_power: |
| self._task_list.append(ExternalPowerTask()) |
| if press_to_continue: |
| self._task_list.append(PressSpaceTask(_MSG_TASK_SPACE)) |
| |
| if self._task_list: |
| task.run_factory_tasks(self.job, self._task_list) |
| |
| factory.log('%s run_once finished' % repr(self.__class__)) |