| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Wrapper test to run verification on a labstation.""" |
| |
| import json |
| import logging |
| import os |
| import re |
| import time |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.server import test |
| from autotest_lib.server import utils as server_utils |
| from autotest_lib.server.hosts import servo_host as _servo_host |
| from autotest_lib.server.hosts import servo_constants |
| from autotest_lib.server.hosts import factory |
| |
| class servo_LabstationVerification(test.test): |
| """Wrapper test to run verifications on a labstation image. |
| |
| This test verifies basic servod behavior on the host supplied to it e.g. |
| that servod can start etc, before inferring the DUT attached to the servo |
| device, and running more comprehensive servod tests by using a full |
| cros_host and servo_host setup. |
| """ |
| version = 1 |
| |
| UL_BIT_MASK = 0x2 |
| |
| # Regex to match ipv4 byte. |
| IPV4_RE_BLOCK = r'(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])' |
| |
| # Full regex to match an ipv4 with optional subnet mask. |
| RE_IPV4 = re.compile(r'^(%(block)s\.){3}(%(block)s)(/\d+)?$' % |
| {'block':IPV4_RE_BLOCK}) |
| |
| # Timeout in seconds to wait after cold_reset before attempting to ping |
| # again. This includes a potential fw screen (30s), and some buffer |
| # for the network. |
| RESET_TIMEOUT_S = 60 |
| |
| def get_servo_mac(self, servo_proxy): |
| """Given a servo's serial retrieve ethernet port mac address. |
| |
| @param servo_proxy: proxy to talk to servod |
| |
| @returns: mac address of the ethernet port as a string |
| @raises: error.TestError: if mac address cannot be inferred |
| """ |
| # TODO(coconutruben): once mac address retrieval through v4 is |
| # implemented remove these lines of code, and replace with |
| # servo_v4_eth_mac. |
| try: |
| serial = servo_proxy.get('support.serialname') |
| if serial == 'unknown': |
| serial = servo_proxy.get('serialname') |
| except error.TestFail as e: |
| if 'No control named' in e: |
| serial = servo_proxy.get('serialname') |
| else: |
| raise e |
| ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), |
| 'serial_to_mac_map.json') |
| with open(ctrl_path, 'r') as f: |
| serial_mac_map = json.load(f) |
| if not serial in serial_mac_map: |
| raise error.TestError('Unable to retrieve mac address for ' |
| 'serial %s' % serial) |
| return str(serial_mac_map[serial]) |
| |
| def _flip_UL_bit(self, byte): |
| """Helper to flip the Universal/Local bit in a given byte. |
| |
| For some IPv6's extended unique identifier (EUI) 64 calculation |
| part of the logic is to flip the U/L bit on the first byte. |
| |
| Note: it is the callers responsibility to ensure that |byte| is |
| only one byte. This function will just flip the 7th bit of whatever |
| is supplied and return that. |
| |
| @param byte: the byte to flip |
| |
| @returns: |byte| with it's U/L bit flipped. |
| """ |
| return byte ^ self.UL_BIT_MASK |
| |
| def _from_mac_to_ipv6_eui_64(self, mac): |
| """Convert a MAC address (IEEE EUI48) to a IEEE EUI64 node component. |
| |
| This follows guidelines to convert a mac address to an IPv6 node |
| component by |
| - splitting the mac into two parts |
| - inserting 0xfffe in between the two parts |
| - flipping the U/L bit on the first byte |
| |
| @param mac: string containing the mac address |
| |
| @returns: string containing the IEEE EUI64 node component to |mac| |
| """ |
| mac_bytes = [b.lower() for b in mac.split(':')] |
| # First, flip the 7th bit again. This converts the string coming from |
| # the mac (as it's a hex) into an int, flips it, before casting it back |
| # to a hex as is expected for the mac address. |
| mac_bytes[0] = hex(self._flip_UL_bit(int(mac_bytes[0],16)))[2:] |
| mac_bytes = (mac_bytes[:3] + ['ff', 'fe'] + mac_bytes[-3:]) |
| ipv6_components = [] |
| while mac_bytes: |
| # IPv6 has two bytes between : |
| ipv6_components.append('%s%s' % (mac_bytes.pop(0), |
| mac_bytes.pop(0))) |
| # Lastly, remove the leading 0s to have a well formatted concise IPv6. |
| return ':'.join([c.lstrip('0') for c in ipv6_components]) |
| |
| def _mac_to_ipv6_addr(self, mac, ipv6_network_component): |
| """Helper to generate an IPv6 address given network component and mac. |
| |
| @param mac: the mac address of the target network interface |
| @param ipv6_network_component: prefix + subnet id portion of IPv6 [:64] |
| |
| @returns: an IPv6 address that could be used to target the network |
| interface at |mac| if it's on the same network as the network |
| component indicates |
| """ |
| # Do not add an extra/miss a ':' when glueing both parts together. |
| glue = '' if ipv6_network_component[-1] == ':' else ':' |
| return '%s%s%s' % (ipv6_network_component, glue, |
| self._from_mac_to_ipv6_eui_64(mac)) |
| |
| def _from_ipv6_to_mac_address(self, ipv6): |
| """Given an IPv6 address retrieve the mac address. |
| |
| Assuming the address at |ipv6| followed the conversion standard layed |
| out at _from_mac_to_ipv6_eui_64() above, this helper does the inverse. |
| |
| @param ipv6: full IPv6 address to extract the mac address from |
| |
| @returns: mac address extracted from node component as a string |
| """ |
| # The node component i.e. the one holding the mac info is the 64 bits. |
| components = ipv6.split(':')[-4:] |
| # This is reversing the EUI 64 logic. |
| mac_bytes = [] |
| for component in components: |
| # Expand the components fully again. |
| full_component = component.rjust(4,'0') |
| # Mac addresses use one byte components as opposed to the two byte |
| # ones for IPv6 - split them up. |
| mac_bytes.extend([full_component[:2], full_component[2:]]) |
| # First, flip the 7th bit again. |
| mac_bytes[0] = self._flip_UL_bit(mac_bytes[0]) |
| # Second, remove the 0xFFFE bytes inserted in the middle again. |
| mac_bytes = mac_bytes[:3] + mac_bytes[-3:] |
| return ':'.join([c.lower() for c in mac_bytes]) |
| |
| def _build_ssh_cmd(self, hostname, cmd): |
| """Build the ssh command to run |cmd| via bash on |hostname|. |
| |
| @param hostname: hostname/ip where to run the cmd on |
| @param cmd: cmd on hostname to run |
| |
| @returns: ssh command to run |
| """ |
| ssh_cmd = [r'ssh', '-q', '-o', 'StrictHostKeyChecking=no', |
| r'-o', 'UserKnownHostsFile=/dev/null', |
| r'root@%s' % hostname, |
| r'"%s"' % cmd] |
| return ' '.join(ssh_cmd) |
| |
| def _ip_info_from_host(self, host, ip, info, host_name): |
| """Retrieve some |info| related to |ip| from host on |ip|. |
| |
| @param host: object that implements 'run', where the command |
| will be executed form |
| @param ip: ip address to run on and to filter for |
| @param info: one of 'ipv4' or 'dev' |
| @param host_name: executing host's name, for error message |
| |
| @returns: ipv4 associated on the same nic as |ip| if |info|== 'ipv4' |
| nic dev name associated with |ip| if |info|== 'dev' |
| |
| @raises error.TestError: if output of 'ip --brief addr' is unexpected |
| @raises error.TestError: info not in ['ipv4', 'dev'] |
| """ |
| if info not in ['ipv4', 'dev']: |
| raise error.TestFail('Cannot retrieve info %r', info) |
| ip_stub = r"ip --brief addr | grep %s" % ip |
| cmd = self._build_ssh_cmd(ip, ip_stub) |
| logging.info('command to find %s on %s: %s', info, host_name, cmd) |
| # The expected output here is of the form: |
| # [net device] [UP/DOWN] [ipv4]/[subnet mask] [ipv6]/[subnet mask]+ |
| try: |
| output = host.run(cmd).stdout.strip() |
| except (error.AutoservRunError, error.CmdError) as e: |
| logging.error(str(e)) |
| raise error.TestFail('Failed to retrieve %s on %s' % (info, ip)) |
| logging.debug('ip raw output: %s', output) |
| components = output.split() |
| if info == 'ipv4': |
| # To be safe, get all IPs, and subsequently report the first ipv4 |
| # found. |
| raw_ips = components[2:] |
| for raw_ip in raw_ips: |
| if re.match(self.RE_IPV4, raw_ip): |
| ret = raw_ip.split('/')[0] |
| logging.info('ipv4 found: %s', ret) |
| break |
| else: |
| raise error.TestFail('No ipv4 address found in ip command: %s' % |
| ', '.join(raw_ips)) |
| if info == 'dev': |
| ret = components[0] |
| logging.info('dev found: %s', ret) |
| return ret |
| |
| def get_dut_on_servo_ip(self, servo_host_proxy): |
| """Retrieve the IPv4 IP of the DUT attached to a servo. |
| |
| Note: this will reboot the DUT if it fails initially to get the IP |
| Note: for this to work, servo host and dut have to be on the same subnet |
| |
| @param servo_host_proxy: proxy to talk to the servo host |
| |
| @returns: IPv4 address of DUT attached to servo on |servo_host_proxy| |
| |
| @raises error.TestError: if the ip cannot be inferred |
| """ |
| # Note: throughout this method, sh refers to servo host, dh to DUT host. |
| # Figure out servo hosts IPv6 address that's based on its mac address. |
| servo_proxy = servo_host_proxy._servo |
| sh_ip = server_utils.get_ip_address(servo_host_proxy.hostname) |
| sh_nic_dev = self._ip_info_from_host(servo_host_proxy, sh_ip, 'dev', |
| 'servo host') |
| addr_cmd ='cat /sys/class/net/%s/address' % sh_nic_dev |
| sh_dev_addr = servo_host_proxy.run(addr_cmd).stdout.strip() |
| logging.debug('Inferred Labstation MAC to be: %s', sh_dev_addr) |
| sh_dev_ipv6_stub = self._from_mac_to_ipv6_eui_64(sh_dev_addr) |
| # This will get us the IPv6 address that uses the mac address as node id |
| cmd = (r'ifconfig %s | grep -oE "([0-9a-f]{0,4}:){4}%s"' % |
| (sh_nic_dev, sh_dev_ipv6_stub)) |
| servo_host_ipv6 = servo_host_proxy.run(cmd).stdout.strip() |
| logging.debug('Inferred Labstation IPv6 to be: %s', servo_host_ipv6) |
| # Figure out DUTs expected IPv6 address |
| # The network component should be shared between the DUT and the servo |
| # host as long as they're on the same subnet. |
| network_component = ':'.join(servo_host_ipv6.split(':')[:4]) |
| dut_ipv6 = self._mac_to_ipv6_addr(self.get_servo_mac(servo_proxy), |
| network_component) |
| logging.info('Inferred DUT IPv6 to be: %s', dut_ipv6) |
| # Dynamically generate the correct shell-script to retrieve the ipv4. |
| try: |
| server_utils.run('ping -6 -c 1 -w 35 %s' % dut_ipv6) |
| except error.CmdError: |
| # If the DUT cannot be pinged, then try to reset it and try to |
| # ping again. |
| logging.info('Failed to ping DUT on ipv6: %s. Cold resetting', |
| dut_ipv6) |
| servo_proxy._power_state.reset() |
| time.sleep(self.RESET_TIMEOUT_S) |
| dut_ipv4 = None |
| try: |
| # Pass |server_utils| here as it implements the same interface |
| # as a host to run things locally i.e. on the autoserv runner. |
| dut_ipv4 = self._ip_info_from_host(server_utils, dut_ipv6, 'ipv4', |
| 'autoserv') |
| return dut_ipv4 |
| except error.TestFail: |
| logging.info('Failed to retrieve the DUT ipv4 directly. ' |
| 'Going to attempt to tunnel request through ' |
| 'labstation and forgive the error for now.') |
| # Lastly, attempt to run the command from the labstation instead |
| # to guard against networking issues. |
| dut_ipv4 = self._ip_info_from_host(servo_host_proxy, dut_ipv6, 'ipv4', |
| 'autoserv') |
| return dut_ipv4 |
| |
| def _set_dut_stable_version(self, dut_host, stable_version=None): |
| """Helper method to set stable_version in DUT host. |
| |
| @param dut_host: CrosHost object representing the DUT. |
| """ |
| if not stable_version: |
| stable_version = self.cros_version |
| logging.info('Setting stable_version to %s for DUT %s.', |
| stable_version, dut_host.hostname) |
| host_info = dut_host.host_info_store.get() |
| host_info.stable_versions['cros'] = self.cros_version |
| dut_host.host_info_store.commit(host_info) |
| |
| def _get_dut_info_from_config(self): |
| """Get DUT info from json config file. |
| |
| @returns a list of dicts that each dict represents a dut. |
| """ |
| ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), |
| 'labstation_to_dut_map.json') |
| with open(ctrl_path, 'r') as f: |
| data = json.load(f, object_hook=self._byteify) |
| # create a default dut dict in case the servohost is not in config |
| # map, this is normally happened in local testing. |
| default_dut = { |
| 'hostname': None, |
| 'servo_port': '9999', |
| 'servo_serial': None |
| } |
| return data.get(self.labstation_host.hostname, [default_dut]) |
| |
| def _byteify(self, data, ignore_dicts=False): |
| """Helper method to convert unicode to string. |
| """ |
| if isinstance(data, unicode): |
| return data.encode('utf-8') |
| if isinstance(data, list): |
| return [self._byteify(item, ignore_dicts=True) for item in data] |
| if isinstance(data, dict) and not ignore_dicts: |
| return { |
| self._byteify(key, ignore_dicts=True): |
| self._byteify(value, ignore_dicts=True) |
| for key, value in data.iteritems() |
| } |
| return data |
| |
| def _setup_servod(self): |
| """Setup all servod instances under servohost for later testing. |
| """ |
| for dut in self.dut_list: |
| # Use board: nami as default for local testing. |
| board = dut.get('board', 'nami') |
| port = dut.get('servo_port') |
| serial = dut.get('servo_serial') |
| servo_args = { |
| servo_constants.SERVO_HOST_ATTR: |
| self.labstation_host.hostname, |
| servo_constants.SERVO_PORT_ATTR: port, |
| servo_constants.SERVO_SERIAL_ATTR: serial, |
| servo_constants.SERVO_BOARD_ATTR: board, |
| servo_constants.SERVO_SETUP_ATTR: |
| servo_constants.SERVO_SETUP_VALUE_DUAL_V4, |
| 'is_in_lab': False, |
| } |
| |
| logging.info('Setting up servod for port %s', port) |
| servo_host, servo_state = _servo_host.create_servo_host( |
| None, servo_args) |
| try: |
| validate_cmd = 'servodutil show -p %s' % port |
| servo_host.run_grep(validate_cmd, |
| stdout_err_regexp='No servod scratch entry found.') |
| except error.AutoservRunError: |
| raise error.TestFail('Servod of port %s did not come up on' |
| ' labstation.' % port) |
| |
| self.servo_hosts.append(servo_host) |
| |
| def setup_hosts(self): |
| """Prepare all cros and servo hosts that need to run.""" |
| # Servod came up successfully at this point - build a ServoHost and |
| # CrosHost for later testing to verfiy servo functionality. |
| |
| for dut_info, servo_host in zip(self.dut_list, self.servo_hosts): |
| dut_hostname = dut_info.get('hostname') |
| if not dut_hostname: |
| # TODO(coconutruben@): remove this statement once the inferring |
| # is the default. |
| logging.info('hostname not specified for DUT, through ' |
| 'static config or command-line. Will attempt ' |
| 'to infer through hardware address.') |
| dut_hostname = self.get_dut_on_servo_ip(servo_host) |
| logging.info('Running the DUT side on DUT %r', dut_hostname) |
| dut_host = factory.create_host(dut_hostname) |
| dut_host.set_servo_host(servo_host) |
| |
| # Copy labstation's stable_version to dut_host for later test |
| # consume. |
| # TODO(xianuowang@): remove this logic once we figured out how to |
| # propagate DUT's stable_version to the test. |
| stable_version_from_config = dut_info.get('stable_version') |
| self._set_dut_stable_version(dut_host, stable_version_from_config) |
| # Store |dut_host| in |machine_dict| so that parallel running can |
| # find the host. |
| self.machine_dict[dut_host.hostname] = dut_host |
| |
| def initialize(self, host, config=None, local=False): |
| """Setup servod on |host| to run subsequent tests. |
| |
| @param host: LabstationHost object representing the servohost. |
| @param config: the args argument from test_that in a dict. |
| @param local: whether a test image is already on the usb stick. |
| """ |
| # Cache whether this is a local run or not. |
| self.local = local |
| # This list hosts the servo_hosts, in the same order as the |dut_list| |
| # below. |
| self.servo_hosts = [] |
| # This dict houses a mapping of |dut| hostnames to initialized cros_host |
| # objects for the tests to run. |
| self.machine_dict = {} |
| # Save the host. |
| self.labstation_host = host |
| # Make sure recovery is quick in case of failure. |
| self.job.fast = True |
| # Get list of duts under the servohost. |
| self.dut_list = self._get_dut_info_from_config() |
| # Setup servod for all duts. |
| self._setup_servod() |
| # We need a cros build number for testing download image to usb and |
| # use servo to reimage DUT purpose. So copying labstation's |
| # stable_version here since we don't really care about which build |
| # to install on the DUT. |
| self.cros_version = ( |
| self.labstation_host.host_info_store.get().cros_stable_version) |
| |
| if config: |
| if 'dut_ip' in config: |
| # Retrieve DUT ip from args if caller specified it. |
| # |dut_ip| is special in that it can be used for (quick) setup |
| # testing if the setup is not in the configuration file. |
| # This has two implications: |
| # - the user can only test one dut/servo pair |
| # - the config has to be empty. |
| # TODO(coconutruben): remove this logic for a more holistic |
| # command-line overwrite solution. |
| if len(self.dut_list) == 1 and not self.dut_list[0]['hostname']: |
| self.dut_list[0]['hostname'] = config['dut_ip'] |
| logging.info('Setting the hostname of the only dut to %s.', |
| self.dut_list[0]['hostname']) |
| else: |
| logging.info('dut_ip %s will be ignored. The target ' |
| 'labstation is to be part of static config.') |
| if 'cros_version' in config: |
| # We allow user to override a cros image build. |
| self.cros_version = config['cros_version'] |
| # Lastly, setup the hosts so that testing can occur in parallel. |
| self.setup_hosts() |
| |
| def _run_on_machine(self, machine): |
| """Thin wrapper to run 'servo_Verification' on all machines. |
| |
| @param machine: hostname of the dut to run 'servo_Verification' against. |
| |
| @raises error.TestFail: 'servo_Verification' fails |
| @raises error.TestFail: |machine| unknown (not in |self.machine_dict|) |
| """ |
| dut_host = self.machine_dict.get(machine, None) |
| if dut_host is None: |
| raise error.TestFail('dut machine %r not known to suite. Known ' |
| 'machines: %r', machine, |
| ', '.join(self.machine_dict.keys())) |
| logging.info('About to run on machine %s', machine) |
| if not self.job.run_test('servo_Verification', host=dut_host, |
| local=self.local): |
| raise error.TestFail('At least one test failed.') |
| |
| def run_once(self): |
| """Run through all hosts in |self.machine_dict|.""" |
| self.job.parallel_simple(self._run_on_machine, |
| list(self.machine_dict.keys())) |
| # TODO(coconutruben): at this point, you can print a report what kind of |
| # servod setups failed and which succeeded. Build that out so that |
| # debugging failures is cleaner given multiple setups. |
| |
| def cleanup(self): |
| """Clean up by calling close for dut host, which will also take care |
| of servo cleanup. |
| """ |
| for _, dut in self.machine_dict.items(): |
| dut.close() |