| # -*- coding: utf-8; tab-width: 4; python-indent: 4 -*- |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # Import guard for OpenCV. |
| try: |
| import cv |
| import cv2 |
| except ImportError: |
| pass |
| |
| import base64 |
| import logging |
| import numpy as np |
| import os |
| import pprint |
| import pyudev |
| import re |
| import select |
| import serial |
| import signal |
| import socket |
| import StringIO |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| import time |
| import xmlrpclib |
| |
| import autotest_lib.client.cros.camera.perf_tester as camperf |
| import autotest_lib.client.cros.camera.renderer as renderer |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import factory_setup_modules #pylint:disable=W0611 |
| from cros.factory.event_log import Log, GetDeviceId, TimedUuid |
| from cros.factory.test import factory |
| from cros.factory.test import leds |
| from cros.factory.test import shopfloor |
| from cros.factory.test import test_ui |
| from cros.factory.test import utils |
| from cros.factory.test.media_util import MountedMedia |
| from cros.factory.utils import net_utils, file_utils |
| from cros.factory.utils.process_utils import SpawnOutput |
| from autotest_lib.client.cros.rf.config import PluggableConfig |
| from autotest_lib.client.cros import tty |
| from cros.factory.test.test_ui import UI |
| |
| |
| # Test type constants: |
| _TEST_TYPE_MODULE = 'Module' |
| _TEST_TYPE_AB = 'AB' |
| _TEST_TYPE_FULL = 'Full' |
| |
| # Content type constants: |
| _CONTENT_IMG = 'image' |
| _CONTENT_TXT = 'text' |
| |
| _INSERT_ETHERNET_DONGLE_TIMEOUT_SECS = 30 # Timeout for inserting dongle. |
| _IP_SETUP_TIMEOUT_SECS = 10 # Timeout for setting IP address. |
| _SHOPFLOOR_TIMEOUT_SECS = 10 # Timeout for shopfloor connection. |
| _SHOPFLOOR_RETRY_INTERVAL_SECS = 10 # Seconds to wait between retries. |
| |
| |
| class ShopfloorBridgeException(Exception): |
| pass |
| |
| |
| class ShopfloorBridge(object): |
| '''Warps shopfloor bridge connection.''' |
| # Uses very simple protocol in shopfloor bridge. |
| # <4-char ID><space><8-char data length>\r\n<variable length of data> |
| _PROT_PORT = 8000 # Default port for shopfloor bridge. |
| # Hello! |
| _PROT_COMMAND_HELLO = 'HELO' |
| # download test parameters |
| _PROT_COMMAND_PARA = 'PARA' |
| # uploads test results |
| _PROT_COMMAND_TEST = 'TEST' |
| _PROT_COMMAND_SIZE = 4 |
| _PROT_STATUS_OK = 'OKAY' |
| _PROT_STATUS_FAIL = 'FAIL' |
| _PROT_DATALEN_SIZE = 8 |
| _PROT_PREAMBLE_SIZE = _PROT_COMMAND_SIZE + 1 + _PROT_DATALEN_SIZE + 2 |
| # Seconds to wait for request / connection. (The value should be large |
| # enough to avoid unnecessary retrying.) |
| _TIMEOUT_SECS = 20 |
| # Seconds to wait between retries. |
| _RETRY_INTERVAL_SECS = 3 |
| |
| def __init__(self, bridge_ip, setup_network_callback): |
| '''ShopfloorBridge is usually used in module-level testing. |
| |
| Reconfigure the IP with setup_network_callback since the test may run |
| for a very long time. Otherwise, if IP setting is lost, |
| self._reconnect() may get stuck in infinite loop. |
| |
| Args: |
| bridge_ip: IP address of the shopfloor bridge server. |
| setup_network_callback: Callback to setup network IP. |
| ''' |
| assert(bridge_ip) |
| self.bridge_ip = bridge_ip |
| self.setup_network_callback = setup_network_callback |
| self.socket = None |
| self.rfile = None |
| self.wfile = None |
| |
| def __del__(self): |
| self._close() |
| |
| def _close(self): |
| """Closes the connection to shopfloor bridge.""" |
| if self.rfile: |
| self.rfile.close() |
| self.rfile = None |
| if self.wfile: |
| self.wfile.close() |
| self.wfile = None |
| if self.socket: |
| self.socket.close() |
| self.socket = None |
| |
| def _reconnect(self): |
| """Reconnects to shopfloor bridge.""" |
| self._close(); |
| self._connect() |
| |
| def _connect(self, |
| timeout_secs=_TIMEOUT_SECS, |
| retry_interval_secs=_RETRY_INTERVAL_SECS): |
| """Connects to shopfloor bridge. |
| |
| Args: |
| timeout_secs: Seconds to wait for connection. |
| retry_interval_secs: Seconds to wait between retries. |
| """ |
| if self.socket: # already connected |
| return |
| |
| factory.console.info('Connecting to shopfloor bridge...') |
| while True: |
| try: |
| self.socket = socket.socket(socket.AF_INET, |
| socket.SOCK_STREAM) |
| self.socket.settimeout(timeout_secs) |
| self.socket.connect((self.bridge_ip, self._PROT_PORT)) |
| self.socket.settimeout(None) |
| break |
| except: # pylint: disable=W0702 |
| exception_string = utils.FormatExceptionOnly() |
| # Log the exception string only since it may happen repeatedly. |
| factory.console.error('Unable to connect to bridge: %s', |
| exception_string) |
| if self.setup_network_callback: |
| self.setup_network_callback() |
| |
| time.sleep(retry_interval_secs) |
| |
| self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| self.rfile = self.socket.makefile('rb', -1) |
| self.wfile = self.socket.makefile('wb', 0) # No buffering |
| |
| def _send_command(self, command, data, |
| timeout_secs=_TIMEOUT_SECS, |
| retry_interval_secs=_RETRY_INTERVAL_SECS): |
| """Sends command to shopfloor bridge. |
| |
| Args: |
| command: Command to bridge. |
| data: Data to bridge. |
| timeout_secs: Seconds to wait for response. |
| retry_interval_secs: Seconds to wait between retries. |
| |
| Returns: |
| A pair of (response okay, received data). |
| """ |
| received_data = '' |
| self._connect() |
| |
| data_len = len(data) |
| assert len(command) == 4 |
| assert len('%d' % data_len) <= 8 |
| preamble = '%s %08d\r\n' % (command, data_len) |
| assert len(preamble) == self._PROT_PREAMBLE_SIZE |
| |
| response_ok = None |
| while response_ok is None: |
| try: |
| with utils.Timeout(timeout_secs): |
| self.wfile.write(preamble) |
| if data: |
| self.wfile.write(data) |
| self.wfile.flush() |
| ack_preamble = self.rfile.read(self._PROT_PREAMBLE_SIZE) |
| if len(ack_preamble) != self._PROT_PREAMBLE_SIZE: |
| raise ShopfloorBridgeException( |
| "Shopfloor Error: can't read %d bytes (preamble)." % |
| self._PROT_PREAMBLE_SIZE) |
| if ack_preamble[-2] != '\r' or ack_preamble[-1] != '\n': |
| raise ShopfloorBridgeException( |
| 'Shopfloor Error: cannot find \\r or \\n in reply.') |
| ack_status = ack_preamble[0:self._PROT_COMMAND_SIZE] # 0:4 |
| ack_datalen_str = ack_preamble[ |
| self._PROT_COMMAND_SIZE + 1: |
| self._PROT_COMMAND_SIZE + 1 + |
| self._PROT_DATALEN_SIZE] # 5:13 |
| factory.console.info( |
| 'Received "%s" "%s" from shopfloor bridge', ack_status, |
| ack_datalen_str) |
| ack_datalen = int(ack_datalen_str) |
| if ack_datalen > 0: |
| received_data = self.rfile.read(ack_datalen) |
| if len(received_data) != ack_datalen: |
| raise ShopfloorBridgeException( |
| "Shopfloor Error: can't read %d bytes (data)." % |
| ack_datalen) |
| |
| if ack_status != self._PROT_STATUS_OK: |
| # On fail, received_data contains error message. End the |
| # loop here because shopfloor may not recover from the error |
| # anytime soon. We should make the test fails instead of |
| # retrying to talk to shopfloor again. |
| factory.console.error( |
| 'Shopfloor Error: bridge returns %s: %s', |
| ack_status, received_data) |
| response_ok = False |
| else: |
| response_ok = True |
| except: # pylint: disable=W0702 |
| exception_string = utils.FormatExceptionOnly() |
| # Log the exception string only since it may happen repeatedly. |
| factory.console.error('Unable to get response from command %s ' |
| 'from shopfloor bridge: %s', |
| command, exception_string) |
| |
| time.sleep(retry_interval_secs) |
| # Assume it's network problem. Simply reconnects the bridge! |
| self._reconnect() |
| |
| return (response_ok, received_data) |
| |
| def download_param(self, param_filename): |
| """Downloads parameter. |
| |
| Returns: |
| Returns downloaded parameters. |
| """ |
| |
| response_ok, param_data = self._send_command(self._PROT_COMMAND_PARA, |
| param_filename) |
| # No way to handle the error case shopfloor returns FAIL in downloading |
| # parameters. |
| assert response_ok, 'Cannot download parameter from shopfloor' |
| return param_data |
| |
| def upload_result(self, results): |
| """Uploads test results. |
| |
| Returns: |
| Success or not. |
| """ |
| response_ok, dummy = self._send_command(self._PROT_COMMAND_TEST, |
| results) |
| return response_ok |
| |
| |
| class _ShopfloorWrapper(object): |
| '''Wraps shopfloor connectivity.''' |
| |
| def __init__(self, ip_addr, bridge_mode, bridge_ip, |
| shopfloor_directory, shopfloor_param_file): |
| ''' |
| Args: |
| ip_addr: IP address of DUT (None for DHCP). |
| bridge_mode: Use bridge mode or native shopfloor. |
| bridge_ip: IP of the bridge server. |
| shopfloor_directory: Relative path to shopfloor parameters folder. |
| shopfloor_param_file: Filename of parameter file on shopfloor. |
| ''' |
| self.ip_addr = ip_addr |
| if bridge_mode: |
| self.bridge = ShopfloorBridge( |
| bridge_ip=bridge_ip, |
| setup_network_callback=lambda: self._prepare_network(False)) |
| else: |
| self.bridge = None |
| self.shopfloor_directory = shopfloor_directory |
| self.shopfloor_param_file = shopfloor_param_file |
| |
| def _prepare_network(self, force_new_ip): |
| '''Blocks forever until network is prepared. |
| |
| Args: |
| force_new_ip: always set new IP regardless of existing IP. |
| ''' |
| def _obtain_IP(): |
| if self.ip_addr is None: |
| net_utils.SendDhcpRequest() |
| else: |
| net_utils.SetEthernetIp(self.ip_addr, force=force_new_ip) |
| return True if net_utils.GetEthernetIp() else False |
| |
| factory.console.info('Detecting Ethernet device...') |
| try: |
| net_utils.PollForCondition(condition=( |
| lambda: True if net_utils.FindUsableEthDevice() else False), |
| timeout=_INSERT_ETHERNET_DONGLE_TIMEOUT_SECS, |
| condition_name='Detect Ethernet device') |
| |
| # Only setup the IP if required so. |
| current_ip = net_utils.GetEthernetIp( |
| net_utils.FindUsableEthDevice()) |
| if not current_ip or force_new_ip: |
| factory.console.info('Setting up IP address...') |
| net_utils.PollForCondition( |
| condition=_obtain_IP, |
| timeout=_IP_SETUP_TIMEOUT_SECS, |
| condition_name='Setup IP address') |
| except: # pylint: disable=W0702 |
| exception_string = utils.FormatExceptionOnly() |
| factory.console.error('Unable to setup network: %s', |
| exception_string) |
| |
| factory.console.info('Network prepared. IP: %r', |
| net_utils.GetEthernetIp()) |
| |
| def _get_shopfloor_connection(self, |
| timeout_secs=_SHOPFLOOR_TIMEOUT_SECS, |
| retry_interval_secs=_SHOPFLOOR_RETRY_INTERVAL_SECS): |
| """Returns a shopfloor client object. |
| |
| Try forever until a connection of shopfloor is established. |
| |
| Args: |
| timeout_secs: Timeout for shopfloor connection. |
| retry_interval_secs: Seconds to wait between retries. |
| """ |
| factory.console.info('Connecting to shopfloor...') |
| while True: |
| try: |
| shopfloor_client = shopfloor.get_instance( |
| detect=True, timeout=timeout_secs) |
| break |
| except: # pylint: disable=W0702 |
| exception_string = utils.FormatExceptionOnly() |
| # Log the exception string only since it may happen repeatedly. |
| factory.console.error('Unable to sync with shopfloor: %s', |
| exception_string) |
| time.sleep(retry_interval_secs) |
| return shopfloor_client |
| |
| def download_param(self, force_new_ip): |
| '''Read parameters from shopfloor server. |
| |
| Args: |
| force_new_ip: Always set new IP regardless of existing IP. |
| ''' |
| self._prepare_network(force_new_ip) |
| param_path = os.path.join(self.shopfloor_directory, |
| self.shopfloor_param_file) |
| factory.console.info('Read %s from shopfloor', param_path) |
| if self.bridge: |
| return self.bridge.download_param(param_path) |
| else: |
| sf = self._get_shopfloor_connection() |
| return sf.GetParameter(param_path).data |
| |
| def upload_log(self, force_new_ip, aux_log_list): |
| '''Uploads logs to shopfloor server. |
| |
| Args: |
| force_new_ip: Always set new IP regardless of existing IP. |
| aux_log_list: List of tuples of (log_name, log_data). |
| |
| Returns: |
| Returns False if there is critical issue from shopfloor. |
| ''' |
| self._prepare_network(force_new_ip) |
| if not self.bridge: |
| sf = self._get_shopfloor_connection() |
| |
| for log_name, log_data in aux_log_list: |
| factory.console.info('Uploading %s', log_name) |
| start_time = time.time() |
| try: |
| if self.bridge: |
| # Shopfloor bridge behaves differently than our built-in |
| # shopfloor server. |
| # |
| # 1. Failing to upload to built-in shopfloor is either a bug |
| # or wrong network setup. It should never happen. Raise |
| # exception. |
| # |
| # 2. Failing to upload to shopfloor bridge (usually connects |
| # to a proprietary shopfloor server of module house) may be |
| # caused by factory flow issue. For example, DUT is tested |
| # at a wrong stage. This is a critical error and we should |
| # mark the test 'failed' for retesting DUT later. |
| assert re.search(r'\.txt$', log_name), ( |
| 'Only supports uploading of .txt files.') |
| if not self.bridge.upload_result(log_data): |
| return False |
| else: |
| sf.SaveAuxLog(log_name, xmlrpclib.Binary(log_data)) |
| except: # pylint: disable=W0702 |
| exception_string = utils.FormatExceptionOnly() |
| factory.console.error('Failed to upload %s: %s', |
| log_name, exception_string) |
| raise |
| factory.console.info('Successfully synced %s in %.03f s', |
| log_name, time.time() - start_time) |
| return True |
| |
| |
| class ALS(): |
| '''Class to interface the ambient light sensor over iio.''' |
| |
| # Default device paths. |
| _VAL_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_input' |
| _SCALE_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_calibscale' |
| |
| # Default min delay seconds. |
| _DEFAULT_MIN_DELAY = 0.178 |
| |
| def __init__(self, val_path=_VAL_DEV_PATH, |
| scale_path=_SCALE_DEV_PATH): |
| self.detected = True |
| if (not os.path.isfile(val_path) or |
| not os.path.isfile(scale_path)): |
| self.detected = False |
| return |
| self.val_path = val_path |
| self.scale_path = scale_path |
| |
| def _read_core(self): |
| fd = open(self.val_path) |
| val = int(fd.readline().rstrip()) |
| fd.close() |
| return val |
| |
| def _read(self, delay=None, samples=1): |
| '''Read the light sensor value. |
| |
| Args: |
| delay: Delay between samples in seconds. 0 means as fast as |
| possible. |
| samples: Total samples to read. |
| |
| Returns: |
| The light sensor values in a list. |
| ''' |
| if samples < 1: |
| samples = 1 |
| if delay is None: |
| delay = self._DEFAULT_MIN_DELAY |
| |
| buf = [] |
| # The first value might be contaminated by previous settings. |
| # We need to skip it for better accuracy. |
| self._read_core() |
| for dummy in range(samples): |
| time.sleep(delay) |
| val = self._read_core() |
| buf.append(val) |
| |
| return buf |
| |
| def read_mean(self, delay=None, samples=1): |
| if not self.detected: |
| return None |
| |
| buf = self._read(delay, samples) |
| return int(round(float(sum(buf)) / len(buf))) |
| |
| def set_scale_factor(self, scale): |
| if not self.detected: |
| return None |
| |
| fd = open(self.scale_path, 'w') |
| fd.write(str(int(round(scale)))) |
| fd.close() |
| return |
| |
| def get_scale_factor(self): |
| if not self.detected: |
| return None |
| |
| fd = open(self.scale_path) |
| s = int(fd.readline().rstrip()) |
| fd.close() |
| return s |
| |
| |
| class FixtureException(Exception): |
| pass |
| |
| |
| class Fixture(): |
| '''Class for communication with the test fixture.''' |
| |
| def __init__(self, params): |
| # Setup the serial port communication. |
| tty_path = tty.find_tty_by_driver(params['driver']) |
| self.fixture = serial.Serial(port=tty_path, |
| **params['serial_params']) |
| self.fixture.flush() |
| |
| # Load parameters. |
| self.serial_delay = params['serial_delay'] |
| self.light_delay = params['light_delay'] |
| self.light_seq = params['light_seq'] |
| self.fixture_echo = params['echo'] |
| self.light_off = params['off'] |
| |
| def send(self, msg): |
| '''Send control messages to the fixture.''' |
| for c in msg: |
| self.fixture.write(str(c)) |
| self.fixture.flush() |
| # The fixture needs some time to process each incoming character. |
| time.sleep(self.serial_delay) |
| |
| def read(self): |
| return self.fixture.read(self.fixture.inWaiting()) |
| |
| def assert_success(self): |
| '''Check if the returned value from the fixture is OK.''' |
| ret = self.read() |
| if not re.search(self.fixture_echo, ret): |
| raise FixtureException('The communication with fixture was broken') |
| |
| def set_light(self, idx): |
| self.send(self.light_seq[idx]) |
| |
| def turn_off_light(self): |
| self.send(self.light_off) |
| |
| def wait_for_light_switch(self): |
| time.sleep(self.light_delay) |
| |
| |
| class ConnectionMonitor(): |
| """A wrapper to monitor hardware plug/unplug events.""" |
| def __init__(self): |
| self._monitoring = False |
| |
| def start(self, subsystem, device_type=None, on_insert=None, |
| on_remove=None): |
| if self._monitoring: |
| raise Exception("Multiple start() call is not allowed") |
| self.on_insert = on_insert |
| self.on_remove = on_remove |
| |
| # Setup the media monitor, |
| context = pyudev.Context() |
| self.monitor = pyudev.Monitor.from_netlink(context) |
| self.monitor.filter_by(subsystem, device_type) |
| self.monitor.start() |
| self._monitoring = True |
| self._watch_thread = threading.Thread(target=self.watch) |
| self._watch_end = threading.Event() |
| self._watch_thread.start() |
| |
| def watch(self): |
| fd = self.monitor.fileno() |
| while not self._watch_end.isSet(): |
| ret, _, _ = select.select([fd],[],[]) |
| if fd in ret: |
| action, dev = self.monitor.receive_device() |
| if action == 'add' and self.on_insert: |
| self.on_insert(dev.device_node) |
| elif action == 'remove' and self.on_remove: |
| self.on_remove(dev.device_node) |
| |
| def stop(self): |
| self._monitoring = False |
| self._watch_end.set() |
| |
| |
| class factory_CameraPerformanceAls(test.test): |
| version = 3 |
| preserve_srcdir = True |
| |
| _BAD_SERIAL_NUMBER = 'BAD_SN' |
| _NO_SERIAL_NUMBER = 'NO_SN' |
| |
| _PACKET_SIZE = 65000 |
| |
| # Status in the final result tab. |
| _STATUS_NAMES = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf', |
| 'als_stat', 'result'] |
| _STATUS_LABELS = ['Camera Functionality', |
| 'Camera Visual Correctness', |
| 'Camera Lens Shading', |
| 'Camera Image Sharpness', |
| 'ALS Functionality', |
| 'Test Result'] |
| _CAM_TESTS = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf'] |
| _ALS_TESTS = ['als_stat'] |
| |
| # LED patterns. |
| _LED_RUNNING_TEST = ((leds.LED_NUM|leds.LED_CAP, 0.05), (0, 0.05)) |
| |
| # CSS style classes defined in the corresponding HTML file. |
| _STYLE_INFO = "color_idle" |
| _STYLE_PASS = "color_good" |
| _STYLE_FAIL = "color_bad" |
| |
| def t_pass(self, msg): |
| return test_ui.MakeLabel(msg, css_class=self._STYLE_PASS) |
| |
| def t_fail(self, msg): |
| return test_ui.MakeLabel(msg, css_class=self._STYLE_FAIL) |
| |
| def update_status(self, mid=None, msg=None): |
| message = '' |
| if msg: |
| message = msg |
| elif mid: |
| message = test_ui.MakeLabel(self.config['message'][mid + '_en'], |
| self.config['message'][mid + '_zh'], |
| self.config['msg_style'][mid]) |
| self.ui.CallJSFunction("UpdateTestStatus", message) |
| |
| def update_pbar(self, pid=None, value=None, add=True): |
| precent = 0 |
| if value: |
| percent = value |
| elif pid: |
| all_time = self.config['chk_point'][self.type] |
| if add: |
| self.progress += self.config['chk_point'][pid] |
| else: |
| self.progress = self.config['chk_point'][pid] |
| percent = int(round((float(self.progress) / all_time) * 100)) |
| self.ui.CallJSFunction("UpdatePrograssBar", '%d%%' % percent) |
| |
| def register_events(self, events): |
| for event in events: |
| assert hasattr(self, event) |
| self.ui.AddEventHandler(event, getattr(self, event)) |
| |
| def get_test_chart_file(self): |
| return 'test_chart_%s.png' % self.test_chart_version |
| |
| def get_test_sample_file(self): |
| return 'sample_%s.png' % self.test_chart_version |
| |
| def prepare_test(self): |
| self.ref_data = camperf.PrepareTest(self.get_test_chart_file()) |
| |
| def download_param_from_shopfloor(self, event): |
| """Download test parameters from shopfloor server. |
| |
| This is callback from Javascript in order to run download_param() in |
| main thread because utils.Timeout only supports running in main thread. |
| """ |
| assert self.shopfloor |
| param = self.shopfloor.download_param(force_new_ip=True) |
| |
| self.prepare_test() |
| self.config = eval(param) |
| self.reset_data() |
| self.config_loaded = True |
| factory.console.info("Config loaded from shopfloor.") |
| self.ui.CallJSFunction("OnShopfloorInit", self.config['sn_format']) |
| |
| def on_usb_insert(self, dev_path): |
| if not self.config_loaded: |
| # Initialize common test reference data. |
| self.prepare_test() |
| # Load config files and reset test results. |
| self.dev_path = dev_path |
| with MountedMedia(dev_path, 1) as config_dir: |
| config_path = os.path.join(config_dir, 'camera.params') |
| self.config = self.base_config.Read(config_path) |
| self.reset_data() |
| self.config_loaded = True |
| factory.console.info("Config loaded.") |
| self.ui.CallJSFunction("OnUSBInit", self.config['sn_format']) |
| else: |
| self.dev_path = dev_path |
| self.ui.CallJSFunction("OnUSBInsertion") |
| |
| def on_usb_remove(self, dev_path): |
| if self.config_loaded: |
| factory.console.info("USB removal is not allowed during test!") |
| self.ui.CallJSFunction("OnUSBRemoval") |
| |
| def setup_fixture(self): |
| '''Initialize the communication with the fixture.''' |
| try: |
| self.fixture = Fixture(self.config['fixture']) |
| |
| # Go with the default(first) lighting intensity. |
| self.light_state = 0 |
| self.fixture.set_light(self.light_state) |
| if not self.unit_test: |
| self.fixture.assert_success() |
| except Exception as e: |
| self.fixture = None |
| self.log('Failed to initialize the test fixture.\n') |
| return False |
| self.log('Test fixture successfully initialized.\n') |
| return True |
| |
| def sync_fixture(self, event): |
| self.ui.CallJSFunction("OnDetectFixtureConnection") |
| cnt = 0 |
| while not self.setup_fixture(): |
| cnt += 1 |
| if cnt >= self.config['fixture']['n_retry']: |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| time.sleep(self.config['fixture']['retry_delay']) |
| self.ui.CallJSFunction("OnAddFixtureConnection") |
| |
| def on_u2s_insert(self, dev_path): |
| if self.config_loaded: |
| self.sync_fixture(None) |
| |
| def on_u2s_remove(self, dev_path): |
| if self.config_loaded: |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| |
| def update_result(self, row_name, result): |
| result_map = { |
| True: 'PASSED', |
| False: 'FAILED', |
| None: 'UNTESTED' |
| } |
| self.result_dict[row_name] = result_map[result] |
| |
| def update_fail_cause(self, fail_cause): |
| self.fail_cause = fail_cause |
| |
| def reset_data(self): |
| self.target = None |
| self.target_colorful = None |
| self.analyzed = None |
| self.log_to_file = StringIO.StringIO() |
| self.log = lambda *x: (factory.console.info(*x), |
| self.log_to_file.write(*x)) |
| |
| for var in self.status_names: |
| self.update_result(var, None) |
| self.fail_cause = '' |
| self.progress = 0 |
| self.ui.CallJSFunction("ResetUiData", "") |
| |
| def send_img_to_ui(self, data): |
| self.ui.CallJSFunction("ClearBuffer", "") |
| # Send the data in 64K packets due to the socket packet size limit. |
| data_len = len(data) |
| p = 0 |
| while p < data_len: |
| if p + self._PACKET_SIZE > data_len: |
| self.ui.CallJSFunction("AddBuffer", data[p:data_len-1]) |
| p = data_len |
| else: |
| self.ui.CallJSFunction("AddBuffer", |
| data[p:p+self._PACKET_SIZE]) |
| p += self._PACKET_SIZE |
| |
| def update_preview(self, img, container_id, scale=0.5): |
| # Encode the image in the JPEG format. |
| preview = cv2.resize(img, None, fx=scale, fy=scale, |
| interpolation=cv2.INTER_AREA) |
| cv2.imwrite('temp.jpg', preview) |
| with open('temp.jpg', 'r') as fd: |
| img_data = base64.b64encode(fd.read()) + "=" |
| |
| # Update the preview screen with javascript. |
| self.send_img_to_ui(img_data) |
| self.ui.CallJSFunction("UpdateImage", container_id) |
| return |
| |
| def compile_result(self, test_list, use_untest=True): |
| ret = self.result_dict |
| if all('PASSED' == ret[x] for x in test_list): |
| return True |
| if use_untest and any('UNTESTED' == ret[x] for x in test_list): |
| return None |
| return False |
| |
| def generate_final_result(self): |
| self.update_status(mid='end_test') |
| self.cam_pass = self.compile_result(self._CAM_TESTS) |
| if self.use_als: |
| self.als_pass = self.compile_result(self._ALS_TESTS) |
| result = self.compile_result(self.status_names[:-1], |
| use_untest=False) |
| else: |
| result = self.compile_result(self._CAM_TESTS, use_untest=False) |
| self.update_result('result', result) |
| self.log("Result in summary:\n%s\n" % |
| pprint.pformat(self.result_dict)) |
| Log('cam_performance_test_result', **self.result_dict) |
| Log('cam_performance_fail_cause', cam_fail_cause=self.fail_cause) |
| self.update_pbar(pid='end_test') |
| |
| def write_to_usb(self, filename, content, content_type=_CONTENT_TXT): |
| try: |
| with MountedMedia(self.dev_path, 1) as mount_dir: |
| if content_type == _CONTENT_TXT: |
| # Appends the text log. |
| with open(os.path.join(mount_dir, filename), 'a') as f: |
| f.write(content) |
| elif content_type == _CONTENT_IMG: |
| cv2.imwrite(os.path.join(mount_dir, filename), content) |
| except: |
| self.log("Error when writing data to USB!\n") |
| return False |
| return True |
| |
| def save_log_to_usb(self): |
| # Save an image for further analysis |
| self.update_status(mid='save_to_usb') |
| if ((self.target is not None) and |
| ((self.cam_pass and self.log_good_image) or |
| (not self.cam_pass and self.log_bad_image))): |
| if not self.write_to_usb(self.serial_number + ".bmp", |
| self.target, _CONTENT_IMG): |
| return False |
| if self.analyzed is not None: |
| if not self.write_to_usb(self.serial_number + ".result.jpg", |
| self.analyzed, _CONTENT_IMG): |
| return False |
| return self.write_to_usb( |
| self.serial_number + ".txt", self.log_to_file.getvalue()) |
| |
| def save_log_to_shopfloor(self): |
| ''' Updates image and aux log to shopfloor server.''' |
| self.update_status(mid='save_to_shopfloor') |
| |
| aux_log_list = [] |
| log_prefix = '_'.join([re.sub(r'\W+', '_', x) for x in |
| [os.environ.get('CROS_FACTORY_TEST_PATH'), |
| re.sub(r':', '', GetDeviceId()), |
| self.serial_number]]) |
| |
| aux_log_list.append((log_prefix + '.txt', self.log_to_file.getvalue())) |
| |
| # Adds images to aux logs. |
| if ((self.target is not None) and |
| ((self.cam_pass and self.log_good_image) or |
| (not self.cam_pass and self.log_bad_image))): |
| # TODO (jchuang): newer version of OpenCV has better imencode() |
| # Python method. |
| def get_image_data(cv_image, file_ext): |
| temp_fn = os.path.join(tempfile.gettempdir(), |
| TimedUuid() + file_ext) |
| try: |
| cv2.imwrite(temp_fn, cv_image) |
| with open(temp_fn, 'rb') as f: |
| return f.read() |
| except: # pylint: disable=W0702 |
| factory.console.error('Failed to read image data') |
| finally: |
| file_utils.TryUnlink(temp_fn) |
| |
| aux_log_list.extend([(log_prefix + '.bmp', |
| get_image_data(self.target, '.bmp')), |
| (log_prefix + '.jpg', |
| get_image_data(self.analyzed, '.jpg'))]) |
| |
| # If shopfloor.upload_log() returns FALSE only on critical error and |
| # we need to indicate error for retesting later. |
| if not self.shopfloor.upload_log(False, aux_log_list): |
| self.cam_pass = False |
| self.update_fail_cause("Shopfloor") |
| |
| def finalize_test(self): |
| self.generate_final_result() |
| if self.shopfloor: |
| self.save_log_to_shopfloor() |
| self.update_pbar(pid='save_to_shopfloor') |
| elif self.type in [_TEST_TYPE_AB, _TEST_TYPE_MODULE]: |
| # We block the test flow until we successfully dumped the result. |
| while not self.save_log_to_usb(): |
| time.sleep(0.5) |
| self.update_pbar(pid='save_to_usb') |
| |
| # Display final result. |
| def get_str(ret, prefix, use_untest=True): |
| if ret: |
| return self.t_pass(prefix + 'PASS') |
| if use_untest and (ret is None): |
| return self.t_fail(prefix + 'UNFINISHED') |
| return self.t_fail(prefix + 'FAIL') |
| cam_result = get_str(self.cam_pass, 'Camera: ', False) |
| if self.use_als: |
| als_result = get_str(self.als_pass, 'ALS: ') |
| self.update_status(msg=cam_result + ' ' + self.fail_cause + |
| '<br>' + als_result) |
| else: |
| self.update_status(msg=cam_result + ' ' + self.fail_cause) |
| |
| # Reset serial number if passed. Otherwise, operator may forget to input |
| # serial number again. |
| if self.cam_pass and self.type in [_TEST_TYPE_AB, _TEST_TYPE_MODULE]: |
| self.ui.CallJSFunction("RestartSnInputBox") |
| |
| self.update_pbar(value=100) |
| |
| def exit_test(self, event): |
| factory.log('%s run_once finished' % self.__class__) |
| if self.result_dict['result'] == 'PASSED': |
| self.ui.Pass() |
| else: |
| self.ui.Fail('Camera/ALS test failed.') |
| |
| def run_test(self, event=None): |
| self.reset_data() |
| self.update_status(mid='start_test') |
| |
| if self.talk_to_fixture and not self.setup_fixture(): |
| self.update_status(mid='fixture_fail') |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| self.update_pbar(pid='start_test') |
| |
| self.log('Parameter version: %s\n' % self.config['version']) |
| |
| if self.auto_serial_number: |
| # If fails to get serial number, it will display failed in |
| # test_camera_functionality() later |
| ret, auto_sn, error_message = self.auto_get_serial_number() |
| if ret: |
| self.serial_number = auto_sn |
| self.log('Read serial number: %s\n' % auto_sn) |
| else: |
| self.serial_number = self._BAD_SERIAL_NUMBER |
| self.log('No serial number detected: %s\n' % error_message) |
| elif self.type in [_TEST_TYPE_AB, _TEST_TYPE_MODULE]: |
| self.serial_number = event.data.get('sn', '') |
| else: |
| self.serial_number = self._NO_SERIAL_NUMBER |
| |
| if self.type == _TEST_TYPE_FULL: |
| with leds.Blinker(self._LED_RUNNING_TEST): |
| self.test_camera_performance() |
| self.update_pbar(pid='cam_finish', add=False) |
| if self.use_als: |
| self.test_als_calibration() |
| self.update_pbar(pid='als_finish' + self.type, add=False) |
| else: |
| self.test_camera_performance() |
| self.update_pbar(pid='cam_finish', add=False) |
| if self.use_als: |
| self.test_als_calibration() |
| self.update_pbar(pid='als_finish' + self.type, add=False) |
| |
| self.finalize_test() |
| |
| def capture_low_noise_image(self, cam, n_samples): |
| '''Capture a sequence of images and average them to reduce noise.''' |
| if n_samples < 1: |
| n_samples = 1 |
| success, img = cam.read() |
| if not success: |
| return None |
| img = img.astype(np.float64) |
| for t in range(n_samples - 1): |
| success, temp_img = cam.read() |
| if not success: |
| return None |
| img += temp_img.astype(np.float64) |
| img /= n_samples |
| return img.round().astype(np.uint8) |
| |
| def read_usb_attribute(self, device_string, pattern): |
| '''Read and matches regexp pattern in 'lsusb -v' output. |
| |
| Args: |
| device_string: keyword to search for the camera device |
| pattern: regexp pattern with one matching group in MULTILINE mode |
| |
| Returns: |
| A tuple of (is successful, matched string, error message when failed). |
| ''' |
| lsusb_output = SpawnOutput(['lsusb', '-v'], log=True) |
| |
| # Split into several blocks of individual USB device |
| splitter = re.compile(r'^Bus\s+\d+\s+Device\s+\d+', |
| flags = re.MULTILINE) |
| blocks = splitter.split(lsusb_output) |
| matched_blocks = [b for b in blocks if device_string in b] |
| |
| err_message = None |
| ret = False |
| matched_string = None |
| if len(matched_blocks) == 0: |
| err_message = 'No matched camera device for "%s"' % device_string |
| elif len(matched_blocks) > 1: |
| err_message = ('Multiple matched devices found for "%s"' % |
| device_string) |
| else: |
| # Camera module should publish firmware version in 'bcdDevice' field |
| search_result = re.search(pattern, |
| matched_blocks[0], flags = re.MULTILINE) |
| if search_result: |
| matched_string = search_result.group(1) |
| ret = True |
| else: |
| err_message = ( |
| 'Regexp "%s" not matched in lsusb output for "%s"' % |
| (pattern, device_string)) |
| |
| return (ret, matched_string, err_message) |
| |
| def verify_firmware(self, device_string, firmware_string): |
| '''Checks the firmware version of camera module. |
| |
| Args: |
| device_string: keyword to search for the camera device |
| firmware_string: expected firmware string in bcdDevice field |
| |
| Returns: |
| A tuple of (is successful, error message when failed). |
| ''' |
| ret, matched_string, err_message = self.read_usb_attribute( |
| device_string, |
| r'^\s*bcdDevice\s+(\S+)') |
| |
| if ret: |
| if firmware_string != matched_string: |
| err_message = ('Wrong firmware version %s, expecting %s' % |
| (matched_string, firmware_string)) |
| ret = False |
| |
| return (ret, err_message) |
| |
| def auto_get_serial_number(self): |
| '''Auto read the firmware version of camera module. |
| |
| Returns: |
| A tuple of (is successful, matched string, error message when failed). |
| ''' |
| if not self.auto_serial_number: |
| return (False, None, 'auto_serial_number is not defined') |
| ret, matched_string, err_message = self.read_usb_attribute( |
| self.auto_serial_number[0], |
| self.auto_serial_number[1]) |
| return (ret, matched_string, err_message) |
| |
| def test_camera_functionality(self): |
| # Check serial number |
| if self.serial_number == self._BAD_SERIAL_NUMBER: |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("NoCamera") |
| return False |
| |
| # Check firmware version |
| if self.config['firmware']['check_firmware']: |
| device_string = self.config['firmware']['device_string'] |
| firmware_string = self.config['firmware']['firmware_string'] |
| ret, error_message = self.verify_firmware(device_string, |
| firmware_string) |
| if not ret: |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("NoCamera") |
| self.log(error_message + '\n') |
| return False |
| |
| # Initialize the camera with OpenCV. |
| self.update_status(mid='init_cam') |
| cam = cv2.VideoCapture(self.device_index) |
| if not cam.isOpened(): |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("BadCamera") |
| self.log('Failed to initialize the camera. ' |
| 'Could be bad module, bad connection or ' |
| 'bad USB initialization.\n') |
| return False |
| self.update_pbar(pid='init_cam') |
| |
| # Set resolution. |
| self.update_status(mid='set_cam_res') |
| conf = self.config['cam_stat'] |
| cam.set(cv.CV_CAP_PROP_FRAME_WIDTH, conf['img_width']) |
| cam.set(cv.CV_CAP_PROP_FRAME_HEIGHT, conf['img_height']) |
| if (conf['img_width'] != cam.get(cv.CV_CAP_PROP_FRAME_WIDTH) or |
| conf['img_height'] != cam.get(cv.CV_CAP_PROP_FRAME_HEIGHT)): |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("BadCamera") |
| self.log("Can't set the image size. " |
| "Possibly caused by bad USB initialization.\n") |
| return False |
| self.update_pbar(pid='set_cam_res') |
| |
| # Try reading an image from the camera. |
| self.update_status(mid='try_read_cam') |
| success, _ = cam.read() |
| if not success: |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("BadCamera") |
| self.log("Failed to capture an image with the camera.\n") |
| return False |
| self.update_pbar(pid='try_read_cam') |
| |
| # Let the camera's auto-exposure algorithm adjust to the fixture |
| # lighting condition. |
| self.update_status(mid='wait_cam_awb') |
| start = time.time() |
| while time.time() - start < conf['buf_time']: |
| _, _ = cam.read() |
| self.update_pbar(pid='wait_cam_awb') |
| |
| # Read the image that we will use. |
| self.update_status(mid='record_img') |
| n_samples = conf['n_samples'] |
| self.target_colorful = self.capture_low_noise_image(cam, n_samples) |
| if self.target_colorful is None: |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.update_fail_cause("BadCamera") |
| self.log("Error reading images from the camera!\n") |
| return False |
| if self.unit_test: |
| self.target_colorful = cv2.imread(self.get_test_sample_file()) |
| |
| self.target = cv2.cvtColor(self.target_colorful, cv.CV_BGR2GRAY) |
| self.update_result('cam_stat', True) |
| self.log('Successfully captured a sample image.\n') |
| self.update_preview(self.target_colorful, "camera_image", |
| scale=self.config['preview']['scale']) |
| cam.release() |
| self.update_pbar(pid='record_img') |
| return True |
| |
| def test_camera_performance(self): |
| if not self.test_camera_functionality(): |
| return |
| |
| # Export log to both cros.factory.event_log and text log |
| visual_data = {} |
| visual_data['camera_sn'] = self.serial_number |
| |
| def log_visual_data(value, event_key, log_text_fmt): |
| self.log((log_text_fmt % value) + '\n') |
| visual_data[event_key] = value |
| |
| def finish_log_visual_data(): |
| Log('cam_performance_visual_analysis', **visual_data) |
| |
| # Check the captured test pattern image validity. |
| self.update_status(mid='check_vc') |
| |
| success, tar_data = camperf.CheckVisualCorrectness( |
| self.target, self.ref_data, **self.config['cam_vc']) |
| self.analyzed = self.target_colorful.copy() |
| renderer.DrawVC(self.analyzed, success, tar_data) |
| self.update_preview(self.analyzed, "analyzed_image", |
| scale=self.config['preview']['scale']) |
| |
| self.update_result('cam_vc', success) |
| if hasattr(tar_data, 'shift'): |
| log_visual_data(float(tar_data.shift), 'image_shift', |
| 'Image shift percentage: %f') |
| log_visual_data(float(tar_data.v_shift[0]), 'image_shift_x', |
| 'Image shift X: %f') |
| log_visual_data(float(tar_data.v_shift[1]), 'image_shift_y', |
| 'Image shift Y: %f') |
| log_visual_data(float(tar_data.tilt), 'image_tilt', |
| 'Image tilt: %f degrees') |
| if not success: |
| if hasattr(tar_data, 'sample_corners'): |
| log_visual_data(int(tar_data.sample_corners.shape[0]), |
| 'corners', 'Found corners count: %d') |
| if hasattr(tar_data, 'edges'): |
| log_visual_data(int(tar_data.edges.shape[0]), 'edges', |
| 'Found square edges count: %d') |
| |
| if 'shift' in tar_data.msg: |
| self.update_fail_cause('Shift') |
| elif 'tilt' in tar_data.msg: |
| self.update_fail_cause('Tilt') |
| else: |
| self.update_fail_cause('WrongImage') |
| |
| self.log('Visual correctness: %s\n' % tar_data.msg) |
| finish_log_visual_data() |
| return |
| self.update_pbar(pid='check_vc') |
| |
| # Check if the lens shading is present. |
| self.update_status(mid='check_ls') |
| success, tar_ls = camperf.CheckLensShading( |
| self.target, **self.config['cam_ls']) |
| |
| self.update_result('cam_ls', success) |
| if tar_ls.check_low_freq: |
| log_visual_data(float(tar_ls.response), 'ls_low_freq', |
| 'Low-frequency response value: %f') |
| if tar_ls.lowest_ratio: |
| log_visual_data(float(tar_ls.lowest_ratio), 'ls_lowest_ratio', |
| 'Len shading ratio: %f') |
| if not success: |
| self.log('Lens shading: %s\n' % tar_ls.msg) |
| self.update_fail_cause('LenShading') |
| finish_log_visual_data() |
| return |
| self.update_pbar(pid='check_ls') |
| |
| # Check the image sharpness. |
| self.update_status(mid='check_mtf') |
| success, tar_mtf = camperf.CheckSharpness( |
| self.target, tar_data.edges, **self.config['cam_mtf']) |
| renderer.DrawMTF(self.analyzed, tar_data.edges, tar_mtf.perm, |
| tar_mtf.mtfs, |
| self.config['cam_mtf']['mtf_crop_ratio'], |
| self.config['preview']['mtf_color_map_range']) |
| self.update_preview(self.analyzed, "analyzed_image", |
| scale=self.config['preview']['scale']) |
| |
| self.update_result('cam_mtf', success) |
| log_visual_data(float(tar_mtf.mtf), 'median_MTF', 'MTF value: %f') |
| if hasattr(tar_mtf, 'min_mtf'): |
| log_visual_data(float(tar_mtf.min_mtf), 'lowest_MTF', |
| 'Lowest MTF value: %f') |
| if not success: |
| self.log('Sharpness: %s\n' % tar_mtf.msg) |
| self.update_fail_cause('MTF') |
| |
| finish_log_visual_data() |
| self.update_pbar(pid='check_mtf') |
| return |
| |
| def test_als_write_vpd(self, calib_result): |
| self.update_status(mid='dump_to_vpd') |
| conf = self.config['als'] |
| if not calib_result: |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('ALS calibration data is incorrect.\n') |
| return False |
| if subprocess.call(conf['save_vpd'] % calib_result, shell=True): |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('Writing VPD data failed!\n') |
| return False |
| self.log('Successfully calibrated ALS scales.\n') |
| self.update_pbar(pid='dump_to_vpd') |
| return True |
| |
| def test_als_switch_to_next_light(self): |
| self.update_status(mid='adjust_light') |
| conf = self.config['als'] |
| self.light_state += 1 |
| self.fixture.set_light(self.light_state) |
| self.update_pbar(pid='adjust_light') |
| if not self.unit_test: |
| self.fixture.assert_success() |
| if self.light_state >= len(conf['luxs']): |
| return False |
| self.update_status(mid='wait_fixture') |
| self.fixture.wait_for_light_switch() |
| self.update_pbar(pid='wait_fixture') |
| return True |
| |
| def test_als_calibration(self): |
| # Initialize the ALS. |
| self.update_status(mid='init_als') |
| conf = self.config['als'] |
| self.als = ALS(val_path=conf['val_path'], |
| scale_path=conf['scale_path']) |
| if not self.als.detected: |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('Failed to initialize the ALS.\n') |
| return |
| self.als.set_scale_factor(conf['calibscale']) |
| self.update_pbar(pid='init_als') |
| |
| # Go through all different lighting settings |
| # and record ALS values. |
| calib_result = 0 |
| try: |
| vals = [] |
| while True: |
| # Get ALS values. |
| self.update_status(mid='read_als%d' % self.light_state) |
| scale = self.als.get_scale_factor() |
| val = self.als.read_mean(samples=conf['n_samples'], |
| delay=conf['read_delay']) |
| vals.append(val) |
| self.log('Lighting preset lux value: %d\n' % |
| conf['luxs'][self.light_state]) |
| self.log('ALS value: %d\n' % val) |
| self.log('ALS calibration scale: %d\n' % scale) |
| # Check if it is a false read. |
| if not val: |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('The ALS value is stuck at zero.\n') |
| return |
| # Compute calibration data if it is the calibration target. |
| if conf['luxs'][self.light_state] == conf['calib_lux']: |
| calib_result = int(round(float(conf['calib_target']) / |
| val * scale)) |
| self.log('ALS calibration data will be %d\n' % |
| calib_result) |
| self.update_pbar(pid='read_als%d' % self.light_state) |
| |
| # Go to the next lighting preset. |
| if not self.test_als_switch_to_next_light(): |
| break |
| |
| # Check value ordering. |
| for i, li in enumerate(conf['luxs']): |
| for j in range(i): |
| if ((li > conf['luxs'][j] and vals[j] >= vals[i]) or |
| (li < conf['luxs'][j] and vals[j] <= vals[i])): |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('The ordering of ALS values is wrong.\n') |
| return |
| except (FixtureException, serial.serialutil.SerialException) as e: |
| self.fixture = None |
| self.update_result('als_stat', None) |
| self.log("The test fixture was disconnected!\n") |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| except: |
| self.update_result('als_stat', False) |
| self.update_fail_cause("ALS") |
| self.log('Failed to read values from ALS or unknown error.\n') |
| return |
| self.log('Successfully recorded ALS values.\n') |
| |
| # Save ALS values to vpd for FATP test. |
| if self.type == _TEST_TYPE_FULL: |
| if not self.test_als_write_vpd(calib_result): |
| return |
| self.update_result('als_stat', True) |
| return |
| |
| def run_once(self, test_type = _TEST_TYPE_FULL, unit_test = False, |
| use_als = True, test_chart_version = 'A', |
| log_good_image = False, log_bad_image = True, |
| data_method = 'usb', ip_addr = None, bridge_ip_addr = None, |
| shopfloor_directory = None, shopfloor_param_file = None, |
| device_index = -1, ignore_enter_key = False, |
| auto_serial_number = None): |
| '''The entry point of the test. |
| |
| Args: |
| test_type: Run the full machine test or the AB panel test. The AB |
| panel will be run on a host that is used to test |
| connected AB panels (possibly many), while the full |
| machine test would test only the machine that runs it |
| and then exit. |
| unit_test: Run the unit-test mode. The unit-test mode is used to |
| test the test integrity when the test fixture is not |
| available. It should be run on a machine that has a |
| working camera and a working ALS. Please place the |
| camera parameter file under the src directory on an USB |
| stick for use and connect the machine with an |
| USB-to-RS232 converter cable with the designated chipset |
| in the parameter file. The test will replace the |
| captured image with the sample test image and run the |
| camera performance test on it. |
| use_als: Whether to use the ambient light sensor. |
| test_chart_version: Version of the test chart. |
| log_good_image: Log images that pass that test. |
| log_bad_image: Log images that fail that test. |
| data_method: How to read parameter and save results. |
| ('usb', 'shopfloor', 'bridge') |
| ip_addr: Network setting when data_method is shopfloor or bridge. |
| If ip_addr is None, use DHCP. |
| shopfloor_directory: Relative path to shopfloor parameters folder. |
| shopfloor_param_file: Filename of camera parameter file on shopfloor |
| server. |
| bridge_ip_addr: IP addr of shopfloor bridge. |
| device_index: video device index (-1 to auto pick device by OpenCV). |
| ignore_enter_key: disable enter key in serial number input. |
| (Some barcode reader automatically input enter |
| key, but we may prefer not to start the test |
| immediately after barcode is scanned.) |
| auto_serial_number: None or (module keyword, regexp pattern with one |
| matching group in MULTILINE mode) |
| It support all Module, AB, and Full test types. |
| Ex: ('VendorName', r'^\s*iSerial\s+\S+\s+(\S+)') |
| ''' |
| factory.log('%s run_once' % self.__class__) |
| |
| # Add signal handler to close opened camera interface when get killed |
| # TODO: this should be done in autotest framework instead |
| signal_handler = lambda signum, frame: sys.exit(1) |
| signal.signal(signal.SIGTERM, signal_handler) |
| signal.signal(signal.SIGINT, signal_handler) |
| |
| # Set logging level. Otherwise, update_preview() will log the whole |
| # image data to factory log under the default log level of autotest. |
| logging.getLogger().setLevel(logging.INFO) |
| |
| # Initialize variables and environment. |
| assert test_type in [_TEST_TYPE_FULL, _TEST_TYPE_AB, _TEST_TYPE_MODULE] |
| assert unit_test in [True, False] |
| assert use_als in [True, False] |
| assert test_chart_version in ['A', 'B'] |
| assert log_good_image in [True, False] |
| assert log_bad_image in [True, False] |
| assert data_method in ['usb', 'shopfloor', 'bridge'] |
| assert shopfloor_directory is None or type(shopfloor_directory) == str |
| assert shopfloor_param_file is None or type(shopfloor_param_file) == str |
| assert bridge_ip_addr is None or type(bridge_ip_addr) == str |
| assert ignore_enter_key in [True, False] |
| assert auto_serial_number is None or type(auto_serial_number) == tuple |
| self.type = test_type |
| self.unit_test = unit_test |
| self.use_als = use_als |
| self.test_chart_version = test_chart_version |
| self.log_good_image = log_good_image |
| self.log_bad_image = log_bad_image |
| if data_method in ('shopfloor', 'bridge'): |
| if data_method == 'bridge': |
| bridge_mode = True |
| else: |
| bridge_mode = False |
| self.shopfloor = _ShopfloorWrapper( |
| ip_addr=ip_addr, |
| bridge_mode=bridge_mode, |
| bridge_ip=bridge_ip_addr, |
| shopfloor_directory=shopfloor_directory, |
| shopfloor_param_file=shopfloor_param_file) |
| else: |
| self.shopfloor = None |
| self.device_index = device_index |
| self.ignore_enter_key = ignore_enter_key |
| self.auto_serial_number = auto_serial_number |
| |
| self.talk_to_fixture = use_als |
| self.config_loaded = False |
| self.status_names = self._STATUS_NAMES |
| self.status_labels = self._STATUS_LABELS |
| self.result_dict = {} |
| self.fail_cause = '' |
| self.base_config = PluggableConfig({}) |
| os.chdir(self.srcdir) |
| |
| if not self.shopfloor: |
| # Setup the usb disk and usb-to-serial adapter monitor. |
| usb_monitor = ConnectionMonitor() |
| usb_monitor.start(subsystem='block', device_type='disk', |
| on_insert=self.on_usb_insert, |
| on_remove=self.on_usb_remove) |
| |
| if self.talk_to_fixture: |
| u2s_monitor = ConnectionMonitor() |
| u2s_monitor.start(subsystem='usb-serial', |
| on_insert=self.on_u2s_insert, |
| on_remove=self.on_u2s_remove) |
| |
| if self.type == _TEST_TYPE_FULL or self.auto_serial_number: |
| input_serial_number = False |
| else: |
| input_serial_number = True |
| |
| # Startup the UI. |
| self.ui = UI() |
| self.register_events(['sync_fixture', 'exit_test', 'run_test', |
| 'download_param_from_shopfloor']) |
| self.ui.CallJSFunction("InitLayout", self.talk_to_fixture, |
| True if self.shopfloor else False, |
| input_serial_number, |
| self.ignore_enter_key) |
| try: |
| self.ui.Run() |
| except factory.FactoryTestFailure as e: |
| raise error.TestError(e.message) |