blob: 1448e08f0287b9fed444c05b0f961ef0681b4cba [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper class for power autotests requiring telemetry devices."""
import logging
import time
import numpy
CUSTOM_START = 'PowerTelemetryLogger custom start.'
CUSTOM_END = 'PowerTelemetryLogger custom end.'
INTERPOLATION_RESOLUTION = 6
class TelemetryUtilsError(Exception):
"""Error class for issues using these utilities."""
def interpolate_missing_data(data, max_nan_ratio=None, max_sample_gap=None,
max_sample_time_gap=None, timeline=None):
"""Interpolate missing power readings in data.
@param data: array of values
@min_nan_ratio: optional, float, max acceptable ratio of NaN to real values
@max_sample_gap: optional, int, max acceptable number of NaN in a row
@max_sample_time_gap: optional, float, max measurement gap in seconds
Note: supplying max_nan_time_gap requires timeline
@timeline: array of same size as |data| with timeline info for each sample
@returns: list, array |data| with missing values interpolated.
@raises: TelemetryUtilsError if
- the ratio of NaN is higher than |max_nan_ratio| (if supplied)
- no NaN gap is larger than |max_sample_gap| (if supplied)
- no NaN gap takes more time in |timeline| than
|max_sample_time_gap| (if supplied)
- all values in |data| are NaN.
"""
if max_sample_time_gap is not None and timeline is None:
# These are mutually required.
raise TelemetryUtilsError('Supplying max_sample_time_gap requires a '
'timeline.')
data = numpy.array(data)
nan_data = numpy.isnan(data)
if max_nan_ratio:
# Validate the ratio if a ratio is supplied.
nan_ratio = float(sum(nan_data)) / len(data)
if nan_ratio > max_nan_ratio:
# There are too many errors in this source.
# Throw an error so the user has a chance to adjust their power
# collection setup.
raise TelemetryUtilsError('NaN ratio of %.02f '
' - Max is %.02f.' % (nan_ratio,
max_nan_ratio))
if max_sample_gap is not None or max_sample_time_gap is not None:
# Flag to keep track whether the loop is in a measurement gap (NaN).
consecutive_nan_start = None
# Add a dummy at the end to make sure the iteration covers all real
# examples.
for i, isnan in enumerate(numpy.append(nan_data, False)):
if isnan and consecutive_nan_start is None:
consecutive_nan_start = i
if not isnan and consecutive_nan_start is not None:
consecutive_nans = i - consecutive_nan_start
if max_sample_gap and consecutive_nans >= max_sample_gap:
# Reject if there are too many consecutive failures.
raise TelemetryUtilsError('Too many consecutive NaN samples'
': %d.' % consecutive_nans)
if max_sample_time_gap:
# Checks whether the first valid timestamp before the
# gap exists and whether the first valid timestamp after the
# gap exists.
if consecutive_nan_start == 0 or i == len(data):
# We cannot determine the gap timeline properly here
# as the gap either starts or ends with the time.
# Ignore for now.
continue
sample_time_gap = (timeline[i] -
timeline[consecutive_nan_start-1])
if sample_time_gap > max_sample_time_gap:
raise TelemetryUtilsError('Excessively long sample gap '
'of %.02fs. Longest '
'permissible gap is %.02fs.'
% (sample_time_gap,
max_sample_time_gap))
# Reset the flag for the next gap.
consecutive_nan_start = None
# At this point the data passed all validations required.
sample_idx = numpy.arange(len(data))[[~nan_data]]
sample_vals = data[[~nan_data]]
if not len(sample_idx):
raise TelemetryUtilsError('Data has no valid readings. Cannot '
'interpolate.')
output = numpy.interp(range(len(data)), sample_idx, sample_vals)
return [round(x, INTERPOLATION_RESOLUTION) for x in output]
def log_event_ts(message=None, timestamp=None, offset=0):
"""Log the event and timestamp for parsing later.
@param message: description of the event.
@param timestamp: timestamp to for the event, if not provided, default to
current time. Local seconds since epoch.
@param offset: offset in seconds from the provided timestamp, or offset from
current time if timestamp is not provided. Can be positive or
negative.
"""
if not message:
return
if timestamp:
ts = timestamp + offset
else:
ts = time.time() + offset
logging.debug("%s %s", message, ts)
def start_measurement(timestamp=None, offset=0):
"""Mark the start of power telemetry measurement.
Optional. Use only once in the client side test that is wrapped in the
power measurement wrapper tests to help pinpoint exactly where power
telemetry data should start. PowerTelemetryLogger will trim off excess data
before this point. If not used, power telemetry data will start right before
the client side test.
@param timestamp: timestamp for the start of measurement, if not provided,
default to current time. Local seconds since epoch.
@param offset: offset in seconds from the provided timestamp, or offset from
current time if timestamp is not provided. Can be positive or
negative.
"""
log_event_ts(CUSTOM_START, timestamp, offset)
def end_measurement(timestamp=None, offset=0):
"""Mark the end of power telemetry measurement.
Optional. Use only once in the client side test that is wrapped in the
power measurement wrapper tests to help pinpoint exactly where power
telemetry data should end. PowerTelemetryLogger will trim off excess data
after this point. If not used, power telemetry data will end right after the
client side test.
@param timestamp: timestamp for the end of measurement, if not provided,
default to current time. Local seconds since epoch.
@param offset: offset in seconds from the provided timestamp, or offset from
current time if timestamp is not provided. Can be positive or
negative.
"""
log_event_ts(CUSTOM_END, timestamp, offset)