blob: 6e420d2c53fd0f77bf0095a2f39507eaefe44828 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION :
#
# This is a factory test to test the external display (hdmi/vga/other)
# UI based heavily on factory_Display/factory_Audio
import gobject
import gtk
import logging
import os
import pango
import sys
import StringIO
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.test import ui as ful
_LABEL_STATUS_SIZE = (140, 30)
_LABEL_START_STR_AUDIO = 'Connect headset.\n'
_LABEL_START_STR = 'Connect external display\n\nhit SPACE to start test.'
_LABEL_RESPONSE_STR = ful.USER_PASS_FAIL_SELECT_STR
_VERBOSE = False
_SUBTEST_LIST = [
('Wait For External Display ',
{'msg' : 'Plug in External Display\n' + \
_LABEL_RESPONSE_STR,
'cond': 'connected',
}),
('External Display Video',
{'msg' : 'Do you see video on External Display?\n' + \
_LABEL_RESPONSE_STR,
'cfg_disp' : True,
}),
('\t1920x1200',
{'msg' : 'Do you see video on External Display? \n' + \
'Hit esc to update display list then confirm resolution\n\n' + \
_LABEL_RESPONSE_STR,
'res' : '1920x1200',
'cfg_disp' : True,
}),
('\t640x480',
{'msg' : 'Do you see video on External Display? \n' + \
'Hit esc to update display list and confirm resolution\n\n' + \
_LABEL_RESPONSE_STR,
'res' : '640x480',
'cfg_disp' : True,
}),
('\t1920x1080',
{'msg' : 'Do you see video on External Display? \n' + \
'Hit esc to update display list and confirm resolution\n\n' + \
_LABEL_RESPONSE_STR,
'res' : '1920x1080',
'cfg_disp' : True,
}),
('Turn off external display',
{'msg' : 'Has External display gone black?\n' + \
_LABEL_RESPONSE_STR,
'disp_off' : True,
}),
]
_CLEANUP = ('Disconnect Display',
{'msg':'Disconnect external display\n' + \
'Or press TAB to fail\n',
'cond' : 'disconnected',
})
class factory_ExtDisplayRes(test.test):
version = 1
def close_bgjob(self, name):
job = self._job
if job:
utils.nuke_subprocess(job.sp)
utils.join_bg_jobs([job], timeout=1)
result = job.result
if _VERBOSE and (result.stdout or result.stderr):
raise error.CmdError(
name, result,
'stdout: %s\nstderr: %s' % (result.stdout, result.stderr))
self._job = None
def goto_next_subtest(self):
if not self._subtest_queue:
gtk.main_quit()
return
self._current_subtest = self._subtest_queue.pop()
name, cfg = self._current_subtest
self.update_status(name, ful.ACTIVE)
def start_display(self, main, ext):
""" Pushes image to external display. """
cmd = ((
'while [ $(xrandr -d :0 | grep "^%s connected" | wc -l) ' +
' == "0" ]; do sleep 0.5; done; ' +
'xrandr -d :0 --output %s --auto --crtc 0 ' +
'--output %s --auto --crtc 1') %
(ext, main, ext))
self._job = utils.BgJob(cmd, stderr_level=logging.DEBUG)
def stop_display(self, ext):
""" Disconnects external display. """
cmd = "xrandr -d :0 --output %s --off" % (ext)
self._job = utils.BgJob(cmd, stderr_level=logging.DEBUG)
def set_resolution(self, ext, res):
""" Sets resolution of external display """
cmd = "xrandr -d :0 --output %s --mode %s" % (ext, res)
self._job = utils.BgJob(cmd, stderr_level=logging.DEBUG)
def list_connected_displays(self):
""" Lists characteristics of connected displays. """
pipein, pipeout = os.pipe()
cmd = 'xrandr -d :0 | grep " connected"'
self._job = utils.BgJob(cmd, stdin=pipeout)
self._job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
self._job.process_output(stdout=True, final_read=True)
para = self._job.sp.stdout.readlines()
vbox = self._test_widget
verse = ''
for line in para:
l1, l2 = line.split('(')
l2, l3 = l2.split(')')
verse += '\n' + l1 + '\n\t' + l2 + '\n\t' + l3 + '\n'
if self._display_list is not None:
self._test_widget.remove(self._display_list)
self._display_list = ful.make_label(verse, fg=ful.RED)
vbox.pack_start(self._display_list, False, False)
vbox.show_all()
def get_wait_condition(self, cond):
""" Returns wait condition based on input parameters. """
subtest_name, subtest_cfg = self._current_subtest
xrandr_str = 'xrandr -d :0'
grep_str = 'grep "^%s %s"' % (self._ext_display, cond)
wc_str = 'wc -l'
return '[ $('+xrandr_str + '|' + grep_str + '|' + wc_str + ') == "1" ]'
def start_subtest(self):
subtest_name, subtest_cfg = self._current_subtest
if 'cfg' in subtest_cfg:
for cfg in subtest_cfg['cfg']:
try:
utils.system(cfg)
except error.CmdError:
raise error.TestNAError('Setup failed\nCmd: %s' % cfg)
factory.log("cmd: " + cfg)
cond = subtest_cfg.get('cond')
if cond is not None:
subtest_cfg.update({'wait_cond' : self.get_wait_condition(cond)})
self._timer = gobject.timeout_add(500, self.timer_callback)
if 'cfg_disp' in subtest_cfg:
if (self._main_display is not None and
self._ext_display is not None):
res = subtest_cfg.get('res')
if res is not None:
self.set_resolution(self._ext_display, res)
else:
self.start_display(self._main_display, self._ext_display)
elif 'cmd' in subtest_cfg:
cmd = "%s %s" % (subtest_cfg['cmd'], self._sample)
factory.log("cmd: " + cmd)
self._job = utils.BgJob(cmd, stderr_level=logging.DEBUG)
elif 'disp_off' in subtest_cfg:
if (self._ext_display is not None):
self.stop_display(self._ext_display)
else:
self._job = None
def finish_subtest(self):
subtest_name, subtest_cfg = self._current_subtest
if 'postcfg' in subtest_cfg:
for cfg in subtest_cfg['postcfg']:
try:
utils.system(cfg)
except error.CmdError:
raise error.TestNAError('Setup failed\nCmd: %s' % cfg)
factory.log("cmd: " + cfg)
self.close_bgjob(subtest_cfg)
if self._timer is not None:
gobject.source_remove(self._timer)
self._timer = None
def start_next_subtest(self, current_test_status):
""" Main routine for iterating over the sub test list. """
subtest_name, subtest_cfg = self._current_subtest
self.update_status(subtest_name, current_test_status)
self.finish_subtest()
self.goto_next_subtest()
# evaluating a new subtest now
if subtest_name is not self._current_subtest[0]:
subtest_name, subtest_cfg = self._current_subtest
self.start_subtest()
self._prompt_label.set_text(subtest_cfg['msg'])
self._test_widget.queue_draw()
def key_press_callback(self, widget, event):
subtest_name, subtest_cfg = self._current_subtest
if event.keyval == gtk.keysyms.space and not self._started:
self.start_subtest()
self._prompt_label.set_text(subtest_cfg['msg'])
self._started = True
self._test_widget.queue_draw()
return True
def key_release_callback(self, widget, event):
if not self._started:
return True
subtest_name, subtest_cfg = self._current_subtest
if event.keyval == gtk.keysyms.Tab and \
self._status_map[subtest_name] is ful.ACTIVE:
self.start_next_subtest(ful.FAILED)
elif event.keyval == gtk.keysyms.Return and \
self._status_map[subtest_name] is ful.ACTIVE and \
'cond' not in subtest_cfg:
self.list_connected_displays()
self.start_next_subtest(ful.PASSED)
elif event.keyval == ord('Q'):
gtk.main_quit()
self.list_connected_displays()
return True
def timer_callback(self):
subtest_name, subtest_cfg = self._current_subtest
cond = subtest_cfg.get('wait_cond')
factory.log('waiting for cond: '+cond)
exit_code = utils.system(command=cond, ignore_status=True)
if exit_code == 0:
if subtest_cfg.get('cond') is 'disconnect':
self.update_status(subtest_name, ful.PASSED)
self.finish_subtest()
self.goto_next_subtest()
self._test_widget.queue_draw()
return False
else:
self.start_next_subtest(ful.PASSED)
self.list_connected_displays()
return True
def update_status(self, name, status):
self._status_map[name] = status
self._label_status[name].set_text(status)
self._label_status[name].modify_fg(gtk.STATE_NORMAL,
ful.LABEL_COLORS[status])
self._label_status[name].queue_draw()
def make_subtest_label_box(self, name):
eb = gtk.EventBox()
eb.modify_bg(gtk.STATE_NORMAL, ful.BLACK)
label_status = ful.make_label(ful.UNTESTED, size=_LABEL_STATUS_SIZE,
alignment=(0, 0.5),
fg=ful.LABEL_COLORS[ful.UNTESTED])
self._label_status[name] = label_status
label_en = ful.make_label(name, alignment=(1,0.5))
label_sep = ful.make_label(' : ', alignment=(0.5, 0.5))
hbox = gtk.HBox()
hbox.pack_end(label_status, False, False)
hbox.pack_end(label_sep, False, False)
hbox.pack_end(label_en, False, False)
eb.add(hbox)
return eb
def register_callbacks(self, window):
window.connect('key-press-event', self.key_press_callback)
window.add_events(gtk.gdk.KEY_PRESS_MASK)
window.connect('key-release-event', self.key_release_callback)
window.add_events(gtk.gdk.KEY_RELEASE_MASK)
def run_once(self,
has_audio=False,
audio_sample_path=None,
main_display=None,
ext_display=None):
factory.log('%s run_once' % self.__class__)
os.chdir(self.autodir)
self._main_display = main_display
self._ext_display = ext_display
self._display_list = None
self._started = False
self._timer = None
_SUBTEST_LIST.append(_CLEANUP)
self._subtest_queue = [x for x in reversed(_SUBTEST_LIST)]
self._status_map = dict((n, ful.UNTESTED) for n, c in _SUBTEST_LIST)
self._label_status = dict()
label_start = _LABEL_START_STR
prompt_label = ful.make_label(label_start, fg=ful.WHITE)
self._prompt_label = prompt_label
vbox = gtk.VBox()
vbox.pack_start(prompt_label, False, False)
for name, cfg in _SUBTEST_LIST:
label_box = self.make_subtest_label_box(name)
vbox.pack_start(label_box, False, False)
self._test_widget = vbox
self.goto_next_subtest()
ful.run_test_widget(self.job, vbox,
window_registration_callback=self.register_callbacks)
failed_set = set(name for name, status in self._status_map.items()
if status is not ful.PASSED)
if failed_set:
raise error.TestFail('some subtests failed (%s)' %
', '.join(failed_set))
factory.log('%s run_once finished' % self.__class__)