blob: 65a854dafef5fe59a960ec12a32233fb167755db [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Import guard for OpenCV.
try:
import cv
import cv2
except ImportError:
pass
import base64
import numpy as np
import os
import pprint
import pyudev
import re
import select
import serial
import StringIO
import subprocess
import threading
import time
import autotest_lib.client.cros.camera.perf_tester as camperf
import autotest_lib.client.cros.camera.renderer as renderer
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.test import leds
from cros.factory.test import test_ui
from cros.factory.test.media_util import MountedMedia
from autotest_lib.client.cros.rf.config import PluggableConfig
from autotest_lib.client.cros import tty
from cros.factory.test.test_ui import UI
# Test type constants:
_TEST_TYPE_AB = 'AB'
_TEST_TYPE_FULL = 'Full'
# Content type constants:
_CONTENT_IMG = 'image'
_CONTENT_TXT = 'text'
class ALS():
'''Class to interface the ambient light sensor over iio.'''
# Default device paths.
_VAL_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_input'
_SCALE_DEV_PATH = '/sys/bus/iio/devices/iio:device0/illuminance0_calibscale'
# Default min delay seconds.
_DEFAULT_MIN_DELAY = 0.178
def __init__(self, val_path=_VAL_DEV_PATH,
scale_path=_SCALE_DEV_PATH):
self.detected = True
if (not os.path.isfile(val_path) or
not os.path.isfile(scale_path)):
self.detected = False
return
self.val_path = val_path
self.scale_path = scale_path
def _read_core(self):
fd = open(self.val_path)
val = int(fd.readline().rstrip())
fd.close()
return val
def _read(self, delay=None, samples=1):
'''Read the light sensor value.
Args:
delay: Delay between samples in seconds. 0 means as fast as
possible.
samples: Total samples to read.
Returns:
The light sensor values in a list.
'''
if samples < 1:
samples = 1
if delay is None:
delay = self._DEFAULT_MIN_DELAY
buf = []
# The first value might be contaminated by previous settings.
# We need to skip it for better accuracy.
self._read_core()
for dummy in range(samples):
time.sleep(delay)
val = self._read_core()
buf.append(val)
return buf
def read_mean(self, delay=None, samples=1):
if not self.detected:
return None
buf = self._read(delay, samples)
return int(round(float(sum(buf)) / len(buf)))
def set_scale_factor(self, scale):
if not self.detected:
return None
fd = open(self.scale_path, 'w')
fd.write(str(int(round(scale))))
fd.close()
return
def get_scale_factor(self):
if not self.detected:
return None
fd = open(self.scale_path)
s = int(fd.readline().rstrip())
fd.close()
return s
class FixtureException(Exception):
pass
class Fixture():
'''Class for communication with the test fixture.'''
def __init__(self, params):
# Setup the serial port communication.
tty_path = tty.find_tty_by_driver(params['driver'])
self.fixture = serial.Serial(port=tty_path,
**params['serial_params'])
self.fixture.flush()
# Load parameters.
self.serial_delay = params['serial_delay']
self.light_delay = params['light_delay']
self.light_seq = params['light_seq']
self.fixture_echo = params['echo']
self.light_off = params['off']
def send(self, msg):
'''Send control messages to the fixture.'''
for c in msg:
self.fixture.write(str(c))
self.fixture.flush()
# The fixture needs some time to process each incoming character.
time.sleep(self.serial_delay)
def read(self):
return self.fixture.read(self.fixture.inWaiting())
def assert_success(self):
'''Check if the returned value from the fixture is OK.'''
ret = self.read()
if not re.search(self.fixture_echo, ret):
raise FixtureException('The communication with fixture was broken')
def set_light(self, idx):
self.send(self.light_seq[idx])
def turn_off_light(self):
self.send(self.light_off)
def wait_for_light_switch(self):
time.sleep(self.light_delay)
class ConnectionMonitor():
"""A wrapper to monitor hardware plug/unplug events."""
def __init__(self):
self._monitoring = False
def start(self, subsystem, device_type=None, on_insert=None,
on_remove=None):
if self._monitoring:
raise Exception("Multiple start() call is not allowed")
self.on_insert = on_insert
self.on_remove = on_remove
# Setup the media monitor,
context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(context)
self.monitor.filter_by(subsystem, device_type)
self.monitor.start()
self._monitoring = True
self._watch_thread = threading.Thread(target=self.watch)
self._watch_end = threading.Event()
self._watch_thread.start()
def watch(self):
fd = self.monitor.fileno()
while not self._watch_end.isSet():
ret, _, _ = select.select([fd],[],[])
if fd in ret:
action, dev = self.monitor.receive_device()
if action == 'add' and self.on_insert:
self.on_insert(dev.device_node)
elif action == 'remove' and self.on_remove:
self.on_remove(dev.device_node)
def stop(self):
self._monitoring = False
self._watch_end.set()
class factory_CameraPerformanceAls(test.test):
version = 2
preserve_srcdir = True
# OpenCV will automatically search for a working camera device if we use
# the index -1.
_DEVICE_INDEX = -1
_TEST_CHART_FILE = 'test_chart.png'
_TEST_SAMPLE_FILE = 'sample.png'
_PACKET_SIZE = 65000
# Status in the final result tab.
_STATUS_NAMES = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf',
'als_stat', 'result']
_STATUS_LABELS = ['Camera Functionality',
'Camera Visual Correctness',
'Camera Lens Shading',
'Camera Image Sharpness',
'ALS Functionality',
'Test Result']
_CAM_TESTS = ['cam_stat', 'cam_vc', 'cam_ls', 'cam_mtf']
_ALS_TESTS = ['als_stat']
# LED patterns.
_LED_RUNNING_TEST = ((leds.LED_NUM|leds.LED_CAP, 0.05), (0, 0.05))
# CSS style classes defined in the corresponding HTML file.
_STYLE_INFO = "color_idle"
_STYLE_PASS = "color_good"
_STYLE_FAIL = "color_bad"
def t_pass(self, msg):
return test_ui.MakeLabel(msg, css_class=self._STYLE_PASS)
def t_fail(self, msg):
return test_ui.MakeLabel(msg, css_class=self._STYLE_FAIL)
def update_status(self, mid=None, msg=None):
message = ''
if msg:
message = msg
elif mid:
message = test_ui.MakeLabel(self.config['message'][mid + '_en'],
self.config['message'][mid + '_zh'],
self.config['msg_style'][mid])
self.ui.CallJSFunction("UpdateTestStatus", message)
def update_pbar(self, pid=None, value=None, add=True):
precent = 0
if value:
percent = value
elif pid:
all_time = self.config['chk_point'][self.type]
if add:
self.progress += self.config['chk_point'][pid]
else:
self.progress = self.config['chk_point'][pid]
percent = int(round((float(self.progress) / all_time) * 100))
self.ui.CallJSFunction("UpdatePrograssBar", '%d%%' % percent)
def register_events(self, events):
for event in events:
assert hasattr(self, event)
self.ui.AddEventHandler(event, getattr(self, event))
def prepare_test(self):
self.ref_data = camperf.PrepareTest(self._TEST_CHART_FILE)
def on_usb_insert(self, dev_path):
if not self.config_loaded:
# Initialize common test reference data.
self.prepare_test()
# Load config files and reset test results.
self.dev_path = dev_path
with MountedMedia(dev_path, 1) as config_dir:
config_path = os.path.join(config_dir, 'camera.params')
self.config = self.base_config.Read(config_path)
self.reset_data()
self.config_loaded = True
factory.console.info("Config loaded.")
self.ui.CallJSFunction("OnUSBInit", self.config['sn_format'])
else:
self.dev_path = dev_path
self.ui.CallJSFunction("OnUSBInsertion")
def on_usb_remove(self, dev_path):
if self.config_loaded:
factory.console.info("USB removal is not allowed during test!")
self.ui.CallJSFunction("OnUSBRemoval")
def setup_fixture(self):
'''Initialize the communication with the fixture.'''
try:
self.fixture = Fixture(self.config['fixture'])
# Go with the default(first) lighting intensity.
self.light_state = 0
self.fixture.set_light(self.light_state)
if not self.unit_test:
self.fixture.assert_success()
except Exception as e:
self.fixture = None
self.log('Failed to initialize the test fixture.\n')
return False
self.log('Test fixture successfully initialized.\n')
return True
def sync_fixture(self, event):
self.ui.CallJSFunction("OnDetectFixtureConnection")
cnt = 0
while not self.setup_fixture():
cnt += 1
if cnt >= self.config['fixture']['n_retry']:
self.ui.CallJSFunction("OnRemoveFixtureConnection")
return
time.sleep(self.config['fixture']['retry_delay'])
self.ui.CallJSFunction("OnAddFixtureConnection")
def on_u2s_insert(self, dev_path):
if self.config_loaded:
self.sync_fixture(None)
def on_u2s_remove(self, dev_path):
if self.config_loaded:
self.ui.CallJSFunction("OnRemoveFixtureConnection")
def update_result(self, row_name, result):
result_map = {
True: 'PASSED',
False: 'FAILED',
None: 'UNTESTED'
}
self.result_dict[row_name] = result_map[result]
def reset_data(self):
self.target = None
self.target_colorful = None
self.analyzed = None
if self.type == _TEST_TYPE_FULL:
self.log = factory.console.info
else:
self.log_to_file = StringIO.StringIO()
self.log = lambda *x: (factory.console.info(*x),
self.log_to_file.write(*x))
for var in self.status_names:
self.update_result(var, None)
self.progress = 0
self.ui.CallJSFunction("ResetUiData", "")
def send_img_to_ui(self, data):
self.ui.CallJSFunction("ClearBuffer", "")
# Send the data in 64K packets due to the socket packet size limit.
data_len = len(data)
p = 0
while p < data_len:
if p + self._PACKET_SIZE > data_len:
self.ui.CallJSFunction("AddBuffer", data[p:data_len-1])
p = data_len
else:
self.ui.CallJSFunction("AddBuffer",
data[p:p+self._PACKET_SIZE])
p += self._PACKET_SIZE
def update_preview(self, img, container_id, scale=0.5):
# Encode the image in the JPEG format.
preview = cv2.resize(img, None, fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
cv2.imwrite('temp.jpg', preview)
with open('temp.jpg', 'r') as fd:
img_data = base64.b64encode(fd.read()) + "="
# Update the preview screen with javascript.
self.send_img_to_ui(img_data)
self.ui.CallJSFunction("UpdateImage", container_id)
return
def compile_result(self, test_list, use_untest=True):
ret = self.result_dict
if all('PASSED' == ret[x] for x in test_list):
return True
if use_untest and any('UNTESTED' == ret[x] for x in test_list):
return None
return False
def generate_final_result(self):
self.update_status(mid='end_test')
self.cam_pass = self.compile_result(self._CAM_TESTS)
self.als_pass = self.compile_result(self._ALS_TESTS)
result = self.compile_result(self.status_names[:-1], use_untest=False)
self.update_result('result', result)
self.log("Result in summary:\n%s\n" %
pprint.pformat(self.result_dict))
self.update_pbar(pid='end_test')
def write_to_usb(self, filename, content, content_type=_CONTENT_TXT):
try:
with MountedMedia(self.dev_path, 1) as mount_dir:
if content_type == _CONTENT_TXT:
with open(os.path.join(mount_dir, filename), 'a') as f:
f.write(content)
elif content_type == _CONTENT_IMG:
cv2.imwrite(os.path.join(mount_dir, filename), content)
except:
self.log("Error when writing data to USB!\n")
return False
return True
def save_log_to_usb(self):
# Save an image for further analysis in case of the camera
# performance fail.
self.update_status(mid='save_to_usb')
if (not self.cam_pass) and (self.target is not None):
if not self.write_to_usb(self.serial_number + ".bmp",
self.target, _CONTENT_IMG):
return False
if self.analyzed is not None:
if not self.write_to_usb(self.serial_number + ".result.jpg",
self.analyzed, _CONTENT_IMG):
return False
return self.write_to_usb(
self.serial_number + ".txt", self.log_to_file.getvalue())
def finalize_test(self):
self.generate_final_result()
if self.type == _TEST_TYPE_AB:
# We block the test flow until we successfully dumped the result.
while not self.save_log_to_usb():
time.sleep(0.5)
self.update_pbar(pid='save_to_usb')
# Display final result.
def get_str(ret, prefix, use_untest=True):
if ret:
return self.t_pass(prefix + 'PASS')
if use_untest and (ret is None):
return self.t_fail(prefix + 'UNFINISHED')
return self.t_fail(prefix + 'FAIL')
cam_result = get_str(self.cam_pass, 'Camera: ', False)
als_result = get_str(self.als_pass, 'ALS: ')
self.update_status(msg=cam_result + '<br>' + als_result)
self.update_pbar(value=100)
def exit_test(self, event):
factory.log('%s run_once finished' % self.__class__)
if self.result_dict['result'] == 'PASSED':
self.ui.Pass()
else:
self.ui.Fail('Camera/ALS test failed.')
def run_test(self, event=None):
self.reset_data()
self.update_status(mid='start_test')
if self.type == _TEST_TYPE_AB:
self.serial_number = event.data.get('sn', '')
if not self.setup_fixture():
self.update_status(mid='fixture_fail')
self.ui.CallJSFunction("OnRemoveFixtureConnection")
return
self.update_pbar(pid='start_test')
if self.type == _TEST_TYPE_FULL:
with leds.Blinker(self._LED_RUNNING_TEST):
self.test_camera_performance()
self.update_pbar(pid='cam_finish', add=False)
self.test_als_calibration()
else:
self.test_camera_performance()
self.update_pbar(pid='cam_finish', add=False)
self.test_als_calibration()
self.update_pbar(pid='als_finish' + self.type, add=False)
self.finalize_test()
def capture_low_noise_image(self, cam, n_samples):
'''Capture a sequence of images and average them to reduce noise.'''
if n_samples < 1:
n_samples = 1
success, img = cam.read()
if not success:
return None
img = img.astype(np.float64)
for t in range(n_samples - 1):
success, temp_img = cam.read()
if not success:
return None
img += temp_img.astype(np.float64)
img /= n_samples
return img.round().astype(np.uint8)
def test_camera_functionality(self):
# Initialize the camera with OpenCV.
self.update_status(mid='init_cam')
cam = cv2.VideoCapture(self._DEVICE_INDEX)
if not cam.isOpened():
cam.release()
self.update_result('cam_stat', False)
self.log('Failed to initialize the camera. '
'Could be bad module, bad connection or '
'bad USB initialization.\n')
return False
self.update_pbar(pid='init_cam')
# Set resolution.
self.update_status(mid='set_cam_res')
conf = self.config['cam_stat']
cam.set(cv.CV_CAP_PROP_FRAME_WIDTH, conf['img_width'])
cam.set(cv.CV_CAP_PROP_FRAME_HEIGHT, conf['img_height'])
if (conf['img_width'] != cam.get(cv.CV_CAP_PROP_FRAME_WIDTH) or
conf['img_height'] != cam.get(cv.CV_CAP_PROP_FRAME_HEIGHT)):
cam.release()
self.update_result('cam_stat', False)
self.log("Can't set the image size. "
"Possibly caused by bad USB initialization.\n")
return False
self.update_pbar(pid='set_cam_res')
# Try reading an image from the camera.
self.update_status(mid='try_read_cam')
success, _ = cam.read()
if not success:
cam.release()
self.update_result('cam_stat', False)
self.log("Failed to capture an image with the camera.\n")
return False
self.update_pbar(pid='try_read_cam')
# Let the camera's auto-exposure algorithm adjust to the fixture
# lighting condition.
self.update_status(mid='wait_cam_awb')
start = time.time()
while time.time() - start < conf['buf_time']:
_, _ = cam.read()
self.update_pbar(pid='wait_cam_awb')
# Read the image that we will use.
self.update_status(mid='record_img')
n_samples = conf['n_samples']
self.target_colorful = self.capture_low_noise_image(cam, n_samples)
if self.target_colorful is None:
cam.release()
self.update_result('cam_stat', False)
self.log("Error reading images from the camera!\n")
return False
if self.unit_test:
self.target_colorful = cv2.imread(self._TEST_SAMPLE_FILE)
self.target = cv2.cvtColor(self.target_colorful, cv.CV_BGR2GRAY)
self.update_result('cam_stat', True)
self.log('Successfully captured a sample image.\n')
self.update_preview(self.target_colorful, "camera_image",
scale=self.config['preview']['scale'])
cam.release()
self.update_pbar(pid='record_img')
return True
def test_camera_performance(self):
if not self.test_camera_functionality():
return
# Check the captured test pattern image validity.
self.update_status(mid='check_vc')
success, tar_data = camperf.CheckVisualCorrectness(
self.target, self.ref_data, **self.config['cam_vc'])
self.analyzed = self.target_colorful.copy()
renderer.DrawVC(self.analyzed, success, tar_data)
self.update_preview(self.analyzed, "analyzed_image",
scale=self.config['preview']['scale'])
self.update_result('cam_vc', success)
if hasattr(tar_data, 'shift'):
self.log('Image shift percentage: %f\n' % tar_data.shift)
self.log('Image tilt: %f degrees\n' % tar_data.tilt)
if not success:
if hasattr(tar_data, 'sample_corners'):
self.log('Found corners count: %d\n' %
tar_data.sample_corners.shape[0])
if hasattr(tar_data, 'edges'):
self.log('Found square edges count: %d\n' %
tar_data.edges.shape[0])
self.log('Visual correctness: %s\n' % tar_data.msg)
return
self.update_pbar(pid='check_vc')
# Check if the lens shading is present.
self.update_status(mid='check_ls')
success, tar_ls = camperf.CheckLensShading(
self.target, **self.config['cam_ls'])
self.update_result('cam_ls', success)
if tar_ls.check_low_freq:
self.log('Low-frequency response value: %f\n' %
tar_ls.response)
if not success:
self.log('Lens shading: %s\n' % tar_ls.msg)
return
self.update_pbar(pid='check_ls')
# Check the image sharpness.
self.update_status(mid='check_mtf')
success, tar_mtf = camperf.CheckSharpness(
self.target, tar_data.edges, **self.config['cam_mtf'])
renderer.DrawMTF(self.analyzed, tar_data.edges, tar_mtf.perm,
tar_mtf.mtfs,
self.config['cam_mtf']['mtf_crop_ratio'],
self.config['preview']['mtf_color_map_range'])
self.update_preview(self.analyzed, "analyzed_image",
scale=self.config['preview']['scale'])
self.update_result('cam_mtf', success)
self.log('MTF value: %f\n' % tar_mtf.mtf)
if hasattr(tar_mtf, 'min_mtf'):
self.log('Lowest MTF value: %f\n' % tar_mtf.min_mtf)
if not success:
self.log('Sharpness: %s\n' % tar_mtf.msg)
self.update_pbar(pid='check_mtf')
return
def test_als_write_vpd(self, calib_result):
self.update_status(mid='dump_to_vpd')
conf = self.config['als']
if not calib_result:
self.update_result('als_stat', False)
self.log('ALS calibration data is incorrect.\n')
return False
if subprocess.call(conf['save_vpd'] % calib_result, shell=True):
self.update_result('als_stat', False)
self.log('Writing VPD data failed!\n')
return False
self.log('Successfully calibrated ALS scales.\n')
self.update_pbar(pid='dump_to_vpd')
return True
def test_als_switch_to_next_light(self):
self.update_status(mid='adjust_light')
conf = self.config['als']
self.light_state += 1
self.fixture.set_light(self.light_state)
self.update_pbar(pid='adjust_light')
if not self.unit_test:
self.fixture.assert_success()
if self.light_state >= len(conf['luxs']):
return False
self.update_status(mid='wait_fixture')
self.fixture.wait_for_light_switch()
self.update_pbar(pid='wait_fixture')
return True
def test_als_calibration(self):
# Initialize the ALS.
self.update_status(mid='init_als')
conf = self.config['als']
self.als = ALS(val_path=conf['val_path'],
scale_path=conf['scale_path'])
if not self.als.detected:
self.update_result('als_stat', False)
self.log('Failed to initialize the ALS.\n')
return
self.als.set_scale_factor(conf['calibscale'])
self.update_pbar(pid='init_als')
# Go through all different lighting settings
# and record ALS values.
calib_result = 0
try:
vals = []
while True:
# Get ALS values.
self.update_status(mid='read_als%d' % self.light_state)
scale = self.als.get_scale_factor()
val = self.als.read_mean(samples=conf['n_samples'],
delay=conf['read_delay'])
vals.append(val)
self.log('Lighting preset lux value: %d\n' %
conf['luxs'][self.light_state])
self.log('ALS value: %d\n' % val)
self.log('ALS calibration scale: %d\n' % scale)
# Check if it is a false read.
if not val:
self.update_result('als_stat', False)
self.log('The ALS value is stuck at zero.\n')
return
# Compute calibration data if it is the calibration target.
if conf['luxs'][self.light_state] == conf['calib_lux']:
calib_result = int(round(float(conf['calib_target']) /
val * scale))
self.log('ALS calibration data will be %d\n' %
calib_result)
self.update_pbar(pid='read_als%d' % self.light_state)
# Go to the next lighting preset.
if not self.test_als_switch_to_next_light():
break
# Check value ordering.
for i, li in enumerate(conf['luxs']):
for j in range(i):
if ((li > conf['luxs'][j] and vals[j] >= vals[i]) or
(li < conf['luxs'][j] and vals[j] <= vals[i])):
self.update_result('als_stat', False)
self.log('The ordering of ALS values is wrong.\n')
return
except (FixtureException, serial.serialutil.SerialException) as e:
self.fixture = None
self.update_result('als_stat', None)
self.log("The test fixture was disconnected!\n")
self.ui.CallJSFunction("OnRemoveFixtureConnection")
return
except:
self.update_result('als_stat', False)
self.log('Failed to read values from ALS or unknown error.\n')
return
self.log('Successfully recorded ALS values.\n')
# Save ALS values to vpd for FATP test.
if self.type == _TEST_TYPE_FULL:
if not self.test_als_write_vpd(calib_result):
return
self.update_result('als_stat', True)
return
def run_once(self, test_type=_TEST_TYPE_FULL, unit_test=False):
'''The entry point of the test.
Args:
test_type: Run the full machine test or the AB panel test. The AB
panel will be run on a host that is used to test
connected AB panels (possibly many), while the full
machine test would test only the machine that runs it
and then exit.
unit_test: Run the unit-test mode. The unit-test mode is used to
test the test integrity when the test fixture is not
available. It should be run on a machine that has a
working camera and a working ALS. Please place the
camera parameter file under the src directory on an USB
stick for use and connect the machine with an
USB-to-RS232 converter cable with the designated chipset
in the parameter file. The test will replace the
captured image with the sample test image and run the
camera performance test on it.
'''
factory.log('%s run_once' % self.__class__)
# Initialize variables and environment.
assert test_type in [_TEST_TYPE_FULL, _TEST_TYPE_AB]
assert unit_test in [True, False]
self.type = test_type
self.unit_test = unit_test
self.config_loaded = False
self.status_names = self._STATUS_NAMES
self.status_labels = self._STATUS_LABELS
self.result_dict = {}
self.base_config = PluggableConfig({})
os.chdir(self.srcdir)
# Setup the usb disk and usb-to-serial adapter monitor.
usb_monitor = ConnectionMonitor()
usb_monitor.start(subsystem='block', device_type='disk',
on_insert=self.on_usb_insert,
on_remove=self.on_usb_remove)
u2s_monitor = ConnectionMonitor()
u2s_monitor.start(subsystem='usb-serial',
on_insert=self.on_u2s_insert,
on_remove=self.on_u2s_remove)
# Startup the UI.
self.ui = UI()
self.register_events(['sync_fixture', 'exit_test', 'run_test'])
self.ui.CallJSFunction("InitLayout", self.type == _TEST_TYPE_FULL)
self.ui.Run()