blob: 736708fa4be76040238abb112a63dd4a5abc024f [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import ConfigParser
import logging
import pexpect
import Queue
import threading
import time
import dli
# Format Appears as: [Date] [Time] - [Msg Level] - [Message]
LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
CONFIG_FILE = 'rpm_config.ini'
CONFIG = ConfigParser.ConfigParser()
CONFIG.read(CONFIG_FILE)
class RPMController(object):
"""
This abstract class implements RPM request queueing and
processes queued requests.
The actual interaction with the RPM device will be implemented
by the RPM specific subclasses.
It assumes that you know the RPM hostname and that the DUT is on
the specified RPM.
Implementation details:
This is an abstract class, subclasses must implement the methods
listed here. You must not instantiate this class but should
instantiate one of those leaf subclasses.
@var hostname: hostname for this rpm device.
@var is_running_lock: lock used to control access to _running.
@var request_queue: queue used to store requested outlet state changes.
@var queue_lock: lock used to control access to request_queue.
@var _running: boolean value to represent if this controller is currently
looping over queued requests.
"""
def __init__(self, rpm_hostname):
"""
RPMController Constructor.
To be called by subclasses.
@param rpm_hostname: hostname of rpm device to be controlled.
"""
dns_zone = CONFIG.get('CROS', 'dns_zone')
self.hostname = '.'.join([rpm_hostname, dns_zone])
self.request_queue = Queue.Queue()
self._running = False
self.is_running_lock = threading.Lock()
def _start_processing_requests(self):
"""
Check if there is a thread processing requests.
If not start one.
"""
with self.is_running_lock:
if not self._running:
self._running = True
self._running_thread = threading.Thread(target=self._run)
self._running_thread.start()
def _stop_processing_requests(self):
"""
Called if the request request_queue is empty.
Set running status to false.
"""
with self.is_running_lock:
logging.debug('Request queue is empty. RPM Controller for %s'
' is terminating.', self.hostname)
self._running = False
if not self.request_queue.empty():
# This can occur if an item was pushed into the queue after we
# exited the while-check and before the _stop_processing_requests
# call was made. Therefore we need to start processing again.
self._start_processing_requests()
def _run(self):
"""
Processes all queued up requests for this RPM Controller.
Callers should first request_queue up atleast one request and if this
RPM Controller is not running then call run.
Caller can either simply call run but then they will be blocked or
can instantiate a new thread to process all queued up requests.
For example:
threading.Thread(target=rpm_controller.run).start()
Requests are in the format of:
[dut_hostname, new_state, condition_var, result]
Run will set the result with the correct value.
"""
while not self.request_queue.empty():
request = self.request_queue.get()
result = self.set_power_state(request['dut'], request['new_state'])
if not result:
logging.error('Request to change %s to state %s failed.',
request['dut'], request['new_state'])
# Put result inside the result Queue to allow the caller to resume.
request['result_queue'].put(result)
self._stop_processing_requests()
def queue_request(self, dut_hostname, new_state):
"""
Queues up a requested state change for a DUT's outlet.
Requests are in the format of:
[dut_hostname, new_state, condition_var, result]
Run will set the result with the correct value.
@param dut_hostname: hostname of DUT whose outlet we want to change.
@param new_state: ON/OFF/CYCLE - state or action we want to perform on
the outlet.
"""
request = {}
request['dut'] = dut_hostname
request['new_state'] = new_state
# Reserve a spot for the result to be stored.
request['result_queue'] = Queue.Queue()
# Place in request_queue
self.request_queue.put(request)
self._start_processing_requests()
# Block until the request is processed.
result = request['result_queue'].get(block=True)
return result
def set_power_state(self, dut_hostname, new_state):
"""
Set the state of the dut's outlet on this RPM.
To be implemented by the subclasses.
@param dut_hostname: hostname of DUT whose outlet we want to change.
@param new_state: ON/OFF/CYCLE - state or action we want to perform on
the outlet.
@return: True if the attempt to change power state was successful,
False otherwise.
"""
raise NotImplementedError('Abstract class. Subclasses should implement '
'set_power_state().')
def type(self):
"""
Get the type of RPM device we are interacting with.
To be implemented by the subclasses.
@return: string representation of RPM device type.
"""
raise NotImplementedError('Abstract class. Subclasses should implement '
'type().')
class SentryRPMController(RPMController):
"""
This class implements power control for Sentry Switched CDU
http://www.servertech.com/products/switched-pdus/
Example usage:
rpm = SentrySwitchedCDU('chromeos-rack1-rpm1')
rpm.queue_request('chromeos-rack1-host1', 'ON')
@var _username: username used to access device.
@var _password: password used to access device.
@var _ssh_mock: mocked ssh interface used for testing.
"""
DEVICE_PROMPT = 'Switched CDU:'
SSH_LOGIN_CMD = 'ssh -l %s -o StrictHostKeyChecking=no ' \
'-o UserKnownHostsFile=/dev/null %s'
PASSWORD_PROMPT = 'Password:'
SET_STATE_CMD = '%s %s'
SUCCESS_MSG = 'Command successful'
LOGOUT_CMD = 'logout'
def __init__(self, hostname, ssh_mock=None):
super(SentryRPMController, self).__init__(hostname)
self._username = CONFIG.get('SENTRY', 'username')
self._password = CONFIG.get('SENTRY', 'password')
self._ssh_mock = ssh_mock
def set_power_state(self, dut_hostname, new_state):
logging.debug("Setting outlet for DUT: %s to state: %s",
dut_hostname, new_state)
result = True
cmd = SentryRPMController.SSH_LOGIN_CMD % (self._username,
self.hostname)
if not self._ssh_mock: # For testing purposes.
ssh = pexpect.spawn(cmd)
else:
ssh = self._ssh_mock
ssh.expect(SentryRPMController.PASSWORD_PROMPT, timeout=60)
ssh.sendline(self._password)
ssh.expect(SentryRPMController.DEVICE_PROMPT, timeout=60)
ssh.sendline(SentryRPMController.SET_STATE_CMD % (new_state,
dut_hostname))
try:
ssh.expect(SentryRPMController.SUCCESS_MSG, timeout=60)
except pexpect.TIMEOUT:
logging.error('Request to change outlet for DUT: %s to new '
'state %s timed out.', dut_hostname, new_state)
result = False
finally:
ssh.sendline(SentryRPMController.LOGOUT_CMD)
return result
def type(self):
return 'Sentry'
class WebPoweredRPMController(RPMController):
"""
This class implements RPMController for the Web Powered units
produced by Digital Loggers Inc.
@var _rpm: dli.powerswitch instance used to interact with RPM.
"""
CYCLE_SLEEP_TIME = 5
def __init__(self, hostname, powerswitch=None):
username = CONFIG.get('WEBPOWERED', 'username')
password = CONFIG.get('WEBPOWERED', 'password')
super(WebPoweredRPMController, self).__init__(hostname)
if not powerswitch:
self._rpm = dli.powerswitch(hostname=self.hostname, userid=username,
password=password)
else:
# Should only be used in unit_testing
self._rpm = powerswitch
def _get_outlet_value_and_state(self, dut_hostname):
"""
Look up the outlet and state for a given hostname on the RPM.
@param dut_hostname: hostname of DUT whose outlet we want to lookup.
@return [outlet, state]: the outlet number as well as its current state.
"""
status_list = self._rpm.statuslist()
for outlet, hostname, state in status_list:
if hostname == dut_hostname:
return outlet, state
return None
def set_power_state(self, dut_hostname, new_state):
outlet_and_state = self._get_outlet_value_and_state(dut_hostname)
if not outlet_and_state:
logging.error('DUT %s is not on rpm %s',
dut_hostname, self.hostname)
return False
outlet, state = outlet_and_state
expected_state = new_state
if new_state == 'CYCLE':
logging.debug('Beginning Power Cycle for DUT: %s',
dut_hostname)
self._rpm.off(outlet)
logging.debug('Outlet for DUT: %s set to OFF', dut_hostname)
# Pause for 5 seconds before restoring power.
time.sleep(WebPoweredRPMController.CYCLE_SLEEP_TIME)
self._rpm.on(outlet)
logging.debug('Outlet for DUT: %s set to ON', dut_hostname)
expected_state = 'ON'
if new_state == 'OFF':
self._rpm.off(outlet)
logging.debug('Outlet for DUT: %s set to OFF', dut_hostname)
if new_state == 'ON':
self._rpm.on(outlet)
logging.debug('Outlet for DUT: %s set to ON', dut_hostname)
# Lookup the final state of the outlet
return self._is_plug_state(dut_hostname, expected_state)
def _is_plug_state(self, dut_hostname, expected_state):
outlet, state = self._get_outlet_value_and_state(dut_hostname)
if expected_state not in state:
logging.error('Outlet for DUT: %s did not change to new state'
' %s', dut_hostname, expected_state)
return False
return True
def type(self):
return 'Webpowered'
def test_in_order_requests():
"""Simple integration testing."""
rpm = WebPoweredRPMController('chromeos-rack8e-rpm1')
rpm.queue_request('chromeos-rack8e-hostbs1', 'OFF')
rpm.queue_request('chromeos-rack8e-hostbs2', 'OFF')
rpm.queue_request('chromeos-rack8e-hostbs3', 'CYCLE')
def test_parrallel_webrequests():
"""Simple integration testing."""
rpm = WebPoweredRPMController('chromeos-rack8e-rpm1')
threading.Thread(target=rpm.queue_request,
args=('chromeos-rack8e-hostbs1', 'ON')).start()
threading.Thread(target=rpm.queue_request,
args=('chromeos-rack8e-hostbs2', 'ON')).start()
def test_parrallel_sshrequests():
"""Simple integration testing."""
rpm = SentryRPMController('chromeos-rack1-rpm1')
threading.Thread(target=rpm.queue_request,
args=('chromeos-rack1-hostbs1', 'OFF')).start()
threading.Thread(target=rpm.queue_request,
args=('chromeos-rack1-hostbs2', 'ON')).start()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format=LOGGING_FORMAT)
test_in_order_requests()
test_parrallel_webrequests()
test_parrallel_sshrequests()