| # Copyright 2021 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Python module for Abstract Instrument Library.""" |
| |
| import logging |
| import requests |
| import socket |
| |
| |
| class SocketInstrumentError(Exception): |
| """Abstract Instrument Error Class, via Socket and SCPI.""" |
| |
| def __init__(self, error, command=None): |
| """Init method for Socket Instrument Error. |
| |
| Args: |
| error: Exception error. |
| command: Additional information on command, |
| Type, Str. |
| """ |
| super(SocketInstrumentError, self).__init__(error) |
| self._error_code = error |
| self._error_message = self._error_code |
| if command is not None: |
| self._error_message = 'Command {} returned the error: {}.'.format( |
| repr(command), repr(self._error_message)) |
| |
| def __str__(self): |
| return self._error_message |
| |
| |
| class SocketInstrument(object): |
| """Abstract Instrument Class, via Socket and SCPI.""" |
| |
| def __init__(self, ip_addr, ip_port): |
| """Init method for Socket Instrument. |
| |
| Args: |
| ip_addr: IP Address. |
| Type, str. |
| ip_port: TCPIP Port. |
| Type, str. |
| """ |
| self._logger = logging.getLogger(__name__) |
| self._socket_timeout = 120 |
| self._socket_buffer_size = 1024 |
| |
| self._ip_addr = ip_addr |
| self._ip_port = ip_port |
| |
| self._escseq = '\n' |
| self._codefmt = 'utf-8' |
| |
| self._socket = None |
| |
| def _connect_socket(self): |
| """Init and Connect to socket.""" |
| try: |
| self._socket = socket.create_connection( |
| (self._ip_addr, self._ip_port), |
| timeout=self._socket_timeout) |
| |
| infmsg = 'Opened Socket connection to {}:{} with handle {}.'.format( |
| repr(self._ip_addr), repr(self._ip_port), |
| repr(self._socket)) |
| |
| except socket.timeout: |
| errmsg = 'Socket timeout while connecting to instrument.' |
| raise SocketInstrumentError(errmsg) |
| |
| except socket.error: |
| errmsg = 'Socket error while connecting to instrument.' |
| raise SocketInstrumentError(errmsg) |
| |
| def _send(self, cmd): |
| """Send command via Socket. |
| |
| Args: |
| cmd: Command to send, |
| Type, Str. |
| """ |
| if not self._socket: |
| self._connect_socket() |
| |
| cmd_es = cmd + self._escseq |
| |
| try: |
| self._socket.sendall(cmd_es.encode(self._codefmt)) |
| |
| except socket.timeout: |
| errmsg = ('Socket timeout while sending command {} ' |
| 'to instrument.').format(repr(cmd)) |
| raise SocketInstrumentError(errmsg) |
| |
| except socket.error: |
| errmsg = ('Socket error while sending command {} ' |
| 'to instrument.').format(repr(cmd)) |
| raise SocketInstrumentError(errmsg) |
| |
| except Exception as err: |
| errmsg = ('Error {} while sending command {} ' |
| 'to instrument.').format(repr(cmd), repr(err)) |
| raise SocketInstrumentError(errmsg) |
| |
| def _recv(self): |
| """Receive response via Socket. |
| |
| Returns: |
| resp: Response from Instrument via Socket, |
| Type, Str. |
| """ |
| if not self._socket: |
| self._connect_socket() |
| |
| resp = '' |
| |
| try: |
| while True: |
| resp_tmp = self._socket.recv(self._socket_buffer_size) |
| resp_tmp = resp_tmp.decode(self._codefmt) |
| resp += resp_tmp |
| if len(resp_tmp) < self._socket_buffer_size: |
| break |
| |
| except socket.timeout: |
| errmsg = 'Socket timeout while receiving response from instrument.' |
| raise SocketInstrumentError(errmsg) |
| |
| except socket.error: |
| errmsg = 'Socket error while receiving response from instrument.' |
| raise SocketInstrumentError(errmsg) |
| |
| except Exception as err: |
| errmsg = ('Error {} while receiving response ' |
| 'from instrument').format(repr(err)) |
| raise SocketInstrumentError(errmsg) |
| |
| resp = resp.rstrip(self._escseq) |
| |
| return resp |
| |
| def _close_socket(self): |
| """Close Socket Instrument.""" |
| if not self._socket: |
| return |
| |
| try: |
| self._socket.shutdown(socket.SHUT_RDWR) |
| self._socket.close() |
| self._socket = None |
| |
| except Exception as err: |
| errmsg = 'Error {} while closing instrument.'.format(repr(err)) |
| raise SocketInstrumentError(errmsg) |
| |
| def _query(self, cmd): |
| """query instrument via Socket. |
| |
| Args: |
| cmd: Command to send, |
| Type, Str. |
| |
| Returns: |
| resp: Response from Instrument via Socket, |
| Type, Str. |
| """ |
| self._send(cmd + ';*OPC?') |
| resp = self._recv() |
| return resp |
| |
| |
| class RequestInstrument(object): |
| """Abstract Instrument Class, via Request.""" |
| |
| def __init__(self, ip_addr): |
| """Init method for request instrument. |
| |
| Args: |
| ip_addr: IP Address. |
| Type, Str. |
| """ |
| self._request_timeout = 120 |
| self._request_protocol = 'http' |
| self._ip_addr = ip_addr |
| self._escseq = '\r\n' |
| |
| def _query(self, cmd): |
| """query instrument via request. |
| |
| Args: |
| cmd: Command to send, |
| Type, Str. |
| |
| Returns: |
| resp: Response from Instrument via request, |
| Type, Str. |
| """ |
| request_cmd = '{}://{}/{}'.format(self._request_protocol, |
| self._ip_addr, cmd) |
| resp_raw = requests.get(request_cmd, timeout=self._request_timeout) |
| |
| resp = resp_raw.text |
| for char_del in self._escseq: |
| resp = resp.replace(char_del, '') |
| |
| return resp |