blob: ac44b6bdb6e8bb2112ff853b21758478f8d32c0c [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import common
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils as client_utils
from autotest_lib.server.cros.storage import storage_validate as storage
from autotest_lib.server.cros.servo.keyboard import servo_keyboard_flasher
from autotest_lib.server.cros.repair import mac_address_helper
from autotest_lib.site_utils.admin_audit import base
from autotest_lib.site_utils.admin_audit import constants
from autotest_lib.site_utils.admin_audit import rpm_validator
from autotest_lib.site_utils.admin_audit import servo_updater
try:
from autotest_lib.utils.frozen_chromite.lib import metrics
except ImportError:
metrics = client_utils.metrics_mock
# Common status used for statistics.
STATUS_FAIL = 'fail'
STATUS_SUCCESS = 'success'
STATUS_SKIPPED = 'skipped'
class VerifyDutStorage(base._BaseDUTVerifier):
"""Verify the state of the storage on the DUT
The process to determine the type of storage and read metrics
of usage and EOL(end-of-life) information to determine the
state.
Supported storage types: MMS, NVME, SSD.
Possible states are:
UNKNOWN - not access to the DUT, not determine type of storage,
not information to determine metrics
NORMAL - the storage is in good shape and will work stable
device will work stable. (supported for all types)
ACCEPTABLE - the storage almost used all resources, device will
work stable but it is better be ready for replacement
device will work stable. (supported by MMS, NVME)
NEED_REPLACEMENT - the storage broken or worn off the life limit
device can work by not stable and can cause the
flakiness on the tests. (supported by all types)
"""
def __init__(self, dut_host):
super(VerifyDutStorage, self).__init__(dut_host)
self._state = None
def _verify(self, set_label=True, run_badblocks=None):
if not self.host_is_up():
logging.info('Host is down; Skipping the verification')
return
try:
validator = storage.StorageStateValidator(self.get_host())
storage_type = validator.get_type()
logging.debug('Detected storage type: %s', storage_type)
storage_state = validator.get_state(run_badblocks=run_badblocks)
logging.debug('Detected storage state: %s', storage_state)
state = self.convert_state(storage_state)
if state and set_label:
self._set_host_info_state(constants.DUT_STORAGE_STATE_PREFIX,
state)
if state == constants.HW_STATE_NEED_REPLACEMENT:
self.get_host().set_device_needs_replacement(
resultdir=self.get_result_dir())
self._state = state
except Exception as e:
raise base.AuditError('Exception during getting state of'
' storage %s' % str(e))
def convert_state(self, state):
"""Mapping state from validator to verifier"""
if state == storage.STORAGE_STATE_NORMAL:
return constants.HW_STATE_NORMAL
if state == storage.STORAGE_STATE_WARNING:
return constants.HW_STATE_ACCEPTABLE
if state == storage.STORAGE_STATE_CRITICAL:
return constants.HW_STATE_NEED_REPLACEMENT
return None
def get_state(self):
return self._state
class VerifyServoUsb(base._BaseServoVerifier):
"""Verify the state of the USB-drive on the Servo
The process to determine by checking the USB-drive on having any
bad sectors on it.
Possible states are:
UNKNOWN - not access to the device or servo, not available
software on the servo.
NORMAL - the device available for testing and not bad sectors.
was found on it, device will work stable
NEED_REPLACEMENT - the device available for testing and
some bad sectors were found on it. The device can
work but cause flakiness in the tests or repair process.
badblocks errors:
No such device or address while trying to determine device size
"""
def _verify(self):
if not self.servo_is_up():
logging.info('Servo not initialized; Skipping the verification')
return
try:
usb = self.get_host()._probe_and_validate_usb_dev()
logging.debug('USB path: %s', usb)
except Exception as e:
usb = ''
logging.debug('(Not critical) %s', e)
if not usb:
self._set_state(constants.HW_STATE_NOT_DETECTED)
return
# basic readonly check
# path to USB if DUT is sshable
logging.info('Starting verification of USB drive...')
dut_usb = None
if self.host_is_up():
dut_usb = self._usb_path_on_dut()
state = None
try:
if dut_usb:
logging.info('Try run check on DUT side.')
state = self._run_check_on_host(self._dut_host, dut_usb)
else:
logging.info('Try run check on ServoHost side.')
servo = self.get_host().get_servo()
servo_usb = servo.probe_host_usb_dev()
state = self._run_check_on_host(self.get_host(), servo_usb)
except Exception as e:
if 'Timeout encountered:' in str(e):
logging.info('Timeout during running action')
metrics.Counter(
'chromeos/autotest/audit/servo/usb/timeout'
).increment(fields={'host': self._dut_host.hostname})
else:
# badblocks generate errors when device not reachable or
# cannot read system information to execute process
state = constants.HW_STATE_NEED_REPLACEMENT
logging.debug(str(e))
self._set_state(state)
logging.info('Finished verification of USB drive.')
self._install_stable_image()
def _usb_path_on_dut(self):
"""Return path to the USB detected on DUT side."""
servo = self.get_host().get_servo()
servo.switch_usbkey('dut')
result = self._dut_host.run('ls /dev/sd[a-z]')
for path in result.stdout.splitlines():
cmd = ('. /usr/share/misc/chromeos-common.sh; get_device_type %s' %
path)
check_run = self._dut_host.run(cmd, timeout=30, ignore_status=True)
if check_run.stdout.strip() != 'USB':
continue
if self._quick_check_if_device_responsive(self._dut_host, path):
logging.info('USB drive detected on DUT side as %s', path)
return path
return None
def _quick_check_if_device_responsive(self, host, usb_path):
"""Verify that device """
validate_cmd = 'fdisk -l %s' % usb_path
try:
resp = host.run(validate_cmd, ignore_status=True, timeout=30)
if resp.exit_status == 0:
return True
logging.error('USB %s is not detected by fdisk!', usb_path)
except error.AutoservRunError as e:
if 'Timeout encountered' in str(e):
logging.warning('Timeout encountered during fdisk run.')
else:
logging.error('(Not critical) fdisk check fail for %s; %s',
usb_path, str(e))
return False
def _run_check_on_host(self, host, usb):
"""Run badblocks on the provided host.
@params host: Host where USB drive mounted
@params usb: Path to USB drive. (e.g. /dev/sda)
"""
command = 'badblocks -w -e 5 -b 4096 -t random %s' % usb
logging.info('Running command: %s', command)
# The response is the list of bad block on USB.
# Extended time for 2 hour to run USB verification.
# TODO (otabek@) (b:153661014#comment2) bring F3 to run
# check faster if badblocks cannot finish in 2 hours.
result = host.run(command, timeout=7200).stdout.strip()
logging.info("Check result: '%s'", result)
if result:
# So has result is Bad and empty is Good.
return constants.HW_STATE_NEED_REPLACEMENT
return constants.HW_STATE_NORMAL
def _install_stable_image(self):
"""Install stable image to the USB drive."""
# install fresh image to the USB because badblocks formats it
# https://crbug.com/1091406
try:
logging.debug('Started to install test image to USB-drive')
_, image_path = self._dut_host.stage_image_for_servo()
self.get_host().get_servo().image_to_servo_usb(image_path,
power_off_dut=False)
logging.debug('Finished installing test image to USB-drive')
except:
# ignore any error which happined during install image
# it not relative to the main goal
logging.info('Fail to install test image to USB-drive')
def _set_state(self, state):
if state:
self._set_host_info_state(constants.SERVO_USB_STATE_PREFIX, state)
class VerifyServoFw(base._BaseServoVerifier):
"""Force update Servo firmware if it not up-to-date.
This is rarely case when servo firmware was not updated by labstation
when servod started. This should ensure that the servo_v4 and
servo_micro is up-to-date.
"""
def _verify(self):
if not self.servo_host_is_up():
logging.info('Servo host is down; Skipping the verification')
return
servo_updater.update_servo_firmware(
self.get_host(),
force_update=True)
class VerifyRPMConfig(base._BaseDUTVerifier):
"""Check RPM config of the setup.
This check run against RPM configs settings.
"""
def _verify(self):
if not self.host_is_up():
logging.info('Host is down; Skipping the verification')
return
rpm_validator.verify_unsafe(self.get_host())
class FlashServoKeyboardMapVerifier(base._BaseDUTVerifier):
"""Flash the keyboard map on servo."""
def _verify(self):
if not self.host_is_up():
raise base.AuditError('Host is down')
if not self.servo_is_up():
raise base.AuditError('Servo not initialized')
host = self.get_host()
flasher = servo_keyboard_flasher.ServoKeyboardMapFlasher()
if flasher.is_image_supported(host):
flasher.update(host)
class VerifyDUTMacAddress(base._BaseDUTVerifier):
"""Verify and update cached NIC mac address on servo.
Servo_v4 plugged to the DUT and providing NIC for that. We caching mac
address on servod side to better debugging.
"""
def _verify(self):
if not self.host_is_up():
raise base.AuditError('Host is down.')
if not self.servo_is_up():
raise base.AuditError('Servo host is down.')
helper = mac_address_helper.MacAddressHelper()
helper.update_if_needed(self.get_host())