| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| ''' |
| USB to I2C controller. |
| ''' |
| |
| import glob |
| import logging |
| import os |
| import re |
| import serial |
| import time |
| |
| from autotest_lib.client.cros import tty |
| |
| # Least significant bit of I2C address. |
| WRITE_BIT = 0 |
| READ_BIT = 1 |
| |
| |
| def create_i2c_controller(chipset_config): |
| '''Factory method for I2CController. |
| |
| This function is a factory method to create an I2CController instance. |
| |
| @param chipset_config: Chipset configuration. |
| @return An I2CController if succeeded. |
| @throws AssertionError if a valid instance cannot be created. |
| ''' |
| if chipset_config.split(':')[0] == 'SC18IM700': |
| usb_uart_driver = chipset_config.split(':')[1] |
| # Try to find a tty terminal that driver name matches usb_uart_driver. |
| tty_path = tty.find_tty_by_driver(usb_uart_driver) |
| if tty_path: |
| return _I2CControllerSC18IM700(tty_path) |
| |
| assert False, "Unsupported configuration: %s" % chipset_config |
| |
| |
| class I2CController(object): |
| ''' |
| The base class of I2C controller. |
| ''' |
| |
| # Constants indicate I2C bus status. |
| I2C_OK = 1 |
| I2C_NACK_ON_ADDRESS = 2 |
| I2C_NACK_ON_DATA = 3 |
| I2C_TIME_OUT = 4 |
| |
| def send_and_check_status(self, slave_addr, int_array): |
| '''Sends data to I2C slave device and checks the bus status. |
| |
| @param slave_addr: The address of slave in 7bits format. |
| @param int_array: The data to send in integer array. |
| @param status_check: Whether to check I2C bus status. |
| |
| @return An integer indicates I2C bus status. |
| ''' |
| self.send(slave_addr, int_array) |
| return self.read_bus_status() |
| |
| def read_bus_status(self): |
| '''Returns the I2C bus status.''' |
| raise NotImplementedError |
| |
| def send(self, slave_addr, int_array): |
| '''Sends data to I2C slave device. |
| |
| Caller should call read_bus_status() explicitly to confirm whether the |
| data sent successfully. |
| |
| @param slave_addr: The address of slave in 7bits format. |
| @param int_array: The data to send in integer array. |
| ''' |
| raise NotImplementedError |
| |
| def read(self, slave_addr, bytes_to_read): |
| '''Reads data from I2C slave device. |
| |
| @param slave_addr: The address of slave in 7bits format. |
| @param bytes_to_read: The number of bytes to read from device. |
| @return An array of data. |
| ''' |
| raise NotImplementedError |
| |
| |
| class _I2CControllerSC18IM700(I2CController): |
| ''' |
| Implementation of I2C Controller for NXP SC18IM700. |
| ''' |
| SEC_WAIT_I2C = 0.1 |
| |
| # Constants from official datasheet. |
| # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf |
| I2C_STATUS = {0b11110000: I2CController.I2C_OK, |
| 0b11110001: I2CController.I2C_NACK_ON_ADDRESS, |
| 0b11110011: I2CController.I2C_NACK_ON_DATA, |
| 0b11111000: I2CController.I2C_TIME_OUT} |
| |
| def __init__(self, device_path): |
| '''Connects to NXP via serial port. |
| |
| @param device_path: The device path of serial port. |
| ''' |
| self.logger = logging.getLogger('SC18IM700') |
| self.logger.info('Setup serial device... [%s]', device_path) |
| self.device_path = device_path |
| self.serial = serial.Serial(port=self.device_path, |
| baudrate=9600, |
| bytesize=serial.EIGHTBITS, |
| parity=serial.PARITY_NONE, |
| stopbits=serial.STOPBITS_ONE, |
| xonxoff=False, |
| rtscts=True, |
| interCharTimeout=1) |
| self.logger.info('pySerial [%s] configuration : %s', |
| serial.VERSION, self.serial.__repr__()) |
| # Clean the buffer. |
| self.serial.flush() |
| |
| def _write(self, data): |
| '''Converts data to bytearray and writes to the serial port.''' |
| self.serial.write(bytearray(data)) |
| self.serial.flush() |
| |
| def _read(self): |
| '''Reads data from serial port(Non-Blocking).''' |
| ret = self.serial.read(self.serial.inWaiting()) |
| self.logger.info('Hex and binary dump of datas - ') |
| for char in ret: |
| self.logger.info(' %x - %s', ord(char), bin(ord(char))) |
| return ret |
| |
| @staticmethod |
| def _convert_to_8bits_addr(slave_addr_7bits, lsb): |
| '''Converts slave_addr from 7 bits to 8 bits with given LSB.''' |
| assert (slave_addr_7bits >> 7) == 0, "Address must not exceed 7 bits." |
| assert (lsb & ~0x01) == 0, "lsb must not exceed one bit." |
| return (slave_addr_7bits << 1) | lsb |
| |
| def read_bus_status(self): |
| cmd = [ord('R'), 0x0A, ord('P')] |
| self._write(cmd) |
| time.sleep(self.SEC_WAIT_I2C) |
| ret = self._read() |
| if (len(ret) == 1) and (ord(ret[0]) in self.I2C_STATUS): |
| return self.I2C_STATUS[ord(ret[0])] |
| raise IOError("I2C_STATUS_READ_FAILED") |
| |
| def send(self, slave_addr, int_array): |
| cmd = ([ord('S'), |
| self._convert_to_8bits_addr(slave_addr, WRITE_BIT), |
| len(int_array)] + |
| int_array + [ord('P')]) |
| self._write(cmd) |
| |
| def read(self, slave_addr, bytes_to_read): |
| cmd = ([ord('S'), |
| self._convert_to_8bits_addr(slave_addr, READ_BIT), |
| bytes_to_read, |
| ord('P')]) |
| self._write(cmd) |
| time.sleep(self.SEC_WAIT_I2C) |
| return self._read() |
| |
| def write_gpio(self, data): |
| self._write([ord('O'), data, ord('P')]) |
| |
| def read_gpio(self): |
| self._write([ord('I'), ord('P')]) |
| time.sleep(self.SEC_WAIT_I2C) |
| return self._read() |
| |
| def write_register(self, regs, datas): |
| assert len(regs) == len(datas) |
| cmd = [ord('W')] |
| for i in range(len(regs)): |
| cmd.append(regs[i]) |
| cmd.append(datas[i]) |
| cmd.append(ord('P')) |
| self._write(cmd) |
| |
| def read_register(self, regs): |
| cmd = [ord('R')] + regs + [ord('P')] |
| self._write(cmd) |
| time.sleep(self.SEC_WAIT_I2C) |
| return self._read() |