blob: 734f272dbd09eaecc4156c9bcc5bdd20ba71bec5 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime, logging, os, time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import base_utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.common_lib import sequence_utils
from autotest_lib.client.common_lib.cros import chrome
from autotest_lib.client.cros import constants, service_stopper
from autotest_lib.client.cros.image_comparison import image_comparison_factory
from autotest_lib.client.cros.video import media_test_factory
from autotest_lib.client.cros.video import sequence_generator
from autotest_lib.client.cros.multimedia import local_facade_factory
class video_GlitchDetection(test.test):
""""
Takes video frames/screenshots and compares them against known golden ones.
Main test steps:
1. Configure test, set up environment, create needed objects.
2. Download golden images from cloud storage.
4. Load the video based on received configuration (video_format, res)
5. Capture images/frames for the video
6. Verify that captured images/frames are the same as expected golden
ones.
The criteria to determine if two images are the same is read from the
configuration above.
"""
version = 2
def initialize(self):
"""Perform necessary initialization prior to test run.
Private Attributes:
_services: service_stopper.ServiceStopper object
"""
# Do not switch off screen for screenshot utility.
self._services = service_stopper.ServiceStopper(['powerd'])
self._services.stop_services()
def cleanup(self):
self._services.restore_services()
if not self.collect_only: # keep images if you are collecting golden
# images
file_utils.rm_dir_if_exists(self.test_dir)
def setup_image_capturing(self):
"""
Set up the environment for capturing images.
"""
self.player = self.factory.make_video_player()
file_utils.make_leaf_dir(self.golden_images_dir)
file_utils.rm_dir_if_exists(self.test_dir)
file_utils.make_leaf_dir(self.golden_images_dir)
file_utils.ensure_dir_exists(self.test_dir)
def setup_image_comparison(self):
"""
Create objects needed for image comparison.
"""
img_comp_conf_path = os.path.join(self.factory.autotest_cros_video_dir,
'image_comparison.conf')
img_comp_factory = image_comparison_factory.ImageComparisonFactory(
img_comp_conf_path)
bp_proj_specs = [img_comp_factory.bp_base_projname,
self.factory.device_under_test,
self.video_format,
self.video_def,
utils.get_chromeos_release_version().replace('.', '_')]
bp_proj_name = '.'.join(bp_proj_specs)
comparer = img_comp_factory.make_upload_on_fail_comparer(bp_proj_name)
self.bp_comparer = img_comp_factory.make_bp_comparer(bp_proj_name)
self.verifier = img_comp_factory.make_image_verifier(comparer)
def run_screenshot_image_comparison_test(self):
"""
Capture screenshots and compare against golden ones.
"""
capturer = self.factory.make_import_screenshot_capturer()
screenshot_collector = self.factory.make_video_screenshot_collector(
capturer=capturer, player=self.player)
timestamps = sequence_generator.generate_random_sequence(
self.factory.start_capture,
self.factory.stop_capture,
self.factory.samples_per_min)
test_images_paths = screenshot_collector.collect_multiple_screenshots(
timestamps)
namer = self.factory.make_screenshot_filenamer()
filenames = [namer.get_filename(t) for t in timestamps]
golden_images_paths = self.download_golden_images(filenames)
self.verifier.verify(golden_images_paths, test_images_paths)
def capture_frames(self):
"""
Capture frames using chameleon.
"""
self.video_capturer = self.factory.make_chameleon_video_capturer(
self.host.hostname, self.chameleon_host_args)
with self.video_capturer as c:
self.player.load_video()
self.display_facade.move_to_display(
self.display_facade.get_first_external_display_index())
self.display_facade.set_fullscreen(True)
self.player.play()
utils.poll_for_condition(lambda : self.player.currentTime() > 0.0,
timeout=5,
exception=error.TestError(
"Expected current time to be > 0"))
self.player.pause()
self.player.seek_to(datetime.timedelta(seconds=0))
logging.debug("Wait for fullscreen notifications to go away.")
time.sleep(5)
frame_count = (self.max_frame_count if self.max_frame_count else
self.factory.video_frame_count)
return c.capture_only(self.player, frame_count)
def get_unique_checksum_indices(self, checksums):
"""
Returns a list of checksum, first_occurance_index.
@param checksums: list of checksums
@return: list of checksum, first_occurance_index
"""
prev_checksum = None
checksum_index_list = []
for i, checksum in enumerate(checksums):
if checksum != prev_checksum:
checksum_index_list.append((checksum, i))
prev_checksum = checksum
return checksum_index_list
def collect_chameleon_golden_images(self):
"""
Collect golden images but don't run any comparisons. Useful for creating
new golden images.
"""
checksums = self.capture_frames()
checksum_index_list = self.get_unique_checksum_indices(checksums)
# check that we don't have unexpected repeated frames
self.ensure_expected_frame_repeat_count(
checksum_index_list, self.factory.max_repeat_frame_count)
indices = [entry[1] for entry in checksum_index_list]
self.video_capturer.write_images(indices)
golden_checksum_path = os.path.join(
self.test_dir, self.factory.golden_checksum_filename)
# write checksums to file
logging.debug("Write golden checksum file to %s", golden_checksum_path)
with open(golden_checksum_path, "w+") as f:
for checksum, ind in checksum_index_list:
f.write(' '.join([str(i) for i in checksum]))
f.write(' | %d\n' % ind)
def read_chameleon_golden_checksumfile(self, path):
"""
Reads the golden checksum file. Each line in file has the format
w x y z | count where w x y z is a chameleon frame checksum
@param path: complete path to the golden checksum file.
@return an OrderedDict of checksum -> count.
"""
checksum_index_list = []
with open(path) as f:
for line in f:
entry = line.split("|") # w x y z | count
# turn into a tuple because a list can not be used a dict key
checksum = [int(val) for val in entry[0].split()]
checksum_index_list.append((checksum, int(entry[1])))
return checksum_index_list
def ensure_expected_frame_repeat_count(self, checksum_indices,
max_repeat_count):
"""
Walks the list of checksum, first occurrence tuple to ensure that
repeated counts of frames are reasonable.
@param checksum_indices: list of tuples checksum, first index occurred.
@param max_repeat_count: int, max. frame frequency allowed
"""
for i in range(0, len(checksum_indices) - 1):
checksum = checksum_indices[i][0]
checksum_first_occurred = checksum_indices[i][1]
next_checksum_first_occurred = checksum_indices[i + 1][1]
repeat_count = (next_checksum_first_occurred -
checksum_first_occurred)
if repeat_count > max_repeat_count:
msg = ("Too many repeated frames for checksum %s! # of "
"repeated frames : %d. Max allowed is : %d"
%(checksum, repeat_count, max_repeat_count))
raise error.TestFail(msg)
def run_chameleon_image_comparison_test(self):
"""
Capture frames. Perform image comparison. Also check that the number
of frames is distributed as expected.
"""
def dump_list(l):
"""
Logs list line by line.
@param l, the list to log.
"""
for elem in l:
logging.debug(elem)
checksums = self.capture_frames()
logging.debug("*** RAW checksums ***")
dump_list(checksums)
test_checksum_ind_list = self.get_unique_checksum_indices(checksums)
logging.debug("*** FILTERED checksum ***")
dump_list(test_checksum_ind_list)
# We may get a frame that is repeated many times
self.ensure_expected_frame_repeat_count(
test_checksum_ind_list, self.factory.max_repeat_frame_count)
logging.debug("Download golden checksum file.")
remote_golden_checksum_path = os.path.join(
self.remote_golden_images_dir, self.factory.golden_checksum_filename)
golden_checksum_path = os.path.join(
self.golden_images_dir, self.factory.golden_checksum_filename)
file_utils.download_file(remote_golden_checksum_path,
golden_checksum_path)
golden_checksum_ind_list = self.read_chameleon_golden_checksumfile(
golden_checksum_path)
eps = self.factory.frame_count_deviation
golden_count = len(golden_checksum_ind_list)
test_count = len(test_checksum_ind_list)
# We may get too little test frames
if golden_count - test_count > eps:
logging.debug("Too few test images, save what we got.")
indices = [i for _, i in test_checksum_ind_list]
self.video_capturer.write_images(indices, self.resultsdir)
msg = ("Expecting about %d checksums, received %d. Allowed delta "
"is %d") % (golden_count, test_count, eps)
raise error.TestFail(msg)
# truncate if too many
# TODO: mussa, figure out why we get too many crbug/484057
# Started by adding extra log messages in video capturer
test_checksum_ind_list = test_checksum_ind_list[:golden_count]
"""
Find the length of a longest common subsequence (LCS) between golden
and test checksums.
Sometimes you have a missing frame or an extra frame in one list or the
other. Using LCS we skip over the missing frame and continue the
comparison on the rest of the list
"""
# Use a tuple because we will need to hash the checksums into a set
golden_checksums = [tuple(elem[0]) for elem in golden_checksum_ind_list]
test_checksums = [tuple(elem[0]) for elem in test_checksum_ind_list]
lcs_len = sequence_utils.lcs_length(golden_checksums, test_checksums)
eps = self.factory.nonmatching_frames_eps
missing_frames_count = len(golden_checksums) - lcs_len
unknown_frames_count = len(test_checksums) - lcs_len
msg = ("# of matching frames : %d. # of missing frames : %d. # of "
"unknown test frames : %d. Max allowed # of missing frames : "
"%d. # of golden frames : %d. # of test_checksums : %d"
%(lcs_len, missing_frames_count, unknown_frames_count, eps,
len(golden_checksums), len(test_checksums)))
logging.debug(msg)
# get the checksums that are in test run but not in the golden run
# they could be glitchy
unknown_checksums = set(test_checksums) - set(golden_checksums)
if missing_frames_count + unknown_frames_count > eps:
# save it for later review, we are not really verifying,
indices = [i for checksum, i in test_checksum_ind_list if tuple(
checksum) in unknown_checksums]
paths = self.video_capturer.write_images(indices)
comp_url = ''
for path in paths:
comp_result = self.bp_comparer.compare(path, path)
# need the parent link, should be just one for all comparisons
if not comp_url:
comp_url = os.path.dirname(comp_result.comparison_url)
raise error.TestFail("Too many non-matching frames! " + msg +
" Comparison urls : " + comp_url)
def download_golden_images(self, filepaths):
"""
Downloads golden images corresponding to the captured test images.
@param filepaths: list of golden image filepaths.
@return: list of paths to downloaded golden images.
"""
golden_images = []
if type(filepaths) is not list:
filepaths = [filepaths]
filenames = [os.path.basename(path) for path in filepaths]
for f in filenames:
localpath = os.path.join(self.factory.local_golden_images_dir, f)
remotepath = os.path.join(self.factory.golden_images_remote_dir, f)
file_utils.download_file(remotepath, localpath)
golden_images.append(localpath)
return golden_images
def run_once(self, channel, video_name, video_format='', video_def='',
collect_only=False, use_chameleon=False, host=None, args=None):
"""
Work around. See crbug/404773. Some boards have a scaling factor that
results in screenshots being larger than expected. (This factor was
intentionally changed.
To have the same canvas size we force the scale factor.
This only affects hd devices on our list: nyan and pi and has no
effect on sd devices.
For link specifically, we don't force that factor has that causes
the actual device resolution to change. We don't want that.
"""
# TODO: mussa: Remove code if scale factor get reverted to prev value.
self.host = host
self.video_format = video_format
self.video_def = video_def
self.use_chameleon = use_chameleon
self.collect_only = collect_only
self.max_frame_count = None
do_not_scale_boards = ['link']
this_board = utils.get_current_board()
scale_args = ['--force-device-scale-factor', '1']
browser_args = [] if this_board in do_not_scale_boards else scale_args
self.chameleon_host_args = ""
# when we are collecting images, we just have one control file, we will
# receive different command line parameters so process those
# args are delivered like this:
# ['CHAMELEON_HOST=100.107.2.250,video_format=mp4,video_def=480p']
video_args = {}
if args:
args = args[0].split(',')
self.chameleon_host_args = [arg for arg in args if arg.lower().
startswith('chameleon_host')]
video_args = base_utils.args_to_dict(args)
# if args are provided use those instead
if 'video_format' in video_args:
self.video_format = video_args['video_format']
if 'video_def' in video_args:
self.video_def = video_args['video_def']
if 'max_frame_count' in video_args:
self.max_frame_count = int(video_args['max_frame_count'])
ext_paths = []
if use_chameleon:
ext_paths = [constants.DISPLAY_TEST_EXTENSION]
wpr_server = (media_test_factory.MediaTestFactory
.make_webpagereplay_server(video_name))
browser_args += wpr_server.chrome_flags_for_wpr
with chrome.Chrome(extra_browser_args=browser_args,
extension_paths=ext_paths,
autotest_ext=True) as cr, wpr_server:
cr.browser.SetHTTPServerDirectories(self.bindir)
self.factory = media_test_factory.MediaTestFactory(
chrome=cr,
bin_dir=self.bindir,
channel=channel,
video_name=video_name,
video_format=self.video_format,
video_def=self.video_def)
self.display_facade = local_facade_factory.LocalFacadeFactory(
cr).create_display_facade()
self.test_dir = self.factory.test_working_dir
self.golden_images_dir = self.factory.local_golden_images_dir
self.remote_golden_images_dir = self.factory.golden_images_remote_dir
if not self.factory.is_board_supported:
logging.debug("Board not supported. End Test.")
return
self.setup_image_capturing()
if collect_only:
self.collect_chameleon_golden_images()
else:
self.setup_image_comparison()
if use_chameleon:
self.run_chameleon_image_comparison_test()
else:
self.run_screenshot_image_comparison_test()