blob: b2cf080eb7c92f29d078f8b75dec12754ce8f5d0 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import os
import time
import urllib2
import selenium
from autotest_lib.client.bin import utils
from extension_pages import e2e_test_utils
from extension_pages import options
class TestUtils(object):
"""Contains all the helper functions for Chrome mirroring automation."""
short_wait_secs = 3
step_timeout_secs = 60
device_info = 'http://%s:8008/setup/eureka_info'
cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
'softirq', 'steal', 'guest', 'guest_nice']
cpu_idle_fields = ['idle', 'iowait']
def __init__(self):
"""Constructor"""
def _connect_extension(self, driver, extension_id):
"""Connects to extension.
@param driver: The webdriver instance.
@param extension_id: The id of Media Router extension.
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.go_to_page()
e2e_test_utils_page.execute_script(
'chrome.runtime.connect("%s")' % extension_id)
def start_mirroring_media_router(
self, driver, extension_id, receiver_ip, url, fullscreen,
udp_proxy_server=None, network_profile=None):
"""Starts a mirroring session using Media Router extension.
@param driver: The webdriver instance.
@param extension_id: The id of Media Router extension.
@param receiver_ip: The IP of the receiver to launch the activity.
@param url: The URL to navigate to.
@param fullscreen: Click the fullscreen button or not.
@param udp_proxy_server: Address of udp proxy server,
in http address format, http://<ip>:<port>.
@param network_profile: Network profile to use.
@return True if the function finishes.
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
tab_handles = driver.window_handles
device_name = self._get_device_name(receiver_ip)
self._connect_extension(driver, extension_id)
e2e_test_utils_page.execute_script(
'chrome.extension.getBackgroundPage().e2eTestService.start()')
time.sleep(self.short_wait_secs * 3) # Wait for discovery
self._connect_extension(driver, extension_id)
if network_profile and udp_proxy_server:
e2e_test_utils_page.execute_script(
'chrome.extension.getBackgroundPage().e2eTestService.'
'mirrorTabViaCastStreaming("%s", "%s", "%s", "%s")' % (
device_name, url,
udp_proxy_server, network_profile))
else:
e2e_test_utils_page.execute_script(
'chrome.extension.getBackgroundPage().e2eTestService.'
'mirrorTabViaCastStreaming("%s", "%s")' % (
device_name, url))
time.sleep(self.short_wait_secs) # Wait for mirroring to start
all_handles = driver.window_handles
video_handle = [x for x in all_handles if x not in tab_handles].pop()
driver.switch_to_window(video_handle)
self.navigate_to_test_url(driver, url, fullscreen)
return True
def stop_mirroring_media_router(self, driver, extension_id):
"""Stops a mirroring session on a device using Media Router extension.
@param driver: The webdriver instance.
@param extension_id: The id of Media Router extension.
"""
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.go_to_page()
self._connect_extension(driver, extension_id)
e2e_test_utils_page.execute_script(
('chrome.extension.getBackgroundPage()'
'.e2eTestService.stopMirroring()'))
# Wait for receiver to go back to home screen
time.sleep(self.short_wait_secs)
def enable_automatic_send_usage(self, driver):
"""Enables automatic send usage statistics and crash reports in Chrome.
@param driver: The webdriver instance of the browser.
"""
driver.get('chrome://settings-frame')
driver.find_element_by_id('advanced-settings-expander').click()
usage_check_box = driver.find_element_by_id('metrics-reporting-enabled')
utils.poll_for_condition(
usage_check_box.is_displayed, timeout=10, sleep_interval=1,
desc='Wait for automatic send usage checkbox.')
if not usage_check_box.is_selected():
usage_check_box.click()
def set_local_storage_mr_mirroring(
self, driver, extension_id, frame_rate=60):
"""Enables extension fine log in Chrome for debugging.
@param driver: The webdriver instance of the browser.
@param extension_id: The extension ID of Media Router extension.
@param frame_rate: The capture frame rate of Media Router extension.
"""
scripts = ('localStorage["debug.console"] = true;'
'localStorage["debug.logs"] = "fine";')
scripts += ('localStorage["mr.mirror.Settings.Overrides"] ='
' \'{"maxFrameRate":%s}\';') % str(frame_rate)
self._execute_script_reload_extension(
driver, extension_id, scripts)
def upload_mirroring_logs_media_router(self, driver, extension_id):
"""Uploads MR mirroring logs for the latest mirroring session.
@param driver: The webdriver instance of the browser.
@param extension_id: The extension ID of Media Router extension.
@return The report id in crash staging server or
empty if there is nothing.
"""
report_id = None
wait_time = 0
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.go_to_page()
while not report_id and wait_time < 90:
report_id = e2e_test_utils_page.execute_script(
('return localStorage["e2eTestService'
'.castStreamingMirrorLogId"]'))
time.sleep(self.short_wait_secs)
wait_time += self.short_wait_secs
if report_id:
return report_id
else:
return ''
def get_chrome_version(self, driver):
"""Returns the Chrome version that is being used for running test.
@param driver: The chromedriver instance.
@return The Chrome version.
"""
get_chrome_version_js = 'return window.navigator.appVersion;'
app_version = driver.execute_script(get_chrome_version_js)
for item in app_version.split():
if 'Chrome/' in item:
return item.split('/')[1]
return None
def get_chrome_revision(self, driver):
"""Returns Chrome revision number that is being used for running test.
@param driver: The chromedriver instance.
@return The Chrome revision number.
"""
get_chrome_revision_js = ('return document.getElementById("version").'
'getElementsByTagName("span")[2].innerHTML;')
driver.get('chrome://version')
return driver.execute_script(get_chrome_revision_js)
def get_extension_id_from_flag(self, extra_flags):
"""Gets the extension ID based on the whitelisted extension id flag.
@param extra_flags: A string which contains all the extra chrome flags.
@return The ID of the extension. Return None if nothing is found.
"""
extra_flags_list = extra_flags.split()
for flag in extra_flags_list:
if 'whitelisted-extension-id=' in flag:
return flag.split('=')[1]
return None
def navigate_to_test_url(self, driver, url, fullscreen):
"""Navigates to a given URL. Click fullscreen button if needed.
@param driver: The chromedriver instance.
@param url: The URL of the site to navigate to.
@param fullscreen: True and the video will play in full screen mode.
Otherwise, set to False.
"""
driver.get(url)
driver.refresh()
if fullscreen:
self.request_full_screen(driver)
def request_full_screen(self, driver):
"""Requests full screen.
@param driver: The chromedriver instance.
"""
try:
driver.find_element_by_id('fsbutton').click()
except selenium.common.exceptions.NoSuchElementException as err_msg:
print 'Full screen button is not found. ' + str(err_msg)
def set_focus_tab(self, driver, tab_handle):
"""Sets the focus on a tab.
@param driver: The chromedriver instance.
@param tab_handle: The chrome driver handle of the tab.
"""
driver.switch_to_window(tab_handle)
driver.get_screenshot_as_base64()
def output_dict_to_file(self, dictionary, file_name,
path=None, sort_keys=False):
"""Outputs a dictionary into a file.
@param dictionary: The dictionary to be output as JSON.
@param file_name: The name of the file that is being output.
@param path: The path of the file. The default is None.
@param sort_keys: Sort dictionary by keys when output. False by default.
"""
if path is None:
path = os.path.abspath(os.path.dirname(__file__))
# if json file exists, read the existing one and append to it
json_file = os.path.join(path, file_name)
if os.path.isfile(json_file) and dictionary:
with open(json_file, 'r') as existing_json_data:
json_data = json.load(existing_json_data)
dictionary = dict(json_data.items() + dictionary.items())
output_json = json.dumps(dictionary, sort_keys=sort_keys)
with open(json_file, 'w') as file_handler:
file_handler.write(output_json)
def compute_cpu_utilization(self, cpu_dict):
"""Generates the upper/lower bound and the average CPU consumption.
@param cpu_dict: The dictionary that contains CPU usage every sec.
@returns A dict that contains upper/lower bound and average cpu usage.
"""
cpu_bound = {}
cpu_usage = sorted(cpu_dict.values())
cpu_bound['lower_bound'] = (
'%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)])
cpu_bound['upper_bound'] = (
'%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)])
cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage)))
return cpu_bound
def cpu_usage_interval(self, duration, interval=1):
"""Gets the CPU usage over a period of time based on interval.
@param duration: The duration of getting the CPU usage.
@param interval: The interval to check the CPU usage. Default is 1 sec.
@return A dict that contains CPU usage over different time intervals.
"""
current_time = 0
cpu_usage = {}
while current_time < duration:
pre_times = self._get_system_times()
time.sleep(interval)
post_times = self._get_system_times()
cpu_usage[current_time] = self._get_avg_cpu_usage(
pre_times, post_times)
current_time += interval
return cpu_usage
def _execute_script_reload_extension(self, driver, extension_id, script):
"""Executes javascript in the extension and reload it afterwards.
@param driver: The chromedriver instance.
@param extension_id: The id of Media Router extension.
@param script: JavaScript to be executed.
"""
script += 'chrome.runtime.reload();'
current_handle = driver.current_window_handle
new_tab_handle = self._open_new_tab(driver)
e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
driver, extension_id)
e2e_test_utils_page.execute_script(script)
driver.switch_to_window(new_tab_handle)
def _get_avg_cpu_usage(self, pre_times, post_times):
"""Calculates the average CPU usage of two different periods of time.
@param pre_times: The CPU usage information of the start point.
@param post_times: The CPU usage information of the end point.
@return Average CPU usage over a time period.
"""
diff_times = {}
for field in self.cpu_fields:
diff_times[field] = post_times[field] - pre_times[field]
idle_time = sum(diff_times[field] for field in self.cpu_idle_fields)
total_time = sum(diff_times[field] for field in self.cpu_fields)
return float(total_time - idle_time) / total_time * 100.0
def _get_device_name(self, device_ip):
"""Gets all the Chromecast information through eureka_info page.
@param device_ip: string, the IP address the device.
@return Name of the device.
"""
response = urllib2.urlopen(self.device_info % device_ip).read()
return json.loads(response).get('name')
def _get_system_times(self):
"""Gets the CPU information from the system times.
@return An list with CPU usage of different processes.
"""
proc_stat = utils.read_file('/proc/stat')
for line in proc_stat.split('\n'):
if line.startswith('cpu '):
times = line[4:].strip().split(' ')
times = [int(jiffies) for jiffies in times]
return dict(zip(self.cpu_fields, times))
def _open_new_tab(self, driver):
"""Opens a new tab in Chrome.
@param driver: The webdriver instance of the browser.
@return Handle of the new tab.
"""
current_handles = driver.window_handles
driver.execute_script('window.open("chrome://newtab")')
utils.poll_for_condition(
lambda: len(driver.window_handles) > len(current_handles),
timeout=10, sleep_interval=1, desc='Wait for new tab to open.')
return list(set(driver.window_handles) - set(current_handles))[0]