| #!/usr/bin/env python |
| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Dump tpm transactions from previous run of dump_i2c. |
| |
| The output of dump_i2c can be used directly by piping it to dump_tpm's stdin. |
| The dump_tpm input format is line oriented where each line should conform to: |
| |
| <time> <Read|Write> <address in 0xff format> NAK |
| |
| or: |
| |
| <time> <Read|Write> <address in 0xff format> DATA [data]+ |
| """ |
| |
| import fileinput |
| import optparse |
| import os |
| import re |
| import sys |
| |
| class TPMParseError(Exception): |
| """TPM parsing error exception. |
| |
| This exception is raised when the parser failes to understand the input |
| provided. It records the file name and line number for later formatting |
| of an error message that includes a message that is specific to the parse |
| error. |
| """ |
| def __init__(self, message): |
| """Initialize a new TPMParseError exception.""" |
| self.message = message |
| |
| try: |
| self.file_name = fileinput.filename() |
| self.line_number = fileinput.filelineno() |
| except RuntimeError: |
| self.file_name = '<doctest>' |
| self.line_number = 0 |
| |
| def __str__(self): |
| """Format the exception for user consumption.""" |
| return '%s:%d: %s' % (self.file_name, self.line_number, self.message) |
| |
| |
| class Transition: |
| """Structure describing entries in the state transition table.""" |
| def __init__(self, current_state, event, next_state, action): |
| self.current_state = current_state |
| self.event = event |
| self.next_state = next_state |
| self.action = action |
| |
| |
| class TPM: |
| """State machine that reads I2C transactions and prints TPM transactions. |
| |
| TPM interprets single byte writes as precursors to reads since the first |
| byte in any write is used to set the register address for subsequent bytes |
| in the write or future reads. |
| >>> tpm = TPM() |
| >>> tpm.process('0.1 Write 0x20 DATA 0x00') |
| >>> tpm.process('0.2 Read 0x20 DATA 0x81') |
| Read reg TPM_ACCESS_0 returns 0x81 |
| |
| A write with multiple bytes is interpreted as a write to the register |
| addressed by the first byte in the sequence. |
| >>> tpm = TPM() |
| >>> tpm.process('0.1 Write 0x20 DATA 0x09 0x12 0x34') |
| Write reg TPM_DID_VID_0.3 with 0x12 0x34 |
| |
| TPM interprets a write that is not a single byte write that sets the same |
| TPM register address as an error. |
| >>> tpm = TPM() |
| >>> tpm.process('0.1 Write 0x20 DATA 0x08') |
| >>> tpm.process('0.2 Write 0x20 DATA 0x09 0x12') |
| Traceback (most recent call last): |
| ... |
| TPMParseError: <doctest>:0: Unexpected action "0.2 Write 0x20 DATA 0x09 0x12" |
| |
| But if the write is a redundant setting of the TPM register address it is OK |
| >>> tpm = TPM() |
| >>> tpm.process('0.1 Write 0x20 DATA 0x08') |
| >>> tpm.process('0.2 Write 0x20 DATA 0x08') |
| """ |
| SYNC = 0 |
| IDLE = 1 |
| READ = 2 |
| |
| ACTION_WRITE_MULTI = 0 |
| ACTION_WRITE_SINGLE = 1 |
| ACTION_READ = 2 |
| |
| def RegisterName(self, index): |
| register = ['TPM_ACCESS_0', |
| 'TPM_STS_0', |
| 'TPM_STS_0.BC_0', |
| 'TPM_STS_0.BC_1', |
| 'TPM_STS_0.BC_2', |
| 'TPM_DATA_FIFO_0', |
| 'TPM_DID_VID_0.0', |
| 'TPM_DID_VID_0.1', |
| 'TPM_DID_VID_0.2', |
| 'TPM_DID_VID_0.3'] |
| |
| if index < len(register): |
| return register[index] |
| else: |
| raise TPMParseError('Unknown register index %d' % index) |
| |
| def PrintWrite(self, data): |
| """Print TPM write transaction.""" |
| self.register = int(data[0], 16) |
| |
| print 'Write reg %s with %s' % (self.RegisterName(self.register), |
| ' '.join(data[1:])) |
| |
| def RecordAddress(self, data): |
| """Record TPM register address.""" |
| self.register = int(data[0], 16) |
| |
| def CheckDuplicateAddress(self, data): |
| """Check that a write while waiting for a read is actually a duplicate.""" |
| if self.register != int(data[0], 16): |
| raise TPMParseError('Expected "Read" action, got "Write" to new address') |
| |
| def PrintRead(self, data): |
| """Print TPM read transaction.""" |
| print 'Read reg %s returns %s' % (self.RegisterName(self.register), |
| ' '.join(data)) |
| |
| # This state transition table records the valid I2C bus actions for |
| # communicating with the TPM. And state/action pair not defined in this |
| # table is assumed to be invalid and will result in an TPMParseError being |
| # raised. |
| # |
| # The entries in this table correspond to the current state, the event |
| # parsed, the state to transition to and the function to execute on that |
| # transition. The function is passed a TPM instance and an array of the |
| # data values parsed from the event. |
| state_table = [ |
| # The initial section of the state transition table describes the |
| # synchronization process. For the TPM this just involves waiting for |
| # the first write operation. |
| Transition(SYNC, ACTION_WRITE_MULTI, IDLE, PrintWrite), |
| Transition(SYNC, ACTION_WRITE_SINGLE, READ, RecordAddress), |
| Transition(SYNC, ACTION_READ, SYNC, None), |
| |
| # After syncronization is complete the rest of the table describes the |
| # expected transitions. |
| Transition(IDLE, ACTION_WRITE_MULTI, IDLE, PrintWrite), |
| Transition(IDLE, ACTION_WRITE_SINGLE, READ, RecordAddress), |
| Transition(READ, ACTION_WRITE_SINGLE, READ, CheckDuplicateAddress), |
| Transition(READ, ACTION_READ, IDLE, PrintRead)] |
| |
| def __init__(self): |
| """Initialize a new TPM instance. |
| |
| >>> tpm = TPM() |
| >>> tpm.state == tpm.SYNC |
| True |
| """ |
| self.state = self.SYNC |
| self.register = 0x00 |
| |
| def process(self, line): |
| """Update TPM state machine for one line generated by dump_i2c. |
| |
| These examples show how process effects the internal state of TPM. |
| >>> tpm = TPM() |
| |
| >>> tpm.process('0.1 Write 0x20 DATA 0x00') |
| >>> tpm.register == 0x00 |
| True |
| >>> tpm.state == tpm.READ |
| True |
| |
| >>> tpm.process('0.2 Read 0x20 DATA 0x81') |
| Read reg TPM_ACCESS_0 returns 0x81 |
| >>> tpm.state == tpm.IDLE |
| True |
| |
| >>> tpm.process('0.10000000 Write 0x20 DATA 0x09 0x12 0x34') |
| Write reg TPM_DID_VID_0.3 with 0x12 0x34 |
| >>> tpm.register == 0x09 |
| True |
| >>> tpm.state == tpm.IDLE |
| True |
| """ |
| values = line.split() |
| time = float(values[0]) |
| action = ' '.join(values[1].split()) |
| address = ' '.join(values[2].split()) |
| nak = (values[3].split()[0] == 'NAK') |
| data = values[4:] |
| |
| if nak: |
| return |
| |
| if action == 'Read': |
| action_index = self.ACTION_READ |
| elif action == 'Write' and len(data) == 1: |
| action_index = self.ACTION_WRITE_SINGLE |
| elif action == 'Write' and len(data) > 1: |
| action_index = self.ACTION_WRITE_MULTI |
| else: |
| raise TPMParseError('Unknown action "%s"' % action) |
| |
| # Search the transition table for a matching state/action pair. |
| for transition in self.state_table: |
| if (transition.current_state == self.state and |
| transition.event == action_index): |
| if transition.action: |
| transition.action(self, data) |
| |
| self.state = transition.next_state |
| break |
| else: |
| raise TPMParseError('Unexpected action "%s"' % line) |
| |
| |
| def main(): |
| tpm = TPM() |
| |
| for line in fileinput.input(): |
| try: |
| tpm.process(line) |
| except (TPMParseError, ValueError, IndexError) as error: |
| print error |
| return |
| |
| |
| def Test(): |
| """Run any built-in tests.""" |
| import doctest |
| assert doctest.testmod().failed == 0 |
| |
| |
| if __name__ == '__main__': |
| # If first argument is --test, run testing code. |
| if sys.argv[1:2] == ['--test']: |
| Test() |
| else: |
| main() |