blob: 5497a78faf2293e2ec1b6e7532512e26d3ab93d1 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION :
#
# Basic stress (CPU, memory, ...) factory test.
import datetime
import gobject
import gtk
import logging
import re
import thread
import time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory
from autotest_lib.client.cros.factory import ui
from autotest_lib.client.cros.factory.event_log import EventLog
_MESSAGE_PROMPT = 'Basic Stress Test'
_MESSAGE_STARTING = '... Starting Test ...'
_MESSAGE_TIMER = 'Started / 已執行時間: '
_MESSAGE_COUNTDOWN = 'ETA / 剩餘時間: '
_MESSAGE_LOADAVG = 'System Load: '
_MESSAGE_FAILED_LOADAVG = 'Failed to retrieve system load.'
_SYSTEM_MONITOR_LOG_FORMAT = 'SYSTEM MONITOR [%s] %s: %d'
class ECControl(object):
GET_FANSPEED_RE = 'Current fan RPM: ([0-9]*)'
TEMP_SENSOR_RE = 'Reading temperature...([0-9]*)'
def ec_command(self, cmd):
full_cmd = 'ectool %s' % cmd
result = utils.system_output(full_cmd)
logging.debug('Command: %s', full_cmd)
logging.debug('Result: %s', result)
return result
def get_fanspeed(self):
try:
response = self.ec_command('pwmgetfanrpm')
return int(re.findall(self.GET_FANSPEED_RE, response)[0])
except Exception:
logging.warn('Unable to read fan speed.')
return -1
def get_temperature(self, idx):
try:
response = self.ec_command('temps %d' % idx)
return int(re.findall(self.TEMP_SENSOR_RE, response)[0])
except Exception:
logging.warn('Unable to read temperature sensor %d.', idx)
return -1
class factory_StressTest(test.test):
version = 1
def thread_SAT(self, seconds):
try:
result = self.job.run_test('hardware_SAT',
drop_caches=True,
seconds=seconds)
if not result:
raise error.TestError('Failed running hardware_SAT')
finally:
self._complete = True
def timer_event(self, timer_label, countdown_label, load_label):
if self._complete:
with ui.gtk_lock:
gtk.main_quit()
return True
now = time.time()
if now <= self._start_time:
# Still in leading time.
load_label.set_text('%s %d' % (_MESSAGE_STARTING,
self._start_time - now))
return True
timer = now - self._start_time
countdown = self._end_time - now
timer_label.set_text('%d:%02d' % (timer / 60, timer % 60))
sign=' '
if countdown < 0:
countdown_label.modify_fg(gtk.STATE_NORMAL, ui.RED)
countdown = abs(countdown)
sign = '-'
countdown_label.set_text(
'%s%d:%02d' % (sign, countdown / 60, countdown % 60))
try:
with open('/proc/loadavg', 'r') as f:
load_label.set_text(_MESSAGE_LOADAVG + f.read())
except:
load_label.set_text(_MESSAGE_FAILED_LOADAVG)
load_label.modify_fg(gtk.STATE_NORMAL, ui.RED)
return True
def log_system_status(self, ectool, num_temp_sensor):
fan_speed = ectool.get_fanspeed()
temperatures = [ectool.get_temperature(i)
for i in xrange(num_temp_sensor)]
self._event_log.Log('sensor_measurement',
fan_speed=fan_speed,
temperatures=temperatures)
factory.log(_SYSTEM_MONITOR_LOG_FORMAT % (
datetime.datetime.now().isoformat(),
'Fan RPM', fan_speed))
for i in xrange(num_temp_sensor):
factory.log(_SYSTEM_MONITOR_LOG_FORMAT % (
datetime.datetime.now().isoformat(),
'Temp%d' % i,
temperatures[i]))
def monitor_event(self, ectool, num_temp_sensor):
self.log_system_status(ectool, num_temp_sensor)
return True
def run_once(self,
sat_seconds=60,
monitor_interval=None,
num_temp_sensor=1):
factory.log('%s run_once' % self.__class__)
self._complete = False
# Add 3 seconds leading time (for autotest / process to start).
self._start_time = time.time() + 3
self._end_time = self._start_time + sat_seconds + 1
gtk.gdk.threads_init()
vbox = gtk.VBox()
timer_widget, timer_label = ui.make_countdown_widget(
prompt=_MESSAGE_TIMER, value='', fg=ui.WHITE)
countdown_widget, countdown_label = ui.make_countdown_widget(
prompt=_MESSAGE_COUNTDOWN, value='')
load_label = ui.make_label(_MESSAGE_STARTING, fg=ui.BLUE)
gobject.timeout_add(1000, self.timer_event, timer_label,
countdown_label, load_label)
if monitor_interval:
ectool = ECControl()
self._event_log = EventLog.ForAutoTest()
self.log_system_status(ectool, num_temp_sensor)
gobject.timeout_add(1000 * monitor_interval,
self.monitor_event,
ectool,
num_temp_sensor)
vbox.add(ui.make_label(_MESSAGE_PROMPT, font=ui.LABEL_LARGE_FONT))
vbox.pack_start(timer_widget, padding=10)
vbox.pack_start(load_label, padding=20)
vbox.pack_start(countdown_widget, padding=10)
thread.start_new_thread(self.thread_SAT, (sat_seconds, ))
ui.run_test_widget(self.job, vbox)
factory.log('%s run_once finished' % self.__class__)