blob: 8bbc3692b6d99c5e5fb2b9b5346064c2afa96501 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import mock
import prologix_scpi_driver
import scpi
import unittest
import cellular_logging
import cellular_system_error
log = cellular_logging.SetupCellularLogging('scpi_test')
# TODO:(byronk):
# a hack for now. Should look this up in labconfig_data. crbug.com/225108
# TODO:(byronk):
# replace SystemError with a specific exception crbug.com/225127
scpi_instruments = [
# Agilent 8960 call box
{'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'},
# PXT is called 6621
{'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"}
]
class BasicPrologixTest(unittest.TestCase):
"""
Basic connection test
"""
def test_bad_ip_address(self):
"""
Connect to the wrong port and check for the right error message.
"""
instr = copy.copy(scpi_instruments[0])
instr['ip'] = '192.168.0.0' # str(int(instr['gpib_addr'])+1)
log.debug(instr)
with self.assertRaises(Exception) as ex:
self._get_idns_and_verify(instruments=[instr], opc=True)
self.assertIsInstance(ex.exception,
cellular_system_error.SocketTimeout)
def test_ConnectToPortSuccess(self):
""" Make a socket connection """
s = scpi_instruments[0]
prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5)
def test_ConnectToPortBadIP(self):
""" Make a socket connection """
with self.assertRaises(Exception) as ex:
prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1)
self.assertIsInstance(ex.exception,
cellular_system_error.SocketTimeout)
def test_BadGpibAddress(self):
"""
How does the code behave if we can't connect.
"""
instr = copy.copy(scpi_instruments[0])
instr['gpib_addr'] = 9 # str(int(instr['gpib_addr'])+1)
with self.assertRaises(Exception) as ex:
self._get_idns_and_verify(instruments=[instr], opc=True)
self.assertIsInstance(ex.exception,
cellular_system_error.InstrumentTimeout)
@mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery')
def test_NonClearReadBufferBeforeInit(self, patched_driver):
"""
Sometimes the Prologix box will have junk in it's read buffer
There is code to read the junk out until setting the ++addr works.
Test that here.
"""
s = scpi_instruments[0]
patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']]
driver = prologix_scpi_driver.PrologixScpiDriver(
hostname=s['ip'],
port=1234,
gpib_address=s['gpib_addr'],
read_timeout_seconds=2)
def test_Reset(self):
for instr in scpi_instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
read_timeout_seconds=20)
scpi_connection.Reset()
self.scpi_connection.Close()
def test_SimpleVerify(self):
"""
call SimpleVerify.
"""
# TODO(byronk): make sure this test only runs on the 8960. This
# command doesn't work on other boxes
for instr in scpi_instruments[:1]:
assert instr['name_part'] == '8960'
scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
read_timeout_seconds=2)
# Check to see if the power state is off.
# setting most instrument to off should be ok.
scpi_connection.SimpleVerify('call:ms:pow:targ', '+0')
self.scpi_connection.Close()
def test_FetchErrors(self):
"""
call FetchErrors
"""
for instr in scpi_instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
read_timeout_seconds=2)
scpi_connection._WaitAndFetchErrors()
self.scpi_connection.Close()
def test_BadScpiCommand(self):
"""
Send a bad command. We should fail gracefully.
"""
for instr in scpi_instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
read_timeout_seconds=1)
try:
scpi_connection.Query('*IDN')
except cellular_system_error.InstrumentTimeout:
assert \
"Should have raised a Instrument Timeout on a bad SCPI command"
def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self):
"""
Send a stanza, which uses the context manager
"""
for instr in scpi_instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=False,
read_timeout_seconds=5)
scpi_connection.SendStanza(['*WAI'])
scpi_connection.Close()
def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self):
"""
Send a stanza, which uses the context manager
"""
for instr in scpi_instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
read_timeout_seconds=5)
scpi_connection.SendStanza(['*WAI'])
scpi_connection.Close()
def test_GetIdnOpcTrue(self):
"""
Test with opc True. OPC= operation complete. Asking this
question *OPC? after commands blocks until the command finishes.
This prevents us from sending commands faster then then the
instrument can handle. True is usually the right setting.
"""
self._get_idns_and_verify(instruments=scpi_instruments, opc=True)
def test_GetIdnOpcFalse(self):
"""
Now with OPC off.
"""
self._get_idns_and_verify(instruments=scpi_instruments, opc=False)
def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2):
"""
Build the prologix object.
"""
ip_addr = instr['ip']
name_part = instr['name_part']
gpib_addr = instr['gpib_addr']
log.debug("trying %s at %s" % (name_part, ip_addr))
driver = prologix_scpi_driver.PrologixScpiDriver(
hostname=ip_addr,
port=1234,
gpib_address=gpib_addr,
read_timeout_seconds=read_timeout_seconds)
self.scpi_connection = scpi.Scpi(driver)
log.debug("setting opc to %s " % opc_on_stanza)
self.scpi_connection.opc_on_stanza = opc_on_stanza
return self.scpi_connection
def _get_idns_and_verify(self, instruments, opc=False):
"""
Get the idn string from all the instruments, and check that it
contains the desired substring. This is a quick sanity check only.
"""
for instr in instruments:
scpi_connection = self._open_prologix(instr, opc_on_stanza=opc)
response = scpi_connection.Query('*IDN?')
log.debug("looking for %s in response string: %s " %
(instr['name_part'], response))
assert instr['name_part'] in response
self.scpi_connection.Close()
if __name__ == '__main__':
unittest.main()