| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import re |
| import time |
| import xmlrpclib |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.server.cros.faft.firmware_test import FirmwareTest |
| |
| class firmware_ECThermal(FirmwareTest): |
| """ |
| Servo based EC thermal engine test. |
| """ |
| version = 1 |
| |
| # Delay for waiting fan to start or stop |
| FAN_DELAY = 5 |
| |
| # Delay for waiting device stressing to stablize |
| STRESS_DELAY = 30 |
| |
| # Delay for stressing device with fan off to check temperature increase |
| STRESS_DELAY_NO_FAN = 12 |
| |
| # Margin for comparing servo based and ectool based CPU temperature |
| TEMP_MISMATCH_MARGIN = 3 |
| |
| # Minimum increase of CPU temperature when stressing DUT |
| TEMP_STRESS_INCREASE = 3 |
| |
| # Pseudo INT_MAX. Used as infinity when comparing temperature readings |
| INT_MAX = 10000 |
| |
| # Sensor type ID of ignored sensors |
| SENSOR_TYPE_IGNORED = 255 |
| |
| # PID of DUT stressing processes |
| _stress_pid = list() |
| |
| def enable_auto_fan_control(self): |
| """Enable EC automatic fan speed control""" |
| # We use set_nocheck because servo reports current target |
| # RPM instead 'auto', and therefore servo.set always fails. |
| self.servo.set_nocheck('fan_target_rpm', 'auto') |
| |
| |
| def max_fan(self): |
| """Maximize fan speed""" |
| # We use set_nocheck because servo reports current target |
| # RPM instead 'max', and therefore servo.set always fails. |
| self.servo.set_nocheck('fan_target_rpm', 'max') |
| |
| |
| def turn_off_fan(self): |
| """Turn off fan""" |
| self.servo.set('fan_target_rpm', 'off') |
| |
| |
| def _get_setting_for_type(self, type_id): |
| """ |
| Retrieve thermal setting for a given type of sensor |
| |
| Args: |
| type_id: The ID of sensor type. |
| |
| Returns: |
| A list containing thresholds in the following order: |
| Warning |
| CPU off |
| All power off |
| Fan speed thresholds |
| """ |
| setting = list() |
| current_id = 0 |
| while True: |
| try: |
| lines = self.faft_client.system.run_shell_command_get_output( |
| 'ectool thermalget %d %d' % (type_id, current_id)) |
| except xmlrpclib.Fault: |
| break |
| pattern = re.compile('Threshold \d* [a-z ]* \d* is (\d*) K.') |
| for line in lines: |
| matched = pattern.match(line) |
| if matched is not None: |
| # Convert degree K to degree C |
| setting.append(int(matched.group(1)) - 273) |
| current_id = current_id + 1 |
| |
| if len(setting) == 0: |
| return None |
| return setting |
| |
| |
| def get_fan_steps(self): |
| """Retrieve fan step config from EC""" |
| num_steps = len(self._thermal_setting[0]) - 3 |
| self._fan_steps = list() |
| expected_pat = (["Lowest speed: ([0-9-]+) RPM"] + |
| ["\d+ K:\s+([0-9-]+) RPM"] * num_steps) |
| match = self.ec.send_command_get_output("thermalfan 0", expected_pat) |
| for m in match: |
| self._fan_steps.append(int(m[1])) |
| |
| # Get the actual value of each fan step |
| for i in xrange(num_steps + 1): |
| if self._fan_steps[i] == 0: |
| continue |
| self.servo.set_nocheck('fan_target_rpm', "%d" % self._fan_steps[i]) |
| self._fan_steps[i] = int(self.servo.get('fan_target_rpm')) |
| |
| logging.info("Actual fan steps: %s", self._fan_steps) |
| |
| |
| def get_thermal_setting(self): |
| """Retrieve thermal engine setting from EC""" |
| self._thermal_setting = list() |
| type_id = 0 |
| while True: |
| setting = self._get_setting_for_type(type_id) |
| if setting is None: |
| break |
| self._thermal_setting.append(setting) |
| type_id = type_id + 1 |
| logging.info("Number of tempearture sensor types: %d", type_id) |
| |
| # Get the number of temperature sensors |
| self._num_temp_sensor = 0 |
| while True: |
| try: |
| self.faft_client.system.run_shell_command('ectool temps %d' % |
| self._num_temp_sensor) |
| self._num_temp_sensor = self._num_temp_sensor + 1 |
| except xmlrpclib.Fault: |
| break |
| logging.info("Number of temperature sensor: %d", self._num_temp_sensor) |
| |
| |
| def initialize(self, host, cmdline_args): |
| super(firmware_ECThermal, self).initialize(host, cmdline_args) |
| self.ec.send_command("chan 0") |
| try: |
| self.faft_client.system.run_shell_command('stop temp_metrics') |
| except xmlrpclib.Fault: |
| self._has_temp_metrics = False |
| else: |
| logging.info('Stopped temp_metrics') |
| self._has_temp_metrics = True |
| if self.check_ec_capability(['thermal']): |
| self.get_thermal_setting() |
| self.get_fan_steps() |
| self.enable_auto_fan_control() |
| |
| |
| def cleanup(self): |
| if self.check_ec_capability(['thermal']): |
| self.enable_auto_fan_control() |
| if self._has_temp_metrics: |
| logging.info('Starting temp_metrics') |
| self.faft_client.system.run_shell_command('start temp_metrics') |
| self.ec.send_command("chan 0xffffffff") |
| super(firmware_ECThermal, self).cleanup() |
| |
| |
| def _find_cpu_sensor_id(self): |
| """ |
| This function find CPU temperature sensor using ectool. |
| |
| Returns: |
| Integer ID of CPU temperature sensor. |
| |
| Raises: |
| error.TestFail: Raised if we fail to find PECI temparture through |
| ectool. |
| """ |
| for temp_id in range(self._num_temp_sensor): |
| lines = self.faft_client.system.run_shell_command_get_output( |
| 'ectool tempsinfo %d' % temp_id) |
| for line in lines: |
| matched = re.match('Sensor name: (.*)', line) |
| if matched is not None and matched.group(1) == 'PECI': |
| return temp_id |
| raise error.TestFail('Cannot find CPU temperature sensor ID.') |
| |
| |
| def _get_temp_reading(self, sensor_id): |
| """ |
| Get temperature reading on a sensor through ectool |
| |
| Args: |
| sensor_id: Temperature sensor ID. |
| |
| Returns: |
| Temperature reading in degree C. |
| |
| Raises: |
| xmlrpclib.Fault: Raised when we fail to read temperature. |
| error.TestError: Raised if ectool doesn't behave as we expected. |
| """ |
| assert sensor_id < self._num_temp_sensor |
| pattern = re.compile('Reading temperature...(\d*)') |
| lines = self.faft_client.system.run_shell_command_get_output( |
| 'ectool temps %d' % sensor_id) |
| for line in lines: |
| matched = pattern.match(line) |
| if matched is not None: |
| return int(matched.group(1)) - 273 |
| # Should never reach here |
| raise error.TestError("Unexpected error occurred") |
| |
| |
| def check_temp_report(self): |
| """ |
| Checker of temperature reporting. |
| |
| This function reads CPU temperature from servo and ectool. If |
| the two readings mismatches by more than TEMP_MISMATCH_MARGIN,' |
| test fails. |
| |
| Raises: |
| error.TestFail: Raised when temperature reading mismatches by |
| more than TEMP_MISMATCH_MARGIN. |
| """ |
| cpu_temp_id = self._find_cpu_sensor_id() |
| logging.info("CPU temperature sensor ID is %d", cpu_temp_id) |
| ectool_cpu_temp = self._get_temp_reading(cpu_temp_id) |
| servo_cpu_temp = int(self.servo.get('cpu_temp')) |
| logging.info("CPU temperature from servo: %d C", servo_cpu_temp) |
| logging.info("CPU temperature from ectool: %d C", ectool_cpu_temp) |
| if abs(ectool_cpu_temp - servo_cpu_temp) > self.TEMP_MISMATCH_MARGIN: |
| raise error.TestFail( |
| 'CPU temperature readings from servo and ectool differ') |
| |
| |
| def _stress_dut(self, threads=4): |
| """ |
| Stress DUT system. |
| |
| By reading from /dev/urandom and writing to /dev/null, we can stress |
| DUT and cause CPU temperature to go up. We stress the system forever, |
| until _stop_stressing is called to kill the stress threads. This |
| function is non-blocking. |
| |
| Args: |
| threads: Number of threads (processes) when stressing forever. |
| |
| Returns: |
| A list of stress process IDs is returned. |
| """ |
| logging.info("Stressing DUT with %d threads...", threads) |
| self.faft_client.system.run_shell_command('pkill dd') |
| stress_cmd = 'dd if=/dev/urandom of=/dev/null bs=1M &' |
| # Grep for [d]d instead of dd to prevent getting the PID of grep |
| # itself. |
| pid_cmd = "ps -ef | grep '[d]d if=/dev/urandom' | awk '{print $2}'" |
| self._stress_pid = list() |
| for _ in xrange(threads): |
| self.faft_client.system.run_shell_command(stress_cmd) |
| lines = self.faft_client.system.run_shell_command_get_output( |
| pid_cmd) |
| for line in lines: |
| logging.info("PID is %s", line) |
| self._stress_pid.append(int(line.strip())) |
| return self._stress_pid |
| |
| |
| def _stop_stressing(self): |
| """Stop stressing DUT system""" |
| stop_cmd = 'kill -9 %d' |
| for pid in self._stress_pid: |
| self.faft_client.system.run_shell_command(stop_cmd % pid) |
| |
| |
| def check_fan_off(self): |
| """ |
| Checker of fan turned off. |
| |
| The function first delay FAN_DELAY seconds to ensure fan stops. |
| Then it reads fan speed and return False if fan speed is non-zero. |
| Then it stresses the system a bit and check if the temperature |
| goes up by more than TEMP_STRESS_INCREASE. |
| |
| Raises: |
| error.TestFail: Raised when temperature doesn't increase by more than |
| TEMP_STRESS_INCREASE. |
| """ |
| time.sleep(self.FAN_DELAY) |
| fan_speed = self.servo.get('fan_actual_rpm') |
| if int(fan_speed) != 0: |
| raise error.TestFail("Fan is not turned off.") |
| logging.info("EC reports fan turned off.") |
| cpu_temp_before = int(self.servo.get('cpu_temp')) |
| logging.info("CPU temperature before stressing is %d C", |
| cpu_temp_before) |
| self._stress_dut() |
| time.sleep(self.STRESS_DELAY_NO_FAN) |
| cpu_temp_after = int(self.servo.get('cpu_temp')) |
| self._stop_stressing() |
| logging.info("CPU temperature after stressing is %d C", |
| cpu_temp_after) |
| if cpu_temp_after - cpu_temp_before < self.TEMP_STRESS_INCREASE: |
| raise error.TestFail( |
| "CPU temperature did not go up by more than %d degrees" % |
| self.TEMP_STRESS_INCREASE) |
| |
| |
| def _get_temp_sensor_type(self, sensor_id): |
| """ |
| Get type of a given temperature sensor |
| |
| Args: |
| sensor_id: Temperature sensor ID. |
| |
| Returns: |
| Type ID of the temperature sensor. |
| |
| Raises: |
| error.TestError: Raised when ectool doesn't behave as we expected. |
| """ |
| assert sensor_id < self._num_temp_sensor |
| pattern = re.compile('Sensor type: (\d*)') |
| lines = self.faft_client.system.run_shell_command_get_output( |
| 'ectool tempsinfo %d' % sensor_id) |
| for line in lines: |
| matched = pattern.match(line) |
| if matched is not None: |
| return int(matched.group(1)) |
| # Should never reach here |
| raise error.TestError("Unexpected error occurred") |
| |
| |
| def _check_fan_speed_per_sensor(self, fan_speed, sensor_id): |
| """ |
| Check if the given fan_speed is reasonable from the view of certain |
| temperature sensor. There could be three types of outcome: |
| 1. Fan speed is higher than expected. This may be due to other |
| sensor sensing higher temperature and setting fan to higher |
| speed. |
| 2. Fan speed is as expected. |
| 3. Fan speed is lower than expected. In this case, EC is not |
| working as expected and an error should be raised. |
| |
| Args: |
| fan_speed: The current fan speed in RPM. |
| sensor_id: The ID of temperature sensor. |
| |
| Returns: |
| 0x00: Fan speed is higher than expected. |
| 0x01: Fan speed is as expected. |
| 0x10: Fan speed is lower than expected. |
| |
| Raises: |
| error.TestError: Raised when getting unexpected fan speed. |
| """ |
| sensor_type = self._get_temp_sensor_type(sensor_id) |
| if sensor_type == self.SENSOR_TYPE_IGNORED: |
| # This sensor should be ignored |
| return 0x00 |
| |
| if self._thermal_setting[sensor_type][-1] == -273: |
| # The fan stepping for this type of sensor is disabled |
| return 0x00 |
| |
| try: |
| idx = self._fan_steps.index(fan_speed) |
| except: |
| raise error.TestError("Unexpected fan speed: %d" % fan_speed) |
| |
| if idx == 0: |
| lower_bound = -self.INT_MAX |
| upper_bound = self._thermal_setting[sensor_type][3] |
| elif idx == len(self._fan_steps) - 1: |
| lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3 |
| upper_bound = self.INT_MAX |
| else: |
| lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3 |
| upper_bound = self._thermal_setting[sensor_type][idx + 3] |
| |
| temp_reading = self._get_temp_reading(sensor_id) |
| logging.info("Sensor %d = %d C", sensor_id, temp_reading) |
| logging.info(" Expecting %d - %d C", lower_bound, upper_bound) |
| if temp_reading > upper_bound: |
| return 0x00 |
| elif temp_reading < lower_bound: |
| return 0x10 |
| else: |
| return 0x01 |
| |
| |
| def check_auto_fan(self): |
| """ |
| Checker of thermal engine automatic fan speed control. |
| |
| Stress DUT system for a longer period to make temperature more stable |
| and check if fan speed is controlled as expected. |
| |
| Raises: |
| error.TestFail: Raised when fan speed is not as expected. |
| """ |
| self._stress_dut() |
| time.sleep(self.STRESS_DELAY) |
| fan_rpm = int(self.servo.get('fan_target_rpm')) |
| logging.info('Fan speed is %d RPM', fan_rpm) |
| try: |
| result = reduce(lambda x, y: x | y, |
| [self._check_fan_speed_per_sensor(fan_rpm, x) |
| for x in range(self._num_temp_sensor)]) |
| finally: |
| self._stop_stressing() |
| if result == 0x00: |
| raise error.TestFail("Fan speed higher than expected") |
| if result == 0x10: |
| raise error.TestFail("Fan speed lower than expected") |
| |
| |
| def run_once(self): |
| if not self.check_ec_capability(['thermal']): |
| raise error.TestNAError("Nothing needs to be tested on this device") |
| logging.info("Checking host temperature report.") |
| self.check_temp_report() |
| |
| self.turn_off_fan() |
| logging.info("Verifying fan is turned off.") |
| self.check_fan_off() |
| |
| self.enable_auto_fan_control() |
| logging.info("Verifying automatic fan control functionality.") |
| self.check_auto_fan() |