blob: c1300817b75ff15259aff020c3aabca8b552b129 [file] [log] [blame]
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import enum
import json
import os
import logging
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import chrome
from autotest_lib.client.common_lib.cros import power_load_util
from autotest_lib.client.cros.input_playback import keyboard
from autotest_lib.client.cros.power import power_dashboard
from autotest_lib.client.cros.power import power_status
from autotest_lib.client.cros.power import power_test
class power_MeetClient(power_test.power_Test):
"""class for power_MeetClient test.
This test should be call from power_MeetCall server test only.
"""
version = 1
video_url = 'http://meet.google.com'
doc_url = 'http://doc.new'
extra_browser_args = ['--use-fake-ui-for-media-stream']
def initialize(self, seconds_period=20., pdash_note='',
force_discharge=False):
"""initialize method."""
super(power_MeetClient, self).initialize(
seconds_period=seconds_period,
pdash_note=pdash_note,
force_discharge=force_discharge)
self._username = power_load_util.get_meet_username()
self._password = power_load_util.get_meet_password()
def run_once(self, meet_code, duration=180, layout='Tiled'):
"""run_once method.
@param meet_code: Meet code generated in power_MeetCall.
@param duration: duration in seconds.
@param layout: string of meet layout to use.
"""
with keyboard.Keyboard() as keys,\
chrome.Chrome(init_network_controller=True,
gaia_login=True,
username=self._username,
password=self._password,
extra_browser_args=self.extra_browser_args,
autotest_ext=True) as cr:
# Move existing window to left half and open video page
tab = cr.browser.tabs[0]
tab.Activate()
# Run in full-screen.
fullscreen = tab.EvaluateJavaScript('document.webkitIsFullScreen')
if not fullscreen:
keys.press_key('f4')
url = self.video_url + '/' + meet_code
logging.info('Navigating left window to %s', url)
tab.Navigate(url)
# Workaround when camera isn't init for some unknown reason.
time.sleep(10)
tab.EvaluateJavaScript('location.reload()')
tab.WaitForDocumentReadyStateToBeComplete()
logging.info(meet_code)
self.keyvals['meet_code'] = meet_code
def wait_until(cond, error_msg):
"""Helper for javascript polling wait."""
for _ in range(60):
time.sleep(1)
if tab.EvaluateJavaScript(cond):
return
raise error.TestFail(error_msg)
wait_until('window.hasOwnProperty("hrTelemetryApi")',
'Meet API does not existed.')
wait_until('hrTelemetryApi.isInMeeting()',
'Can not join meeting.')
wait_until('hrTelemetryApi.getParticipantCount() > 1',
'Meeting has no other participant.')
# Make sure camera and mic are on.
tab.EvaluateJavaScript('hrTelemetryApi.setCameraMuted(false)')
tab.EvaluateJavaScript('hrTelemetryApi.setMicMuted(false)')
if layout == 'Tiled':
tab.EvaluateJavaScript('hrTelemetryApi.setTiledLayout()')
elif layout == 'Auto':
tab.EvaluateJavaScript('hrTelemetryApi.setAutoLayout()')
elif layout == 'Sidebar':
tab.EvaluateJavaScript('hrTelemetryApi.setSidebarLayout()')
elif layout == 'Spotlight':
tab.EvaluateJavaScript('hrTelemetryApi.setSpotlightLayout()')
else:
raise error.TestError('Unknown layout %s' % layout)
self.keyvals['layout'] = layout
self.start_measurements()
time.sleep(duration)
end_time = self._start_time + duration
# Collect stat
meet_data = tab.EvaluateJavaScript(
'realtime.media.getMediaInfoDataPoints()')
power_dashboard.get_dashboard_factory().registerDataType(
MeetStatLogger, MeetStatDashboard)
self._meas_logs.append(
MeetStatLogger(self._start_time, end_time, meet_data))
class MeetStatLogger(power_status.MeasurementLogger):
"""Class for logging meet data point to power dashboard.
Format of meet_data http://google3/logs/proto/buzz/callstats.proto
"""
def __init__(self, start_ts, end_ts, meet_data):
# Do not call parent constructor to avoid making a new thread.
self.times = [start_ts]
# Meet epoch timestamp uses millisec unit.
self.meet_data = [data_point for data_point in meet_data
if start_ts * 1000 <= data_point['timestamp'] <= end_ts * 1000]
def calc(self, mtype=None):
return {}
def save_results(self, resultsdir, fname_prefix=None):
# Save raw dict from meet to file. Ignore fname_prefix.
with open(os.path.join(resultsdir, 'meet_powerlog.json'), 'w') as f:
json.dump(self.meet_data , f, indent=4, separators=(',', ': '),
ensure_ascii=False)
class MeetStatDashboard(power_dashboard.MeasurementLoggerDashboard):
"""Dashboard class for MeetStatLogger class."""
# Direction and type numbers map to constants in the proto
class Direction(enum.IntEnum):
"""Possible directions for media entries of a data point."""
SENDER = 0
RECEIVER = 1
class MediaType(enum.IntEnum):
"""Possible media types for media entries of a data point."""
VIDEO = 2
# Important metrics to collect.
MEET_KEYS = [
'encodeUsagePercent',
'fps',
'height',
'width',
]
def _get_ssrc_dict(self, meet_data):
""" Extract http://what/ssrc for all video stream and map to string.
The format of the string would be sender_# / receiver_# where # denotes
index for the video counting from 0.
Returns:
dict from ssrc to video stream string.
"""
ret = {}
count = [0, 0]
# We only care about video streams.
for media in meet_data[-1]['media']:
if media['mediatype'] != self.MediaType.VIDEO:
continue
if (media['direction'] != self.Direction.SENDER and
media['direction'] != self.Direction.RECEIVER):
continue
name = [media['directionStr'], str(count[media['direction']])]
if media['direction'] == self.Direction.SENDER:
name.append(media['sendercodecname'])
else:
name.append(media['receiverCodecName'])
count[media['direction']] += 1
ret[media['ssrc']] = '_'.join(name)
return ret
def _get_meet_unit(self, key):
"""Return unit from name of the key."""
if key.endswith('fps'):
return 'fps'
if key.endswith('Percent'):
return 'percent'
if key.endswith('width') or key.endswith('height') :
return 'point'
raise error.TestError('Unexpected key: %s' % key)
def _get_meet_type(self, key):
"""Return type from name of the key."""
if key.endswith('fps'):
return 'meet_fps'
if key.endswith('Percent'):
return 'meet_encoder_load'
if key.endswith('width'):
return 'meet_width'
if key.endswith('height'):
return 'meet_height'
raise error.TestError('Unexpected key: %s' % key)
def _convert(self):
"""Convert meet raw dict to data to power dict."""
meet_data = self._logger.meet_data
ssrc_dict = self._get_ssrc_dict(meet_data)
# Dict from timestamp to dict of meet_key to value
parse_dict = collections.defaultdict(
lambda: collections.defaultdict(int))
key_set = set()
testname='power_MeetCall'
for data_point in meet_data:
timestamp = data_point['timestamp']
for media in data_point['media']:
ssrc = media.get('ssrc', 0)
if ssrc not in ssrc_dict:
continue
name = ssrc_dict[media['ssrc']]
for meet_key in self.MEET_KEYS:
if meet_key not in media:
continue
key = '%s_%s' % (name, meet_key)
key_set.add(key)
parse_dict[timestamp][key] = media[meet_key]
timestamps = sorted(parse_dict.keys())
sample_count = len(timestamps)
powerlog_data = collections.defaultdict(list)
for ts in sorted(parse_dict.keys()):
for key in key_set:
powerlog_data[key].append(parse_dict[ts][key])
powerlog_dict = {
'sample_count': sample_count,
'sample_duration': 1,
'average': {k: 1.0 * sum(v) / sample_count
for k, v in powerlog_data.iteritems()},
'data': powerlog_data,
'unit': {k: self._get_meet_unit(k) for k in key_set},
'type': {k: self._get_meet_type(k) for k in key_set},
'checkpoint': [[testname]] * sample_count,
}
return powerlog_dict