blob: 3ecd19f5867c10974a3aaefff17b6d3fdcf74dea [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import re
import time
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
def has_ectool():
"""Determine if ectool shell command is present.
Returns:
boolean true if avail, false otherwise.
"""
cmd = 'which ectool'
return (utils.system(cmd, ignore_status=True) == 0)
class EC_Common(object):
"""Class for EC common.
This incredibly brief base class is intended to encapsulate common elements
across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment
that includes only the use of ectool.
"""
def __init__(self, target='cros_ec'):
"""Constructor.
@param target: target name of ec to communicate with.
"""
if not has_ectool():
ec_info = utils.system_output("mosys ec info",
ignore_status=True)
logging.warning("Ectool absent on this platform ( %s )",
ec_info)
raise error.TestNAError("Platform doesn't support ectool")
self._target = target
def ec_command(self, cmd, ignore_status=False):
"""Executes ec command and returns results.
@param cmd: string of command to execute.
@param ignore_status: (OPTIONAL) ignore exit status of system call
@returns: string of results from ec command.
"""
full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
result = utils.system_output(full_cmd, ignore_status=ignore_status)
logging.debug('Command: %s', full_cmd)
logging.debug('Result: %s', result)
return result
class EC(EC_Common):
"""Class for CrOS embedded controller (EC)."""
HELLO_RE = "EC says hello"
GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
SET_FANSPEED_RE = "Fan target RPM set."
TEMP_SENSOR_RE = "Reading temperature...([0-9]*)"
TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
# For battery, check we can see a non-zero capacity value.
BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
def hello(self):
"""Test EC hello command.
@returns True if success False otherwise.
"""
response = self.ec_command('hello')
return (re.search(self.HELLO_RE, response) is not None)
def auto_fan_ctrl(self):
"""Turns auto fan ctrl on.
@returns True if success False otherwise.
"""
response = self.ec_command('autofanctrl')
logging.info('Turned on auto fan control.')
return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
def get_fanspeed(self):
"""Gets fanspeed.
@raises error.TestError if regexp fails to match.
@returns integer of fan speed RPM.
"""
response = self.ec_command('pwmgetfanrpm')
match = re.search(self.GET_FANSPEED_RE, response)
if not match:
raise error.TestError('Unable to read fan speed')
rpm = int(match.group(1))
logging.info('Fan speed: %d', rpm)
return rpm
def set_fanspeed(self, rpm):
"""Sets fan speed.
@param rpm: integer of fan speed RPM to set
@returns True if success False otherwise.
"""
response = self.ec_command('pwmsetfanrpm %d' % rpm)
logging.info('Set fan speed: %d', rpm)
return (re.search(self.SET_FANSPEED_RE, response) is not None)
def get_temperature(self, idx):
"""Gets temperature from idx sensor.
@param idx: integer of temp sensor to read.
@raises error.TestError if fails to read sensor.
@returns integer of temperature reading in degrees Kelvin.
"""
response = self.ec_command('temps %d' % idx)
match = re.search(self.TEMP_SENSOR_RE, response)
if not match:
raise error.TestError('Unable to read temperature sensor %d' % idx)
return int(match.group(1))
def get_battery(self):
"""Get battery presence (design capacity found).
@returns True if success False otherwise.
"""
response = self.ec_command('battery')
return (re.search(self.BATTERY_RE, response) is not None)
def get_lightbar(self):
"""Test lightbar.
@returns True if success False otherwise.
"""
self.ec_command('lightbar on')
self.ec_command('lightbar init')
self.ec_command('lightbar 4 255 255 255')
response = self.ec_command('lightbar')
self.ec_command('lightbar off')
return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
class EC_USBPD_Port(EC_Common):
"""Class for CrOS embedded controller for USB-PD Port.
Public Methods:
is_dfp: Determine if data role is Downstream Facing Port (DFP).
is_amode_supported: Check if alternate mode is supported by port.
is_amode_entered: Check if alternate mode is entered.
set_amode: Set an alternate mode.
Private attributes:
_port: integer of USB type-C port id.
_port_info: holds usbpd protocol info.
_amodes: holds alternate mode info.
Private methods:
_invalidate_port_data: Remove port data to force re-eval.
_get_port_info: Get USB-PD port info.
_get_amodes: parse and return port's svid info.
"""
def __init__(self, port):
"""Constructor.
@param port: integer of USB type-C port id.
"""
self._port = port
# TODO(crosbug.com/p/38133) target= only works for samus
super(EC_USBPD_Port, self).__init__(target='cros_pd')
# Interrogate port at instantiation. Use invalidate to force re-eval.
self._port_info = self._get_port_info()
self._amodes = self._get_amodes()
def _invalidate_port_data(self):
"""Remove port data to force re-eval."""
self._port_info = None
self._amodes = None
def _get_port_info(self):
"""Get USB-PD port info.
ectool command usbpd provides the following information about the port:
- Enabled/Disabled
- Power & Data Role
- Polarity
- Protocol State
At time of authoring it looks like:
Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
@raises error.TestError if ...
port info not parseable.
@returns dictionary for <port> with keyval pairs:
enabled: True | False | None
power_role: sink | source | None
data_role: UFP | DFP | None
is_reversed: True | False | None
state: various strings | None
"""
PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
'Polarity:CC(\d+)\s+State:(\w+)'
match = re.search(PORT_INFO_RE,
self.ec_command("usbpd %s" % (self._port)))
if not match or int(match.group(1)) != self._port:
error.TestError('Unable to determine port %d info' % self._port)
pinfo = dict(enabled=None, power_role=None, data_role=None,
is_reversed=None, state=None)
pinfo['enabled'] = match.group(2) == 'enabled'
pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
pinfo['data_role'] = match.group(4)
pinfo['is_reversed'] = True if match.group(5) == '2' else False
pinfo['state'] = match.group(6)
logging.debug('port_info = %s', pinfo)
return pinfo
def _get_amodes(self):
"""Parse alternate modes from pdgetmode.
Looks like ...
*SVID:0xff01 *0x00000485 0x00000000 ...
SVID:0x18d1 0x00000001 0x00000000 ...
@returns dictionary of format:
<svid>: {active: True|False, configs: <config_list>, opos:<opos>}
where:
<svid> : USB-IF Standard or vendor id as
hex string (i.e. 0xff01)
<config_list> : list of uint32_t configs
<opos> : integer of active object position.
Note, this is the config list index + 1
"""
SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
svids = dict()
cmd = 'pdgetmode %d' % self._port
for line in self.ec_command(cmd, ignore_status=True).split('\n'):
if line.strip() == '':
continue
logging.debug('pdgetmode line: %s', line)
match = re.search(SVID_RE, line)
if not match:
logging.warning("Unable to parse SVID line %s", line)
continue
active = match.group(1) == '*'
svid = match.group(2)
configs_str = match.group(3)
configs = list()
opos = None
for i,config in enumerate(configs_str.split(), 1):
if config.startswith('*'):
opos = i
config = config[1:]
config = int(config, 16)
# ignore unpopulated configs
if config == 0:
continue
configs.append(config)
svids[svid] = dict(active=active, configs=configs, opos=opos)
logging.debug("Port %d svids = %s", self._port, svids)
return svids
def is_dfp(self):
"""Determine if data role is Downstream Facing Port (DFP).
@returns True if DFP False otherwise.
"""
if self._port_info is None:
self._port_info = self._get_port_info()
return self._port_info['data_role'] == 'DFP'
def is_amode_supported(self, svid):
"""Check if alternate mode is supported by port partner.
@param svid: alternate mode SVID hexstring (i.e. 0xff01)
"""
if self._amodes is None:
self._amodes = self._get_amodes()
if svid in self._amodes.keys():
return True
return False
def is_amode_entered(self, svid, opos):
"""Check if alternate mode is entered.
@param svid: alternate mode SVID hexstring (i.e. 0xff01).
@param opos: object position of config to act on.
@returns True if entered False otherwise
"""
if self._amodes is None:
self._amodes = self._get_amodes()
if not self.is_amode_supported(svid):
return False
if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
return True
return False
def set_amode(self, svid, opos, enter, delay_secs=2):
"""Set alternate mode.
@param svid: alternate mode SVID hexstring (i.e. 0xff01).
@param opos: object position of config to act on.
@param enter: Boolean of whether to enter mode.
@raises error.TestError if ...
mode not supported.
opos is > number of configs.
@returns True if successful False otherwise
"""
if self._amodes is None:
self._amodes = self._get_amodes()
if svid not in self._amodes.keys():
raise error.TestError("SVID %s not supported", svid)
if opos > len(self._amodes[svid]['configs']):
raise error.TestError("opos > available configs")
cmd = "pdsetmode %d %s %d %d" % (self._port, svid, opos,
1 if enter else 0)
self.ec_command(cmd, ignore_status=True)
self._invalidate_port_data()
# allow some time for mode entry/exit
time.sleep(delay_secs)
return self.is_amode_entered(svid, opos) == enter
class EC_USBPD(EC_Common):
"""Class for CrOS embedded controller for USB-PD.
Public attributes:
ports: list EC_USBPD_Port instances
Public Methods:
get_num_ports: get number of USB-PD ports device has.
Private attributes:
_num_ports: integer number of USB-PD ports device has.
"""
def __init__(self, num_ports=None):
"""Constructor.
@param num_ports: total number of USB-PD ports on device. This is an
override. If left 'None' will try to determine.
"""
self._num_ports = num_ports
self.ports = list()
# TODO(crosbug.com/p/38133) target= only works for samus
super(EC_USBPD, self).__init__(target='cros_pd')
if (self.get_num_ports() == 0):
raise error.TestNAError("Device has no USB-PD ports")
for i in xrange(self._num_ports):
self.ports.append(EC_USBPD_Port(i))
def get_num_ports(self):
"""Determine the number of ports for device.
Uses ectool's usbpdpower command which in turn makes host command call
to EC_CMD_USB_PD_PORTS to determine the number of ports.
TODO(tbroch) May want to consider adding separate ectool command to
surface the number of ports directly instead of via usbpdpower
@returns number of ports.
"""
if (self._num_ports is not None):
return self._num_ports
self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
return self._num_ports