blob: 169ab48127bf95b47b0505ba85b1dd8964d571a3 [file] [log] [blame]
# Copyright (c) 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os.path
import logging
import time
import math
import numbers
import numpy
from enum import IntEnum
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import path_utils
class IperfResult(object):
"""Logic for parsing and representing iperf results."""
@staticmethod
def from_iperf_output(results, config):
"""Parse the text output of iperf and return an IperfResult.
@param results string raw results from iperf.
@param config IperfConfig the config for the test.
@return IperfResult result.
"""
class IperfIndex(IntEnum):
"""Defines the indices of certain values in iperf output."""
LOG_ID_INDEX = 5
INTERVAL_INDEX = 6
DATA_TRANSFERED_INDEX = 7
PERCENT_LOSS_INDEX = 12
NUM_FIELDS_IN_SERVER_OUTPUT = 14
lines = results.splitlines()
total_throughput = 0
test_durations = []
percent_losses = []
for line in lines:
fields = line.split(',')
# Negative Log ID values are used for sum total output which we
# don't use.
if float(fields[IperfIndex.LOG_ID_INDEX]) < 0:
continue
# Filter out client side logs from UDP results. We only want server
# side results because they reflect the amount of data that was
# actually received by the server. Server output has 14 fields while
# client output has 9 fields, so we use this to differentiate.
# Ideally we'd use the '-x D' option to filter the client side data,
# but this option makes the iperf output unreliable.
if config.udp and len(fields) < NUM_FIELDS_IN_SERVER_OUTPUT:
continue
total_data_bytes = float(fields[IperfIndex.DATA_TRANSFERED_INDEX])
test_interval = fields[IperfIndex.INTERVAL_INDEX]
test_start_end = test_interval.split('-')
duration = float(test_start_end[1]) - float(test_start_end[0])
test_durations.append(duration)
total_throughput += IperfResult._calculate_throughput(
total_data_bytes, duration)
if (config.udp):
percent_losses.append(
float(fields[IperfIndex.PERCENT_LOSS_INDEX]))
# We should get one line of output for each port used in the test. In
# rare cases, the data from one of the ports is not included in the
# iperf output, so discard results in these cases.
expected_num_output_lines = config.num_ports
if config.bidirectional:
expected_num_output_lines *= 2
if len(test_durations) != expected_num_output_lines:
logging.info(
'iperf command output was missing some data, ignoring test run.'
)
return None
test_duration = math.fsum(test_durations) / len(test_durations)
if config.udp:
percent_loss = math.fsum(percent_losses) / len(percent_losses)
else:
percent_loss = None
return IperfResult(test_duration, total_throughput, percent_loss)
@staticmethod
def from_samples(samples):
"""Build an averaged IperfResult from |samples|.
Calculate an representative sample with averaged values
and standard deviation of throughput from samples.
@param samples list of IperfResult objects.
@return IperfResult object.
"""
if len(samples) == 0:
return None
duration_samples = [float(sample.duration) for sample in samples]
duration_mean = numpy.mean(duration_samples)
throughput_samples = [float(sample.throughput) for sample in samples]
throughput_mean = numpy.mean(throughput_samples)
throughput_dev = numpy.std(throughput_samples)
# For TCP connections, the packet loss is 0 by definition. In these
# cases, the percent_loss will be None for all samples, and UDP results
# should never have a percent_loss of None, so we can just check the
# first sample.
if samples[0].percent_loss == None:
percent_loss_mean = None
else:
percent_loss_samples = [
float(sample.percent_loss) for sample in samples
]
percent_loss_mean = numpy.mean(percent_loss_samples)
return IperfResult(duration_mean,
throughput_mean,
percent_loss_mean,
throughput_dev=throughput_dev)
def throughput_cv_less_than_maximum(self, max_cv):
"""Check that the throughput from this result is "accurate" enough.
We say that an IperfResult is "accurate" enough when the coefficient of
variance (standard deviation / mean) is below the passed in fraction.
@param fraction float maximum coefficient of variance for the
throughput sample.
@return True on above condition.
"""
if self.throughput is None or self.throughput_dev is None:
return True
if not self.throughput_dev and not self.throughput:
# 0/0 is undefined, but take this to be good for our purposes.
return True
if self.throughput_dev and not self.throughput:
# Deviation is non-zero, but the average is 0. Deviation
# as a fraction of the self.throughput is undefined but in theory
# a "very large number."
return False
if self.throughput_dev / self.throughput > max_cv:
return False
return True
@staticmethod
def _calculate_throughput(total_data, duration):
"""Calculate the throughput from the total bytes transeferred and the
duration of the test.
@param total_data int The number of bytes transferred during the test.
@param duration float The duration of the test in seconds.
@return float The throughput of the test in Mbps.
"""
if duration == 0:
return 0
total_bits = total_data * 8
bits_per_second = total_bits / duration
return bits_per_second / 1000000
def __init__(self, duration, throughput, percent_loss,
throughput_dev=None):
"""Construct an IperfResult.
@param duration float how long the test took in seconds.
@param throughput float test throughput in Mbps.
@param percent_loss float percentage of packets lost in UDP transfer.
@param throughput_dev standard deviation of throughputs.
"""
self.duration = duration
self.throughput = throughput
self.percent_loss = percent_loss
self.throughput_dev = throughput_dev
def get_keyval(self, prefix=''):
ret = {}
if prefix:
prefix = prefix + '_'
if self.throughput_dev is None:
margin = ''
else:
margin = '+-%0.2f' % self.throughput_dev
if self.throughput is not None:
ret[prefix + 'throughput'] = '%0.2f%s' % (self.throughput, margin)
return ret
def __repr__(self):
fields = []
fields += [
'%s=%0.2f' % item for item in vars(self).iteritems()
if item[1] is not None and isinstance(item[1], numbers.Number)
]
return '%s(%s)' % (self.__class__.__name__, ', '.join(fields))
class IperfConfig(object):
""" Defines the configuration for an iperf run. """
DEFAULT_TEST_TIME = 10
DEFAULT_MAX_BANDWIDTH = '10000M'
DEFAULT_NUM_PORTS = 4
IPERF_TEST_TYPE_TCP_TX = 'tcp_tx'
IPERF_TEST_TYPE_TCP_RX = 'tcp_rx'
IPERF_TEST_TYPE_TCP_BIDIRECTIONAL = 'tcp_bidirectional'
IPERF_TEST_TYPE_UDP_TX = 'udp_tx'
IPERF_TEST_TYPE_UDP_RX = 'udp_rx'
IPERF_TEST_TYPE_UDP_BIDIRECTIONAL = 'udp_bidirectional'
def __init__(self,
test_type,
max_bandwidth=DEFAULT_MAX_BANDWIDTH,
test_time=DEFAULT_TEST_TIME,
num_ports=DEFAULT_NUM_PORTS):
""" Construct an IperfConfig.
@param test_type string, PerfTestTypes test type.
@param max_bandwidth string maximum bandwidth to be used during the test
e.x. 100M (100 Mbps).
@param test_time int number of seconds to run the test for.
@param num_ports int number of ports use in the test.
"""
if test_type == IperfConfig.IPERF_TEST_TYPE_TCP_TX:
self.udp = False
self.bidirectional = False
elif test_type == IperfConfig.IPERF_TEST_TYPE_TCP_RX:
self.udp = False
self.bidirectional = False
elif test_type == IperfConfig.IPERF_TEST_TYPE_TCP_BIDIRECTIONAL:
self.udp = False
self.bidirectional = True
elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_TX:
self.udp = True
self.bidirectional = False
elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_RX:
self.udp = True
self.bidirectional = False
elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_BIDIRECTIONAL:
self.udp = True
self.bidirectional = True
else:
raise error.TestFail(
'Test type %s is not supported by iperf_runner.' %
test_type)
self.max_bandwidth = max_bandwidth
self.test_time = test_time
self.num_ports = num_ports
self.test_type = test_type
class IperfRunner(object):
"""Delegate to run iperf on a client/server pair."""
DEFAULT_TEST_TIME = 10
IPERF_SERVER_MAX_STARTUP_WAIT_TIME = 11
IPERF_CLIENT_TURNDOWN_WAIT_TIME = 1
IPERF_COMMAND_TIMEOUT_MARGIN = 20
def __init__(
self,
client_proxy,
server_proxy,
config,
client_interface=None,
server_interface=None,
):
"""Construct an IperfRunner. Use the IP addresses of the passed
interfaces if they are provided. Otherwise, attempt to use the WiFi
interface on the devices.
@param client LinuxSystem object.
@param server LinuxSystem object.
@param client_interface Interface object.
@param server_interface Interface object.
"""
self._client_proxy = client_proxy
self._server_proxy = server_proxy
self._server_host = server_proxy.host
self._client_host = client_proxy.host
if server_interface:
self._server_ip = server_interface.ipv4_address
# If a server interface was not explicitly provided, attempt to use
# the WiFi IP of the device.
else:
try:
self._server_ip = server_proxy.wifi_ip
except:
raise error.TestFail('Server device has no WiFi IP address, '\
'and no alternate interface was specified.')
if client_interface:
self._client_ip = client_interface.ipv4_address
# If a client interface was not explicitly provided, use the WiFi IP
# address of the WiFiClient device.
else:
try:
self._client_ip = client_proxy.wifi_ip
except:
raise error.TestFail('Client device has no WiFi IP address, '\
'and no alternate interface was specified.')
# Assume minijail0 is on ${PATH}, but raise exception if it's not
# available on both server and client.
self._minijail = 'minijail0'
path_utils.must_be_installed(self._minijail, host=self._server_host)
path_utils.must_be_installed(self._minijail, host=self._client_host)
# Bind mount a tmpfs over /tmp, since netserver hard-codes the /tmp
# path. netserver's log files aren't useful anyway.
self._minijail = ("%s -v -k 'tmpfs,/tmp,tmpfs,"
"MS_NODEV|MS_NOEXEC|MS_NOSUID,mode=755,size=10M'" %
self._minijail)
self._config = config
self._command_iperf_server = path_utils.must_be_installed(
'iperf', host=self._server_host)
self._command_iperf_client = path_utils.must_be_installed(
'iperf', host=self._client_host)
self._udp_flag = '-u' if config.udp else ''
self._bidirectional_flag = '-d' if config.bidirectional else ''
def __enter__(self):
self._restart_iperf_server()
return self
def __exit__(self, exc_type, exc_value, traceback):
self._client_proxy.firewall_cleanup()
self._server_proxy.firewall_cleanup()
self._kill_iperf_server()
self._kill_iperf_client()
def _kill_iperf_client(self):
"""Kills any existing iperf process on the client."""
self._client_host.run('pkill -9 %s' %
os.path.basename(self._command_iperf_client),
ignore_status=True)
def _kill_iperf_server(self):
"""Kills any existing iperf process on the serving host."""
self._server_host.run('pkill -9 %s' %
os.path.basename(self._command_iperf_server),
ignore_status=True)
def _restart_iperf_server(self):
"""Start an iperf server on the server device. Also opens up firewalls
on the test devices.
"""
logging.info('Starting iperf server...')
self._kill_iperf_server()
logging.debug('iperf server invocation: %s %s -s -B %s -D %s',
self._minijail, self._command_iperf_server,
self._server_ip, self._udp_flag)
devnull = open(os.devnull, "w")
self._server_host.run('%s %s -s -B %s -D %s' %
(self._minijail, self._command_iperf_server,
self._server_ip, self._udp_flag),
stderr_tee=devnull)
startup_time = time.time()
# Ensure the endpoints aren't firewalled.
protocol = 'udp' if self._config.udp else 'tcp'
self._client_proxy.firewall_open(protocol, self._server_ip)
self._server_proxy.firewall_open(protocol, self._client_ip)
# Run a client iperf test. The client will attempt to connect to the
# server for 10 seconds, but will exit early if it succeeds before
# that. This ensures that the server has had suffiecient time to come
# online before we begin the tests. We don't fail on timeout here
# because the logic for failed connections is contained in run().
iperf_test = '%s -c %s -B %s -t 1 %s' % (
self._command_iperf_client, self._server_ip, self._client_ip,
self._udp_flag)
result = self._client_host.run(
iperf_test,
ignore_status=True,
ignore_timeout=True,
timeout=self.IPERF_SERVER_MAX_STARTUP_WAIT_TIME)
if result.exit_status:
logging.debug(
'Failed to make a connection to the server in %s seconds.',
self.IPERF_SERVER_MAX_STARTUP_WAIT_TIME)
else:
logging.debug('Successfully made a connection to the server.')
# TODO(b:198343041) When iperf2 clients are run too quickly back to
# back, the server is unable to distinguish between them. Wait briefly
# to allow the server to reset.
time.sleep(self.IPERF_CLIENT_TURNDOWN_WAIT_TIME)
def run(self, ignore_failures=False, retry_count=3):
"""Run iperf and take a performance measurement.
@param ignore_failures bool True iff iperf runs that fail should be
ignored. If this happens, run will return a None value rather
than an IperfResult.
@param retry_count int number of times to retry the iperf command if
it fails due to an internal timeout within iperf.
@return IperfResult summarizing an iperf run.
"""
iperf_client = '%s -c %s -B %s -b %s -x C -y c -P 4 -t %s %s %s' % (
self._command_iperf_client, self._server_ip, self._client_ip,
self._config.max_bandwidth, self._config.test_time,
self._udp_flag, self._bidirectional_flag)
logging.info('Running iperf client for %d seconds.',
self._config.test_time)
logging.debug('iperf client invocation: %s', iperf_client)
timeout = self._config.test_time + self.IPERF_COMMAND_TIMEOUT_MARGIN
for _ in range(retry_count):
result = self._client_host.run(iperf_client,
ignore_status=True,
ignore_timeout=ignore_failures,
timeout=timeout)
if not result:
logging.info('Retrying iperf after empty result.')
continue
# Exit retry loop on success.
if not result.exit_status:
break
# We are in an unhandled error case.
logging.info('Retrying iperf after an unknown error.')
if ignore_failures and (result is None or result.exit_status):
return None
if result is None:
raise error.TestFail("No results; cmd: %s", iperf_client)
if result.exit_status:
raise error.CmdError(iperf_client, result,
"Command returned non-zero exit status")
# TODO(b:198343041) When iperf2 clients are run too quickly back to
# back, the server is unable to distinguish between them. Wait briefly
# to allow the server to reset.
time.sleep(self.IPERF_CLIENT_TURNDOWN_WAIT_TIME)
return IperfResult.from_iperf_output(result.stdout, self._config)