blob: ea5a7187d5a89a6684f9a455c95d62a05a6b60c4 [file] [log] [blame]
# Lint as: python2, python3
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import logging
import os
import re
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.client.cros import cryptohome
from autotest_lib.server.cros.minios import minios_util
from autotest_lib.server.cros.network import wifi_test_context_manager
from autotest_lib.server.cros.update_engine import update_engine_test
class MiniOsTest(update_engine_test.UpdateEngineTest):
"""
Base class that sets up helper objects/functions for NBR tests.
"""
_ETHERNET_LABEL = 'Ethernet'
_MINIOS_CLIENT_CMD = 'minios_client'
_MINIOS_KERNEL_FLAG = 'cros_minios'
# Period to wait for firmware screen in seconds.
# Value based on Brya, which is the slowest so far.
_FIRMWARE_SCREEN_TIMEOUT = 30
# Number of times to attempt booting into MiniOS.
_MINIOS_BOOT_MAX_ATTEMPTS = 3
# Timeout periods, given in seconds.
_MINIOS_SHUTDOWN_TIMEOUT = 30
# Number of seconds to wait for the host to boot into MiniOS. Should always
# be greater than `_FIRMWARE_SCREEN_TIMEOUT`.
_MINIOS_WAIT_UP_TIME_SECONDS = 120
# Version reported to OMAHA/NEBRASKA for recovery.
_RECOVERY_VERSION = '0.0.0.0'
# Files used by the tests.
_DEPENDENCY_DIRS = ['bin', 'lib', 'lib64', 'libexec']
_DEPENDENCY_INSTALL_DIR = '/usr/local'
_MINIOS_TEMP_STATEFUL_DIR = '/usr/local/tmp/stateful'
_STATEFUL_DEV_IMAGE_NAME = 'dev_image_new'
# Wildcards form of '_DEPENDENCY_DIRS' used for robust extraction from
# stateful archive given that the contents of the stateful archive varies
# based on DUT CPU architecture.
_DEPENDENCY_DIRS_PATTERN = ['bin', 'lib*']
# Additional log files to be extracted from MiniOS.
_MESSAGES_LOG = '/var/log/messages'
_MINIOS_LOG = '/var/log/minios.log'
_NET_LOG = '/var/log/net.log'
_UPSTART_LOG = '/var/log/upstart.log'
# MiniOS State values from platform/system_api/dbus/minios/minios.proto.
_MINIOS_STATE_ERROR = 'ERROR'
_MINIOS_STATE_IDLE = 'IDLE'
_MINIOS_STATE_NETWORK_SELECTION = 'NETWORK_SELECTION'
_MINIOS_STATE_NETWORK_CONNECTED = 'CONNECTED'
_MINIOS_STATE_NETWORK_CREDENTIALS = 'NETWORK_CREDENTIALS'
_MINIOS_STATE_RECOVERING = 'RECOVERING'
# Arrays used to build iptables commands.
_DROP_OUTGOING_PACKETS = ['OUTPUT', '-p', 'tcp', '-j', 'DROP']
# Filter DEVSERVER(8082) and GS-CACHE(8888) for when running in the lab.
# Also filter HTTP(80) and HTTPS(443) ports for when running at desk.
_PORTS_TO_FILTER = ['80', '443', '8082', '8888']
_GET_STATE_EXTRACTOR = re.compile(r'State is (\w+)')
def _after_iteration_hook(self, obj):
"""
Save MiniOS logs and reboot back to ChromeOS before running rest of
the after iter hooks. This step is required because the rest of after
iter hooks expect to run in a ChromeOS environment.
@param obj: the test object.
"""
if self._is_running_minios():
# Save log files before rebooting.
self._minios_cleanup()
self._host.reboot()
def initialize(self,
host,
wifi_configs=None,
running_at_desk=None,
skip_provisioning=None,
**kwargs):
"""
Sets default variables for the test.
@param host: The DUT we will be running on.
@param wifi_configs: List containing access point configuration dict and
wifi client configuration dict.
@param running_at_desk: indicates test is run locally from a
workstation.
@param skip_provisioning: indicates test is run locally and provisioning
of inactive partition should be skipped.
"""
super(MiniOsTest, self).initialize(host, **kwargs)
self._nebraska = None
self._use_public_bucket = minios_util.to_bool(running_at_desk)
self._skip_provisioning = minios_util.to_bool(skip_provisioning)
self._servo = host.servo
self._servo.initialize_dut()
self._minios_resultsdir = os.path.join(self.resultsdir, 'minios')
os.mkdir(self._minios_resultsdir)
self._wifi_configs = wifi_configs
self._wifi_context = None
self.register_after_iteration_hook(self._after_iteration_hook)
def warmup(self, host, **kwargs):
"""
Setup up minios autotests.
MiniOS tests can have a failure condition that leaves the current
active partition unbootable. To avoid this having an effect on other
tests in a test suite, we provision and run the current installed
version on the inactive partition to ensure the next test runs
with the correct version of ChromeOS.
Additionally, if wifi_configs is provided, we setup and validate the
wifi context.
@param host: The DUT we will be running on.
@param **kwargs: Dict of key, value settings from command line to passed
on to the wifi context manager.
See `cros.network.WiFiTestContextManager` for relevant arguments.
"""
if not self._skip_provisioning:
self.provision_dut(build_name=self._get_release_builder_path(),
public_bucket=self._use_public_bucket,
is_release_bucket=False)
if self._wifi_configs:
self._setup_wifi_context(host, kwargs)
super(MiniOsTest, self).warmup()
def cleanup(self):
"""Clean up minios autotests."""
# Our wifi context is configured to only run the client and the
# router. Since, we booted into MiniOS and lost the client
# (xmlrpc server) running on the DUT. We therefore cannot use the
# teardown method on the wifi_context. We therefore only close the
# router.
if self._wifi_context:
self._wifi_context.router.close()
# Restore the stateful partition.
self._restore_stateful(public_bucket=self._use_public_bucket)
super(MiniOsTest, self).cleanup()
@property
def wifi_context(self):
"""@return the WiFi context for this test."""
return self._wifi_context
def _minios_cleanup(self):
"""
Perform any cleanup operations before we exit MiniOS.
MiniOS runs purely from ramfs and thus all data is lost when we exit
MiniOS. We therefore need to perform any cleanup functions like grabbing
logs before MiniOS exits. This function should be called just before
rebooting out of MiniOS.
"""
def CopyLog(filename):
try:
self._host.get_file(filename, self._minios_resultsdir)
except error.AutoservRunError:
logging.exception("Failed to copy %s during clean up",
filename)
if self._host:
for filename in (
self._MESSAGES_LOG,
self._NET_LOG,
self._UPDATE_ENGINE_LOG_DIR,
self._UPSTART_LOG,
):
CopyLog(filename)
if self._host.is_file_exists(self._MINIOS_LOG):
CopyLog(self._MINIOS_LOG)
else:
logging.warning("Skipping copy of %s, file not present.",
self._MINIOS_LOG)
if self._nebraska:
nebraska_log_file = os.path.join("/tmp", self._NEBRASKA_LOG)
CopyLog(nebraska_log_file)
self._nebraska = None
def _is_running_minios(self):
"""Returns True if the DUT is booted into MiniOS."""
pattern = r'\b%s\b' % self._MINIOS_KERNEL_FLAG
return re.search(pattern, self._host.get_cmdline())
def _boot_minios(self):
"""Boot the DUT into MiniOS."""
# Turn off usbkey to avoid booting into usb-recovery image.
self._servo.switch_usbkey('off')
psc = self._servo.get_power_state_controller()
psc.power_off()
psc.power_on(psc.REC_ON)
self._host.test_wait_for_shutdown(self._MINIOS_SHUTDOWN_TIMEOUT)
logging.info('Waiting for firmware screen')
time.sleep(self._FIRMWARE_SCREEN_TIMEOUT)
# Attempt multiple times to boot into MiniOS. If all attempts fail then
# this is some kind of firmware issue. Since we failed to boot an OS use
# servo to reset the unit and then report test failure.
attempts = 0
minios_is_up = False
while not minios_is_up and attempts < self._MINIOS_BOOT_MAX_ATTEMPTS:
# Use Ctrl+R shortcut to boot 'MiniOS
logging.info('Try to boot MiniOS')
self._servo.ctrl_r()
minios_is_up = self._host.wait_up(
timeout=self._MINIOS_WAIT_UP_TIME_SECONDS,
host_is_down=True)
attempts += 1
if minios_is_up:
# If mainfw_type is recovery then we are in MiniOS.
mainfw_type = self._host.run_output('crossystem mainfw_type')
if mainfw_type != 'recovery':
raise error.TestError(
'Boot to MiniOS - invalid firmware: %s.' % mainfw_type)
# There are multiple types of recovery images, make sure we booted
# into minios.
if not self._is_running_minios():
raise error.TestError(
'Boot to MiniOS - recovery image is not minios.')
else:
# Try to not leave unit on recovery firmware screen.
self._host.power_cycle()
raise error.TestError('Boot to MiniOS failed.')
def _create_minios_hostlog(self):
"""Create the minios hostlog file.
To ensure the recovery was successful we need to compare the update
events against expected update events. This function creates the hostlog
for minios before the recovery reboots the DUT.
"""
# Check that update logs exist.
if len(self._get_update_engine_logs()) < 1:
err_msg = 'update_engine logs are missing. Cannot verify recovery.'
raise error.TestFail(err_msg)
# Download the logs instead of reading it over the network since it will
# disappear after MiniOS reboots the DUT.
logfile = os.path.join(self.resultsdir, 'minios_update_engine.log')
self._host.get_file(self._UPDATE_ENGINE_LOG, logfile)
logfile_content = None
with open(logfile) as f:
logfile_content = f.read()
minios_hostlog = os.path.join(self.resultsdir, 'hostlog_minios')
with open(minios_hostlog, 'w') as fp:
# There are four expected hostlog events during recovery.
extract_logs = self._extract_request_logs(logfile_content)
json.dump(extract_logs[-4:], fp)
return minios_hostlog
def _install_test_dependencies(self, public_bucket=False):
"""
Install test dependencies from a downloaded stateful archive.
@param public_bucket: True to download stateful from a public bucket.
"""
statefuldev_url = self._get_stateful_url(public_bucket)
logging.info('Installing dependencies from %s', statefuldev_url)
# Create destination directories.
minios_dev_image_dir = os.path.join(self._MINIOS_TEMP_STATEFUL_DIR,
self._STATEFUL_DEV_IMAGE_NAME)
install_dirs = [
os.path.join(self._DEPENDENCY_INSTALL_DIR, dir)
for dir in self._DEPENDENCY_DIRS
]
self._run(['mkdir', '-p', minios_dev_image_dir] + install_dirs)
# Symlink the install dirs into the staging destination.
for dir in install_dirs:
self._run(['ln', '-s', dir, minios_dev_image_dir])
# Generate the list of stateful archive members that we want to extract.
members = [
os.path.join(self._STATEFUL_DEV_IMAGE_NAME, dir)
for dir in self._DEPENDENCY_DIRS_PATTERN
]
try:
self._download_and_extract_stateful(statefuldev_url,
self._MINIOS_TEMP_STATEFUL_DIR,
members=members,
keep_symlinks=True,
wildcards=True)
except error.AutoservRunError as e:
err_str = 'Failed to install the test dependencies'
raise error.TestFail('%s: %s' % (err_str, str(e)))
self._setup_python_symlinks()
# Clean-up unused files to save memory.
self._run(['rm', '-rf', self._MINIOS_TEMP_STATEFUL_DIR])
def _setup_python_symlinks(self):
"""
Create symlinks in the root to point to all python paths in /usr/local
for stateful installed python to work. This is needed because Gentoo
creates wrappers with hardcoded paths to the root (e.g. python-exec).
"""
for path in self._DEPENDENCY_DIRS:
self._run([
'find',
os.path.join(self._DEPENDENCY_INSTALL_DIR, path),
'-maxdepth', '1', '\(', '-name', 'python*', '-o', '-name',
'portage', '\)', '-exec', 'ln', '-s', '{}',
os.path.join('/usr', path), '\;'
])
def _start_nebraska(self, payload_url=None):
"""
Initialize and start nebraska on the DUT.
@param payload_url: The payload to served by nebraska.
"""
if not self._nebraska:
self._nebraska = minios_util.NebraskaService(
self, self._host, payload_url)
self._nebraska.start()
def _verify_reboot(self, old_boot_id):
"""
Verify that the unit rebooted using the boot_id.
@param old_boot_id A boot id value obtained before the
reboot.
"""
self._host.test_wait_for_shutdown(self._MINIOS_SHUTDOWN_TIMEOUT)
self._host.test_wait_for_boot(old_boot_id)
self._should_restore_stateful = True
def _drop_download_traffic(self):
"""
Insert Iptables rules to drop outgoing HTTP(S) packets. This simulates
a dropped network connection.
"""
iptables_add_rule = ['iptables', '-I']
for port in self._PORTS_TO_FILTER:
self._run(iptables_add_rule + self._DROP_OUTGOING_PACKETS +
['--dport', port])
def _restore_download_traffic(self):
"""
Attempt to remove Iptables rules that drop outgoing HTTP(S) packets if
any. This simulates restoration of a dropped network connection.
"""
iptables_delete_rule = ['iptables', '-D']
for port in self._PORTS_TO_FILTER:
self._run(iptables_delete_rule + self._DROP_OUTGOING_PACKETS +
['--dport', port],
ignore_status=True)
def _next_screen(self):
"""
Advance MiniOS recovery to the next screen (next step).
"""
self._run([self._MINIOS_CLIENT_CMD, '--next_screen'])
def _prev_screen(self):
"""
Advance MiniOS recovery to the previous screen (previous step).
"""
self._run([self._MINIOS_CLIENT_CMD, '--prev_screen'])
def _set_network_credentials(self, network_name, network_password=None):
"""
Set the network and optional password that MiniOS should connect to.
@param network_name: The name of the network to connect to for recovery.
@param network_password: Optional password for the network.
"""
cmd = [
self._MINIOS_CLIENT_CMD, '--set_credentials',
'--network_name=%s' % network_name
]
if network_password:
cmd += ['--network_password=%s' % network_password]
logging.info('Setting network credentials for %s.', network_name)
self._run(cmd)
def _get_minios_state(self):
"""
Get the current state of MiniOS from the command
'minios_client --get_state'.
@return The string value of the current MiniOS state.
"""
result = self._run([self._MINIOS_CLIENT_CMD,
'--get_state']).stderr.rstrip()
state_match = self._GET_STATE_EXTRACTOR.search(result)
return state_match.group(1) if state_match else None
def _validate_minios_state(self, expected_state):
"""
Check if MiniOS is in the expected state.
@param expected_state: The expected state that we want to validate.
"""
state = self._get_minios_state()
if expected_state != state:
raise error.TestFail('%s not found. State is %s' %
(expected_state, state))
def _wait_for_minios_state(self, state_to_wait_for, timeout=3600):
"""
Wait for the MiniOS to reach a certain state.
@param state_to_wait_for: The MiniOS state to wait for.
@param timeout: How long to wait before giving in seconds.
"""
expiration = time.time() + timeout
while True:
state = self._get_minios_state()
if state_to_wait_for == state:
break
time.sleep(1)
if time.time() > expiration:
raise error.TestFail('MiniOS did not achieve state: %s before'
'timeout. Current state: %s.' %
(state_to_wait_for, state))
def _setup_wifi_context(self, host, cmdline_args):
"""
Setup Wifi context with a router but no packet capture or attenuator.
@param host: The DUT that will be using the wifi context.
@param cmdline_args: Dict of key, value settings from command line to
be passed on to the wifi context manager.
See `cros.network.WiFiTestContextManager` for relevant arguments.
"""
logging.debug('Running MiniOS wifi test with arguments: %r.',
cmdline_args)
self._wifi_context = wifi_test_context_manager.WiFiTestContextManager(
self.__class__.__name__,
host,
cmdline_args,
self.debugdir)
self._wifi_context.setup(include_pcap=False, include_attenuator=False)
msg = '======= MiniOS wifi setup complete. Starting test... ======='
self._wifi_context.client.shill_debug_log(msg)
def _configure_network_for_test(self):
"""
Configures the appropriate network for the test.
For tests that do not use Wifi, this is a no-op.
For tests that use Wifi, we setup the AP and verify that we can connect
to it.
@return a tuple containing the network name and network password if any.
"""
# Not a wifi test, thus no-op.
if not self._wifi_configs:
return (MiniOsTest._ETHERNET_LABEL, None)
for ap_config, client_conf in self._wifi_configs:
client_conf.ssid = self._configure_and_connect_to_ap(ap_config)
return (client_conf.ssid, client_conf.security_config.psk)
def _configure_and_connect_to_ap(self, ap_config):
"""
Configure the router as an AP with the given config and connect
the DUT to it.
@param ap_config HostapConfig object.
@return ssid of the configured AP.
"""
if not self._wifi_context:
raise error.TestError(
'Cannot configure access point - no wifi context provided.')
self._wifi_context.configure(ap_config)
ap_ssid = self._wifi_context.router.get_ssid()
assoc_params = xmlrpc_datatypes.AssociationParameters(
ssid=ap_ssid, security_config=ap_config.security_config)
self._wifi_context.assert_connect_wifi(assoc_params)
return ap_ssid
def _verify_device_ownership(self, owned):
"""Verify DUT ownership, fail if ownership does not match `owned`"""
login_status = self._host.run_output(cryptohome.CRYPTOHOME_CMD +
' --action=get_login_status')
match = re.search('%s: (true|false)' % cryptohome.OWNER_EXISTS,
login_status)
if not match:
raise error.TestFail('Invalid login status: "%s".' % login_status)
device_owned_status = match.group(1) == 'true'
if device_owned_status != owned:
raise error.TestFail(
'Unexpeced ownership, expected=%s, actual=%s.' %
(owned, device_owned_status))