blob: a0b438407e4b0c7c0ce683140fa643e5b01a9b84 [file] [log] [blame]
# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Interface to control RF Switch.
Helper module to control RF Switch. Common commands to control the relays
are made available as methods that can be called.
Please refer go/rf-switch for more info on the switch.
"""
import contextlib
import logging
from autotest_lib.server.cros.network.rf_switch import scpi
class RfSwitchException(Exception):
"""Exception for RfSwitch Errors."""
class RfSwitch(scpi.Scpi):
"""RF Switch Controller."""
_ALL_AP_RELAYS = 'k1_1:k1_24' # Each AP uses 6 relays
_ALL_CLIENT_RELAYS = 'k1_25:k1_48' # Each client uses 6 relays
_MIN_ENCLOSURE = 1 # Starting enclosure number
_MAX_ENCLOSURE = 4 # Last enclosure number
_ALL_ENCLOSURE = 0 # All the enclosures
_RELAYS_PER_ENCLOSURE = 6 # Number of relays in an enclosure
# Each AP enclosure uses 6 relays and a R relay to set attenuation
_AP_ATTENUATOR_RELAYS = {
1: ['k1_49', 'k1_50', 'k1_51', 'k1_52', 'k1_53', 'k1_54', 'R1_9'],
2: ['k1_55', 'k1_56', 'k1_57', 'k1_58', 'k1_59', 'k1_60', 'R1_10'],
3: ['k1_61', 'k1_62', 'k1_63', 'k1_64', 'k1_65', 'k1_66', 'R1_11'],
4: ['k1_67', 'k1_68', 'k1_69', 'k1_70', 'k1_71', 'k1_72', 'R1_12'],
}
# Shorter version
_AP_ATTENUATOR_RELAYS_SHORT = {
1: 'k1_49:k1_54,R1_9',
2: 'k1_55:k1_60,R1_10',
3: 'k1_61:k1_66,R1_11',
4: 'k1_67:k1_72,R1_12',
}
_CMD_CLOSE_RELAYS = 'ROUT:CLOS'
_CMD_CHECK_RELAYS_CLOSED = 'ROUT:CLOS?'
_CMD_OPEN_RELAYS = 'ROUT:OPEN'
_CMD_OPEN_ALL_RELAYS = 'ROUT:OPEN:ALL'
_CMD_SET_VERIFY = 'ROUT:CHAN:VER'
_CMD_GET_VERIFY = 'ROUT:CHAN:VER?'
_CMD_SET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL'
_CMD_GET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL?'
_CMD_GET_VERIFY_STATE = 'ROUT:CHAN:VER:POS:STAT?'
_CMD_CHECK_BUSY = 'ROUT:MOD:BUSY?'
_CMD_WAIT = 'ROUT:MOD:WAIT'
def __init__(self, host, port=scpi.Scpi.SCPI_PORT):
"""Controller for RF Switch.
@param host: Hostname or IP address of RF Switch
@param port: Int SCPI port number (default 5025)
"""
scpi.Scpi.__init__(self, host, port)
def send_cmd_check_error(self, cmd):
"""Send command to switch and check for any error.
@param cmd: string cmd to send to switch
"""
self.write(cmd)
code, error = self.error_query()
if code:
raise RfSwitchException('Error on command: "%s" - code: %s,'
' message: %s', cmd, code, error)
self.wait()
def close_relays(self, relays):
"""Close relays.
@param relays: relays to close (, to separate and : for range)
"""
self.send_cmd_check_error('%s (@%s)\n'
% (self._CMD_CLOSE_RELAYS, relays))
def relays_closed(self, relays):
"""Get open/closed status of relays.
@param relays: relays to check (, to separate and : for range)
@returns tuple of relay status, status is true if a relay is closed.
"""
status = self.query('%s (@%s)\n' % (self._CMD_CHECK_RELAYS_CLOSED,
relays)).strip().split(',')
return tuple(bool(int(x)) for x in status)
def open_relays(self, relays):
"""Open relays.
@param relays: string relays to close (, to separate and : for range)
"""
self.send_cmd_check_error('%s (@%s)\n' % (
self._CMD_OPEN_RELAYS, relays))
def open_all_relays(self):
"""Open all relays.
This will open all relays including the attenuator, which will set it
to Max. Please remember to set your attenuation to right level after
this call.
"""
self.send_cmd_check_error('%s\n' % self._CMD_OPEN_ALL_RELAYS)
def set_verify(self, relays, on):
"""Configure Close? to return indicator state instead of driven state.
@param relays: relays to verify (, to separate and : for range)
@param on: string state to verify (on(True)/off(False))
"""
self.send_cmd_check_error('%s %s,(@%s)\n' % (
self._CMD_SET_VERIFY, int(bool(on)), relays))
def get_verify(self, relays):
"""Get the verify mode of relays.
@param relays: relays to verify (, to separate and : for range)
@returns tuple of verify mode for relays
"""
status = self.query('%s (@%s)\n' % (
self._CMD_GET_VERIFY, relays)).strip().split(',')
return tuple(bool(int(x)) for x in status)
def set_verify_inverted(self, relays, inverted):
"""Set the polarity confidence of relay to be inverted.
@param relays: relays to set (, to separate and : for range)
@param inverted: Boolean True if INV, False for NORM
"""
self.send_cmd_check_error('%s %s,(@%s)\n' % (
self._CMD_SET_VERIFY_INVERTED,
'INV' if inverted else 'NORM', relays))
def get_verify_inverted(self, relays):
"""Get the confidence polarity. 1 is inverted, 0 is value as-is.
@param relays: relays to get (, to separate and : for range)
@returns tuple of status where 1 in inverted and 0 for value as-is
"""
status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_INVERTED,
relays)).strip().split(',')
return tuple(bool(int(x)) for x in status)
def get_verify_state(self, relays):
"""If verify set get driven state, else confidence state.
@param relays: relays to get (, to separate and : for range)
@returns tuple of verify status for given relays
"""
status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_STATE,
relays)).strip().split(',')
return tuple(bool(int(x)) for x in status)
@property
def busy(self):
"""Check relays are still settling.
@returns Boolean True if relays are settling, False if not.
"""
return bool(int(self.query('%s\n' % self._CMD_CHECK_BUSY).strip()))
def wait(self):
"""Wait for relays to debounce."""
self.write('%s\n' % self._CMD_WAIT)
def get_attenuation(self, ap_enclosure):
"""Get Attenuation for an AP enclosure.
Attenuation is set by turning on/off the relays. Each relay specifies
a bit in the binary value of attenuation. Find the relay status and
build the binary value, then find the decimal value from it.
@param ap_enclosure: Int 1/2/3/4 (AP enclosure index)
@returns attenuation value in int
@raises ValueError: on bad ap_enclosure value
"""
if (ap_enclosure < self._MIN_ENCLOSURE or
ap_enclosure > self._MAX_ENCLOSURE):
raise ValueError('Invalid AP enclosure number: %s.', ap_enclosure)
else:
a_status = self.relays_closed(
self._AP_ATTENUATOR_RELAYS_SHORT[ap_enclosure])
status = a_status[::-1] # reverse for Endian transform
logging.debug('attenuator status: %s', status)
# build the binary value
binary = ''.join(['0' if x else '1' for x in status])
return int(binary, 2)
def get_all_attenuation(self):
"""Get attenuation value for all AP enclosures.
@returns tuple of attenuation value for each enclosure
"""
attenuations = []
for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
attenuations.append(self.get_attenuation(x))
return tuple(attenuations)
def set_attenuation(self, ap_enclosure, attenuation):
"""Set attenuation for an AP enclosure.
@param ap_enclosure: Int 0,1,2,3,4 AP enclosure number. 0 for all
@param attenuation: Int Attenuation value to set
@raises ValueError: on bad ap_enclosure value
"""
if ap_enclosure == self._ALL_ENCLOSURE:
# set attenuation on all
for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
self.set_attenuation(x, attenuation)
elif (ap_enclosure < self._MIN_ENCLOSURE or
ap_enclosure > self._MAX_ENCLOSURE):
raise ValueError('Bad AP enclosure value: %s' % ap_enclosure)
else:
# convert attenuation decimal value to binary
bin_array = [int(x) for x in list('{0:07b}'.format(attenuation))]
# For endian
reverse_bits = self._AP_ATTENUATOR_RELAYS[ap_enclosure][:: -1]
# determine which bits should be set
relays_to_close = [
reverse_bits[i] for i, j in enumerate(bin_array) if not j
]
# open all relay for attenuator & then close the ones we need
relays = ','.join(self._AP_ATTENUATOR_RELAYS[ap_enclosure])
self.open_relays(relays)
logging.debug('Attenuator relays opened: %s', relays)
relays = ','.join(relays_to_close)
self.close_relays(relays)
logging.debug('Attenuator relays closed: %s', relays)
def get_ap_connections(self):
"""Get a list of AP to client connections.
@returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)
"""
# Get the closed status for relays connected to AP enclosures.
ap_connections = self.relays_closed(self._ALL_AP_RELAYS)
# Find out the connections
connections = []
for index, relay_num in enumerate(ap_connections):
# if the relay is closed, there is a connection
if relay_num:
connection = {
'AP': (index / self._RELAYS_PER_ENCLOSURE) + 1,
'Client': (index % self._RELAYS_PER_ENCLOSURE) + 1
}
connections.append(connection)
return tuple(connections)
def get_client_connections(self):
"""Get a list of AP to client connections.
@returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)
"""
# Get the closed status for relays connected to client enclosures.
c_connections = self.relays_closed(self._ALL_CLIENT_RELAYS)
# Find out the connections
connections = []
for index, relay_num in enumerate(c_connections):
# if the relay is closed, there is a connection
if relay_num:
connection = {
'AP': (index % self._RELAYS_PER_ENCLOSURE) + 1,
'Client': (index / self._RELAYS_PER_ENCLOSURE) + 1
}
connections.append(connection)
return tuple(connections)
def connect_ap_client(self, ap, client):
"""Connect AP and a Client enclosure.
@param ap: Int 1/2/3/4 AP enclosure index
@param client: Int 1/2/3/4 Client enclosure index
@raises ValueError: when ap / client value is not 1/2/3/4
"""
if (ap < self._MIN_ENCLOSURE or ap > self._MAX_ENCLOSURE):
raise ValueError(
'AP enclosure should be 1/2/3/4. Given: %s' % ap)
if (client < self._MIN_ENCLOSURE or
client > self._MAX_ENCLOSURE):
raise ValueError('Client enclosure should be 1/2/3/4. Given: %s' %
client)
# Relay index start from 0 so subtract 1 for index
relays = 'k1_%s,k1_%s' % (
((int(ap) - 1) * self._RELAYS_PER_ENCLOSURE) + int(client),
(((int(client) - 1) + self._MAX_ENCLOSURE) *
self._RELAYS_PER_ENCLOSURE) + int(ap))
logging.debug('Relays to close: %s', relays)
self.close_relays(relays)
@contextlib.contextmanager
def RfSwitchSession(host, port=scpi.Scpi.SCPI_PORT):
"""Start a RF Switch session and close it when done.
@param host: Hostname or IP address of RF Switch
@param port: Int SCPI port number (default 5025)
@yields: an instance of InteractiveSsh
"""
session = RfSwitch(host, port)
try:
yield session
finally:
session.close()