| # -*- coding: utf-8 -*- |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # Import guard for OpenCV. |
| try: |
| import cv |
| import cv2 |
| except ImportError: |
| pass |
| |
| import base64 |
| import numpy as np |
| import os |
| import pprint |
| import pyudev |
| import re |
| import select |
| import serial |
| import StringIO |
| import subprocess |
| import threading |
| import time |
| |
| import autotest_lib.client.cros.camera.perf_tester as camperf |
| import autotest_lib.client.cros.camera.renderer as renderer |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import factory_setup_modules |
| from cros.factory.test import factory |
| from cros.factory.test import leds |
| from cros.factory.test import test_ui |
| from cros.factory.test.media_util import MountedMedia |
| from autotest_lib.client.cros.rf.config import PluggableConfig |
| from autotest_lib.client.cros import tty |
| from cros.factory.test.test_ui import UI |
| |
| |
| # Test type constants: |
| _TEST_TYPE_AB = 'AB' |
| _TEST_TYPE_FULL = 'Full' |
| |
| # Content type constants: |
| _CONTENT_IMG = 'image' |
| _CONTENT_TXT = 'text' |
| |
| |
| class ALS(): |
| '''Class to interface the ambient light sensor over iio.''' |
| |
| # Default device paths. |
| _VAL_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_input' |
| _SCALE_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_calibscale' |
| |
| # Default min delay seconds. |
| _DEFAULT_MIN_DELAY = 0.178 |
| |
| def __init__(self, val_path=_VAL_DEV_PATH, |
| scale_path=_SCALE_DEV_PATH): |
| self.detected = True |
| if (not os.path.isfile(val_path) or |
| not os.path.isfile(scale_path)): |
| self.detected = False |
| return |
| self.val_path = val_path |
| self.scale_path = scale_path |
| |
| def _read_core(self): |
| fd = open(self.val_path) |
| val = int(fd.readline().rstrip()) |
| fd.close() |
| return val |
| |
| def _read(self, delay=None, samples=1): |
| '''Read the light sensor value. |
| |
| Args: |
| delay: Delay between samples in seconds. 0 means as fast as |
| possible. |
| samples: Total samples to read. |
| |
| Returns: |
| The light sensor values in a list. |
| ''' |
| if samples < 1: |
| samples = 1 |
| if delay is None: |
| delay = self._DEFAULT_MIN_DELAY |
| |
| buf = [] |
| # The first value might be contaminated by previous settings. |
| # We need to skip it for better accuracy. |
| self._read_core() |
| for dummy in range(samples): |
| time.sleep(delay) |
| val = self._read_core() |
| buf.append(val) |
| |
| return buf |
| |
| def read_mean(self, delay=None, samples=1): |
| if not self.detected: |
| return None |
| |
| buf = self._read(delay, samples) |
| return int(round(float(sum(buf)) / len(buf))) |
| |
| def set_scale_factor(self, scale): |
| if not self.detected: |
| return None |
| |
| fd = open(self.scale_path, 'w') |
| fd.write(str(int(round(scale)))) |
| fd.close() |
| return |
| |
| def get_scale_factor(self): |
| if not self.detected: |
| return None |
| |
| fd = open(self.scale_path) |
| s = int(fd.readline().rstrip()) |
| fd.close() |
| return s |
| |
| |
| class FixtureException(Exception): |
| pass |
| |
| |
| class Fixture(): |
| '''Class for communication with the test fixture.''' |
| |
| def __init__(self, params): |
| # Setup the serial port communication. |
| tty_path = tty.find_tty_by_driver(params['driver']) |
| self.fixture = serial.Serial(port=tty_path, |
| **params['serial_params']) |
| self.fixture.flush() |
| |
| # Load parameters. |
| self.serial_delay = params['serial_delay'] |
| self.light_delay = params['light_delay'] |
| self.light_seq = params['light_seq'] |
| self.fixture_echo = params['echo'] |
| self.light_off = params['off'] |
| |
| def send(self, msg): |
| '''Send control messages to the fixture.''' |
| for c in msg: |
| self.fixture.write(str(c)) |
| self.fixture.flush() |
| # The fixture needs some time to process each incoming character. |
| time.sleep(self.serial_delay) |
| |
| def read(self): |
| return self.fixture.read(self.fixture.inWaiting()) |
| |
| def assert_success(self): |
| '''Check if the returned value from the fixture is OK.''' |
| ret = self.read() |
| if not re.search(self.fixture_echo, ret): |
| raise FixtureException('The communication with fixture was broken') |
| |
| def set_light(self, idx): |
| self.send(self.light_seq[idx]) |
| |
| def turn_off_light(self): |
| self.send(self.light_off) |
| |
| def wait_for_light_switch(self): |
| time.sleep(self.light_delay) |
| |
| |
| class ConnectionMonitor(): |
| """A wrapper to monitor hardware plug/unplug events.""" |
| def __init__(self): |
| self._monitoring = False |
| |
| def start(self, subsystem, device_type=None, on_insert=None, |
| on_remove=None): |
| if self._monitoring: |
| raise Exception("Multiple start() call is not allowed") |
| self.on_insert = on_insert |
| self.on_remove = on_remove |
| |
| # Setup the media monitor, |
| context = pyudev.Context() |
| self.monitor = pyudev.Monitor.from_netlink(context) |
| self.monitor.filter_by(subsystem, device_type) |
| self.monitor.start() |
| self._monitoring = True |
| self._watch_thread = threading.Thread(target=self.watch) |
| self._watch_end = threading.Event() |
| self._watch_thread.start() |
| |
| def watch(self): |
| fd = self.monitor.fileno() |
| while not self._watch_end.isSet(): |
| ret, _, _ = select.select([fd],[],[]) |
| if fd in ret: |
| action, dev = self.monitor.receive_device() |
| if action == 'add' and self.on_insert: |
| self.on_insert(dev.device_node) |
| elif action == 'remove' and self.on_remove: |
| self.on_remove(dev.device_node) |
| |
| def stop(self): |
| self._monitoring = False |
| self._watch_end.set() |
| |
| |
| class factory_CameraPerformanceAls(test.test): |
| version = 2 |
| preserve_srcdir = True |
| |
| # OpenCV will automatically search for a working camera device if we use |
| # the index -1. |
| _DEVICE_INDEX = -1 |
| _TEST_CHART_FILE = 'test_chart.png' |
| _TEST_SAMPLE_FILE = 'sample.png' |
| |
| _PACKET_SIZE = 65000 |
| |
| # Status in the final result tab. |
| _STATUS_NAMES = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf', |
| 'als_stat', 'result'] |
| _STATUS_LABELS = ['Camera Functionality', |
| 'Camera Visual Correctness', |
| 'Camera Lens Shading', |
| 'Camera Image Sharpness', |
| 'ALS Functionality', |
| 'Test Result'] |
| _CAM_TESTS = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf'] |
| _ALS_TESTS = ['als_stat'] |
| |
| # LED patterns. |
| _LED_RUNNING_TEST = ((leds.LED_NUM|leds.LED_CAP, 0.05), (0, 0.05)) |
| |
| # CSS style classes defined in the corresponding HTML file. |
| _STYLE_INFO = "color_idle" |
| _STYLE_PASS = "color_good" |
| _STYLE_FAIL = "color_bad" |
| |
| def t_pass(self, msg): |
| return test_ui.MakeLabel(msg, css_class=self._STYLE_PASS) |
| |
| def t_fail(self, msg): |
| return test_ui.MakeLabel(msg, css_class=self._STYLE_FAIL) |
| |
| def update_status(self, mid=None, msg=None): |
| message = '' |
| if msg: |
| message = msg |
| elif mid: |
| message = test_ui.MakeLabel(self.config['message'][mid + '_en'], |
| self.config['message'][mid + '_zh'], |
| self.config['msg_style'][mid]) |
| self.ui.CallJSFunction("UpdateTestStatus", message) |
| |
| def update_pbar(self, pid=None, value=None, add=True): |
| precent = 0 |
| if value: |
| percent = value |
| elif pid: |
| all_time = self.config['chk_point'][self.type] |
| if add: |
| self.progress += self.config['chk_point'][pid] |
| else: |
| self.progress = self.config['chk_point'][pid] |
| percent = int(round((float(self.progress) / all_time) * 100)) |
| self.ui.CallJSFunction("UpdatePrograssBar", '%d%%' % percent) |
| |
| def register_events(self, events): |
| for event in events: |
| assert hasattr(self, event) |
| self.ui.AddEventHandler(event, getattr(self, event)) |
| |
| def prepare_test(self): |
| self.ref_data = camperf.PrepareTest(self._TEST_CHART_FILE) |
| |
| def on_usb_insert(self, dev_path): |
| if not self.config_loaded: |
| # Initialize common test reference data. |
| self.prepare_test() |
| # Load config files and reset test results. |
| self.dev_path = dev_path |
| with MountedMedia(dev_path, 1) as config_dir: |
| config_path = os.path.join(config_dir, 'camera.params') |
| self.config = self.base_config.Read(config_path) |
| self.reset_data() |
| self.config_loaded = True |
| factory.console.info("Config loaded.") |
| self.ui.CallJSFunction("OnUSBInit", self.config['sn_format']) |
| else: |
| self.dev_path = dev_path |
| self.ui.CallJSFunction("OnUSBInsertion") |
| |
| def on_usb_remove(self, dev_path): |
| if self.config_loaded: |
| factory.console.info("USB removal is not allowed during test!") |
| self.ui.CallJSFunction("OnUSBRemoval") |
| |
| def setup_fixture(self): |
| '''Initialize the communication with the fixture.''' |
| try: |
| self.fixture = Fixture(self.config['fixture']) |
| |
| # Go with the default(first) lighting intensity. |
| self.light_state = 0 |
| self.fixture.set_light(self.light_state) |
| if not self.unit_test: |
| self.fixture.assert_success() |
| except Exception as e: |
| self.fixture = None |
| self.log('Failed to initialize the test fixture.\n') |
| return False |
| self.log('Test fixture successfully initialized.\n') |
| return True |
| |
| def sync_fixture(self, event): |
| self.ui.CallJSFunction("OnDetectFixtureConnection") |
| cnt = 0 |
| while not self.setup_fixture(): |
| cnt += 1 |
| if cnt >= self.config['fixture']['n_retry']: |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| time.sleep(self.config['fixture']['retry_delay']) |
| self.ui.CallJSFunction("OnAddFixtureConnection") |
| |
| def on_u2s_insert(self, dev_path): |
| if self.config_loaded: |
| self.sync_fixture(None) |
| |
| def on_u2s_remove(self, dev_path): |
| if self.config_loaded: |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| |
| def update_result(self, row_name, result): |
| result_map = { |
| True: 'PASSED', |
| False: 'FAILED', |
| None: 'UNTESTED' |
| } |
| self.result_dict[row_name] = result_map[result] |
| |
| def reset_data(self): |
| self.target = None |
| self.target_colorful = None |
| self.analyzed = None |
| if self.type == _TEST_TYPE_FULL: |
| self.log = factory.console.info |
| else: |
| self.log_to_file = StringIO.StringIO() |
| self.log = lambda *x: (factory.console.info(*x), |
| self.log_to_file.write(*x)) |
| |
| for var in self.status_names: |
| self.update_result(var, None) |
| self.progress = 0 |
| self.ui.CallJSFunction("ResetUiData", "") |
| |
| def send_img_to_ui(self, data): |
| self.ui.CallJSFunction("ClearBuffer", "") |
| # Send the data in 64K packets due to the socket packet size limit. |
| data_len = len(data) |
| p = 0 |
| while p < data_len: |
| if p + self._PACKET_SIZE > data_len: |
| self.ui.CallJSFunction("AddBuffer", data[p:data_len-1]) |
| p = data_len |
| else: |
| self.ui.CallJSFunction("AddBuffer", |
| data[p:p+self._PACKET_SIZE]) |
| p += self._PACKET_SIZE |
| |
| def update_preview(self, img, container_id, scale=0.5): |
| # Encode the image in the JPEG format. |
| preview = cv2.resize(img, None, fx=scale, fy=scale, |
| interpolation=cv2.INTER_AREA) |
| cv2.imwrite('temp.jpg', preview) |
| with open('temp.jpg', 'r') as fd: |
| img_data = base64.b64encode(fd.read()) + "=" |
| |
| # Update the preview screen with javascript. |
| self.send_img_to_ui(img_data) |
| self.ui.CallJSFunction("UpdateImage", container_id) |
| return |
| |
| def compile_result(self, test_list, use_untest=True): |
| ret = self.result_dict |
| if all('PASSED' == ret[x] for x in test_list): |
| return True |
| if use_untest and any('UNTESTED' == ret[x] for x in test_list): |
| return None |
| return False |
| |
| def generate_final_result(self): |
| self.update_status(mid='end_test') |
| self.cam_pass = self.compile_result(self._CAM_TESTS) |
| self.als_pass = self.compile_result(self._ALS_TESTS) |
| result = self.compile_result(self.status_names[:-1], use_untest=False) |
| self.update_result('result', result) |
| self.log("Result in summary:\n%s\n" % |
| pprint.pformat(self.result_dict)) |
| self.update_pbar(pid='end_test') |
| |
| def write_to_usb(self, filename, content, content_type=_CONTENT_TXT): |
| try: |
| with MountedMedia(self.dev_path, 1) as mount_dir: |
| if content_type == _CONTENT_TXT: |
| with open(os.path.join(mount_dir, filename), 'a') as f: |
| f.write(content) |
| elif content_type == _CONTENT_IMG: |
| cv2.imwrite(os.path.join(mount_dir, filename), content) |
| except: |
| self.log("Error when writing data to USB!\n") |
| return False |
| return True |
| |
| def save_log_to_usb(self): |
| # Save an image for further analysis in case of the camera |
| # performance fail. |
| self.update_status(mid='save_to_usb') |
| if (not self.cam_pass) and (self.target is not None): |
| if not self.write_to_usb(self.serial_number + ".bmp", |
| self.target, _CONTENT_IMG): |
| return False |
| if self.analyzed is not None: |
| if not self.write_to_usb(self.serial_number + ".result.jpg", |
| self.analyzed, _CONTENT_IMG): |
| return False |
| return self.write_to_usb( |
| self.serial_number + ".txt", self.log_to_file.getvalue()) |
| |
| def finalize_test(self): |
| self.generate_final_result() |
| if self.type == _TEST_TYPE_AB: |
| # We block the test flow until we successfully dumped the result. |
| while not self.save_log_to_usb(): |
| time.sleep(0.5) |
| self.update_pbar(pid='save_to_usb') |
| |
| # Display final result. |
| def get_str(ret, prefix, use_untest=True): |
| if ret: |
| return self.t_pass(prefix + 'PASS') |
| if use_untest and (ret is None): |
| return self.t_fail(prefix + 'UNFINISHED') |
| return self.t_fail(prefix + 'FAIL') |
| cam_result = get_str(self.cam_pass, 'Camera: ', False) |
| als_result = get_str(self.als_pass, 'ALS: ') |
| self.update_status(msg=cam_result + '<br>' + als_result) |
| self.update_pbar(value=100) |
| |
| def exit_test(self, event): |
| factory.log('%s run_once finished' % self.__class__) |
| if self.result_dict['result'] == 'PASSED': |
| self.ui.Pass() |
| else: |
| self.ui.Fail('Camera/ALS test failed.') |
| |
| def run_test(self, event=None): |
| self.reset_data() |
| self.update_status(mid='start_test') |
| if self.type == _TEST_TYPE_AB: |
| self.serial_number = event.data.get('sn', '') |
| if not self.setup_fixture(): |
| self.update_status(mid='fixture_fail') |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| self.update_pbar(pid='start_test') |
| |
| if self.type == _TEST_TYPE_FULL: |
| with leds.Blinker(self._LED_RUNNING_TEST): |
| self.test_camera_performance() |
| self.update_pbar(pid='cam_finish', add=False) |
| self.test_als_calibration() |
| else: |
| self.test_camera_performance() |
| self.update_pbar(pid='cam_finish', add=False) |
| self.test_als_calibration() |
| self.update_pbar(pid='als_finish' + self.type, add=False) |
| |
| self.finalize_test() |
| |
| def capture_low_noise_image(self, cam, n_samples): |
| '''Capture a sequence of images and average them to reduce noise.''' |
| if n_samples < 1: |
| n_samples = 1 |
| success, img = cam.read() |
| if not success: |
| return None |
| img = img.astype(np.float64) |
| for t in range(n_samples - 1): |
| success, temp_img = cam.read() |
| if not success: |
| return None |
| img += temp_img.astype(np.float64) |
| img /= n_samples |
| return img.round().astype(np.uint8) |
| |
| def test_camera_functionality(self): |
| # Initialize the camera with OpenCV. |
| self.update_status(mid='init_cam') |
| cam = cv2.VideoCapture(self._DEVICE_INDEX) |
| if not cam.isOpened(): |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.log('Failed to initialize the camera. ' |
| 'Could be bad module, bad connection or ' |
| 'bad USB initialization.\n') |
| return False |
| self.update_pbar(pid='init_cam') |
| |
| # Set resolution. |
| self.update_status(mid='set_cam_res') |
| conf = self.config['cam_stat'] |
| cam.set(cv.CV_CAP_PROP_FRAME_WIDTH, conf['img_width']) |
| cam.set(cv.CV_CAP_PROP_FRAME_HEIGHT, conf['img_height']) |
| if (conf['img_width'] != cam.get(cv.CV_CAP_PROP_FRAME_WIDTH) or |
| conf['img_height'] != cam.get(cv.CV_CAP_PROP_FRAME_HEIGHT)): |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.log("Can't set the image size. " |
| "Possibly caused by bad USB initialization.\n") |
| return False |
| self.update_pbar(pid='set_cam_res') |
| |
| # Try reading an image from the camera. |
| self.update_status(mid='try_read_cam') |
| success, _ = cam.read() |
| if not success: |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.log("Failed to capture an image with the camera.\n") |
| return False |
| self.update_pbar(pid='try_read_cam') |
| |
| # Let the camera's auto-exposure algorithm adjust to the fixture |
| # lighting condition. |
| self.update_status(mid='wait_cam_awb') |
| start = time.time() |
| while time.time() - start < conf['buf_time']: |
| _, _ = cam.read() |
| self.update_pbar(pid='wait_cam_awb') |
| |
| # Read the image that we will use. |
| self.update_status(mid='record_img') |
| n_samples = conf['n_samples'] |
| self.target_colorful = self.capture_low_noise_image(cam, n_samples) |
| if self.target_colorful is None: |
| cam.release() |
| self.update_result('cam_stat', False) |
| self.log("Error reading images from the camera!\n") |
| return False |
| if self.unit_test: |
| self.target_colorful = cv2.imread(self._TEST_SAMPLE_FILE) |
| |
| self.target = cv2.cvtColor(self.target_colorful, cv.CV_BGR2GRAY) |
| self.update_result('cam_stat', True) |
| self.log('Successfully captured a sample image.\n') |
| self.update_preview(self.target_colorful, "camera_image", |
| scale=self.config['preview']['scale']) |
| cam.release() |
| self.update_pbar(pid='record_img') |
| return True |
| |
| def test_camera_performance(self): |
| if not self.test_camera_functionality(): |
| return |
| |
| # Check the captured test pattern image validity. |
| self.update_status(mid='check_vc') |
| success, tar_data = camperf.CheckVisualCorrectness( |
| self.target, self.ref_data, **self.config['cam_vc']) |
| self.analyzed = self.target_colorful.copy() |
| renderer.DrawVC(self.analyzed, success, tar_data) |
| self.update_preview(self.analyzed, "analyzed_image", |
| scale=self.config['preview']['scale']) |
| |
| self.update_result('cam_vc', success) |
| if hasattr(tar_data, 'shift'): |
| self.log('Image shift percentage: %f\n' % tar_data.shift) |
| self.log('Image tilt: %f degrees\n' % tar_data.tilt) |
| if not success: |
| if hasattr(tar_data, 'sample_corners'): |
| self.log('Found corners count: %d\n' % |
| tar_data.sample_corners.shape[0]) |
| if hasattr(tar_data, 'edges'): |
| self.log('Found square edges count: %d\n' % |
| tar_data.edges.shape[0]) |
| self.log('Visual correctness: %s\n' % tar_data.msg) |
| return |
| self.update_pbar(pid='check_vc') |
| |
| # Check if the lens shading is present. |
| self.update_status(mid='check_ls') |
| success, tar_ls = camperf.CheckLensShading( |
| self.target, **self.config['cam_ls']) |
| |
| self.update_result('cam_ls', success) |
| if tar_ls.check_low_freq: |
| self.log('Low-frequency response value: %f\n' % |
| tar_ls.response) |
| if not success: |
| self.log('Lens shading: %s\n' % tar_ls.msg) |
| return |
| self.update_pbar(pid='check_ls') |
| |
| # Check the image sharpness. |
| self.update_status(mid='check_mtf') |
| success, tar_mtf = camperf.CheckSharpness( |
| self.target, tar_data.edges, **self.config['cam_mtf']) |
| renderer.DrawMTF(self.analyzed, tar_data.edges, tar_mtf.perm, |
| tar_mtf.mtfs, |
| self.config['cam_mtf']['mtf_crop_ratio'], |
| self.config['preview']['mtf_color_map_range']) |
| self.update_preview(self.analyzed, "analyzed_image", |
| scale=self.config['preview']['scale']) |
| |
| self.update_result('cam_mtf', success) |
| self.log('MTF value: %f\n' % tar_mtf.mtf) |
| if hasattr(tar_mtf, 'min_mtf'): |
| self.log('Lowest MTF value: %f\n' % tar_mtf.min_mtf) |
| if not success: |
| self.log('Sharpness: %s\n' % tar_mtf.msg) |
| self.update_pbar(pid='check_mtf') |
| return |
| |
| def test_als_write_vpd(self, calib_result): |
| self.update_status(mid='dump_to_vpd') |
| conf = self.config['als'] |
| if not calib_result: |
| self.update_result('als_stat', False) |
| self.log('ALS calibration data is incorrect.\n') |
| return False |
| if subprocess.call(conf['save_vpd'] % calib_result, shell=True): |
| self.update_result('als_stat', False) |
| self.log('Writing VPD data failed!\n') |
| return False |
| self.log('Successfully calibrated ALS scales.\n') |
| self.update_pbar(pid='dump_to_vpd') |
| return True |
| |
| def test_als_switch_to_next_light(self): |
| self.update_status(mid='adjust_light') |
| conf = self.config['als'] |
| self.light_state += 1 |
| self.fixture.set_light(self.light_state) |
| self.update_pbar(pid='adjust_light') |
| if not self.unit_test: |
| self.fixture.assert_success() |
| if self.light_state >= len(conf['luxs']): |
| return False |
| self.update_status(mid='wait_fixture') |
| self.fixture.wait_for_light_switch() |
| self.update_pbar(pid='wait_fixture') |
| return True |
| |
| def test_als_calibration(self): |
| # Initialize the ALS. |
| self.update_status(mid='init_als') |
| conf = self.config['als'] |
| self.als = ALS(val_path=conf['val_path'], |
| scale_path=conf['scale_path']) |
| if not self.als.detected: |
| self.update_result('als_stat', False) |
| self.log('Failed to initialize the ALS.\n') |
| return |
| self.als.set_scale_factor(conf['calibscale']) |
| self.update_pbar(pid='init_als') |
| |
| # Go through all different lighting settings |
| # and record ALS values. |
| calib_result = 0 |
| try: |
| vals = [] |
| while True: |
| # Get ALS values. |
| self.update_status(mid='read_als%d' % self.light_state) |
| scale = self.als.get_scale_factor() |
| val = self.als.read_mean(samples=conf['n_samples'], |
| delay=conf['read_delay']) |
| vals.append(val) |
| self.log('Lighting preset lux value: %d\n' % |
| conf['luxs'][self.light_state]) |
| self.log('ALS value: %d\n' % val) |
| self.log('ALS calibration scale: %d\n' % scale) |
| # Check if it is a false read. |
| if not val: |
| self.update_result('als_stat', False) |
| self.log('The ALS value is stuck at zero.\n') |
| return |
| # Compute calibration data if it is the calibration target. |
| if conf['luxs'][self.light_state] == conf['calib_lux']: |
| calib_result = int(round(float(conf['calib_target']) / |
| val * scale)) |
| self.log('ALS calibration data will be %d\n' % |
| calib_result) |
| self.update_pbar(pid='read_als%d' % self.light_state) |
| |
| # Go to the next lighting preset. |
| if not self.test_als_switch_to_next_light(): |
| break |
| |
| # Check value ordering. |
| for i, li in enumerate(conf['luxs']): |
| for j in range(i): |
| if ((li > conf['luxs'][j] and vals[j] >= vals[i]) or |
| (li < conf['luxs'][j] and vals[j] <= vals[i])): |
| self.update_result('als_stat', False) |
| self.log('The ordering of ALS values is wrong.\n') |
| return |
| except (FixtureException, serial.serialutil.SerialException) as e: |
| self.fixture = None |
| self.update_result('als_stat', None) |
| self.log("The test fixture was disconnected!\n") |
| self.ui.CallJSFunction("OnRemoveFixtureConnection") |
| return |
| except: |
| self.update_result('als_stat', False) |
| self.log('Failed to read values from ALS or unknown error.\n') |
| return |
| self.log('Successfully recorded ALS values.\n') |
| |
| # Save ALS values to vpd for FATP test. |
| if self.type == _TEST_TYPE_FULL: |
| if not self.test_als_write_vpd(calib_result): |
| return |
| self.update_result('als_stat', True) |
| return |
| |
| def run_once(self, test_type=_TEST_TYPE_FULL, unit_test=False): |
| '''The entry point of the test. |
| |
| Args: |
| test_type: Run the full machine test or the AB panel test. The AB |
| panel will be run on a host that is used to test |
| connected AB panels (possibly many), while the full |
| machine test would test only the machine that runs it |
| and then exit. |
| unit_test: Run the unit-test mode. The unit-test mode is used to |
| test the test integrity when the test fixture is not |
| available. It should be run on a machine that has a |
| working camera and a working ALS. Please place the |
| camera parameter file under the src directory on an USB |
| stick for use and connect the machine with an |
| USB-to-RS232 converter cable with the designated chipset |
| in the parameter file. The test will replace the |
| captured image with the sample test image and run the |
| camera performance test on it. |
| ''' |
| factory.log('%s run_once' % self.__class__) |
| |
| # Initialize variables and environment. |
| assert test_type in [_TEST_TYPE_FULL, _TEST_TYPE_AB] |
| assert unit_test in [True, False] |
| self.type = test_type |
| self.unit_test = unit_test |
| self.config_loaded = False |
| self.status_names = self._STATUS_NAMES |
| self.status_labels = self._STATUS_LABELS |
| self.result_dict = {} |
| self.base_config = PluggableConfig({}) |
| os.chdir(self.srcdir) |
| |
| # Setup the usb disk and usb-to-serial adapter monitor. |
| usb_monitor = ConnectionMonitor() |
| usb_monitor.start(subsystem='block', device_type='disk', |
| on_insert=self.on_usb_insert, |
| on_remove=self.on_usb_remove) |
| u2s_monitor = ConnectionMonitor() |
| u2s_monitor.start(subsystem='usb-serial', |
| on_insert=self.on_u2s_insert, |
| on_remove=self.on_u2s_remove) |
| |
| # Startup the UI. |
| self.ui = UI() |
| self.register_events(['sync_fixture', 'exit_test', 'run_test']) |
| self.ui.CallJSFunction("InitLayout", self.type == _TEST_TYPE_FULL) |
| self.ui.Run() |