blob: e3f71cf86a049100fae6d701caf839ad415658f2 [file] [log] [blame]
# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Interface for SCPI Protocol.
Helper module to communicate with devices that uses SCPI protocol.
https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments
This will be used by RF Switch that was designed to connect WiFi AP and
WiFi Clients RF enclosures for interoperability testing.
"""
from __future__ import print_function
import logging
import six
import socket
import sys
class ScpiException(Exception):
"""Exception for SCPI Errors."""
def __init__(self, msg=None, cause=None):
messages = []
if msg:
messages.append(msg)
if cause:
messages.append('Wrapping exception: %s: %s' % (
type(cause).__name__, str(cause)))
super(ScpiException, self).__init__(', '.join(messages))
class Scpi(object):
"""Controller for devices using SCPI protocol."""
SCPI_PORT = 5025
DEFAULT_READ_LEN = 4096
CMD_IDENTITY = '*IDN?'
CMD_RESET = '*RST'
CMD_STATUS = '*STB?'
CMD_ERROR_CHECK = 'SYST:ERR?'
def __init__(self, host, port=SCPI_PORT):
"""
Controller for devices using SCPI protocol.
@param host: hostname or IP address of device using SCPI protocol
@param port: Int SCPI port number (default 5025)
@raises SCPIException: on error connecting to device
"""
self.host = host
self.port = port
# Open a socket connection for communication with chassis.
try:
self.socket = socket.socket()
self.socket.connect((host, port))
except (socket.error, socket.timeout) as e:
logging.error('Error connecting to SCPI device.')
six.reraise(ScpiException(cause=e), None, sys.exc_info()[2])
def close(self):
"""Close the connection."""
if hasattr(self, 'socket'):
self.socket.close()
del self.socket
def write(self, data):
"""Send data to socket.
@param data: Data to send
@returns number of bytes sent
"""
return self.socket.send(data)
def read(self, buffer_size=DEFAULT_READ_LEN):
"""Safely read the query response.
@param buffer_size: Int max data to read at once (default 4096)
@returns String data read from the socket
"""
return str(self.socket.recv(buffer_size))
def query(self, data, buffer_size=DEFAULT_READ_LEN):
"""Send the query and get response.
@param data: data (Query) to send
@param buffer_size: Int max data to read at once (default 4096)
@returns String data read from the socket
"""
self.write(data)
return self.read(buffer_size)
def info(self):
"""Get Chassis Info.
@returns dictionary information of Chassis
"""
# Returns a comma separated text as below converted to dict.
# 'VTI Instruments Corporation,EX7200-S-11539,138454,3.13.8\n'
return dict(
zip(('Manufacturer', 'Model', 'Serial', 'Version'),
self.query('%s\n' % self.CMD_IDENTITY)
.strip().split(',', 3)))
def reset(self):
"""Reset the chassis.
@returns number of bytes sent
"""
return self.write('%s\n' % self.CMD_RESET)
def status(self):
"""Get status of relays.
@returns Int status of relays
"""
return int(self.query('%s\n' % self.CMD_STATUS))
def error_query(self):
"""Check for any error.
@returns tuple of error code and error message
"""
code, msg = self.query('%s\n' % self.CMD_ERROR_CHECK).split(', ')
return int(code), msg.strip().strip('"')