blob: 77fcfcbf1884082a18e74396e30fd7f0abaf505b [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""XML RPC server for display testing."""
import argparse
import code
import logging
import multiprocessing
import os
import re
import time
import xmlrpclib
import telemetry
import common # pylint: disable=W0611
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib.cros import chrome, xmlrpc_server
from autotest_lib.client.cros import constants, cros_ui, sys_power
EXT_PATH = os.path.join(os.path.dirname(__file__), 'display_test_extension')
TimeoutException = telemetry.core.util.TimeoutException
class DisplayTestingXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
"""XML RPC delegate for display testing."""
def __init__(self, chrome):
self._chrome = chrome
self._browser = chrome.browser
def get_display_info(self):
"""Gets the display info from Chrome.system.display API.
@return array of dict for display info.
"""
extension = self._chrome.get_extension(EXT_PATH)
if not extension:
raise RuntimeError('Graphics test extension not found')
extension.ExecuteJavaScript('window.__display_info = null;')
extension.ExecuteJavaScript(
"chrome.system.display.getInfo(function(info) {"
"window.__display_info = info;})")
utils.wait_for_value(lambda: (
extension.EvaluateJavaScript("window.__display_info") != None),
expected_value=True)
return extension.EvaluateJavaScript("window.__display_info")
def _wait_for_display_options_to_appear(self, tab, display_index,
timeout=16):
"""Waits for option.DisplayOptions to appear.
The function waits until options.DisplayOptions appears or is timed out
after the specified time.
@param tab: the tab where the display options dialog is shown.
@param display_index: index of the display; 0 is the internal one
for chromebooks.
@param timeout: time wait for display options appear.
@raise RuntimeError when display_index is out of range
@raise TimeoutException when the operation is timed out.
"""
tab.WaitForJavaScriptExpression(
"typeof options !== 'undefined' &&"
"typeof options.DisplayOptions !== 'undefined' &&"
"typeof options.DisplayOptions.instance_ !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_ !== 'undefined'", timeout)
if not tab.EvaluateJavaScript(
"options.DisplayOptions.instance_.displays_.length > %d"
% (display_index)):
raise RuntimeError('Display index out of range: '
+ str(tab.EvaluateJavaScript(
"options.DisplayOptions.instance_.displays_.length")))
tab.WaitForJavaScriptExpression(
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d] !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d].id !== 'undefined' &&"
"typeof options.DisplayOptions.instance_"
" .displays_[%(index)d].resolutions !== 'undefined'"
% {'index': display_index}, timeout)
def get_available_resolutions(self, display_index):
"""Gets the resolution list from chrome://settings-frame/display via
telemetry.
@param display_index: index of the display to get resolutions from; 0 is
the internal one for chromebooks.
@return: A dict of available resolutions list.
@raise TimeoutException when the operation is timed out.
"""
tab = self._browser.tabs.New()
try:
tab.Navigate('chrome://settings-frame/display')
tab.Activate()
self._wait_for_display_options_to_appear(tab, display_index)
return tab.EvaluateJavaScript(
"options.DisplayOptions.instance_"
" .displays_[%(index)d].resolutions"
% {'index': display_index})
finally:
tab.Close()
def set_resolution(self, display_index, width, height, timeout=3):
"""Sets the resolution of the specified display.
@param display_index: index of the display to set resolution for;
0 is the internal one for chromebooks.
@param width: width of the resolution
@param height: height of the resolution
@param timeout: maximal time in seconds waiting for the new resolution
to settle in.
@raise TimeoutException when the operation is timed out.
"""
tab = self._browser.tabs.New()
try:
tab.Navigate('chrome://settings-frame/display')
tab.Activate()
self._wait_for_display_options_to_appear(tab, display_index)
tab.ExecuteJavaScript(
# Previous version before CR:417113012 (targeted for M38)
#"chrome.send('setResolution',["
#" options.DisplayOptions.instance_"
#" .displays_[%(index)d].id, %(width)d, %(height)d]);"
"for (resolution_index in options.DisplayOptions"
" .instance_.displays_[%(index)d].resolutions) {"
" var resolution = options.DisplayOptions"
" .instance_.displays_[%(index)d].resolutions["
" resolution_index];"
" if (resolution.originalWidth == %(width)d &&"
" resolution.originalHeight == %(height)d) {"
" chrome.send('setDisplayMode', ["
" options.DisplayOptions.instance_"
" .displays_[%(index)d].id, resolution]);"
" break;"
" }"
"}"
% {'index': display_index, 'width': width, 'height': height}
)
# TODO(tingyuan):
# Support for multiple external monitors (i.e. for chromebox)
end_time = time.time() + timeout
while time.time() < end_time:
r = self.get_resolution(self.get_ext_connector_name())
if (width, height) == (r[0], r[1]):
return True
time.sleep(0.1)
raise TimeoutException("Failed to change resolution to %r (%r"
" detected)" % ((width, height), r))
finally:
tab.Close()
def get_resolution(self, output):
"""Gets the resolution of the specified output.
@param output: The output name as a string.
@return The resolution of output as a tuple (width, height,
fb_offset_x, fb_offset_y) of ints.
"""
regexp = re.compile(
r'^([-A-Za-z0-9]+)\s+connected\s+(\d+)x(\d+)\+(\d+)\+(\d+)',
re.M)
match = regexp.findall(utils.call_xrandr())
for m in match:
if m[0] == output:
return (int(m[1]), int(m[2]), int(m[3]), int(m[4]))
return (0, 0, 0, 0)
def take_tab_screenshot(self, url_pattern, output_suffix):
"""Takes a screenshot of the tab specified by the given url pattern.
The captured screenshot is saved to:
/tmp/screenshot_<output_suffix>_<last_part_of_url>.png
@param url_pattern: A string of url pattern used to search for tabs.
@param output_suffix: A suffix appended to the file name of captured
PNG image.
"""
if not url_pattern:
# If no URL pattern is provided, defaults to capture all the tabs
# that show PNG images.
url_pattern = '.png'
tabs = self._browser.tabs
screenshots = []
for i in xrange(0, len(tabs)):
if url_pattern in tabs[i].url:
screenshots.append((tabs[i].url, tabs[i].Screenshot(timeout=5)))
output_file = ('/tmp/screenshot_%s_%%s.png' % output_suffix)
for url, screenshot in screenshots:
image_filename = os.path.splitext(url.rsplit('/', 1)[-1])[0]
screenshot.WriteFile(output_file % image_filename)
return True
def toggle_mirrored(self):
"""Toggles mirrored.
Emulates L_Ctrl + Maximize in X server to toggle mirrored.
"""
self.press_key('ctrl+F4')
return True
def press_key(self, key_str):
"""Presses the given key(s).
@param key_str: A string of the key(s), like 'ctrl+F4', 'Up'.
"""
command = 'xdotool key %s' % key_str
cros_ui.xsystem(command)
return True
def set_mirrored(self, is_mirrored):
"""Sets mirrored mode.
@param is_mirrored: True or False to indicate mirrored state.
"""
def _is_mirrored_enabled():
return bool(self.get_display_info()[0]['mirroringSourceId'])
retries = 3
while _is_mirrored_enabled() != is_mirrored and retries > 0:
self.toggle_mirrored()
time.sleep(3)
retries -= 1
return _is_mirrored_enabled() == is_mirrored
def suspend_resume(self, suspend_time=10):
"""Suspends the DUT for a given time in second.
@param suspend_time: Suspend time in second.
"""
sys_power.do_suspend(suspend_time)
return True
def suspend_resume_bg(self, suspend_time=10):
"""Suspends the DUT for a given time in second in the background.
@param suspend_time: Suspend time in second.
"""
process = multiprocessing.Process(target=self.suspend_resume,
args=(suspend_time,))
process.start()
return True
def get_ext_connector_name(self):
"""Gets the name of the external output connector.
@return The external output connector name as a string.
"""
xrandr_output = utils.get_xrandr_output_state()
for output in xrandr_output.iterkeys():
if (output.startswith('HDMI') or
output.startswith('DP') or
output.startswith('DVI')):
return output
return False
def wait_output_connected(self, output):
"""Wait for output to connect.
@param output: The output name as a string.
@return: True if output is connected; False otherwise.
"""
def _is_connected(output):
xrandr_output = utils.get_xrandr_output_state()
if output not in xrandr_output:
return False
return xrandr_output[output]
return utils.wait_for_value(lambda: _is_connected(output),
expected_value=True)
def load_url(self, url):
"""Loads the given url in a new tab.
@param url: The url to load as a string.
"""
tab = self._browser.tabs.New()
tab.Navigate(url)
tab.Activate()
return True
def close_tab(self, index=-1):
"""Closes the tab of the given index.
@param index: The tab index to close. Defaults to the last tab.
"""
self._browser.tabs[index].Close()
return True
def reconnect_output(self, output):
"""Reconnects output.
@param output: The output name as a string.
"""
utils.set_xrandr_output(output, False)
utils.set_xrandr_output(output, True)
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action='store_true', required=False,
help=('create a debug console with a ServerProxy "s" '
'connecting to the XML RPC sever at localhost'))
args = parser.parse_args()
if args.debug:
s = xmlrpclib.ServerProxy('http://localhost:%d' %
constants.DISPLAY_TESTING_XMLRPC_SERVER_PORT)
code.interact(local=locals())
else:
logging.basicConfig(level=logging.DEBUG)
logging.debug('display_xmlrpc_server main...')
os.environ['DISPLAY'] = ':0.0'
os.environ['XAUTHORITY'] = '/home/chronos/.Xauthority'
extra_browser_args = ['--enable-gpu-benchmarking']
with chrome.Chrome(extension_paths=[EXT_PATH],
extra_browser_args=extra_browser_args) as cr:
server = xmlrpc_server.XmlRpcServer(
'localhost', constants.DISPLAY_TESTING_XMLRPC_SERVER_PORT)
server.register_delegate(DisplayTestingXmlRpcDelegate(cr))
server.run()