blob: e2dbe7d93b15ce43b06114caa5638e31dc598384 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Main file for pacina utility"""
import argparse
import contextlib
import json
import logging
import pathlib
import signal
import sys
import time
import typing as t
import urllib
import cs_config
import cs_types
import cs_util
import pandas as pd # pylint: disable=import-error
import plotly.express # pylint: disable=import-error
from pyftdi.ftdi import Ftdi # pylint: disable=import-error
from pyftdi.i2c import I2cController # pylint: disable=import-error
logger = logging.getLogger(__name__)
# The default frequency is 100 kHz. PAC supports up to
# - I2C Fast Mode Plus (1MHz)
# - I2C High-Speed mode (3.4 MHz)
# Due to observed i2c clock stretching on PAC side, most likely values higher
# than 1 MHz won't yield positive results.
I2C_CLOCK_FREQUENCY: t.Final[int] = 1 * 1000 * 1000 # MHz
# Please read '3 Buffers and the Latency Timer' section of the following doc:
# https://ftdichip.com/wp-content/uploads/2020/08/AN232B-04_DataLatencyFlow.pdf
# before attempting to change this value.
#
# Due to USB frame size being 1 ms, it cannot be lower than 2ms.
FTDI_LATENCY_TIMER: t.Final[int] = 2 # ms
# Style for the DataFrame Table HTML generation
styles = [
{"selector": "tr:hover", "props": [("background-color", "#ffff99")]},
{
"selector": "",
"props": [
("border-collapse", "collapse"),
("width", "100%"),
("font-family", "sans-serif"),
],
},
{
"selector": "th",
"props": [
("font-size", "100%"),
("face", "Open Sans"),
("text-align", "left"),
("background-color", "#4a86e8"),
("color", "white"),
],
},
{"selector": "caption", "props": [("caption-side", "bottom")]},
]
class SignalHandler:
"""Signal Handler that sets a flag."""
def __init__(self) -> None:
self.terminate_signal = False
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, unused_frame) -> None:
"""Define a signal handler for record so we can stop on CTRL-C.
Autotest can call subprocess.kill which will make us stop waiting and
dump the rest of the log.
"""
logger.warning("Signal handler called with signal %d", signum)
self.terminate_signal = True
def __bool__(self) -> bool:
return self.terminate_signal
def generate_reports(
start_time: float,
log_path: pathlib.Path,
log_prefix: str,
dut_info: t.Optional[dict],
data: list,
avg_power: dict,
config_rails: t.List[dict],
power_state: str,
) -> None:
header_record = [
"time_abs",
"time_relative",
"rail",
"accum",
"count",
"power",
]
time_log_path = log_path / (log_prefix + "timeLog.csv")
summary_path = log_path / (log_prefix + "summary.csv")
summary_html_path = log_path / (log_prefix + "summary.html")
df = pd.DataFrame(data, columns=header_record)
df.to_csv(time_log_path)
df_avg = pd.DataFrame.from_dict(avg_power, orient="index").reset_index()
df_avg["power"] = df_avg["acc_power"] / df_avg["count"]
df_avg.columns = [
"Rail",
"Total Accumulated (Raw)",
"Count",
"Accumulated Power",
"Sense Resistor",
"Average Power (W)",
]
pd.options.display.float_format = "{:,.3f}".format
df_avg = df_avg.sort_values(by=["Average Power (W)"], ascending=False)
print(df_avg)
df_avg.to_csv(summary_path)
# Default Plots
power_plots = []
time_plot = plotly.express.line(
df,
x="time_relative",
y="power",
color="rail",
labels={"power": "Power (w)", "time_relative": "Time (seconds)"},
)
time_plot.update_layout(
title="Time Series",
xaxis_title="Time (Seconds)",
yaxis_title="Power (W)",
)
power_plots.append(time_plot)
box_plot = plotly.express.box(df, y="power", x="rail")
box_plot.update_layout(
title="Measurement Statistics",
xaxis_title="Rail",
yaxis_title="Power (W)",
)
power_plots.append(box_plot)
sdf_avg = df_avg.style.hide_index().set_table_styles(styles)
if len(config_rails) > 0:
rail_map = pd.DataFrame(config_rails)
df_avg = pd.merge(df_avg, rail_map, on="Rail")
df_avg["Average Power (W)"] = df_avg["Average Power (W)"].abs()
df_avg["voltage (mV)"] = df_avg.Rail.apply(
lambda x: x.split("_")[0].strip("PP")
)
star_plot = plotly.express.sunburst(
df_avg,
names="Rail",
parents="Parent",
values="Average Power (W)",
title="Power Sunburst",
color="voltage (mV)",
)
power_plots.append(star_plot)
root_rail = "PPVAR_SYS"
root_pwr = None
if root_rail in df_avg.Rail.values:
root_pwr = df_avg[df_avg.Rail == root_rail][
"Average Power (W)"
].unique()[0]
else:
print(root_rail + " measurement not found.")
t1_columns = ["Rail", "voltage (mV)", "Average Power (W)"]
if root_rail in df_avg.Parent.values:
t1_rails = df_avg[df_avg["Parent"] == root_rail]
t1_pwr = t1_rails["Average Power (W)"].sum()
print("Tier1 Summary")
print(f"{'T1 Rail Total:':<20}{t1_pwr:>20.3f}")
if root_pwr:
print(f"{'T1 Root %s' % root_rail:<20}{root_pwr:>20.3f}")
print(f"{'Root - T1 Total:':<20}{(root_pwr - t1_pwr):>20.3f}")
print(t1_rails[t1_columns])
with summary_html_path.open("w") as f:
f.write("<h1>" + summary_html_path.name + "<h1>")
if dut_info:
test_info_path = log_path / (log_prefix + "test-info.json")
stop_time = time.strftime("%Y-%m-%d %H:%M:%S")
test_info: dict = {}
test_info["id"] = log_prefix
test_info["measurement_phase"] = power_state
test_info["measurement_start_time"] = time.strftime(
"%Y%m%d %H%M%S", time.localtime(start_time)
)
test_info["measurement_stop_time"] = stop_time
test_info = test_info | dut_info
del test_info["configs"]
del test_info["ftdi_urls"]
df_test_info = (
pd.DataFrame(test_info, index=[0]).transpose().reset_index()
)
df_test_info.columns = ["Items", "Test Config"]
s = df_test_info.style.hide(axis="index").set_table_styles(styles)
f.write(s.render())
test_info["upload_data"] = True
with open(test_info_path, "w", encoding="utf-8") as fjson:
json.dump(test_info, fjson, indent=2, separators=(",", ": "))
f.write(sdf_avg.render())
for pplts in power_plots:
f.write(
pplts.to_html(
full_html=False,
include_plotlyjs="cdn",
default_height="50%",
)
)
report_file_path = urllib.parse.quote(str(summary_html_path.absolute()))
print(f"Report: file://{report_file_path}")
def parse_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument(
"-s",
"--single",
help=(
"Use to take a single voltage, current "
"power measurement of all rails",
),
action="store_true",
)
parser.add_argument(
"-t",
"--time",
default=10.0,
help="Time to capture in seconds",
type=float,
)
parser.add_argument(
"--configs",
nargs="*",
default=[],
help=(
"Current sensor configuration files. "
"Supports both servod and pacman formats. "
"Number of config files needs to match number of "
"number of FTDI URLs."
),
)
parser.add_argument(
"--power_state",
default="undefined",
choices=[
"undefined",
"z5",
"z2",
"z1",
"s5",
"s4",
"s3",
"s0ix",
"plt-1h",
"plt-10h",
],
help="Power State Information",
)
parser.add_argument(
"-O",
"--output",
type=pathlib.Path,
default="./results",
help="Path for log files",
)
parser.add_argument(
"-p",
"--polarity",
type=cs_types.Polarity,
default=cs_types.Polarity.UNIPOLAR,
choices=cs_types.Polarity,
help="Measurements can either be unipolar or bipolar",
)
parser.add_argument(
"--ftdi-urls",
nargs="*",
default=["ftdi:///"],
help="FTDI URLs. Number of URLs needs to match number of config files",
)
parser.add_argument(
"--dut-info",
type=argparse.FileType("r"),
help="JSON file containing DUT related information",
)
parser.add_argument(
"--dut",
default="",
help="Target DUT. Only used when --dut_info is used.",
)
parser.add_argument(
"--sample-time",
default=1,
type=float,
help="Target sample time in seconds",
)
parser.add_argument(
"-d",
"--debug",
help="Print debug messages",
action="store_const",
dest="loglevel",
const=logging.DEBUG,
default=logging.WARNING,
)
parser.add_argument(
"-v",
"--verbose",
help="Print verbose messages",
action="store_const",
dest="loglevel",
const=logging.INFO,
)
return parser.parse_args(argv)
@contextlib.contextmanager
def managed_ftdi(ftdi_url: str) -> t.Iterator[I2cController]:
"""Managed FTDI resource."""
ftdi_inst = I2cController()
ftdi_inst.configure(ftdi_url, frequency=I2C_CLOCK_FREQUENCY)
ftdi_inst.ftdi.set_latency_timer(FTDI_LATENCY_TIMER)
try:
yield ftdi_inst
finally:
ftdi_inst.close()
def log_power_continuous(
log_duration: float,
sample_time: float,
configs: t.List[cs_config.BusConfig],
) -> None:
logger.info("Measuring Power for %ss.", log_duration)
terminate_signal = SignalHandler()
stime = time.time()
timeout = stime + log_duration
# Current sensors get reset by the first read (setting t=0)
tprev_sample = 0.0
sample_index = 0
while True:
tcur_sample = time.time()
if tcur_sample > timeout:
if sample_index > 1:
break
else:
if tprev_sample == 0:
tprev_sample = tcur_sample
elif tcur_sample - tprev_sample < sample_time:
continue
else:
tprev_sample = tcur_sample
if terminate_signal:
break
print(
"Logging: %.2f / %.2f s..." % (tcur_sample - stime, log_duration),
end="\r",
)
# Depending on the system, it takes ~150ms to read 4 ch.
# This results in second FTDI bus to have more samples than the
# sampling time.
for config in configs:
config.log_continuous()
sample_index = sample_index + 1
def log_power_single(
configs: t.List[cs_config.BusConfig],
log_path: pathlib.Path,
log_prefix: str,
) -> None:
header_single = ["Rail", "Voltage (V)", "Current (A)", "Power (W)"]
data = []
gpio_data = []
logger.info(
"Taking a single voltage, current power measurement of all rails"
)
for config in configs:
config.log_single()
single_log_path = log_path / (log_prefix + "singleLog.csv")
for config in configs:
data.extend(config.data)
if config.gpio_vals:
gpio_data.extend(config.gpio_vals)
df = pd.DataFrame(data, columns=header_single)
pd.options.display.float_format = "{:,.3f}".format
print(df)
df.to_csv(single_log_path)
if gpio_data:
df_gpio = pd.DataFrame(gpio_data, columns=["GPIO", "State"])
print("\nGPIO States")
print(df_gpio)
logger.info("Output written to %s", single_log_path)
def main(argv: t.Optional[t.List[str]] = None) -> None:
args = parse_args(argv)
logging.basicConfig(level=args.loglevel)
# Adding PacDebugger V1 VID PID
Ftdi.add_custom_vendor(0x18D1, "Google")
Ftdi.add_custom_product(0x18D1, 0x5211, "PacDebuggerV1")
ftdis = Ftdi.list_devices()
if len(ftdis) == 0:
logger.error("No FTDIs found. Aborting!")
return
start_time = time.time()
dut_info = None
log_prefix = ""
if args.dut_info and not args.dut:
logger.error("Specify target DUT.")
return
if args.dut_info and args.dut:
dut_info = json.load(args.dut_info)
if args.dut not in dut_info.keys():
logger.error(args.dut + " not found in " + args.dut_info.name)
return
dut_info = dut_info[args.dut]
args.ftdi_urls = dut_info["ftdi_urls"]
args.configs = dut_info["configs"]
log_prefix = "_".join(
[
dut_info["model"],
dut_info["build_phase"],
dut_info["sku"],
dut_info["dut_id"],
dut_info["os_version"],
args.power_state,
time.strftime("%Y%m%d_%H%M%S", time.localtime(start_time)),
]
)
log_prefix = log_prefix + "_"
if not args.configs:
logger.error("Current Sensor Configuration(s) Required")
return
if len(args.ftdi_urls) != len(args.configs):
logger.error(
"Number of config files needs to match number of FTDI URLs"
)
return
for ftdi_url in args.ftdi_urls:
Ftdi.get_device(ftdi_url)
# Prepare output directory.
if dut_info and log_prefix:
log_path = args.output / dut_info["program"]
else:
log_path = args.output / time.strftime(
"%Y%m%d_%H%M%S", time.localtime(start_time)
)
log_path.mkdir(parents=True, exist_ok=True)
# based on the config files, generate cs instances
configs: t.List[cs_config.BusConfig] = []
config_rails: t.List[dict] = []
with contextlib.ExitStack() as stack:
for ftdi_url, input_config in zip(args.ftdi_urls, args.configs):
logger.info(
"Measuring power from %s using %s", ftdi_url, input_config
)
ftdi = stack.enter_context(managed_ftdi(ftdi_url))
config = cs_config.BusConfig(
input_config,
ftdi,
cs_util.supported_pns,
polarity=args.polarity,
sample_time=args.sample_time,
)
configs.append(config)
config_rails.extend(config.rails)
if args.single:
log_power_single(configs, log_path, log_prefix)
return
log_power_continuous(args.time, args.sample_time, configs)
logger.info("Completed Logging")
avg_power: dict = {}
data = []
for config in configs:
data.extend(config.get_acc_pwr())
avg_power = {**avg_power, **config.get_avg_pwr()}
generate_reports(
start_time,
log_path,
log_prefix,
dut_info,
data,
avg_power,
config_rails,
args.power_state,
)
if __name__ == "__main__":
main(sys.argv[1:])