| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| This module implements the classes for control message headers, control |
| messages and bidirectional process to pack and unpack request/response message |
| packets. |
| This module contains the following classes: |MBIMData|, |MBIMHeader|, |
| |MBIMFragmentHeader|, |MBIMMessageBase|, |MBIMMessage|, |MBIMOpenMessage|, |
| |MBIMCloseMessage|, |MBIMCommandMessage|, |MBIMErrorMessage|, |MBIMOpenDone|, |
| |MBIMCloseDone|, |MBIMCommandDone|. |
| |
| Reference: |
| [1] Universal Serial Bus Communications Class Subclass Specification for |
| Mobile Broadband Interface Model |
| http://www.usb.org/developers/docs/devclass_docs/ |
| MBIM10Errata1_073013.zip |
| """ |
| import array |
| import struct |
| import sys |
| |
| import common |
| from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors |
| from autotest_lib.client.cros.cellular.mbim_compliance import mbim_constants |
| |
| |
| # Message classes which are sent from the host to the device. |
| MSG_FROM_HOST_TO_DEVICE = ('MBIMOpenMessage', |
| 'MBIMCloseMessage', |
| 'MBIMCommandMessage', |
| 'MBIMHostErrorMessage') |
| |
| # Message classese which are sent from the device to the host. |
| MSG_FROM_DEVICE_TO_HOST = ('MBIMOpenDone', |
| 'MBIMCloseDone', |
| 'MBIMCommandDone', |
| 'MBIMFunctionErrorMessage', |
| 'MBIMIndicateStatusMessage') |
| |
| |
| class MBIMData(object): |
| """ |
| The base class for the data being used in control messages. |
| |
| Its derived classes should define |_FIELDS|, a set of tuples in |
| (<field format>, <field name>) form, and |_DEFAULTS|, a dictionary to |
| map field names to their default values. |_FIELDS| and |_DEFAULTS| are used |
| to pack/unpack to/from packets which are sequences of bytes. Some derived |
| class may define extra fields, such as |_COMMAND_INFORMATION| in |
| |MBIMCommandMessage| and |MBIMCommandone|. |
| There are three classes derived from |MBIMData|, |MBIMHeader|, |
| |MBIMFragmentHeader| and |MBIMMessageBase|. Both |MBIMHeader| and |
| |MBIMFragment| define fields which will be used in the headers for control |
| messages. |MBIMMessageBase| is the base case for all control messages. |
| """ |
| |
| @classmethod |
| def get_fields(cls, get_all=False): |
| """ |
| @returns The set of the fields defined in |_FIELDS| of a derived class. |
| """ |
| return cls._FIELDS |
| |
| |
| @classmethod |
| def unpack(cls, packet): |
| """ |
| Unpack the packet into a map with the formats specified in |fields|. The |
| map contains pairs <field_name>: <value>. |
| |
| @param packet: The byte array to be unpacked. |
| @returns The contents for the packet in a dictionary, where the pairs |
| are in <field_name>: <value> form. |
| |
| """ |
| field_formats, field_names = zip(*cls.get_fields(get_all=True)) |
| format_string = '<' + ''.join(field_formats) |
| length_of_fields = struct.calcsize(format_string) |
| |
| if len(packet) < length_of_fields: |
| mbim_errors.log_and_raise( |
| mbim_errors.MBIMComplianceControlMessageError, |
| 'The length of the packet should be at least %d for %s, ' |
| 'got %d.' % (length_of_fields, cls.__name__, len(packet))) |
| |
| contents = {} |
| for index, value in enumerate(struct.unpack(format_string, |
| packet[:length_of_fields])): |
| contents[field_names[index]] = value |
| return contents |
| |
| |
| class MBIMHeader(MBIMData): |
| """ The header class for MBIM control messages.""" |
| |
| _FIELDS = (('I', 'message_type'), |
| ('I', 'message_length'), |
| ('I', 'transaction_id')) |
| |
| |
| class MBIMFragmentHeader(MBIMData): |
| """ The fragment header class for MBIM control messages.""" |
| |
| _FIELDS = (('I', 'total_fragments'), |
| ('I', 'current_fragment')) |
| |
| |
| class MBIMMessageBase(MBIMData): |
| """ |
| This class is the base class for all MBIM control messages. |
| |
| This class provides functions including packet production and packets |
| parsing. The instantiation of its derived classes depends on |_FIELDS| and |
| |_DEFAULTS| defined in the derived classes, where the |MBIMCommandMessage| |
| has extra fields defined as |_COMMAND_INFORMATION|. |
| |_FIELDS| defines the essential fields including message type, message |
| length, transaction ID, and some message-specific fields. |
| |_COMMAND_INFORMATION| defines the information about device service ID, CID, |
| command type and length of the information buffer. |
| |_DEFAULTS| specifies the default values for some fields. Note that |
| message type is required for every derived class of |MBIMMessageBase|, and |
| for |MBIMCommandMessage|, total fragment, current fragment, and information |
| buffer length are required. |
| """ |
| _transaction_id = 0x00000000 |
| |
| |
| def __init__(self, information_buffer=None, **kwargs): |
| """ |
| @param kwargs: The keyword arguments for all the fields to be set in the |
| message body. |
| """ |
| keys = kwargs.keys() |
| defaults = self._DEFAULTS |
| self.all_field_formats, self.all_field_names = ( |
| zip(*self.get_fields(get_all=True))) |
| |
| unknown_keys = set(keys) - set(self.all_field_names) |
| if unknown_keys: |
| mbim_errors.log_and_raise( |
| mbim_errors.MBIMComplianceControlMessageError, |
| 'Unknown field(s) %s found in arguments for %s.' % ( |
| unknown_keys, self.__class__.__name__)) |
| |
| # Some fields will be computed in the process of |generate_packet|, so |
| # these fields are not required in |kwargs|. Packets creation will fail |
| # if there is no value in |kwargs| and |defaults| for any of the |
| # required fields. The optional/required fields depends on the type of |
| # the message created. |
| if self.__class__.__name__ in MSG_FROM_HOST_TO_DEVICE: |
| optional_fields = set(['transaction_id', 'message_length']) |
| else: |
| optional_fields = set() |
| required_fields = set(self.all_field_names) - optional_fields |
| |
| for name in required_fields: |
| # Set the field value to the value given in |kwargs| if the value |
| # is provided, default value otherwise. If default value is not |
| # provided as well, an error will be raised. |
| value = kwargs.get(name, defaults.get(name)) |
| if value is None: |
| mbim_errors.log_and_raise( |
| mbim_errors.MBIMComplianceControlMessageError, |
| 'Field %s is required to create a %s.' % ( |
| name, self.__class__.__name__)) |
| |
| setattr(self, name, value) |
| setattr(self, 'information_buffer', information_buffer) |
| |
| |
| def _get_transaction_id(self): |
| """ |
| Returns incrementing transaction ids on successive calls. |
| |
| @returns The tracsaction id for control message delivery. |
| |
| """ |
| if MBIMMessageBase._transaction_id > (sys.maxint - 2): |
| MBIMMessageBase._transaction_id = 0x00000000 |
| MBIMMessageBase._transaction_id += 1 |
| return MBIMMessageBase._transaction_id |
| |
| |
| def generate_packets(self): |
| """ |
| Generate a list of packets based on the given message type. Different |
| types of messages require different fields. For example, a MBIM_OPEN_MSG |
| will need message_type and max_control_transfer to contruct the message. |
| |
| @returns A list of packets to be sent, and each packet is in binary |
| array form. |
| """ |
| cls = self.__class__ |
| if cls.__name__ not in MSG_FROM_HOST_TO_DEVICE: |
| mbim_errors.log_and_raise(NotImplementedError) |
| |
| # TODO(mcchou): Handle the fragmentation for MBIM_COMMAND_MSG while |
| # information buffer is not NULL. |
| packets = [] |
| self.transaction_id = self._get_transaction_id() |
| format_string = '<' + ''.join(self.all_field_formats) |
| self.message_length = struct.calcsize(format_string) |
| if self.information_buffer: |
| self.message_length += len(self.information_buffer) |
| packet = self.pack(format_string, self.all_field_names) |
| if self.information_buffer: |
| packet.extend(self.information_buffer) |
| packets.append(packet) |
| return packets |
| |
| |
| def pack(self, format_string, field_names): |
| """ |
| Pack a list of fields based on their formats. |
| |
| @param format_string: The concatenated formats for the fields given in |
| |field_names|. |
| @param field_names: The name of the fields to be packed. |
| @returns The packet in binary array form. |
| |
| """ |
| field_values = [getattr(self, name) for name in field_names] |
| return array.array('B', struct.pack(format_string, *field_values)) |
| |
| |
| @classmethod |
| def get_fields(cls, get_all=False): |
| """ |
| Retrieve the fields based on the type of the control message. For |
| |MBIMOpenMessage|, |MBIMCloseMessage| and |MBIMErrorMessage|, there is |
| no |_COMMAND_INFORMATION|, so this method returns fields in |_FIELDS| |
| even if |get_all|=True. As for |MBIMCommandMessage|, this method returns |
| fields in both |_FIELDS| and |_COMMAND_INFORMATION| if |get_all|=True, |
| |_FIELDS| otherwise. |
| |
| @param get_all: The flag to determine whether |_COMMAND_INTFORMATION| |
| should be returned or not. |
| @returns The set of the fields in tuple(field format, field name) form. |
| |
| """ |
| all_fields = cls._FIELDS |
| if get_all and hasattr(cls, '_COMMAND_INFORMATION'): |
| all_fields += cls._COMMAND_INFORMATION |
| return all_fields |
| |
| |
| @classmethod |
| def parse_packets(cls, packets): |
| """ |
| Parse a sequence of packets into the corresponding response message. |
| |
| Each packet is a byte array, and the response message can be one of |
| the following type: |MBIMOpenDone|, |MBIMCloseDone| and |
| |MBIMCommandDone|. For |MBIMOpenDone| and |MBIMCloseDone|, the expected |
| number of packets is 1. As for |MBIMCommandDone|, the number of packets |
| should be at least 1, since the response message from the device may be |
| fragmented into several packetes. |
| |
| @param packets: The list of the response packets which are in byte |
| array form. |
| @returns The object of the response message. A response message can be |
| one of the following type: |MBIMOpenDone|, |MBIMCloseDone| and |
| |MBIMCommandDone|. |
| |
| """ |
| if cls.__name__ not in MSG_FROM_DEVICE_TO_HOST: |
| mbim_errors.log_and_raise(NotImplementedError) |
| |
| # Parse the first packet. |
| response_contents = cls.unpack(packets[0]) |
| response_message = cls(**response_contents) |
| field_formats, _ = zip(*cls.get_fields(get_all=True)) |
| length_of_all_fields = struct.calcsize('<' + ''.join(field_formats)) |
| |
| if cls in [MBIMCommandDone, MBIMIndicateStatusMessage]: |
| # Unpack the continuation packets of type |MBIM_COMMAND_DONE|. |
| info_buffer = array.array('B') |
| info_buffer.extend(packets[0][length_of_all_fields:]) |
| field_formats, field_names = zip(*cls.get_fields()) |
| format_string = '<' + ''.join(field_formats) |
| length_of_headers = struct.calcsize(format_string) |
| for packet in packets[1:]: |
| if len(packet) < length_of_headers: |
| mbim_errors.log_and_raise( |
| mbim_errors.MBIMComplianceControlMessageError, |
| 'The length of the continuation packet(s) for %s ' |
| 'should be at least %d.' % ( |
| cls.__name__, length_of_headers)) |
| |
| info_buffer.extend(packet[length_of_headers:]) |
| setattr(response_message, 'information_buffer', info_buffer) |
| |
| return response_message |
| |
| |
| class MBIMOpenMessage(MBIMMessageBase): |
| """ The class for MBIM_OPEN_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + (('I', 'max_control_transfer'),) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_OPEN_MSG} |
| |
| |
| class MBIMCloseMessage(MBIMMessageBase): |
| """ The class for MBIM_CLOSE_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_CLOSE_MSG} |
| |
| |
| class MBIMCommandMessage(MBIMMessageBase): |
| """ The class for MBIM_COMMAND_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + MBIMFragmentHeader.get_fields() |
| _COMMAND_INFORMATION = (('16s', 'device_service_id'), |
| ('I', 'cid'), |
| ('I', 'command_type'), |
| ('I', 'information_buffer_length')) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_COMMAND_MSG, |
| 'total_fragments': 0x00000001, |
| 'current_fragment': 0x00000000, |
| 'information_buffer_length': 0} |
| |
| |
| class MBIMHostErrorMessage(MBIMMessageBase): |
| """ The class for MBIM_ERROR_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + (('I', 'error_status_code'),) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_HOST_ERROR_MSG} |
| |
| |
| class MBIMOpenDone(MBIMMessageBase): |
| """ The class for MBIM_OPEN_DONE. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + (('I', 'status_codes'),) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_OPEN_DONE} |
| |
| |
| class MBIMCloseDone(MBIMMessageBase): |
| """ The class for MBIM_CLOSE_DONE. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + (('I', 'status_codes'),) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_CLOSE_DONE} |
| |
| |
| class MBIMCommandDone(MBIMMessageBase): |
| """ The class for MBIM_COMMAND_DONE. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + MBIMFragmentHeader.get_fields() |
| _COMMAND_INFORMATION = (('16s', 'device_service_id'), |
| ('I', 'cid'), |
| ('I', 'status_codes'), |
| ('I', 'information_buffer_length')) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_COMMAND_DONE} |
| |
| |
| class MBIMIndicateStatusMessage(MBIMMessageBase): |
| """ The class for MBIM_INDICATE_STATUS_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + MBIMFragmentHeader.get_fields() |
| _COMMAND_INFORMATION = (('16s', 'device_service_id'), |
| ('I', 'cid'), |
| ('I', 'information_buffer_length')) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_INDICATE_STATUS_MSG} |
| |
| |
| class MBIMFunctionErrorMessage(MBIMMessageBase): |
| """ The class for MBIM_FUNCTION_ERROR_MSG. """ |
| |
| _FIELDS = MBIMHeader.get_fields() + (('I', 'error_status_code'),) |
| _DEFAULTS = {'message_type': mbim_constants.MBIM_FUNCTION_ERROR_MSG} |
| |
| |
| def parse_response_packets(packets): |
| """ |
| Parse the response packets based on the response message type. |
| |
| @param packets: The list of packets to be parsed. Each packet is in byte |
| array form. |
| @returns The object of the response message. A response message can be |
| one of the following type: |MBIMOpenDone|, |MBIMCloseDone| and |
| |MBIMCommandDone|. |
| |
| """ |
| # TODO(mcchou): Handle the fragmented MBIM_COMMAND_DONE response and come up |
| # with a generic parser. |
| if not packets: |
| mbim_errors.log_and_raise(mbim_errors.MBIMComplianceControlMessageError, |
| 'Expected at least 1 packet to parse, got 0.') |
| |
| # Parse the packet header to get the response message type. |
| header_contents = MBIMHeader.unpack(packets[0]) |
| # Parse |packets| based on the response message type. |
| PARSER_MAP = { |
| mbim_constants.MBIM_OPEN_DONE: MBIMOpenDone.parse_packets, |
| mbim_constants.MBIM_CLOSE_DONE: MBIMCloseDone.parse_packets, |
| mbim_constants.MBIM_COMMAND_DONE: MBIMCommandDone.parse_packets, |
| mbim_constants.MBIM_FUNCTION_ERROR_MSG: |
| MBIMFunctionErrorMessage.parse_packets, |
| mbim_constants.MBIM_INDICATE_STATUS_MSG: |
| MBIMIndicateStatusMessage.parse_packets} |
| message_type = header_contents['message_type'] |
| parser = PARSER_MAP.get(message_type) |
| if parser is None: |
| mbim_errors.log_and_raise(NotImplementedError) |
| |
| response_message = parser(packets) |
| return response_message |