| #!/usr/bin/env python |
| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Dump I2C transactions from raw CSV record produced by Saleae Logic. |
| |
| The input file format for I2C transaction data is a comma separated value |
| (CSV) file with a single header row. The columns of the file that are |
| used by dump_i2c are in the following format (data is optional for some |
| events. |
| |
| <time>, <event>, [data] |
| |
| Additional columns after the first three and the header row are ignored. |
| """ |
| |
| import fileinput |
| import optparse |
| import os |
| import re |
| import sys |
| |
| class I2CParseError(Exception): |
| """I2C parsing error exception. |
| |
| This exception is raised when the parser failes to understand the input |
| provided. It records the file name and line number for later formatting |
| of an error message that includes a message that is specific to the parse |
| error. |
| """ |
| def __init__(self, message): |
| """Initialize a new I2CParseError exception.""" |
| self.message = message |
| |
| try: |
| self.file_name = fileinput.filename() |
| self.line_number = fileinput.filelineno() |
| except RuntimeError: |
| self.file_name = '<doctest>' |
| self.line_number = 0 |
| |
| def __str__(self): |
| """Format the exception for user consumption.""" |
| return '%s:%d: %s' % (self.file_name, self.line_number, self.message) |
| |
| |
| class Transition: |
| """Structure describing entries in the state transition table.""" |
| def __init__(self, current_state, event, next_state, action): |
| self.current_state = current_state |
| self.event = event |
| self.next_state = next_state |
| self.action = action |
| |
| |
| class I2C: |
| """State machine class that accumulates and prints full I2C bus transactions. |
| |
| Once a complete bus transaction is encountered it is printed. The output of |
| this class can be further processed by device specific scripts to further |
| understand the transaction. |
| |
| This example shows the basic functionality of I2C. |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('0.1,Start Bit,') |
| >>> i2c.process('0.2,Write Address + ACK, 0x20') |
| >>> i2c.process('0.3,Data + ACK, 0x00') |
| >>> i2c.process('0.4,Stop Bit,') |
| 0.10000000 Write 0x20 DATA 0x00 |
| |
| Here we see I2C filtering out a transaction based on the device address. |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('0.1,Start Bit,') |
| >>> i2c.process('0.2,Write Address + ACK, 0xff') |
| >>> i2c.process('0.3,Data + ACK, 0x00') |
| >>> i2c.process('0.4,Stop Bit,') |
| |
| Here is an example of an invalid I2C transaction sequence, there can not be |
| two start bits in a row. |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('0.1,Start Bit,') |
| >>> i2c.process('0.1,Start Bit,') |
| Traceback (most recent call last): |
| ... |
| I2CParseError: <doctest>:0: Unexpected event "Start Bit" |
| |
| This is an example of I2C syncing to the beginning of the first full |
| transaction presented to it. |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('0.1,Stop Bit,') |
| >>> i2c.process('0.1,Start Bit,') |
| >>> i2c.state == i2c.STARTED |
| True |
| |
| And a completely bogus value results in a ValueError when trying to convert |
| the time string to a float. |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('this,is,not,valid') |
| Traceback (most recent call last): |
| ... |
| ValueError: invalid literal for float(): this |
| |
| Or a truncated line will throw an IndexError |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.process('0.1') |
| Traceback (most recent call last): |
| ... |
| IndexError: list index out of range |
| """ |
| |
| SYNC = 0 |
| IDLE = 1 |
| STARTED = 2 |
| READING = 3 |
| WRITING = 4 |
| NAK = 5 |
| |
| def StartBit(self, time, data): |
| """Record start time of transaction.""" |
| self.message += '%.8f ' % time |
| |
| def WriteAddressNAK(self, time, data): |
| """Record NAK'ed address transaction for writing.""" |
| self.address = int(data, 16) |
| self.message += 'Write %s NAK' % data |
| |
| def WriteAddressACK(self, time, data): |
| """Record ACK'ed address transaction for writing.""" |
| self.address = int(data, 16) |
| self.message += 'Write %s DATA ' % data |
| |
| def ReadAddressNAK(self, time, data): |
| """Record NAK'ed address transaction for reading.""" |
| self.address = int(data, 16) |
| self.message += 'Read %s NAK' % data |
| |
| def ReadAddressACK(self, time, data): |
| """Record ACK'ed address transaction for reading.""" |
| self.address = int(data, 16) |
| self.message += 'Read %s DATA ' % data |
| |
| def AddData(self, time, data): |
| """Record read or written data.""" |
| self.message += '%s ' % data |
| |
| def ClearMessage(self, time, data): |
| """Clear accumulated transaction.""" |
| self.message = '' |
| |
| def PrintMessage(self, time, data): |
| """Print and clear accumulated transaction.""" |
| if self.address == self.match_address: |
| print self.message |
| |
| self.message = '' |
| |
| # This state transition table records the valid I2C bus transitions that we |
| # expect to see. Any state/action pair not defined in this table is assumed |
| # to be invalid and will result in an I2CParseError being raised. |
| # |
| # The entries in this table correspond to the current state, the event |
| # parsed, the state to transition to and the function to execute on that |
| # transition. The function is passed a CSV instance, the time of the event |
| # and a possibly empty data field. |
| state_table = [ |
| # The initial section of the state transition table describes the |
| # synchronization process. For the I2C bus this means waiting for |
| # the first start or repeated start bit. We can also transition to |
| # the IDLE state when we see a stop bit because the next bit has to be |
| # a start bit. If it's not we'll raise a I2CParseError exception. |
| Transition(SYNC, 'Start Bit', STARTED, StartBit), |
| Transition(SYNC, 'Repeated Start Bit', STARTED, StartBit), |
| Transition(SYNC, 'Write Address + NAK', SYNC, None), |
| Transition(SYNC, 'Write Address + ACK', SYNC, None), |
| Transition(SYNC, 'Read Address + NAK', SYNC, None), |
| Transition(SYNC, 'Read Address + ACK', SYNC, None), |
| Transition(SYNC, 'Data + NAK', SYNC, None), |
| Transition(SYNC, 'Data + ACK', SYNC, None), |
| Transition(SYNC, 'Stop Bit', IDLE, None), |
| |
| # After syncronization is complete the rest of the table describes the |
| # expected transitions. |
| Transition(IDLE, 'Start Bit', STARTED, StartBit), |
| Transition(STARTED, 'Stop Bit', IDLE, ClearMessage), |
| Transition(STARTED, 'Write Address + NAK', NAK, WriteAddressNAK), |
| Transition(STARTED, 'Write Address + ACK', WRITING, WriteAddressACK), |
| Transition(STARTED, 'Read Address + NAK', NAK, ReadAddressNAK), |
| Transition(STARTED, 'Read Address + ACK', READING, ReadAddressACK), |
| Transition(WRITING, 'Data + NAK', NAK, AddData), |
| Transition(WRITING, 'Data + ACK', WRITING, AddData), |
| Transition(READING, 'Data + NAK', NAK, AddData), |
| Transition(READING, 'Data + ACK', READING, AddData), |
| Transition(WRITING, 'Stop Bit', IDLE, PrintMessage), |
| Transition(WRITING, 'Repeated Start Bit', STARTED, PrintMessage), |
| Transition(NAK, 'Stop Bit', IDLE, PrintMessage)] |
| |
| def __init__(self, match_address, timeout): |
| """Initialize a new I2C instance. |
| |
| The I2C instance will print all transactions with a particular I2C device |
| specified by it's address up until the timeout. |
| |
| Args: |
| match_address: I2C device address to filter for |
| timeout: Maximum time to start recording new transactions |
| |
| >>> i2c = I2C(0x20, 1) |
| >>> i2c.match_address == 0x20 |
| True |
| >>> i2c.timeout == 1 |
| True |
| >>> i2c.state == i2c.SYNC |
| True |
| """ |
| self.state = self.SYNC |
| self.address = 0x00 |
| self.message = '' |
| self.match_address = match_address |
| self.timeout = timeout |
| |
| def process(self, line): |
| """Update I2C state machine from one line of the CSV file. |
| |
| The CSV file is assumed to have the format generated by the Saleae Logic |
| desktop I2C recording tool. |
| |
| These examples show how process effects the internal state of I2C. |
| >>> i2c = I2C(0x20, 1) |
| |
| >>> i2c.process('0.1,Start Bit,') |
| >>> i2c.state == i2c.STARTED |
| True |
| >>> i2c.message == '0.10000000 ' |
| True |
| |
| >>> i2c.process('0.1,Stop Bit,') |
| >>> i2c.state == i2c.IDLE |
| True |
| >>> i2c.message == '' |
| True |
| """ |
| values = line.split(',') |
| |
| time = float(values[0]) |
| detail = ' '.join(values[1].split()) |
| |
| if len(values) > 2: |
| data = ' '.join(values[2].split()) |
| else: |
| data = '' |
| |
| # Once the timeout value has been reached in the input trace we ignore all |
| # future events once we've returned to the IDLE state. We return to the |
| # IDLE state at the next "Stop Bit" and stay there. |
| if time > self.timeout and self.state == self.IDLE: |
| return |
| |
| # Search the transition table for a matching state/action pair. |
| for transition in self.state_table: |
| if (transition.current_state == self.state and |
| transition.event == detail): |
| if transition.action: |
| transition.action(self, time, data) |
| |
| self.state = transition.next_state |
| break |
| else: |
| raise I2CParseError('Unexpected event "%s"' % detail) |
| |
| |
| def main(): |
| parser = optparse.OptionParser(usage = 'usage: %prog [filename] [options]\n') |
| |
| parser.add_option('-a', '--address', default=0x20, |
| type='int', |
| help='I2C device address to process', |
| action='store', |
| dest='address') |
| |
| parser.add_option('-t', '--timeout', default=100, |
| type='float', |
| help='All transactions before timeout are shown', |
| action='store', |
| dest='timeout') |
| |
| options, arguments = parser.parse_args() |
| |
| input = fileinput.input(arguments) |
| i2c = I2C(options.address, options.timeout) |
| |
| for line in input: |
| # The first line of the file is the header row. |
| if not fileinput.isfirstline(): |
| try: |
| i2c.process(line) |
| except (I2CParseError, ValueError, IndexError) as error: |
| print error |
| return |
| |
| def Test(): |
| """Run any built-in tests.""" |
| import doctest |
| assert doctest.testmod().failed == 0 |
| |
| |
| if __name__ == '__main__': |
| # If first argument is --test, run testing code. |
| if sys.argv[1:2] == ['--test']: |
| Test() |
| else: |
| main() |