blob: df44b086e247c97cdf72176117086edf667b37e0 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for an RF switch built on an Elexol EtherIO24.
The EtherIO is documented at
http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php
This file is both a python module and a command line utility to speak
to the module
"""
import cellular_logging
import collections
import socket
import struct
import sys
log = cellular_logging.SetupCellularLogging('ether_io_rf_switch')
class Error(Exception):
pass
class EtherIo24(object):
"""Encapsulates an EtherIO24 UDP-GPIO bridge."""
def __init__(self, hostname, port=2424):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('', 0))
self.destination = (hostname, port)
self.socket.settimeout(3) # In seconds
def SendPayload(self, payload):
self.socket.sendto(payload, self.destination)
def SendOperation(self, opcode, list_bytes):
"""Sends the specified opcode with [list_bytes] as an argument."""
payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes)
self.SendPayload(payload)
return payload
def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None):
"""Sends opcode and bytes,
then reads to make sure command was executed."""
if read_opcode is None:
read_opcode = write_opcode.lower()
for _ in xrange(3):
write_sent = self.SendOperation(write_opcode, list_bytes)
self.SendOperation(read_opcode, list_bytes)
try:
response = self.AwaitResponse()
if response == write_sent:
return
else:
log.warning('Unexpected reply: sent %s, got %s',
write_sent.encode('hex_codec'),
response.encode('hex_codec'))
except socket.timeout:
log.warning('Timed out awaiting reply for %s', write_opcode)
continue
raise Error('Failed to execute %s' % write_sent.encode('hex_codec'))
def AwaitResponse(self):
(response, address) = self.socket.recvfrom(65536)
if (socket.gethostbyname(address[0]) !=
socket.gethostbyname(self.destination[0])):
log.warning('Unexpected reply source: %s (expected %s)',
address, self.destination)
return response
class RfSwitch(object):
"""An RF switch hooked to an Elexol EtherIO24."""
def __init__(self, ip):
self.io = EtherIo24(ip)
# Must run on pythons without 0bxxx notation. These are 1110,
# 1101, 1011, 0111
decode = [0xe, 0xd, 0xb, 0x7]
self.port_mapping = []
for upper in xrange(3):
for lower in xrange(4):
self.port_mapping.append(decode[upper] << 4 | decode[lower])
def SelectPort(self, n):
"""Connects port n to the RF generator."""
# Set all pins to output
# !A0: all pins output
self.io.SendCommandVerify('!A', [0])
self.io.SendCommandVerify('A', [self.port_mapping[n]])
def Query(self):
"""Returns (binary port status, selected port, port direction)."""
self.io.SendOperation('!a', [])
raw_direction = self.io.AwaitResponse()
direction = ord(raw_direction[2])
self.io.SendOperation('a', [])
status = ord(self.io.AwaitResponse()[1])
try:
port = self.port_mapping.index(status)
except ValueError:
port = None
return status, port, direction
def CommandLineUtility(arguments):
"""Command line utility to control a switch."""
def Select(switch, remaining_args):
switch.SelectPort(int(remaining_args.popleft()))
def Query(switch, unused_remaining_args):
(raw_status, port, direction) = switch.Query()
if direction != 0x00:
print 'Warning: Direction register is %x, should be 0x00' % \
direction
if port is None:
port_str = 'Invalid'
else:
port_str = str(port)
print 'Port %s (0x%x)' % (port_str, raw_status)
def Usage():
print 'usage: %s hostname {query|select portnumber}' % sys.argv[0]
exit(1)
try:
hostname = arguments.popleft()
operation = arguments.popleft()
switch = RfSwitch(hostname)
if operation == 'query':
Query(switch, arguments)
elif operation == 'select':
Select(switch, arguments)
else:
Usage()
except IndexError:
Usage()
if __name__ == '__main__':
CommandLineUtility(collections.deque(sys.argv[1:]))