blob: edf6429e24b2c661f9074851a6c5f2853c0bc4bf [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
SCPI-over-TCP controller.
import inspect
import logging
import re
import signal
import socket
import struct
import time
from contextlib import contextmanager
class Error(Exception):
A SCPI error.
error_id: The numeric SCPI error code, if any.
error_msg: The SCPI error message, if any.
def __init__(self, msg, error_id=None, error_msg=None):
super(Error, self).__init__(msg)
self.error_id = error_id
self.error_msg = error_msg
class TimeoutError(Error):
def _TruncateForLogging(msg):
if len(msg) > MAX_LOG_LENGTH:
msg = msg[0:MAX_LOG_LENGTH] + '<truncated>'
return msg
def Timeout(secs):
def handler(signum, frame):
raise TimeoutError('Timeout')
if secs:
if signal.alarm(secs):
raise Error('Alarm was already set')
signal.signal(signal.SIGALRM, handler)
if secs:
signal.signal(signal.SIGALRM, lambda signum, frame: None)
class LANSCPI(object):
'''A SCPI-over-TCP controller.'''
def __init__(self, host, port=5025, timeout=6, retries=10):
Connects to a device using SCPI-over-TCP.
host: Host to connect to.
port: Port to connect to.
timeout: Timeout in seconds. (Uses the ALRM signal.)
retries: maximum attemptis to connect to the host.
self.timeout = timeout
self.logger = logging.getLogger('SCPI') = host
self.port = port
for times in range(1, retries + 1):
try:'Connecting to %s:%d [try %d/%d]...' % (
host, port, times, retries))
except Exception as e:
time.sleep(1)"Unable to connect to %s:%d: %s" % (
host, port, e))
raise Error('Failed to connect %s:%d after %d tries' % (
host, port, retries))
def _Connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
with Timeout(self.timeout):
self.logger.debug('] Connecting to %s:%d...' % (, self.port))
self.socket.connect((, self.port))
self.rfile = self.socket.makefile('rb', -1) # Default buffering
self.wfile = self.socket.makefile('wb', 0) # No buffering
self.logger.debug('Connected') = self.Query('*IDN?')
def Close(self):
def Reopen(self):
Closes and reopens the connection.
def Send(self, commands, wait=True):
Sends a command or series of commands.
commands: The commands to send. May be list, or a string if
just a single command.
wait: If True, issues an *OPC? command after the final
command to block until all commands have completed.
if type(commands) == str:
self.Send([commands], wait)
for command in commands:
if command[-1] == '?':
raise Error('Called Send with query %r' % command)
errors = []
error_id = None
error_msg = None
for i in range(len(commands)):
ret = self._ReadLine()
if ret != '+0,"No error"':
'Issuing command %r: %r' % (commands[i], ret))
if not error_id:
# We don't have an error ID for the exception yet;
# try to parse the SCPI error.
match = re.match(r'^([-+]?\d+),"(.+)"$', ret)
if match:
error_id = int(
error_msg =
if errors:
raise Error('; '.join(errors), error_id, error_msg)
if wait:
ret = self._ReadLine()
if int(ret) != 1:
raise Error('Expected 1 after *OPC? but got %r' % ret)
def Query(self, command, format=None):
Issues a query, returning the result.
command: The command to issue.
format: If present, a function that will be applied to the query
response to parse it. The formatter may be int(), float(), a
function from the "Formatters" section at the bottom of this
file, or any other function that accepts a single string
if '?' not in command:
raise Error('Called Query with non-query %r' % command)
line1 = self._ReadLine()
line2 = self._ReadLine()
# On success, line1 is the queried value and line2 is the status
# register. On failure, line1 is the status register and line2
# is the error string. We do this to make sure that we can
# detect an unknown header rather than just waiting forever.
if ',' in line2:
raise Error('Error issuing command %r: %r' % (command, line2))
# Success! Get SYST:ERR, which should be +0
line3 = self._ReadLine()
if line3 != '+0,"No error"':
raise Error('Error issuing command %r: %r' % (command, line3))
if format:
line1 = format(line1)
return line1
def Quote(self, string):
Quotes a string.
# TODO(jsalz): Use the real IEEE 488.2 string format.
return '"%s"' % string
def _ReadLine(self):
Reads a single line, timing out in self.timeout seconds.
with Timeout(self.timeout):
if not self.timeout:
self.logger.debug('[ (waiting)')
ch =
if ch == '#':
# Binary format, which is:
# 1. A pound sign
# 2. A base-10 representation of the number of characters in the
# base-10 representation of the payload length
# 3. The payload length, in base-10
# 4. The payload
# 5. A newline character
# E.g., "#17FOO BAR\n" (where 7 is the length of "FOO BAR" and
# 1 is the length of "7").
# Note that if any of this goes haywire, the connection will be
# basically unusable since there is no way to know where we
# are in the binary data.
length_length = int(
length = int(
ret =
ch =
if ch != '\n':
raise Error('Expected newline at end of binary data')
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug('[binary %r' % _TruncateForLogging(ret))
return ret
elif ch == '\n':
# Empty line
return ''
ret = ch + self.rfile.readline().rstrip('\n')
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug('[ %s' % _TruncateForLogging(ret))
return ret
def _WriteLine(self, command):
Writes a single line.
if '\n' in command:
raise Error('Newline in command: %r' % command)
self.logger.debug('] %s' % command)
print >>self.wfile, command
# Formatters.
FLOATS = lambda s: [float(f) for f in s.split(",")]
if len(bin) % 4:
raise Error('Binary float data contains %d bytes '
'(not a multiple of 4)' % len(bin))
return struct.unpack('>' + 'f' * (len(bin)/4), bin)
def BINARY_FLOATS_WITH_LENGTH(expected_length):
def formatter(bin):
ret = BINARY_FLOATS(bin)
if len(ret) == 1 and math.isnan(ret[0]):
raise Error('Unable to retrieve array')
if len(ret) != expected_length:
raise Error('Expected %d elements but got %d' % (
expected_length, len(ret)))
return ret
return formatter