blob: bff76656740395ccc239969f54a9edff83752ddf [file] [log] [blame] [edit]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import tempfile
from PIL import Image
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.cros.chameleon import chameleon_port_finder
from autotest_lib.client.cros.chameleon import chameleon_stream_server
from autotest_lib.client.cros.chameleon import edid
from autotest_lib.server import test
from autotest_lib.server.cros.multimedia import remote_facade_factory
class video_PlaybackQuality(test.test):
"""Server side video playback quality measurement.
This test measures the video playback quality by chameleon.
It will output 2 performance data. Number of Corrupted Frames and Number of
Dropped Frames.
"""
version = 1
# treat 0~0x30 as 0
COLOR_MARGIN_0 = 0x30
# treat (0xFF-0x60)~0xFF as 0xFF.
COLOR_MARGIN_255 = 0xFF - 0x60
# If we can't find the expected frame after TIMEOUT_FRAMES, raise exception.
TIMEOUT_FRAMES = 120
# RGB for black. Used for preamble and postamble.
RGB_BLACK = [0, 0, 0]
# Expected color bar rgb. The color order in the array is the same order in
# the video frames.
EXPECTED_RGB = [('Blue', [0, 0, 255]), ('Green', [0, 255, 0]),
('Cyan', [0, 255, 255]), ('Red', [255, 0, 0]),
('Magenta', [255, 0, 255]), ('Yellow', [255, 255, 0]),
('White', [255, 255, 255])]
def _save_frame_to_file(self, resolution, frame, filename):
"""Save video frame to file under results directory.
This function will append .png filename extension.
@param resolution: A tuple (width, height) of resolution.
@param frame: The video frame data.
@param filename: File name.
"""
image = Image.fromstring('RGB', resolution, frame)
image.save('%s/%s.png' % (self.resultsdir, filename))
def _check_rgb_value(self, value, expected_value):
"""Check value of the RGB.
This function will check if the value is in the range of expected value
and its margin.
@param value: The value for checking.
@param expected_value: Expected value. It's ether 0 or 0xFF.
@returns: True if the value is in range. False otherwise.
"""
if expected_value <= value <= self.COLOR_MARGIN_0:
return True
if expected_value >= value >= self.COLOR_MARGIN_255:
return True
return False
def _check_rgb(self, frame, expected_rgb):
"""Check the RGB raw data of all pixels in a video frame.
Because checking all pixels may take more than one video frame time. If
we want to analyze the video frame on the fly, we need to skip pixels
for saving the checking time.
The parameter of how many pixels to skip is self._skip_check_pixels.
@param frame: Array of all pixels of video frame.
@param expected_rgb: Expected values for RGB.
@returns: number of error pixels.
"""
error_number = 0
for i in xrange(0, len(frame), 3 * (self._skip_check_pixels + 1)):
if not self._check_rgb_value(ord(frame[i]), expected_rgb[0]):
error_number += 1
continue
if not self._check_rgb_value(ord(frame[i + 1]), expected_rgb[1]):
error_number += 1
continue
if not self._check_rgb_value(ord(frame[i + 2]), expected_rgb[2]):
error_number += 1
return error_number
def _find_and_skip_preamble(self, description):
"""Find and skip the preamble video frames.
@param description: Description of the log and file name.
"""
# find preamble which is the first black frame.
number_of_frames = 0
while True:
video_frame = self._stream_server.receive_realtime_video_frame()
(frame_number, width, height, _, frame) = video_frame
if self._check_rgb(frame, self.RGB_BLACK) == 0:
logging.info('Find preamble at frame %d', frame_number)
break
if number_of_frames > self.TIMEOUT_FRAMES:
raise error.TestFail('%s found no preamble' % description)
number_of_frames += 1
self._save_frame_to_file((width, height), frame,
'%s_pre_%d' % (description, frame_number))
# skip preamble.
# After finding preamble, find the first frame that is not black.
number_of_frames = 0
while True:
video_frame = self._stream_server.receive_realtime_video_frame()
(frame_number, _, _, _, frame) = video_frame
if self._check_rgb(frame, self.RGB_BLACK) != 0:
logging.info('End preamble at frame %d', frame_number)
self._save_frame_to_file((width, height), frame,
'%s_end_preamble' % description)
break
if number_of_frames > self.TIMEOUT_FRAMES:
raise error.TestFail('%s found no color bar' % description)
number_of_frames += 1
def _store_wrong_frames(self, frame_number, resolution, frames):
"""Store wrong frames for debugging.
@param frame_number: latest frame number.
@param resolution: A tuple (width, height) of resolution.
@param frames: Array of video frames. The latest video frame is in the
front.
"""
for index, frame in enumerate(frames):
if not frame:
continue
element = ((frame_number - index), resolution, frame)
self._wrong_frames.append(element)
def _check_color_bars(self, description):
"""Check color bars for video playback quality.
This function will read video frame from stream server and check if the
color is right by self._check_rgb until read postamble.
If only some pixels are wrong, the frame will be counted to corrupted
frame. If all pixels are wrong, the frame will be counted to wrong
frame.
@param description: Description of log and file name.
@return A tuple (corrupted_frame_count, wrong_frame_count) for quality
data.
"""
# store the recent 2 video frames for debugging.
# Put the latest frame in the front.
frame_history = [None, None]
# Check index for color bars.
check_index = 0
corrupted_frame_count = 0
wrong_frame_count = 0
while True:
# Because the first color bar is skipped in _find_and_skip_preamble,
# we start from the 2nd color.
check_index = (check_index + 1) % len(self.EXPECTED_RGB)
video_frame = self._stream_server.receive_realtime_video_frame()
(frame_number, width, height, _, frame) = video_frame
# drop old video frame and store new one
frame_history.pop(-1)
frame_history.insert(0, frame)
color_name = self.EXPECTED_RGB[check_index][0]
expected_rgb = self.EXPECTED_RGB[check_index][1]
error_number = self._check_rgb(frame, expected_rgb)
# The video frame is correct, go to next video frame.
if not error_number:
continue
# Total pixels need to be adjusted by the _skip_check_pixels.
total_pixels = width * height / (self._skip_check_pixels + 1)
log_string = ('[%s] Number of error pixels %d on frame %d, '
'expected color %s, RGB %r' %
(description, error_number, frame_number, color_name,
expected_rgb))
self._store_wrong_frames(frame_number, (width, height),
frame_history)
# clean history after they are stored.
frame_history = [None, None]
# Some pixels are wrong.
if error_number != total_pixels:
corrupted_frame_count += 1
logging.warn('[Corrupted]%s', log_string)
continue
# All pixels are wrong.
# Check if we get postamble where all pixels are black.
if self._check_rgb(frame, self.RGB_BLACK) == 0:
logging.info('Find postamble at frame %d', frame_number)
break
wrong_frame_count += 1
logging.info('[Wrong]%s', log_string)
# Adjust the check index due to frame drop.
# The screen should keep the old frame or go to next video frame
# due to frame drop.
# Check if color is the same as the previous frame.
# If it is not the same as previous frame, we assign the color of
# next frame without checking.
previous_index = ((check_index + len(self.EXPECTED_RGB) - 1)
% len(self.EXPECTED_RGB))
if not self._check_rgb(frame, self.EXPECTED_RGB[previous_index][1]):
check_index = previous_index
else:
check_index = (check_index + 1) % len(self.EXPECTED_RGB)
return (corrupted_frame_count, wrong_frame_count)
def _dump_wrong_frames(self, description):
"""Dump wrong frames to files.
@param description: Description of the file name.
"""
for frame_number, resolution, frame in self._wrong_frames:
self._save_frame_to_file(resolution, frame,
'%s_%d' % (description, frame_number))
self._wrong_frames = []
def _prepare_playback(self):
"""Prepare playback video."""
# Workaround for white bar on rightmost and bottommost on samus when we
# set fullscreen from fullscreen.
self._display_facade.set_fullscreen(False)
self._video_facade.prepare_playback(self._video_tempfile.name)
def _get_playback_quality(self, description, capture_dimension):
"""Get the playback quality.
This function will playback the video and analysis each video frames.
It will output performance data too.
@param description: Description of the log, file name and performance
data.
@param capture_dimension: A tuple (width, height) of the captured video
frame.
"""
logging.info('Start to get %s playback quality', description)
self._prepare_playback()
self._chameleon_port.start_capturing_video(capture_dimension)
self._stream_server.reset_video_session()
self._stream_server.dump_realtime_video_frame(
False, chameleon_stream_server.RealtimeMode.BestEffort)
self._video_facade.start_playback()
self._find_and_skip_preamble(description)
(corrupted_frame_count, wrong_frame_count) = (
self._check_color_bars(description))
self._stream_server.stop_dump_realtime_video_frame()
self._chameleon_port.stop_capturing_video()
self._video_facade.pause_playback()
self._dump_wrong_frames(description)
dropped_frame_count = self._video_facade.dropped_frame_count()
graph_name = '%s_%s' % (self._video_description, description)
self.output_perf_value(description='Corrupted frames',
value=corrupted_frame_count, units='frame',
higher_is_better=False, graph=graph_name)
self.output_perf_value(description='Wrong frames',
value=wrong_frame_count, units='frame',
higher_is_better=False, graph=graph_name)
self.output_perf_value(description='Dropped frames',
value=dropped_frame_count, units='frame',
higher_is_better=False, graph=graph_name)
def run_once(self, host, video_url, video_description, test_regions,
skip_check_pixels=5):
"""Runs video playback quality measurement.
@param host: A host object representing the DUT.
@param video_url: The ULR of the test video.
@param video_description: a string describes the video to play which
will be part of entry name in dashboard.
@param test_regions: An array of tuples (description, capture_dimension)
for the testing region of video. capture_dimension is a tuple
(width, height).
@param skip_check_pixels: We will check one pixel and skip number of
pixels. 0 means no skip. 1 means check 1 pixel and skip 1 pixel.
Because we may take more than 1 video frame time for checking
all pixels. Skip some pixles for saving time.
"""
# Store wrong video frames for dumping and debugging.
self._video_url = video_url
self._video_description = video_description
self._wrong_frames = []
self._skip_check_pixels = skip_check_pixels
factory = remote_facade_factory.RemoteFacadeFactory(
host, results_dir=self.resultsdir, no_chrome=True)
chameleon_board = host.chameleon
browser_facade = factory.create_browser_facade()
display_facade = factory.create_display_facade()
self._display_facade = display_facade
self._video_facade = factory.create_video_facade()
self._stream_server = chameleon_stream_server.ChameleonStreamServer(
chameleon_board.host.hostname)
chameleon_board.setup_and_reset(self.outputdir)
self._stream_server.connect()
# Download the video to self._video_tempfile.name
_, ext = os.path.splitext(video_url)
self._video_tempfile = tempfile.NamedTemporaryFile(suffix=ext)
# The default permission is 0o600.
os.chmod(self._video_tempfile.name, 0o644)
file_utils.download_file(video_url, self._video_tempfile.name)
browser_facade.start_default_chrome()
display_facade.set_mirrored(False)
edid_path = os.path.join(self.bindir, 'test_data', 'edids',
'EDIDv2_1920x1080')
finder = chameleon_port_finder.ChameleonVideoInputFinder(
chameleon_board, display_facade)
for chameleon_port in finder.iterate_all_ports():
self._chameleon_port = chameleon_port
connector_type = chameleon_port.get_connector_type()
logging.info('See the display on Chameleon: port %d (%s)',
chameleon_port.get_connector_id(),
connector_type)
with chameleon_port.use_edid(
edid.Edid.from_file(edid_path, skip_verify=True)):
resolution = utils.wait_for_value_changed(
display_facade.get_external_resolution,
old_value=None)
if resolution is None:
raise error.TestFail('No external display detected on DUT')
display_facade.move_to_display(
display_facade.get_first_external_display_id())
for description, capture_dimension in test_regions:
self._get_playback_quality('%s_%s' % (connector_type,
description),
capture_dimension)