blob: 005e66e1130b117ac53775ce971b12fc63d7eafe [file] [log] [blame]
#!/usr/bin/python2
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import cellular_system_error
import cellular_logging
import os
import select
import socket
import traceback
class PrologixScpiDriver:
"""Wrapper for a Prologix TCP<->GPIB bridge.
http://prologix.biz/gpib-ethernet-controller.html
http://prologix.biz/index.php?dispatch=attachments.getfile&attachment_id=1
Communication is over a plain TCP stream on port 1234. Commands to
the bridge are in-band, prefixed with ++.
Notable instance variables include:
self.auto: When 1, the bridge automatically addresses the target
in listen mode. When 0, we must issue a ++read after every
query. As of Aug '11, something between us and the Agilent 8960
is wrong such that running in auto=0 mode leaves us hanging if
we issue '*RST;*OPC?'
"""
all_open_connections = {}
def __init__(self, hostname, port=1234, gpib_address=14,
read_timeout_seconds=30, connect_timeout_seconds=5):
"""Constructs a wrapper for the Prologix TCP<->GPIB bridge :
Arguments:
hostname: hostname of prologix device
port: port number
gpib_address: initial GPIB device to connect to
read_timeout_seconds: the read time out for the socket to the
prologix box
connect_timeout_seconds: the read time out for the socket to the
prologix box
"""
logger_name = 'prologix'
s = 'IP:%s GPIB:%s: ' % (hostname, gpib_address)
formatter_string = '%(asctime)s %(filename)s %(lineno)d ' + s + \
'- %(message)s'
self.scpi_logger = cellular_logging.SetupCellularLogging(
logger_name, formatter_string)
self.connection_key = "%s:%s" % (hostname, port)
self.connection_data = {self.connection_key: traceback.format_stack()}
if self.connection_key in self.all_open_connections.keys():
raise cellular_system_error.BadState(
'IP network connection to '
'prologix is already in use. : %s ' % self.all_open_connections)
self.all_open_connections[self.connection_key] = self.connection_data
self.socket = connect_to_port(hostname, port, connect_timeout_seconds)
self.read_timeout_seconds = read_timeout_seconds
self.socket.setblocking(0)
self.SetAuto(1)
self._AddCarrigeReturnsToResponses()
self.SetGpibAddress(gpib_address)
self.scpi_logger.debug('set read_timeout_seconds: %s ' %
self.read_timeout_seconds)
def __del__(self):
self.Close()
def _AddCarrigeReturnsToResponses(self):
"""
Have the prologix box add a line feed to each response.
Some instruments may need this.
"""
pass
self.Send('++eot_enable 1')
self.Send('++eot_char 10')
def SetAuto(self, auto):
"""Controls Prologix read-after-write (aka 'auto') mode."""
# Must be an int so we can send it as an arg to ++auto.
self.auto = int(auto)
self.Send('++auto %d' % self.auto)
def Close(self):
"""Closes the socket."""
try:
self.scpi_logger.error('Closing prologix devices at : %s ' %
self.connection_key)
self.all_open_connections.pop(self.connection_key)
except KeyError:
self.scpi_logger.error('Closed %s more then once' %
self.connection_key)
try:
self.socket.close()
except AttributeError: # Maybe we close before we finish building.
pass
def SetGpibAddress(self, gpib_address):
max_tries = 10
while max_tries > 0:
max_tries -= 1
self.Send('++addr %s' % gpib_address)
read_back_value = self._DirectQuery('++addr')
try:
if int(read_back_value) == int(gpib_address):
break
except ValueError:
# If we read a string, don't raise, just try again.
pass
self.scpi_logger.error('Set gpib addr to: %s, read back: %s' %
(gpib_address, read_back_value))
self.scpi_logger.error('Setting the GPIB address failed. ' +
'Trying again...')
def Send(self, command):
self.scpi_logger.info('] %s', command)
try:
self.socket.send(command + '\n')
except Exception as e:
self.scpi_logger.error('sending SCPI command %s failed. ' %
command)
self.scpi_logger.exception(e)
raise SystemError('Sending SCPI command failed. '
'Did the instrument stopped talking?')
def Reset(self):
"""Sends a standard SCPI reset and waits for it to complete."""
# There is some misinteraction between the devices such that if we
# send *RST and *OPC? and then manually query with ++read,
# occasionally that ++read doesn't come back. We currently depend
# on self.Query to turn on Prologix auto mode to avoid this
self.Send('*RST')
self.Query('*OPC?')
def Read(self):
"""Read a response from the bridge."""
try:
ready = select.select([self.socket], [], [],
self.read_timeout_seconds)
except Exception as e:
self.scpi_logger.exception(e)
s = 'Read from the instrument failed. Timeout:%s' % \
self.read_timeout_seconds
self.scpi_logger.error(s)
raise SystemError(s)
if ready[0]:
response = self.socket.recv(4096)
response = response.rstrip()
self.scpi_logger.info('[ %s', response)
return response
else:
self.Close()
s = 'Connection to the prologix adapter worked.' \
'But there was not data to read from the instrument.' \
'Does that command return a result?' \
'Bad GPIB port number, or timeout too short?'
raise cellular_system_error.InstrumentTimeout(s)
def Query(self, command):
"""Send a GPIB command and return the response."""
#self.SetAuto(1) #maybe useful?
s = list(self.scpi_logger.findCaller())
s[0] = os.path.basename(s[0])
s = list(self.scpi_logger.findCaller())
s[0] = os.path.basename(s[0])
self.scpi_logger.debug('caller :' + str(s) + command)
self.Send(command)
if not self.auto:
self.Send('++read eoi')
output = self.Read()
#self.SetAuto(0) #maybe useful?
return output
def _DirectQuery(self, command):
"""Sends a query to the prologix (do not send ++read).
Returns: response of the query.
"""
self.Send(command)
return self.Read()
def connect_to_port(hostname, port, connect_timeout_seconds):
# Right out of the python documentation,
# http://docs.python.org/library/socket.html
for res in socket.getaddrinfo(
hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, _, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error as msg:
raise cellular_system_error.SocketTimeout(
'Failed to make a new socket object. ' + str(msg))
try:
s.settimeout(connect_timeout_seconds)
s.connect(sa)
except socket.error as msg:
try:
s.close()
except Exception:
pass # Try to close it, but it may not have been created.
temp_string_var = ' Could be bad IP address. Tried: %s : %s' % \
(hostname, port)
raise cellular_system_error.SocketTimeout(str(msg) +
temp_string_var)
return s