blob: d29e8199036d5a527c1c5b47e5776d84c98ca0fb [file] [log] [blame]
# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Interface for SCPI Protocol.
Helper module to communicate with devices that uses SCPI protocol.
https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments
This will be used by RF Switch that was designed to connect WiFi AP and
WiFi Clients RF enclosures for interoperability testing.
"""
import logging
import socket
import sys
class ScpiException(Exception):
"""Exception for SCPI Errors."""
def __init__(self, msg=None, cause=None):
messages = []
if msg:
messages.append(msg)
if cause:
messages.append('Wrapping exception: %s: %s' % (
type(cause).__name__, str(cause)))
super(ScpiException, self).__init__(', '.join(messages))
class Scpi(object):
"""Controller for devices using SCPI protocol."""
SCPI_PORT = 5025
DEFAULT_READ_LEN = 4096
CMD_IDENTITY = '*IDN?'
CMD_RESET = '*RST'
CMD_STATUS = '*STB?'
CMD_ERROR_CHECK = 'SYST:ERR?'
def __init__(self, host, port=SCPI_PORT):
"""
Controller for devices using SCPI protocol.
@param host: hostname or IP address of device using SCPI protocol
@param port: Int SCPI port number (default 5025)
@raises SCPIException: on error connecting to device
"""
self.host = host
self.port = port
# Open a socket connection for communication with chassis.
try:
self.socket = socket.socket()
self.socket.connect((host, port))
except (socket.error, socket.timeout) as e:
logging.error('Error connecting to SCPI device.')
raise ScpiException(cause=e), None, sys.exc_info()[2]
def close(self):
"""Close the connection."""
if hasattr(self, 'socket'):
self.socket.close()
del self.socket
def write(self, data):
"""Send data to socket.
@param data: Data to send
@returns number of bytes sent
"""
return self.socket.send(data)
def read(self, buffer_size=DEFAULT_READ_LEN):
"""Safely read the query response.
@param buffer_size: Int max data to read at once (default 4096)
@returns String data read from the socket
"""
return str(self.socket.recv(buffer_size))
def query(self, data, buffer_size=DEFAULT_READ_LEN):
"""Send the query and get response.
@param data: data (Query) to send
@param buffer_size: Int max data to read at once (default 4096)
@returns String data read from the socket
"""
self.write(data)
return self.read(buffer_size)
def info(self):
"""Get Chassis Info.
@returns dictionary information of Chassis
"""
# Returns a comma separated text as below converted to dict.
# 'VTI Instruments Corporation,EX7200-S-11539,138454,3.13.8\n'
return dict(
zip(('Manufacturer', 'Model', 'Serial', 'Version'),
self.query('%s\n' % self.CMD_IDENTITY)
.strip().split(',', 3)))
def reset(self):
"""Reset the chassis.
@returns number of bytes sent
"""
return self.write('%s\n' % self.CMD_RESET)
def status(self):
"""Get status of relays.
@returns Int status of relays
"""
return int(self.query('%s\n' % self.CMD_STATUS))
def error_query(self):
"""Check for any error.
@returns tuple of error code and error message
"""
code, msg = self.query('%s\n' % self.CMD_ERROR_CHECK).split(', ')
return int(code), msg.strip().strip('"')