|  | #!/usr/bin/env python | 
|  | # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 
|  | # Use of this source code is governed by a BSD-style license that can be | 
|  | # found in the LICENSE file. | 
|  |  | 
|  | """Dump tpm transactions from previous run of dump_i2c. | 
|  |  | 
|  | The output of dump_i2c can be used directly by piping it to dump_tpm's stdin. | 
|  | The dump_tpm input format is line oriented where each line should conform to: | 
|  |  | 
|  | <time> <Read|Write> <address in 0xff format> NAK | 
|  |  | 
|  | or: | 
|  |  | 
|  | <time> <Read|Write> <address in 0xff format> DATA [data]+ | 
|  | """ | 
|  |  | 
|  | import fileinput | 
|  | import optparse | 
|  | import os | 
|  | import re | 
|  | import sys | 
|  |  | 
|  | class TPMParseError(Exception): | 
|  | """TPM parsing error exception. | 
|  |  | 
|  | This exception is raised when the parser failes to understand the input | 
|  | provided.  It records the file name and line number for later formatting | 
|  | of an error message that includes a message that is specific to the parse | 
|  | error. | 
|  | """ | 
|  | def __init__(self, message): | 
|  | """Initialize a new TPMParseError exception.""" | 
|  | self.message = message | 
|  |  | 
|  | try: | 
|  | self.file_name = fileinput.filename() | 
|  | self.line_number = fileinput.filelineno() | 
|  | except RuntimeError: | 
|  | self.file_name = '<doctest>' | 
|  | self.line_number = 0 | 
|  |  | 
|  | def __str__(self): | 
|  | """Format the exception for user consumption.""" | 
|  | return '%s:%d: %s' % (self.file_name, self.line_number, self.message) | 
|  |  | 
|  |  | 
|  | class Transition: | 
|  | """Structure describing entries in the state transition table.""" | 
|  | def __init__(self, current_state, event, next_state, action): | 
|  | self.current_state = current_state | 
|  | self.event = event | 
|  | self.next_state = next_state | 
|  | self.action = action | 
|  |  | 
|  |  | 
|  | class TPM: | 
|  | """State machine that reads I2C transactions and prints TPM transactions. | 
|  |  | 
|  | TPM interprets single byte writes as precursors to reads since the first | 
|  | byte in any write is used to set the register address for subsequent bytes | 
|  | in the write or future reads. | 
|  | >>> tpm = TPM() | 
|  | >>> tpm.process('0.1 Write 0x20 DATA 0x00') | 
|  | >>> tpm.process('0.2 Read 0x20 DATA 0x81') | 
|  | Read reg TPM_ACCESS_0 returns 0x81 | 
|  |  | 
|  | A write with multiple bytes is interpreted as a write to the register | 
|  | addressed by the first byte in the sequence. | 
|  | >>> tpm = TPM() | 
|  | >>> tpm.process('0.1 Write 0x20 DATA 0x09 0x12 0x34') | 
|  | Write reg TPM_DID_VID_0.3 with 0x12 0x34 | 
|  |  | 
|  | TPM interprets a write that is not a single byte write that sets the same | 
|  | TPM register address as an error. | 
|  | >>> tpm = TPM() | 
|  | >>> tpm.process('0.1 Write 0x20 DATA 0x08') | 
|  | >>> tpm.process('0.2 Write 0x20 DATA 0x09 0x12') | 
|  | Traceback (most recent call last): | 
|  | ... | 
|  | TPMParseError: <doctest>:0: Unexpected action "0.2 Write 0x20 DATA 0x09 0x12" | 
|  |  | 
|  | But if the write is a redundant setting of the TPM register address it is OK | 
|  | >>> tpm = TPM() | 
|  | >>> tpm.process('0.1 Write 0x20 DATA 0x08') | 
|  | >>> tpm.process('0.2 Write 0x20 DATA 0x08') | 
|  | """ | 
|  | SYNC = 0 | 
|  | IDLE = 1 | 
|  | READ = 2 | 
|  |  | 
|  | ACTION_WRITE_MULTI = 0 | 
|  | ACTION_WRITE_SINGLE = 1 | 
|  | ACTION_READ = 2 | 
|  |  | 
|  | def RegisterName(self, index): | 
|  | register = ['TPM_ACCESS_0', | 
|  | 'TPM_STS_0', | 
|  | 'TPM_STS_0.BC_0', | 
|  | 'TPM_STS_0.BC_1', | 
|  | 'TPM_STS_0.BC_2', | 
|  | 'TPM_DATA_FIFO_0', | 
|  | 'TPM_DID_VID_0.0', | 
|  | 'TPM_DID_VID_0.1', | 
|  | 'TPM_DID_VID_0.2', | 
|  | 'TPM_DID_VID_0.3'] | 
|  |  | 
|  | if index < len(register): | 
|  | return register[index] | 
|  | else: | 
|  | raise TPMParseError('Unknown register index %d' % index) | 
|  |  | 
|  | def PrintWrite(self, data): | 
|  | """Print TPM write transaction.""" | 
|  | self.register = int(data[0], 16) | 
|  |  | 
|  | print 'Write reg %s with %s' % (self.RegisterName(self.register), | 
|  | ' '.join(data[1:])) | 
|  |  | 
|  | def RecordAddress(self, data): | 
|  | """Record TPM register address.""" | 
|  | self.register = int(data[0], 16) | 
|  |  | 
|  | def CheckDuplicateAddress(self, data): | 
|  | """Check that a write while waiting for a read is actually a duplicate.""" | 
|  | if self.register != int(data[0], 16): | 
|  | raise TPMParseError('Expected "Read" action, got "Write" to new address') | 
|  |  | 
|  | def PrintRead(self, data): | 
|  | """Print TPM read transaction.""" | 
|  | print 'Read reg %s returns %s' % (self.RegisterName(self.register), | 
|  | ' '.join(data)) | 
|  |  | 
|  | # This state transition table records the valid I2C bus actions for | 
|  | # communicating with the TPM.  And state/action pair not defined in this | 
|  | # table is assumed to be invalid and will result in an TPMParseError being | 
|  | # raised. | 
|  | # | 
|  | # The entries in this table correspond to the current state, the event | 
|  | # parsed, the state to transition to and the function to execute on that | 
|  | # transition.  The function is passed a TPM instance and an array of the | 
|  | # data values parsed from the event. | 
|  | state_table = [ | 
|  | # The initial section of the state transition table describes the | 
|  | # synchronization process.  For the TPM this just involves waiting for | 
|  | # the first write operation. | 
|  | Transition(SYNC, ACTION_WRITE_MULTI,  IDLE, PrintWrite), | 
|  | Transition(SYNC, ACTION_WRITE_SINGLE, READ, RecordAddress), | 
|  | Transition(SYNC, ACTION_READ,         SYNC, None), | 
|  |  | 
|  | # After syncronization is complete the rest of the table describes the | 
|  | # expected transitions. | 
|  | Transition(IDLE, ACTION_WRITE_MULTI,  IDLE, PrintWrite), | 
|  | Transition(IDLE, ACTION_WRITE_SINGLE, READ, RecordAddress), | 
|  | Transition(READ, ACTION_WRITE_SINGLE, READ, CheckDuplicateAddress), | 
|  | Transition(READ, ACTION_READ,         IDLE, PrintRead)] | 
|  |  | 
|  | def __init__(self): | 
|  | """Initialize a new TPM instance. | 
|  |  | 
|  | >>> tpm = TPM() | 
|  | >>> tpm.state == tpm.SYNC | 
|  | True | 
|  | """ | 
|  | self.state = self.SYNC | 
|  | self.register = 0x00 | 
|  |  | 
|  | def process(self, line): | 
|  | """Update TPM state machine for one line generated by dump_i2c. | 
|  |  | 
|  | These examples show how process effects the internal state of TPM. | 
|  | >>> tpm = TPM() | 
|  |  | 
|  | >>> tpm.process('0.1 Write 0x20 DATA 0x00') | 
|  | >>> tpm.register == 0x00 | 
|  | True | 
|  | >>> tpm.state == tpm.READ | 
|  | True | 
|  |  | 
|  | >>> tpm.process('0.2 Read 0x20 DATA 0x81') | 
|  | Read reg TPM_ACCESS_0 returns 0x81 | 
|  | >>> tpm.state == tpm.IDLE | 
|  | True | 
|  |  | 
|  | >>> tpm.process('0.10000000 Write 0x20 DATA 0x09 0x12 0x34') | 
|  | Write reg TPM_DID_VID_0.3 with 0x12 0x34 | 
|  | >>> tpm.register == 0x09 | 
|  | True | 
|  | >>> tpm.state == tpm.IDLE | 
|  | True | 
|  | """ | 
|  | values  = line.split() | 
|  | time    = float(values[0]) | 
|  | action  = ' '.join(values[1].split()) | 
|  | address = ' '.join(values[2].split()) | 
|  | nak     = (values[3].split()[0] == 'NAK') | 
|  | data    = values[4:] | 
|  |  | 
|  | if nak: | 
|  | return | 
|  |  | 
|  | if action == 'Read': | 
|  | action_index = self.ACTION_READ | 
|  | elif action == 'Write' and len(data) == 1: | 
|  | action_index = self.ACTION_WRITE_SINGLE | 
|  | elif action == 'Write' and len(data) > 1: | 
|  | action_index = self.ACTION_WRITE_MULTI | 
|  | else: | 
|  | raise TPMParseError('Unknown action "%s"' % action) | 
|  |  | 
|  | # Search the transition table for a matching state/action pair. | 
|  | for transition in self.state_table: | 
|  | if (transition.current_state == self.state and | 
|  | transition.event == action_index): | 
|  | if transition.action: | 
|  | transition.action(self, data) | 
|  |  | 
|  | self.state = transition.next_state | 
|  | break | 
|  | else: | 
|  | raise TPMParseError('Unexpected action "%s"' % line) | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | tpm = TPM() | 
|  |  | 
|  | for line in fileinput.input(): | 
|  | try: | 
|  | tpm.process(line) | 
|  | except (TPMParseError, ValueError, IndexError) as error: | 
|  | print error | 
|  | return | 
|  |  | 
|  |  | 
|  | def Test(): | 
|  | """Run any built-in tests.""" | 
|  | import doctest | 
|  | assert doctest.testmod().failed == 0 | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | # If first argument is --test, run testing code. | 
|  | if sys.argv[1:2] == ['--test']: | 
|  | Test() | 
|  | else: | 
|  | main() |