blob: 9cdef5720b01c6d9f55254b25f2b5b185427e50d [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing a test suite that is run to test auto updates."""
from __future__ import print_function
import os
import re
import signal
import sys
import time
import unittest
import urllib
from functools import partial
from chromite.lib import cros_logging as logging
from chromite.lib import dev_server_wrapper
from chromite.lib import signals
from crostestutils.au_test_harness import cros_test_proxy
from crostestutils.au_test_harness import gce_au_worker
from crostestutils.au_test_harness import real_au_worker
from crostestutils.au_test_harness import update_exception
from crostestutils.au_test_harness import vm_au_worker
class AUTest(unittest.TestCase):
"""Test harness that uses an au_worker to perform and validate updates.
Defines a test suite that is run using an au_worker. An au_worker can
be created to perform and validates updates on both virtual and real devices.
See documentation for au_worker for more information.
"""
def __init__(self, *_args, **_kwargs):
super(AUTest, self).__init__(*_args, **_kwargs)
# Original signal handlers.
self._old_sigint = None
self._old_sigterm = None
# Verify that the signals modules is actually usable, and won't segfault
# upon invocation of getsignal. See signals.SignalModuleUsable for the
# details and upstream python bug.
self._use_signals = signals.SignalModuleUsable()
@classmethod
def ProcessOptions(cls, options):
"""Processes options for the test suite and sets up the worker class.
Args:
options: options class to be parsed from main class.
"""
cls.base_image_path = options.base_image
cls.payload_signing_key = options.payload_signing_key
cls.target_image_path = options.target_image
cls.test_results_root = options.test_results_root
if options.type == 'vm':
cls.worker_class = vm_au_worker.VMAUWorker
elif options.type == 'gce':
cls.worker_class = gce_au_worker.GCEAUWorker
else:
cls.worker_class = real_au_worker.RealAUWorker
# Cache away options to instantiate workers later.
cls.options = options
def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
"""Attempt a payload update, expect it to fail with expected log."""
try:
self.worker.UpdateUsingPayload(payload)
except update_exception.UpdateException as err:
# Will raise ValueError if expected is not found.
if re.search(re.escape(expected_msg), err.output, re.MULTILINE):
return
logging.warning("Didn't find %r in:\n%s", expected_msg, err.output)
self.fail('We managed to update when failure was expected')
def AttemptUpdateWithFilter(self, update_filter, proxy_port=8081):
"""Update through a proxy, with a specified filter, and expect success."""
target_image_path = self.worker.PrepareBase(self.target_image_path)
# We assume that devserver starts at the default port (8080), and start
# our proxy at a different one. We then tell our update tools to
# have the client connect to our proxy_port instead of the 8080.
proxy = cros_test_proxy.CrosTestProxy(
port_in=proxy_port,
address_out='127.0.0.1',
port_out=dev_server_wrapper.DEFAULT_PORT,
data_filter=update_filter
)
proxy.serve_forever_in_thread()
try:
self.worker.PerformUpdate(target_image_path, target_image_path,
proxy_port=proxy_port)
finally:
proxy.shutdown()
# --- UNITTEST SPECIFIC METHODS ---
def setUp(self):
"""Overrides unittest.TestCase.setUp and called before every test.
Sets instance specific variables and initializes worker.
"""
super(AUTest, self).setUp()
# Install custom signal handlers that call worker.CleanUp on the receipt of
# SIGINT and SIGTERM. This is particularly useful in cases where the worker
# allocates resources in prepare stage, but is taking too long at test stage
# and the caller decides to terminate it (by sending a SIGTERM). The default
# action of SIGTERM is terminate, which leaves expensive resources leaked
# and/or local environment tainted.
if self._use_signals:
self._InstallHandlers()
self.worker = self.worker_class(self.options, AUTest.test_results_root)
self.download_folder = os.path.join(os.path.realpath(os.path.curdir),
'latest_download')
def tearDown(self):
"""Overrides unittest.TestCase.tearDown and called after every test."""
self.worker.CleanUp()
# Restore signal handlers.
if self._use_signals:
self._RestoreHandlers()
def testUpdateKeepStateful(self):
"""Tests if we can update normally.
This test checks that we can update by updating the stateful partition
rather than wiping it.
"""
self.worker.Initialize(self.options.ssh_port or 9222)
# Just make sure some tests pass on original image. Some old images
# don't pass many tests.
base_image_path = self.worker.PrepareBase(self.base_image_path)
# Update to
self.worker.PerformUpdate(self.target_image_path, base_image_path)
self.assertTrue(self.worker.VerifyImage(self))
# Update from
self.worker.PerformUpdate(self.target_image_path, self.target_image_path)
self.assertTrue(self.worker.VerifyImage(self))
def testUpdateWipeStateful(self):
"""Tests if we can update after cleaning the stateful partition.
This test checks that we can update successfully after wiping the
stateful partition.
"""
self.worker.Initialize(self.options.ssh_port or 9223)
# Just make sure some tests pass on original image. Some old images
# don't pass many tests.
base_image_path = self.worker.PrepareBase(self.base_image_path)
# Update to
self.worker.PerformUpdate(self.target_image_path, base_image_path,
'clean')
self.assertTrue(self.worker.VerifyImage(self))
# Update from
self.worker.PerformUpdate(self.target_image_path, self.target_image_path,
'clean')
self.assertTrue(self.worker.VerifyImage(self))
def testInterruptedUpdate(self):
"""Tests what happens if we interrupt payload delivery 3 times."""
class InterruptionFilter(cros_test_proxy.Filter):
"""This filter causes the proxy to interrupt the download 3 times.
It does this by closing the first three connections after they transfer
2M total in the outbound direction.
"""
def __init__(self):
"""Defines variable shared across all connections."""
self.close_count = 0
self.data_size = 0
def setup(self):
"""Called once at the start of each connection."""
self.data_size = 0
# Overriden method. The first three connections transferring more than 2M
# outbound will be closed.
def OutBound(self, data):
if self.close_count < 3:
if self.data_size > (2 * 1024 * 1024):
self.close_count += 1
return None
self.data_size += len(data)
return data
self.worker.Initialize(self.options.ssh_port or 9224)
self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082)
def testSimpleSignedUpdate(self):
"""Test that updates to itself with a signed payload."""
self.worker.Initialize(self.options.ssh_port or 9226)
signed_target_image_path = self.worker.PrepareBase(self.target_image_path,
signed_base=True)
if self.payload_signing_key:
self.worker.PerformUpdate(
self.target_image_path, signed_target_image_path,
payload_signing_key=self.payload_signing_key)
else:
logging.info('No key found to use for signed testing.')
def SimpleTestUpdateAndVerify(self):
"""Test that updates to itself.
We explicitly don't use test prefix so that isn't run by default. Can be
run using test_prefix option.
"""
self.worker.Initialize(self.options.ssh_port or 9227)
target_image_path = self.worker.PrepareBase(self.target_image_path)
self.worker.PerformUpdate(target_image_path, target_image_path)
self.assertTrue(self.worker.VerifyImage(self))
def SimpleTestVerify(self):
"""Test that only verifies the target image.
We explicitly don't use test prefix so that isn't run by default. Can be
run using test_prefix option.
"""
self.worker.Initialize(self.options.ssh_port or 9228)
self.worker.PrepareBase(self.target_image_path)
self.assertTrue(self.worker.VerifyImage(self))
# --- DISABLED TESTS ---
def NoTestDelayedUpdate(self):
"""Tests what happens if some data is delayed during update delivery."""
class DelayedFilter(cros_test_proxy.Filter):
"""Causes intermittent delays in data transmission.
It does this by inserting 3 20 second delays when transmitting
data after 2M has been sent.
"""
def __init__(self):
"""Defines variable shared across all connections."""
self.data_size = 0
self.delay_count = 0
def setup(self):
"""Called once at the start of each connection."""
self.data_size = 0
self.delay_count = 0
# The first three packets after we reach 2M transferred
# are delayed by 20 seconds.
def OutBound(self, data):
if self.delay_count < 3:
if self.data_size > (2 * 1024 * 1024):
self.delay_count += 1
time.sleep(20)
self.data_size += len(data)
return data
self.worker.Initialize(self.options.ssh_port or 9225)
self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083)
def NotestPlatformToolchainOptions(self):
"""Tests the hardened toolchain options."""
self.worker.Initialize(self.options.ssh_port or 9229)
self.worker.PrepareBase(self.base_image_path)
self.assertTrue(self.worker.VerifyImage('platform_ToolchainOptions'))
# TODO(sosa): Get test to work with verbose.
def NotestPartialUpdate(self):
"""Tests what happens if we attempt to update with a truncated payload."""
self.worker.Initialize(self.options.ssh_port or 9230)
# Preload with the version we are trying to test.
self.worker.PrepareBase(self.target_image_path)
# Image can be updated at:
# ~chrome-eng/chromeos/localmirror/autest-images
url = ('http://gsdview.appspot.com/chromeos-localmirror/'
'autest-images/truncated_image.gz')
payload = os.path.join(self.download_folder, 'truncated_image.gz')
# Read from the URL and write to the local file
urllib.urlretrieve(url, payload)
expected_msg = 'download_hash_data == update_check_response_hash failed'
self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
# TODO(sosa): Get test to work with verbose.
def NotestCorruptedUpdate(self):
"""Tests what happens if we attempt to update with a corrupted payload."""
self.worker.Initialize(self.options.ssh_port or 9231)
# Preload with the version we are trying to test.
self.worker.PrepareBase(self.target_image_path)
# Image can be updated at:
# ~chrome-eng/chromeos/localmirror/autest-images
url = ('http://gsdview.appspot.com/chromeos-localmirror/'
'autest-images/corrupted_image.gz')
payload = os.path.join(self.download_folder, 'corrupted.gz')
# Read from the URL and write to the local file
urllib.urlretrieve(url, payload)
# This update is expected to fail...
expected_msg = 'zlib inflate() error:-3'
self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
# --- PRIVATE HELPER FUNCTIONS ---
def _InstallHandlers(self):
"""Installs signal handlers for SIGINT and SIGTERM."""
self._old_sigint = signal.getsignal(signal.SIGINT)
self._old_sigterm = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGINT, partial(self._SigintAndSigtermHandler,
self._old_sigint))
signal.signal(signal.SIGTERM, partial(self._SigintAndSigtermHandler,
self._old_sigterm))
def _RestoreHandlers(self):
"""Restores signal handlers for SIGINT and SIGTERM."""
signal.signal(signal.SIGINT, self._old_sigint)
signal.signal(signal.SIGTERM, self._old_sigterm)
def _SigintAndSigtermHandler(self, original_handler, signum, frame):
"""Common signal handler for SIGINT and SIGTERM.
It tries to clean up allocated resources, and relays the signal to the
original handler.
Args:
original_handler: The original signal handler.
signum: The number of the signal to handle.
frame: Current stack frame. See signal.signal for details on |signum| and
|frame|.
"""
logging.warning('Received signal %d' % signum)
if signum:
# If we've been invoked because of a signal, ignore delivery of that
# signal from this point forward. The invoking context of this method
# restores signal delivery to what it was prior; we suppress future
# delivery till then since this code handles SIGINT/SIGTERM fully
# including delivering the signal to the original handler on the way out.
#
# Mask both SIGINT and SIGTERM so that the cleanup won't be interrupted.
# They will be turned back on once cleanup finishes.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.worker.CleanUp()
if not signals.RelaySignal(original_handler, signum, frame):
logging.warning('Failed to relay signal %d to original handler.' % signum)
sys.exit('Received signal %d.' % signum)