blob: e0cf3f5c0a0b21d585f6a3c174e655afdae99f75 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import gtk
import logging
import os
import pprint
import StringIO
from autotest_lib.client.bin import test
from autotest_lib.client.cros import factory
from autotest_lib.client.cros.factory import ui as ful
from autotest_lib.client.cros.factory.media_util import MediaMonitor
from autotest_lib.client.cros.factory.media_util import MountedMedia
from autotest_lib.client.cros.rf import agilent_scpi
from autotest_lib.client.cros.rf.config import PluggableConfig
from gtk import gdk
_MESSAGE_USB = (
'Please insert the usb stick to load parameters.\n'
'請插入usb以讀取測試參數\n')
_MESSAGE_PREPARE_PANEL = (
'Please connect the next AB panel to ENA.\n'
'Then press ENTER to scan the barcode.\n'
'請連接下一塊AB Panel\n'
'備妥後按ENTER掃描序號\n')
_MESSAGE_PREPARE_MAIN_ANTENNA = (
'Make sure the main WWAN antennta is connected to Port 1\n'
'Make sure the main WLAN antennta is connected to Port 2\n'
'Then press ENTER to proceed, TAB to skip.\n'
'確定 主WWAN天線 連到Port 1\n'
'確定 主WLAN天線 連到Port 2\n'
'備妥後按ENTER繼續, 或按TAB跳過\n')
_MESSAGE_PREPARE_AUX_ANTENNA = (
'Make sure the aux WWAN antennta is connected to Port 1\n'
'Make sure the aux WLAN antennta is connected to Port 2\n'
'Then press ENTER to proceed, TAB to skip.\n'
'確定 副WWAN天線 連到Port 1\n'
'確定 副WLAN天線 連到Port 2\n'
'備妥後按ENTER繼續, 或按TAB跳過\n')
_MESSAGE_RESULT_TAB = (
'Results are listed below.\n'
'Please disconnect the panel and press ENTER to write log.\n'
'測試結果顯示如下\n'
'請將AB Panel移除, 並按ENTER寫入測試結果\n')
_TEST_SN_NUMBER = 'TEST-SN-NUMBER'
_LABEL_SIZE = (300, 30)
def make_status_row(row_name,
display_dict,
init_prompt=None,
init_status=None):
"""Returns a HBox shows the status."""
display_dict.setdefault(row_name, {})
if init_prompt is None:
init_prompt = display_dict[row_name]['prompt']
else:
display_dict[row_name]['prompt'] = init_prompt
if init_status is None:
init_status = display_dict[row_name]['status']
else:
display_dict[row_name]['status'] = init_status
def prompt_label_expose(widget, event):
prompt = display_dict[row_name]['prompt']
widget.set_text(prompt)
def status_label_expose(widget, event):
status = display_dict[row_name]['status']
widget.set_text(status)
widget.modify_fg(gtk.STATE_NORMAL, ful.LABEL_COLORS[status])
prompt_label = ful.make_label(
init_prompt, size=_LABEL_SIZE,
alignment=(0, 0.5))
delimiter_label = ful.make_label(':', alignment=(0, 0.5))
status_label = ful.make_label(
init_status, size=_LABEL_SIZE,
alignment=(0, 0.5), fg=ful.LABEL_COLORS[init_status])
widget = gtk.HBox()
widget.pack_end(status_label, False, False)
widget.pack_end(delimiter_label, False, False)
widget.pack_end(prompt_label, False, False)
status_label.connect('expose_event', status_label_expose)
prompt_label.connect('expose_event', prompt_label_expose)
return widget
def make_prepare_widget(message, on_key_enter, on_key_tab=None):
"""Returns a widget that display the message and bind proper functions."""
widget = gtk.VBox()
widget.add(ful.make_label(message))
def key_release_callback(widget, event):
if event.keyval == gtk.keysyms.Tab:
if on_key_tab is not None:
return on_key_tab()
elif event.keyval == gtk.keysyms.Return:
return on_key_enter()
widget.key_callback = key_release_callback
return widget
class factory_Antenna(test.test):
version = 3
# The state goes from _STATE_INITIAL to _STATE_RESULT_TAB then jumps back
# to _STATE_PREPARE_PANEL for another testing cycle.
_STATE_INITIAL = -1
_STATE_WAIT_USB = 0
_STATE_PREPARE_PANEL = 1
_STATE_ENTERING_SN = 2
_STATE_PREPARE_MAIN_ANTENNA = 3
_STATE_PREPARE_AUX_ANTENNA = 4
_STATE_RESULT_TAB = 5
# Status in the final result tab.
_STATUS_NAMES = ['sn', 'cell_main', 'cell_aux',
'wifi_main', 'wifi_aux', 'result']
_STATUS_LABELS = ['Serial Number',
'Cellular Antenna(MAIN)',
'Cellular Antenna(AUX)',
'WiFi Antenna(MAIN)',
'WiFi Antenna(AUX)',
'Test Result']
def advance_state(self):
if self._state == self._STATE_RESULT_TAB:
self._state = self._STATE_PREPARE_PANEL
else:
self._state = self._state + 1
self.switch_widget(self._state_widget[self._state])
def on_usb_insert(self, dev_path):
if self._state == self._STATE_WAIT_USB:
self.dev_path = dev_path
with MountedMedia(dev_path, 1) as config_dir:
config_path = os.path.join(config_dir, 'antenna.params')
self.config = self.base_config.Read(config_path)
self.reset_data_for_next_test()
self.advance_state()
factory.log("Config loaded.")
def on_usb_remove(self, dev_path):
if self._state != self._STATE_WAIT_USB:
raise Exception("USB removal is not allowed during test")
def register_callbacks(self, window):
def key_press_callback(widget, event):
if hasattr(self, 'last_widget'):
if hasattr(self.last_widget, 'key_callback'):
return self.last_widget.key_callback(widget, event)
return False
window.connect('key-press-event', key_press_callback)
window.add_events(gtk.gdk.KEY_PRESS_MASK)
def switch_widget(self, widget_to_display):
if hasattr(self, 'last_widget'):
if widget_to_display is not self.last_widget:
self.last_widget.hide()
self.test_widget.remove(self.last_widget)
else:
return
self.last_widget = widget_to_display
self.test_widget.add(widget_to_display)
self.test_widget.show_all()
def on_sn_keypress(self, entry, key):
if key.keyval == gtk.keysyms.Tab:
entry.set_text(_TEST_SN_NUMBER)
return True
return False
def on_sn_complete(self, serial_number):
self.serial_number = serial_number
# TODO(itspeter): display the SN info in the result tab.
self._update_status('sn', self.check_sn_format(serial_number))
self.advance_state()
@staticmethod
def check_sn_format(sn):
# TODO(itspeter): Check SN according to the spec in factory.
return sn == _TEST_SN_NUMBER
def write_to_usb(self, filename, content):
with MountedMedia(self.dev_path, 1) as mount_dir:
with open(os.path.join(mount_dir, filename), 'w') as f:
f.write(content)
return True
def test_main_antennas(self, skip_flag):
if not skip_flag:
freqs = set()
self._add_required_freqs('cell', freqs)
self._add_required_freqs('wifi', freqs)
ret = self._get_traces(freqs, ['S11', 'S22'],
purpose='test_main_antennas')
self._test_main_cell_antennas(ret)
self._test_main_wifi_antennas(ret)
self.advance_state()
def test_aux_antennas(self, skip_flag):
if not skip_flag:
freqs = set()
self._add_required_freqs('cell', freqs)
self._add_required_freqs('wifi', freqs)
ret = self._get_traces(freqs, ['S11', 'S22'],
purpose='test_aux_antennas')
self._test_aux_cell_antennas(ret)
self._test_aux_wifi_antennas(ret)
self.generate_final_result()
self.advance_state()
def _update_status(self, row_name, result):
"""Updates status in display_dict."""
result_map = {
True: ful.PASSED,
False: ful.FAILED,
None: ful.UNTESTED
}
assert result in result_map, "Unknown result"
self.display_dict[row_name]['status'] = result_map[result]
def _test_main_cell_antennas(self, traces):
self._update_status(
'cell_main',
self._compare_traces(traces, 'cell', 1, 'cell_main', 'S11'))
def _test_main_wifi_antennas(self, traces):
self._update_status(
'wifi_main',
self._compare_traces(traces, 'wifi', 1, 'wifi_main', 'S22'))
def _test_aux_cell_antennas(self, traces):
self._update_status(
'cell_aux',
self._compare_traces(traces, 'cell', 3, 'cell_aux', 'S11'))
def _test_aux_wifi_antennas(self, traces):
self._update_status(
'wifi_aux',
self._compare_traces(traces, 'wifi', 3, 'wifi_aux', 'S22'))
def generate_final_result(self):
self._result = all(
ful.PASSED == self.display_dict[var]['status']
for var in self._STATUS_NAMES[:-1])
self._update_status('result', self._result)
self.log_to_file.write("Result in summary:\n%s\n" %
pprint.pformat(self.display_dict))
def save_log(self):
# TODO(itspeter): Dump more details upon RF teams' request.
self.log_to_file.write("\n\nRaw traces:\n%s\n" %
pprint.pformat(self._raw_traces))
return self.write_to_usb(
self.serial_number + ".txt", self.log_to_file.getvalue())
def _check_measurement(self, standard_value, extracted_value):
"""Compares whether the measurement meets the spec."""
if (standard_value is None) or (extracted_value < standard_value):
return True
else:
return False
def _add_required_freqs(self, antenna_type, freqs_set):
"""Reads the required frequencies and add it to freqs_set.
Format of antenna.params:
{'vswr_max': {antenna_type: [(freq, main, coupling, aux), ...
For example: {'vswr_max': {'cell': [(746, -6, -10, -6),
Usage example:
self._add_required_freqs('cell', freqs)
"""
for config_tuple in self.config['vswr_max'][antenna_type]:
freqs_set.add(config_tuple[0])
def _get_traces(self, freqs_set, parameters, purpose="unspecified"):
"""This function is a wrapper for GetTraces in order to log details."""
# Generate the sweep tuples.
freqs = sorted(freqs_set)
segments = [(freq_min * 1e6, freq_max * 1e6, 2) for
freq_min, freq_max in
zip(freqs, freqs[1:])]
self.ena.SetSweepSegments(segments)
ret = self.ena.GetTraces(parameters)
self._raw_traces[purpose] = ret
return ret
def _compare_traces(self, traces, antenna_type, column,
log_title, ena_parameter):
"""Compares whether returned traces and the spec are aligned.
Usage example:
self._test_sweep_segment(traces, 'cell', 1, 'cell_main', 'S11')
"""
self.log_to_file.write(
"Start measurement [%s], with profile[%s,col %s], from ENA-%s\n" %
(log_title, antenna_type, column, ena_parameter))
# Generate the sweep tuples.
freqs = [atuple[0] * 1e6 for atuple in
self.config['vswr_max'][antenna_type]]
standards = [atuple[column] for atuple in
self.config['vswr_max'][antenna_type]]
freqs_responses = [traces.GetFreqResponse(freq, ena_parameter) for
freq in freqs]
results = [self._check_measurement(std_val, ext_val) for
std_val, ext_val in
zip(standards, freqs_responses)]
logs = zip(freqs, standards, freqs_responses, results)
logs.insert(0, ("Frequency",
"Column-%s" % column,
"ENA-%s" % ena_parameter,
"Result"))
self.log_to_file.write("%s results:\n%s\n" %
(log_title, pprint.pformat(logs)))
return all(results)
def reset_data_for_next_test(self):
"""Resets internal data for the next testing cycle.
prepare_panel_widget is the first widget of a new testing cycle. This
function resets all the testing data. Caller should call switch_widget
to change current UI to the first widget.
"""
self.log_to_file = StringIO.StringIO()
self._raw_traces = {}
self.sn_input_widget.get_entry().set_text('')
for var in self._STATUS_NAMES:
self._update_status(var, None)
def on_result_enter(self):
# The UI will stop in this screen unless log is saved.
if self.save_log():
self.reset_data_for_next_test()
self.advance_state()
return False
def switch_to_sn_input_widget(self):
self.advance_state()
return True
def make_result_widget(self, on_key_enter):
widget = gtk.VBox()
widget.add(ful.make_label(_MESSAGE_RESULT_TAB))
for name, label in zip(self._STATUS_NAMES, self._STATUS_LABELS):
widget.add(make_status_row(name, self.display_dict,
label, ful.UNTESTED))
def key_press_callback(widget, event):
if event.keyval == gtk.keysyms.Return:
on_key_enter()
widget.key_callback = key_press_callback
return widget
def run_once(self, ena_host):
factory.log('%s run_once' % self.__class__)
factory.log('parameters: (ena_host: %s)' % ena_host)
# Setup the ENA host.
self.ena = agilent_scpi.ENASCPI(ena_host)
# Initialize variables.
self.display_dict = {}
self.base_config = PluggableConfig({})
self.last_handler = None
# Set up the UI widgets.
self.usb_prompt_widget = gtk.VBox()
self.usb_prompt_widget.add(ful.make_label(_MESSAGE_USB))
self.prepare_panel_widget = make_prepare_widget(
_MESSAGE_PREPARE_PANEL,
self.switch_to_sn_input_widget)
self.prepare_main_antenna_widget = make_prepare_widget(
_MESSAGE_PREPARE_MAIN_ANTENNA,
lambda : self.test_main_antennas(skip_flag=False),
lambda : self.test_main_antennas(skip_flag=True))
self.prepare_aux_antenna_widget = make_prepare_widget(
_MESSAGE_PREPARE_AUX_ANTENNA,
lambda : self.test_aux_antennas(skip_flag=False),
lambda : self.test_aux_antennas(skip_flag=True))
self.result_widget = self.make_result_widget(self.on_result_enter)
self.sn_input_widget = ful.make_input_window(
prompt='Enter Serial Number (TAB to use testing sample SN):',
on_validate=None,
on_keypress=self.on_sn_keypress,
on_complete=self.on_sn_complete)
# Make sure the entry in widget will have focus.
self.sn_input_widget.connect(
"show",
lambda *x : self.sn_input_widget.get_entry().grab_focus())
# Setup the relation of states and widgets.
self._state_widget = {
self._STATE_INITIAL: None,
self._STATE_WAIT_USB: self.usb_prompt_widget,
self._STATE_PREPARE_PANEL: self.prepare_panel_widget,
self._STATE_ENTERING_SN: self.sn_input_widget,
self._STATE_PREPARE_MAIN_ANTENNA: self.prepare_main_antenna_widget,
self._STATE_PREPARE_AUX_ANTENNA: self.prepare_aux_antenna_widget,
self._STATE_RESULT_TAB: self.result_widget
}
# Setup the usb monitor,
monitor = MediaMonitor()
monitor.start(on_insert=self.on_usb_insert,
on_remove=self.on_usb_remove)
# Setup the initial display.
self.test_widget = gtk.VBox()
self._state = -1
self.advance_state()
ful.run_test_widget(
self.job,
self.test_widget,
window_registration_callback=self.register_callbacks)