blob: 069a28eee9150d295145636ac3864837238b3e9f [file] [log] [blame]
# Lint as: python2, python3
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import path_utils
from autotest_lib.server import site_linux_rpi
class RPiHackRFRunner(site_linux_rpi.LinuxRPi):
"""Linux RaspberryPi with HackRF capabilities for WiFiTest class."""
RF_DATA_FILES_FOLDER = '/etc/hackrf_files'
LATEST_FIRMWARE_VERSION = '2021.03.1'
# TODO (b/239568628): HackRF supports sample rates in the 8MHz-20MHz range.
# Currently set to 8MHz because at this sample rate we can reliably transmit
# RF data for 250 seconds. Raising the sample rate would decrease the amount
# of time we could reliably broadcast noise. The default sample rate may get
# updated in the future once RF data can be transmitted indefinitely and
# properly terminated in tests.
DEFAULT_SAMPLE_RATE = 8000000
def __init__(self, host):
"""Build a RPiHackRFRunner.
@param host: Host object representing the remote machine.
"""
super(RPiHackRFRunner, self).__init__(
host,
role=site_linux_rpi.RPiRole.HACKRF_RUNNER)
self._command_hackrf_info = None
self._hackrf_serial_number = None
self._hackrf_firmware_version = None
self.__setup()
def __setup(self):
"""Setup this system.
Can be used to complete initialization of a RaspberryPi object that is
being used a HackRF Runner, or to re-establish a good state after a
reboot.
"""
self.verify_hackrf_commands()
def close(self):
"""Close global resources held by this system."""
self.stop_broadcasting_file()
super(RPiHackRFRunner, self).close()
# HackRF-specific functions
def install_rf_data_file(self, rf_data_file):
""" Install a radio frequency data file on the HackRF Runner.
@param rf_data_file: string name of the radio frequency data file to be
installed on the HackRF Runner including the absolute path to it
on the DUT.
"""
# Make sure the directory exists.
self.host.run('mkdir -p %s' % self.RF_DATA_FILES_FOLDER)
try:
self.host.send_file('%s' % rf_data_file,
self.RF_DATA_FILES_FOLDER)
except error.AutoservRunError:
logging.error('Failed to install RF data file on the HackRF Runner')
raise
def delete_rf_data_file(self, rf_data_file=None):
""" Remove the specified file or all radio frequency data files from the
HackRF Runner.
@param rf_data_file: string name of the radio frequency data file to be
deleted including the absolute path to it on the HackRF Runner
or None.
"""
if rf_data_file:
self.host.run('rm %s' % rf_data_file)
logging.info('Deleted the RF data file %s', rf_data_file)
else:
self.host.run('rm %s/*', self.RF_DATA_FILES_FOLDER)
logging.info('Deleted all the RF data files in the directory %s',
self.RF_DATA_FILES_FOLDER)
def stop_broadcasting_file(self):
""" Interrupt the broadcasting of RF data on the HackRF."""
self._kill_process_instance(process='hackrf_transfer')
def verify_hackrf_commands(self):
""" Verify that the HackRF runner can run commands on the HackRF.
Checks whether the HackRF Runner has the necessary packages and installs
them otherwise. Extracts the serial number and firmware version of the
first HackRF if there are several connected HackRFs. Requests for the
firmware to be updated if it is out of date. Tests whether the HackRF
can transmit radio frequency data.
"""
# Ensure that the HackRF Runner has the necessary packages.
try:
self._command_hackrf_info = path_utils.must_be_installed(
'hackrf_info',
host=self.host)
except error.TestError:
logging.warning('Don\'t have hackrf commands installed. Proceeding '
'to install the necessary packages...')
self.host.run('sudo apt-get update')
self.host.run('sudo apt-get install -y hackrf')
self._command_hackrf_info = path_utils.must_be_installed(
'hackrf_info',
host=self.host)
# Stop broadcasting RF data before extracting the firmware version
# because hackrf_info doesn't display firmware version unless the HackRF
# is freed up.
self.stop_broadcasting_file()
hackrf_information = self.host.run(self._command_hackrf_info,
ignore_status=True).stdout
if 'No HackRF boards found' in hackrf_information:
raise error.TestNAError('Could not identify any connected HackRFs.')
# Extract the serial number and firmware version of first HackRF.
first_hackrf = hackrf_information.split('Found HackRF')[1]
for line in first_hackrf.split('\n'):
if line.split(':')[0] == 'Serial number':
self._hackrf_serial_number = line.split(':')[1].strip()
elif line.split(':')[0] == 'Firmware Version':
self._hackrf_firmware_version = line.split(':')[1].split()[0]
logging.debug('HackRF Serial Number: %s', self._hackrf_serial_number)
logging.debug('HackRF Firmware Version: %s',
self._hackrf_firmware_version)
# Verify that HackRF firmware version is up-to-date.
if self.LATEST_FIRMWARE_VERSION != self._hackrf_firmware_version:
logging.error('Tests may fail because your HackRF firmware is not '
'up-to-date. '
'See: https://hackrf.readthedocs.io/en/latest/updating_firmware.html#updating-the-spi-flash-firmware',
self._hackrf_serial_number)
else:
logging.debug('HackRF firmware is up to date')
# Test that you can transmit radio waves from the HackRF.
# Using dev/zero eliminates dependency on install_rf_data_file working.
tx_test = self.broadcast_file(rf_data_file='/dev/zero', duration=3,
run_in_background=False)
if 'MiB/second' not in tx_test:
logging.error('Unable to transmit RF data')
raise
else:
duration = tx_test.split('\n')[-7].split(':')[1].strip()
logging.debug('Successfully transmitted sample RF data during '
'HackRF validation for %s', duration)
def broadcast_file(self, rf_data_file, duration, frequency=None, gain=None,
run_in_background=False):
""" Broadcast radio signals on the HackRF based on the radio frequency
data file.
@param rf_data_file: string name of the radio frequency data file to be
broadcast on the HackRF including the absolute path to it on the
HackRF Runner.
@param duration: int desired duration for how long the rf_data_file
should be played in seconds.
@param frequency: int frequency at which to play the radio signals in Hz
should be within the antenna frequency range.
@param gain: int the strength of the signal being trasmitted in dB,
should be in the 0-47dB range.
@param run_in_background: bool True iff the rf_data_file should be
broadcasted in the background.
@return PID of the hackrf_transfer process if run_in_background is True
otherwise return frequency, sample rate and how much radio
frequency data is broadcast per second.
"""
transmit_command = 'hackrf_transfer -R -t %s -d %s' % (rf_data_file,
self._hackrf_serial_number)
sample_rate = self.DEFAULT_SAMPLE_RATE
num_samples = sample_rate * duration
# Tests were flaky when the number of samples was an exact multiple
# of the sample_rate: the HackRF tries to transmit RF data in an
# additional second at which point hardly any data is left. So we
# are transferring 1% less data in the last second to prevent the
# HackRF from forcing an additional second for RF data transfer
# and the flaky error is avoided.
num_samples -= int(sample_rate * 0.01)
transmit_command += ' -n %s' % str(num_samples)
transmit_command += ' -s %s' % str(sample_rate)
if frequency: transmit_command += ' -f %s' % str(frequency)
if gain != None:
if gain < 0 or gain > 47 or not isinstance(gain, int):
logging.error('Please specify an integer gain in the 0-47dB'
'range')
raise
transmit_command += ' -x %s' % str(gain)
if run_in_background:
return self.host.run_background(transmit_command)
else:
return self.host.run(transmit_command, ignore_status=True).stderr