blob: 43f10a9053d5a640a5b86da4131199fc8bc5e934 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Dump tpm transactions from previous run of dump_i2c.
The output of dump_i2c can be used directly by piping it to dump_tpm's stdin.
The dump_tpm input format is line oriented where each line should conform to:
<time> <Read|Write> <address in 0xff format> NAK
or:
<time> <Read|Write> <address in 0xff format> DATA [data]+
"""
import fileinput
import optparse
import os
import re
import sys
class TPMParseError(Exception):
"""TPM parsing error exception.
This exception is raised when the parser failes to understand the input
provided. It records the file name and line number for later formatting
of an error message that includes a message that is specific to the parse
error.
"""
def __init__(self, message):
"""Initialize a new TPMParseError exception."""
self.message = message
try:
self.file_name = fileinput.filename()
self.line_number = fileinput.filelineno()
except RuntimeError:
self.file_name = '<doctest>'
self.line_number = 0
def __str__(self):
"""Format the exception for user consumption."""
return '%s:%d: %s' % (self.file_name, self.line_number, self.message)
class Transition:
"""Structure describing entries in the state transition table."""
def __init__(self, current_state, event, next_state, action):
self.current_state = current_state
self.event = event
self.next_state = next_state
self.action = action
class TPM:
"""State machine that reads I2C transactions and prints TPM transactions.
TPM interprets single byte writes as precursors to reads since the first
byte in any write is used to set the register address for subsequent bytes
in the write or future reads.
>>> tpm = TPM()
>>> tpm.process('0.1 Write 0x20 DATA 0x00')
>>> tpm.process('0.2 Read 0x20 DATA 0x81')
Read reg TPM_ACCESS_0 returns 0x81
A write with multiple bytes is interpreted as a write to the register
addressed by the first byte in the sequence.
>>> tpm = TPM()
>>> tpm.process('0.1 Write 0x20 DATA 0x09 0x12 0x34')
Write reg TPM_DID_VID_0.3 with 0x12 0x34
TPM interprets a write that is not a single byte write that sets the same
TPM register address as an error.
>>> tpm = TPM()
>>> tpm.process('0.1 Write 0x20 DATA 0x08')
>>> tpm.process('0.2 Write 0x20 DATA 0x09 0x12')
Traceback (most recent call last):
...
TPMParseError: <doctest>:0: Unexpected action "0.2 Write 0x20 DATA 0x09 0x12"
But if the write is a redundant setting of the TPM register address it is OK
>>> tpm = TPM()
>>> tpm.process('0.1 Write 0x20 DATA 0x08')
>>> tpm.process('0.2 Write 0x20 DATA 0x08')
"""
SYNC = 0
IDLE = 1
READ = 2
ACTION_WRITE_MULTI = 0
ACTION_WRITE_SINGLE = 1
ACTION_READ = 2
def RegisterName(self, index):
register = ['TPM_ACCESS_0',
'TPM_STS_0',
'TPM_STS_0.BC_0',
'TPM_STS_0.BC_1',
'TPM_STS_0.BC_2',
'TPM_DATA_FIFO_0',
'TPM_DID_VID_0.0',
'TPM_DID_VID_0.1',
'TPM_DID_VID_0.2',
'TPM_DID_VID_0.3']
if index < len(register):
return register[index]
else:
raise TPMParseError('Unknown register index %d' % index)
def PrintWrite(self, data):
"""Print TPM write transaction."""
self.register = int(data[0], 16)
print 'Write reg %s with %s' % (self.RegisterName(self.register),
' '.join(data[1:]))
def RecordAddress(self, data):
"""Record TPM register address."""
self.register = int(data[0], 16)
def CheckDuplicateAddress(self, data):
"""Check that a write while waiting for a read is actually a duplicate."""
if self.register != int(data[0], 16):
raise TPMParseError('Expected "Read" action, got "Write" to new address')
def PrintRead(self, data):
"""Print TPM read transaction."""
print 'Read reg %s returns %s' % (self.RegisterName(self.register),
' '.join(data))
# This state transition table records the valid I2C bus actions for
# communicating with the TPM. And state/action pair not defined in this
# table is assumed to be invalid and will result in an TPMParseError being
# raised.
#
# The entries in this table correspond to the current state, the event
# parsed, the state to transition to and the function to execute on that
# transition. The function is passed a TPM instance and an array of the
# data values parsed from the event.
state_table = [
# The initial section of the state transition table describes the
# synchronization process. For the TPM this just involves waiting for
# the first write operation.
Transition(SYNC, ACTION_WRITE_MULTI, IDLE, PrintWrite),
Transition(SYNC, ACTION_WRITE_SINGLE, READ, RecordAddress),
Transition(SYNC, ACTION_READ, SYNC, None),
# After syncronization is complete the rest of the table describes the
# expected transitions.
Transition(IDLE, ACTION_WRITE_MULTI, IDLE, PrintWrite),
Transition(IDLE, ACTION_WRITE_SINGLE, READ, RecordAddress),
Transition(READ, ACTION_WRITE_SINGLE, READ, CheckDuplicateAddress),
Transition(READ, ACTION_READ, IDLE, PrintRead)]
def __init__(self):
"""Initialize a new TPM instance.
>>> tpm = TPM()
>>> tpm.state == tpm.SYNC
True
"""
self.state = self.SYNC
self.register = 0x00
def process(self, line):
"""Update TPM state machine for one line generated by dump_i2c.
These examples show how process effects the internal state of TPM.
>>> tpm = TPM()
>>> tpm.process('0.1 Write 0x20 DATA 0x00')
>>> tpm.register == 0x00
True
>>> tpm.state == tpm.READ
True
>>> tpm.process('0.2 Read 0x20 DATA 0x81')
Read reg TPM_ACCESS_0 returns 0x81
>>> tpm.state == tpm.IDLE
True
>>> tpm.process('0.10000000 Write 0x20 DATA 0x09 0x12 0x34')
Write reg TPM_DID_VID_0.3 with 0x12 0x34
>>> tpm.register == 0x09
True
>>> tpm.state == tpm.IDLE
True
"""
values = line.split()
time = float(values[0])
action = ' '.join(values[1].split())
address = ' '.join(values[2].split())
nak = (values[3].split()[0] == 'NAK')
data = values[4:]
if nak:
return
if action == 'Read':
action_index = self.ACTION_READ
elif action == 'Write' and len(data) == 1:
action_index = self.ACTION_WRITE_SINGLE
elif action == 'Write' and len(data) > 1:
action_index = self.ACTION_WRITE_MULTI
else:
raise TPMParseError('Unknown action "%s"' % action)
# Search the transition table for a matching state/action pair.
for transition in self.state_table:
if (transition.current_state == self.state and
transition.event == action_index):
if transition.action:
transition.action(self, data)
self.state = transition.next_state
break
else:
raise TPMParseError('Unexpected action "%s"' % line)
def main():
tpm = TPM()
for line in fileinput.input():
try:
tpm.process(line)
except (TPMParseError, ValueError, IndexError) as error:
print error
return
def Test():
"""Run any built-in tests."""
import doctest
assert doctest.testmod().failed == 0
if __name__ == '__main__':
# If first argument is --test, run testing code.
if sys.argv[1:2] == ['--test']:
Test()
else:
main()