blob: 54ec9bc91bd50b6694eb28f76448118ac58c11a7 [file] [log] [blame]
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import binascii
import importlib
import pac19xx
import pandas
from pyftdi.i2c import I2cController, I2cNackError
import struct
import time
def read_pac(device, reg, num_bytes):
""" Reads num_bytes from PAC I2C register using pyftdi driver.
Args:
device: (string) Serial port device name - ftdi:///?.
reg: (int) Register to read.
num_bytes: (int) Number of bytes to read.
Returns:
(string) Hex decoded as a string
"""
pacval = device.read_from(reg, num_bytes)
pacval = binascii.hexlify(pacval).decode()
return pacval
def read_pac_int(device, reg, num_bytes):
""" Calls read_pac and returns the value as an int.
Args:
device: (string) Serial port device name - ftdi:///?.
reg: (int) Register to read.
num_bytes: (int) Number of bytes to read.
Returns:
(int) The value of read_pac converted to an int
"""
pacval = read_pac(device,reg,num_bytes)
return int(pacval, 16)
def read_voltage(device, ch_num=1):
""" Returns PAC voltage of given channel number.
Args:
device: i2c port object.
ch_num: (int) PAC Channel number.
Returns:
act_volt: (float) Voltage.
"""
# Read Voltage.
act_volt = read_pac_int(device, pac19xx.VBUS1_AVG + int(ch_num), 2)
# Convert to voltage.
act_volt = act_volt * (pac19xx.FSV / pac19xx.V_POLAR)
return float(act_volt)
def read_power(device, sense_resistance, ch_num=1):
""" Returns PAC power of given channel number.
Args:
device: i2c port object.
sense_resistance: (float) sense resistance in Ohms.
ch_num: (int) PAC Channel number.
Returns:
power: (float) Power in W.
"""
# Read power
power = read_pac_int(device, pac19xx.VPOWER1 + int(ch_num), 4)
power_fsr = (pac19xx.FSR / float(sense_resistance)) * pac19xx.FSV
p_prop = power / pac19xx.P_POLAR
power = power_fsr * p_prop * 0.25
# *0.25 to shift right 2 places VPOWERn[29:0].
return power
def read_current(device, sense_resistance, ch_num=1):
"""Returns PAC current of given channel number.
Args:
device: i2c port object.
sense_resistance: (float) sense resistance in Ohms.
ch_num: (int) PAC Channel number.
Returns:
current: (float) Current.
"""
current = read_pac_int(device, pac19xx.VSENSE1_AVG + int(ch_num), 2)
fsc = pac19xx.FSR / float(sense_resistance)
current = (fsc / pac19xx.V_POLAR) * current
return float(current)
def read_gpio(device):
""" Returns GPIO status of PAC GPIO
Args:
device: i2c port object.
Returns:
gpioSetting: (bool) gpio.
"""
# Read GPIO
gpio = read_pac_int(device, pac19xx.SMBUS_SET, 1)
return bool(gpio & (1 << 7))
def reset_accumulator(device):
"""Command to reset PAC accumulators.
Args:
device: i2c port object.
"""
device.write_to(pac19xx.REFRESH, 0)
time.sleep(0.001)
def dump_accumulator(device, ch_num):
"""Command to acquire the voltage accumulator and counts for a PAC.
Args:
device: i2c port object.
ch_num: channel to dump.
Returns:
reg: (long) voltage accumulator register.
count: (int) number of accumulations.
"""
lut = {'0': pac19xx.VACC1, '1': pac19xx.VACC2,
'2': pac19xx.VACC3, '3': pac19xx.VACC4}
reg = device.read_from(lut[str(ch_num)], 7)
count = device.read_from(pac19xx.ACC_COUNT, 4)
count = int.from_bytes(count, byteorder='big', signed=False)
reg = int.from_bytes(reg, byteorder='big', signed=False)
return (reg, count)
def pac_info(ftdi_url):
"""Returns PAC debugging info
Args:
ftdi_url: (string) ftdi url.
"""
# Init the bus
i2c = I2cController()
i2c.configure(ftdi_url)
device = i2c.get_port(0x10)
print(f'Device: \t{device}')
print(f'Args.debug: \t{args.debug}')
tmp = read_pac(device, pac19xx.MANUFACTURER_ID, 1)
print(f'PAC Mfg ID: \t0x{tmp}')
tmp = read_pac(device, pac19xx.REVISION_ID, 1)
print(f'PAC revision: \t0x{tmp}')
tmp = read_pac(device, pac19xx.SMBUS_SET, 1)
print(f'SMBus Set: \t0x{tmp}')
tmp = read_pac(device, pac19xx.NEG_PWR_FSR, 1)
print(f'NEG_PWR_FSR: \t0x{tmp}')
tmp = read_pac(device, pac19xx.CTRL, 2)
print(f'CTRL: \t\t0x{tmp}\n')
def load_config(config_file):
"""Loads the same config file used by servod into a pandas dataframe.
Args:
config File: PAC Address and sense resistor file.
Returns:
config: (Pandas Dataframe) config.
"""
guybrush_r0_pacs = importlib.import_module(config_file.strip('.py'))
config = pandas.DataFrame(guybrush_r0_pacs.INAS)
config.columns = [
'drv', 'addr', 'rail', 'nom', 'rsense', 'mux', 'is_calib'
]
config['addr_pac'] = config.addr.apply(lambda x: x.split(':')[0])
config['ch_num'] = config.addr.apply(lambda x: x.split(':')[1])
return config
def load_gpio_config(gpio_config):
"""Loads a PAC address to GPIO rail name mapping csv file.
Args:
config File: (string) PAC Address and GPIO rail mapping.
Returns:
config: (Pandas Dataframe) config.
"""
gpio_config = pandas.read_csv(gpio_config, skiprows=5, skipinitialspace=True)
return gpio_config
def record(config_file,
ftdi_url='ftdi:///',
rail='all',
record_length=360.0,
voltage=True,
current=True,
power=True):
"""High level function to reset, log, then dump PAC power accumulations.
Args:
config_file: (string) Location of PAC Address/sense resitor .py file.
ftdi_url: (string) ftdi_url.
rail: (string, list of strings) name of rail to log. Must match.
config rail name. will record all by default.
record_length: (float) time in seconds to log.
voltage: (boolean) log voltage.
current: (boolean) log current.
power : (boolean) log power.
Returns:
time_log: (Pandas DataFrame) Time series log.
acummulatorLog: (Pandas DataFrame) Accumulator totals.
"""
# Init the bus.
i2c = I2cController()
i2c.configure(ftdi_url)
# Load the config.
config = load_config(config_file)
# Filter the config to rows we care about.
if 'all' not in rail:
config = config[config['rail'].isin(rail)]
# Clear all accumulators
skip_pacs = []
for pac_address, group in config.groupby('addr_pac'):
try:
device = i2c.get_port(int(pac_address, 16))
reset_accumulator(device)
time.sleep(0.001)
except I2cNackError:
# This happens on the DB PAC in Z states.
print('Unable to reset PAC %s. Ignoring value' % pac_address)
skip_pacs.append(pac_address)
log = []
start_time = time.time()
timeout = start_time + float(record_length)
while True:
if time.time() > timeout:
break
print('Logging: %.2f / %.2f s...' %
(time.time() - start_time, float(record_length)),
end='\r')
# Group measurements by pac for speed.
for pac_address, group in config.groupby('addr_pac'):
if pac_address in skip_pacs:
continue
# Parse from the dataframe.
device = i2c.get_port(int(pac_address, 16))
# Setup any configuration changes here prior to refresh.
device.write_to(pac19xx.REFRESH_V, 0)
# Wait 1ms after REFRESH for registers to stablize.
time.sleep(.001)
# Log every rail on this pac we need to.
for i, row in group.iterrows():
try:
ch_num = row.ch_num
sense_r = float(row.rsense)
# Grab a log timestamp
tmp = {}
tmp['systime'] = time.time()
tmp['relativeTime'] = tmp['systime'] - start_time
tmp['rail'] = row['rail']
if voltage:
tmp['voltage'] = read_voltage(device, ch_num)
if current:
tmp['current'] = read_current(device, sense_r, ch_num)
if power:
tmp['power'] = read_power(device, sense_r, ch_num)
log.append(tmp)
except I2cNackError:
print('NACK detected, continuing measurements')
time.sleep(.001)
continue
time_log = pandas.DataFrame(log)
time_log['power'] = time_log['power']
pandas.options.display.float_format = '{:,.3f}'.format
stats = time_log.groupby('rail').power.describe()
accumulators = []
# Dump the accumulator.
for i, config_row in config.iterrows():
if config_row.addr_pac in skip_pacs:
continue
accumulator = {}
device = i2c.get_port(int(config_row['addr_pac'], 16))
time.sleep(.001)
accumulator['Rail'] = config_row['rail']
(accum, count) = dump_accumulator(device, config_row.ch_num)
accumulator['tAccum'] = time.time() - start_time
accumulator['count'] = count
depth = {'unipolar': 2 ** 30, 'polar': 2 ** 29}
# Equation 3-8 Energy Calculation.
accumulator['accumReg'] = accum
accumulator['rSense'] = config_row.rsense
pwrFSR = 3.2 / config_row.rsense
accumulator['Average Power (w)'] = accum / (depth['unipolar'] *
count) * pwrFSR
accumulators.append(accumulator)
accumulatorLog = pandas.DataFrame(accumulators)
print('Accumulator Power Measurements by Rail (W)')
print(accumulatorLog.sort_values(by='Average Power (w)', ascending=False))
return (time_log, accumulatorLog)
def query_all(config_file, gpio_config, ftdi_url='ftdi:///'):
"""Preform a one time query of GPIOs, powers, currents, voltages.
Args:
config_file: (string) Location of PAC Address/sense resistor .py file.
gpio_config: (string) Location of PAC Address Gpio rail mapping.
ftdi_url: (string) URL of ftdi device.
Returns:
log: Pandas DataFrame with log voltage log.
"""
# Init the bus.
i2c = I2cController()
i2c.configure(ftdi_url)
# Load the config.
config = load_config(config_file)
# Load GPIO Config.
gpio_config = load_gpio_config(gpio_config)
# Ping all the PACs.
skip_pacs = []
for pac_address, group in config.groupby('addr_pac'):
try:
device = i2c.get_port(int(pac_address, 16))
reset_accumulator(device)
time.sleep(0.001)
except I2cNackError:
# This happens on the DB PAC in Z states
print('Unable to reset PAC %s. Ignoring value' % pac_address)
skip_pacs.append(pac_address)
# Measure the GPIOs of all of the pacs
gpio_log = []
for pac_address, row in gpio_config.iterrows():
tmp = {'Rail': row.Rail}
if row.addr_pac in skip_pacs:
continue
# Parse from the dataframe.
device = i2c.get_port(int(row.addr_pac, 16))
if read_gpio(device):
tmp['GPIO'] = 'High'
else:
tmp['GPIO'] = 'Low'
gpio_log.append(tmp)
gpio_log = pandas.DataFrame(gpio_log)
print(gpio_log)
# This just logs the instantaneous measurements once.
log = []
for i, config_row in config.iterrows():
if config_row.addr_pac in skip_pacs:
continue
tmp = {}
tmp['ic_name'] = config_row.drv
tmp['ic_addr'] = config_row.addr.split(':')[0]
tmp['ch_num'] = config_row.addr.split(':')[1]
tmp['Rail'] = config_row.rail
tmp['bus_volt'] = config_row.nom
tmp['sense_r'] = config_row.rsense
i2c = I2cController()
i2c.configure(ftdi_url)
device = i2c.get_port(int(tmp['ic_addr'], 16))
# Setup any configuration changes here prior to refresh.
device.write_to(pac19xx.REFRESH_V, 0)
# Wait 1ms after REFRESH for registers to stablize.
time.sleep(0.001)
# Read voltage.
tmp['act_volt'] = read_voltage(device, tmp['ch_num'])
# Read power.
tmp['power'] = read_power(device, tmp['sense_r'], tmp['ch_num'])
# Read current.
tmp['current'] = read_current(device, tmp['sense_r'], tmp['ch_num'])
log.append(tmp)
log = pandas.DataFrame(log)
pandas.options.display.float_format = '{:,.3f}'.format
print(log)
return log