blob: f6e3420820c2a16253dcebfc4716ae4399dcd2ba [file] [log] [blame] [edit]
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Dump I2C transactions from raw CSV record produced by Saleae Logic.
The input file format for I2C transaction data is a comma separated value
(CSV) file with a single header row. The columns of the file that are
used by dump_i2c are in the following format (data is optional for some
events.
<time>, <event>, [data]
Additional columns after the first three and the header row are ignored.
"""
import fileinput
import optparse
import os
import re
import sys
class I2CParseError(Exception):
"""I2C parsing error exception.
This exception is raised when the parser failes to understand the input
provided. It records the file name and line number for later formatting
of an error message that includes a message that is specific to the parse
error.
"""
def __init__(self, message):
"""Initialize a new I2CParseError exception."""
self.message = message
try:
self.file_name = fileinput.filename()
self.line_number = fileinput.filelineno()
except RuntimeError:
self.file_name = '<doctest>'
self.line_number = 0
def __str__(self):
"""Format the exception for user consumption."""
return '%s:%d: %s' % (self.file_name, self.line_number, self.message)
class Transition:
"""Structure describing entries in the state transition table."""
def __init__(self, current_state, event, next_state, action):
self.current_state = current_state
self.event = event
self.next_state = next_state
self.action = action
class I2C:
"""State machine class that accumulates and prints full I2C bus transactions.
Once a complete bus transaction is encountered it is printed. The output of
this class can be further processed by device specific scripts to further
understand the transaction.
This example shows the basic functionality of I2C.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1,Start Bit,')
>>> i2c.process('0.2,Write Address + ACK, 0x20')
>>> i2c.process('0.3,Data + ACK, 0x00')
>>> i2c.process('0.4,Stop Bit,')
0.10000000 Write 0x20 DATA 0x00
Here we see I2C filtering out a transaction based on the device address.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1,Start Bit,')
>>> i2c.process('0.2,Write Address + ACK, 0xff')
>>> i2c.process('0.3,Data + ACK, 0x00')
>>> i2c.process('0.4,Stop Bit,')
Here is an example of an invalid I2C transaction sequence, there can not be
two start bits in a row.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1,Start Bit,')
>>> i2c.process('0.1,Start Bit,')
Traceback (most recent call last):
...
I2CParseError: <doctest>:0: Unexpected event "Start Bit"
This is an example of I2C syncing to the beginning of the first full
transaction presented to it.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1,Stop Bit,')
>>> i2c.process('0.1,Start Bit,')
>>> i2c.state == i2c.STARTED
True
And a completely bogus value results in a ValueError when trying to convert
the time string to a float.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('this,is,not,valid')
Traceback (most recent call last):
...
ValueError: invalid literal for float(): this
Or a truncated line will throw an IndexError
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1')
Traceback (most recent call last):
...
IndexError: list index out of range
"""
SYNC = 0
IDLE = 1
STARTED = 2
READING = 3
WRITING = 4
NAK = 5
def StartBit(self, time, data):
"""Record start time of transaction."""
self.message += '%.8f ' % time
def WriteAddressNAK(self, time, data):
"""Record NAK'ed address transaction for writing."""
self.address = int(data, 16)
self.message += 'Write %s NAK' % data
def WriteAddressACK(self, time, data):
"""Record ACK'ed address transaction for writing."""
self.address = int(data, 16)
self.message += 'Write %s DATA ' % data
def ReadAddressNAK(self, time, data):
"""Record NAK'ed address transaction for reading."""
self.address = int(data, 16)
self.message += 'Read %s NAK' % data
def ReadAddressACK(self, time, data):
"""Record ACK'ed address transaction for reading."""
self.address = int(data, 16)
self.message += 'Read %s DATA ' % data
def AddData(self, time, data):
"""Record read or written data."""
self.message += '%s ' % data
def ClearMessage(self, time, data):
"""Clear accumulated transaction."""
self.message = ''
def PrintMessage(self, time, data):
"""Print and clear accumulated transaction."""
if self.address == self.match_address:
print self.message
self.message = ''
# This state transition table records the valid I2C bus transitions that we
# expect to see. Any state/action pair not defined in this table is assumed
# to be invalid and will result in an I2CParseError being raised.
#
# The entries in this table correspond to the current state, the event
# parsed, the state to transition to and the function to execute on that
# transition. The function is passed a CSV instance, the time of the event
# and a possibly empty data field.
state_table = [
# The initial section of the state transition table describes the
# synchronization process. For the I2C bus this means waiting for
# the first start or repeated start bit. We can also transition to
# the IDLE state when we see a stop bit because the next bit has to be
# a start bit. If it's not we'll raise a I2CParseError exception.
Transition(SYNC, 'Start Bit', STARTED, StartBit),
Transition(SYNC, 'Repeated Start Bit', STARTED, StartBit),
Transition(SYNC, 'Write Address + NAK', SYNC, None),
Transition(SYNC, 'Write Address + ACK', SYNC, None),
Transition(SYNC, 'Read Address + NAK', SYNC, None),
Transition(SYNC, 'Read Address + ACK', SYNC, None),
Transition(SYNC, 'Data + NAK', SYNC, None),
Transition(SYNC, 'Data + ACK', SYNC, None),
Transition(SYNC, 'Stop Bit', IDLE, None),
# After syncronization is complete the rest of the table describes the
# expected transitions.
Transition(IDLE, 'Start Bit', STARTED, StartBit),
Transition(STARTED, 'Stop Bit', IDLE, ClearMessage),
Transition(STARTED, 'Write Address + NAK', NAK, WriteAddressNAK),
Transition(STARTED, 'Write Address + ACK', WRITING, WriteAddressACK),
Transition(STARTED, 'Read Address + NAK', NAK, ReadAddressNAK),
Transition(STARTED, 'Read Address + ACK', READING, ReadAddressACK),
Transition(WRITING, 'Data + NAK', NAK, AddData),
Transition(WRITING, 'Data + ACK', WRITING, AddData),
Transition(READING, 'Data + NAK', NAK, AddData),
Transition(READING, 'Data + ACK', READING, AddData),
Transition(WRITING, 'Stop Bit', IDLE, PrintMessage),
Transition(WRITING, 'Repeated Start Bit', STARTED, PrintMessage),
Transition(NAK, 'Stop Bit', IDLE, PrintMessage)]
def __init__(self, match_address, timeout):
"""Initialize a new I2C instance.
The I2C instance will print all transactions with a particular I2C device
specified by it's address up until the timeout.
Args:
match_address: I2C device address to filter for
timeout: Maximum time to start recording new transactions
>>> i2c = I2C(0x20, 1)
>>> i2c.match_address == 0x20
True
>>> i2c.timeout == 1
True
>>> i2c.state == i2c.SYNC
True
"""
self.state = self.SYNC
self.address = 0x00
self.message = ''
self.match_address = match_address
self.timeout = timeout
def process(self, line):
"""Update I2C state machine from one line of the CSV file.
The CSV file is assumed to have the format generated by the Saleae Logic
desktop I2C recording tool.
These examples show how process effects the internal state of I2C.
>>> i2c = I2C(0x20, 1)
>>> i2c.process('0.1,Start Bit,')
>>> i2c.state == i2c.STARTED
True
>>> i2c.message == '0.10000000 '
True
>>> i2c.process('0.1,Stop Bit,')
>>> i2c.state == i2c.IDLE
True
>>> i2c.message == ''
True
"""
values = line.split(',')
time = float(values[0])
detail = ' '.join(values[1].split())
if len(values) > 2:
data = ' '.join(values[2].split())
else:
data = ''
# Once the timeout value has been reached in the input trace we ignore all
# future events once we've returned to the IDLE state. We return to the
# IDLE state at the next "Stop Bit" and stay there.
if time > self.timeout and self.state == self.IDLE:
return
# Search the transition table for a matching state/action pair.
for transition in self.state_table:
if (transition.current_state == self.state and
transition.event == detail):
if transition.action:
transition.action(self, time, data)
self.state = transition.next_state
break
else:
raise I2CParseError('Unexpected event "%s"' % detail)
def main():
parser = optparse.OptionParser(usage = 'usage: %prog [filename] [options]\n')
parser.add_option('-a', '--address', default=0x20,
type='int',
help='I2C device address to process',
action='store',
dest='address')
parser.add_option('-t', '--timeout', default=100,
type='float',
help='All transactions before timeout are shown',
action='store',
dest='timeout')
options, arguments = parser.parse_args()
input = fileinput.input(arguments)
i2c = I2C(options.address, options.timeout)
for line in input:
# The first line of the file is the header row.
if not fileinput.isfirstline():
try:
i2c.process(line)
except (I2CParseError, ValueError, IndexError) as error:
print error
return
def Test():
"""Run any built-in tests."""
import doctest
assert doctest.testmod().failed == 0
if __name__ == '__main__':
# If first argument is --test, run testing code.
if sys.argv[1:2] == ['--test']:
Test()
else:
main()