blob: 9c959963eaa63397ebf5d27ab60ac8058b6eefb2 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import multiprocessing
import Queue
import struct
import time
import common
from autotest_lib.client.bin import utils
from autotest_lib.client.cros.cellular.mbim_compliance \
import mbim_channel_endpoint
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
class MBIMChannel(object):
"""
Provide synchronous access to the modem with MBIM command level interaction.
This object should simplify your interaction over the MBIM channel as
follows:
- Use |bidirectional_transaction| to send MBIM packets that are part of a
transaction. This function will block until the transaction completes and
return the MBIM packets received in response.
- |bidirectional_transaction| will filter out packets that do not correspond
to your transaction. This way, you don't have to worry about unsolicited
notifications and/or stale packets when interacting with the modem.
- All filtered out packets can be grabbed using the
|get_outstanding_packets| function. Use this function to receive error
notifications, status notifications, etc.
- Use |unidirectional_transaction| to send MBIM packets for which you don't
expect a response.
- Use |flush| to clean out all pipes before starting a new transaction.
Note that "MBIM packets" here really means MBIM fragments. This object does
not (de)fragment packets for you. Out of necessity, it does check that
received fragments are contiguous and in-order.
So, this object houses the minimum information necessary about the MBIM
fragments to provide you a comfortable synchronous packet level channel.
"""
ENDPOINT_JOIN_TIMEOUT_S = 5
FRAGMENT_TIMEOUT_S = 3
# TODO(pprabhu) Consider allowing each transaction to specify its own
# timeout.
TRANSACTION_TIMEOUT_S = 5
MESSAGE_HEADER_FORMAT = '<LLL'
FRAGMENT_HEADER_FORMAT = '<LL'
MBIM_FRAGMENTED_MESSAGES = [
0x00000003, # MBIM_COMMAND_MSG
0x80000003, # MBIM_COMMAND_DONE
0x80000007] # MBIM_INDICATE_STATUS
def __init__(self,
device,
interface_number,
interrupt_endpoint_address,
in_buffer_size,
process_class=None):
"""
@param device: Device handle returned by PyUSB for the modem to test.
@param interface_number: |bInterfaceNumber| of the MBIM interface.
@param interrupt_endpoint_address: |bEndpointAddress| for the usb
INTERRUPT IN endpoint for notifications.
@param in_buffer_size: The (fixed) buffer size to use for in control
transfers.
@param process_class: The class to instantiate to create a subprocess.
This is used by tests only, to easily mock out the process
ceation.
"""
self._stop_request_event = multiprocessing.Event()
self._request_queue = multiprocessing.Queue()
self._response_queue = multiprocessing.Queue()
self._outstanding_packets = []
self._last_response = []
self._stashed_first_fragment = None
if process_class is None:
process_class = multiprocessing.Process
self._endpoint_process = process_class(
target=mbim_channel_endpoint.MBIMChannelEndpoint,
args=(device,
interface_number,
interrupt_endpoint_address,
in_buffer_size,
self._request_queue,
self._response_queue,
self._stop_request_event))
self._endpoint_process.start()
def __del__(self):
"""
The destructor.
Note that it is not guaranteed that |__del__| is called for objects that
exist when the interpreter exits. It is recommended to call |close|
explicitly.
"""
self.close()
def close(self):
"""
Cleanly close the MBIMChannel.
MBIMChannel forks a subprocess to communicate with the USB device. It is
recommended that |close| be called explicitly.
"""
if not self._endpoint_process:
return
if self._endpoint_process.is_alive():
self._stop_request_event.set()
self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S)
if self._endpoint_process.is_alive():
self._endpoint_process.terminate()
self._endpoint_process = None
def bidirectional_transaction(self, *args):
"""
Execute a synchronous bidirectional transaction.
@param *args: Fragments of a single MBIM transaction. An MBIM
transaction may consist of multiple fragments - each fragment is
the payload for a USB control message. It should be an
|array.array| object. It is your responsibility (and choice) to
keep the fragments in-order, and to send all the fragments.
For more details, see "Fragmentation of messages" in the MBIM
spec.
@returns: A list of fragments in the same order as received that
correspond to the given transaction. If we receive less
fragments than claimed, we will return what we get. If we
receive non-contiguous / out-of-order fragments, we'll complain.
@raises: MBIMComplianceChannelError if received fragments are
out-of-order or non-contigouos.
"""
self._verify_endpoint_open()
if not args:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'No data given to |bidirectional_transaction|.')
transaction_id, _, _ = self._fragment_metadata(args[0])
for fragment in args:
self._request_queue.put_nowait(fragment)
return self._get_response_fragments(transaction_id)
def unidirectional_transaction(self, *args):
"""
Execute a synchronous unidirectional transaction. No return value.
@param *args: Fragments of a single MBIM transaction. An MBIM
transaction may consist of multiple fragments - each fragment is
the payload for a USB control message. It should be an
|array.array| object. It is your responsibility (and choice) to
keep the fragments in-order, and to send all the fragments.
For more details, see "Fragmentation of messages" in the MBIM
spec.
"""
self._verify_endpoint_open()
if not args:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'No data given to |unidirectional_transaction|.')
for fragment in args:
self._request_queue.put_nowait(fragment)
def flush(self):
"""
Clean out all queues.
This waits till all outgoing packets have been sent, and then waits some
more to give the channel time to settle down.
@raises: MBIMComplianceChannelError if things don't settle down fast
enough.
"""
self._verify_endpoint_open()
num_remaining_fragments = self._request_queue.qsize()
try:
timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments
utils.poll_for_condition(lambda: self._request_queue.empty(),
timeout=timeout)
except utils.TimeoutError:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Could not flush request queue.')
# Now wait for the response queue to settle down.
# In the worst case, each request fragment that was remaining at the
# time flush was called belonged to a different transaction, and each of
# these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|.
# To avoid sleeping for long times, we cap this value arbitrarily to 5
# transactions.
num_remaining_transactions = min(5, num_remaining_fragments)
time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S)
extra_packets = self.get_outstanding_packets()
for packet in extra_packets:
logging.debug('flush: discarding packet: %s', packet)
def get_outstanding_packets(self):
"""
Get all received packets that were not part of an explicit transaction.
@returns: A list of packets. Each packet is a list of fragments, so you
perhaps want to do something like:
for packet in channel.get_outstanding_packets():
for fragment in packet:
# handle fragment.
"""
self._verify_endpoint_open()
# Try to get more packets from the response queue.
# This can block forever if the modem keeps spewing trash at us.
while True:
packet = self._get_packet_fragments()
if not packet:
break
self._outstanding_packets.append(packet)
packets = self._outstanding_packets
self._outstanding_packets = []
return packets
def _get_response_fragments(self, transaction_id):
"""
Get response for the given |transaction_id|.
@returns: A list of fragments.
@raises: MBIMComplianceChannelError if response is not recieved.
"""
def _poll_response():
packet = self._get_packet_fragments()
if not packet:
return False
first_fragment = packet[0]
response_id, _, _ = self._fragment_metadata(first_fragment)
if response_id == transaction_id:
self._last_response = packet
return True
self._outstanding_packets.append(packet)
return False
try:
utils.poll_for_condition(
_poll_response,
timeout=self.TRANSACTION_TIMEOUT_S)
except utils.TimeoutError:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Did not receive timely reply to transaction %d' %
transaction_id)
return self._last_response
def _get_packet_fragments(self):
"""
Get all fragements of the next packet from the modem.
This function is responsible for putting together fragments of one
packet, and checking that fragments are continguous and in-order.
"""
fragments = []
if self._stashed_first_fragment is not None:
first_fragment = self._stashed_first_fragment
self._stashed_first_fragment = None
else:
try:
first_fragment = self._response_queue.get(
True, self.FRAGMENT_TIMEOUT_S)
except Queue.Empty:
# *Don't fail* Just return nothing.
return fragments
transaction_id, total_fragments, current_fragment = (
self._fragment_metadata(first_fragment))
if current_fragment != 0:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'First fragment reports fragment number %d' %
current_fragment)
fragments.append(first_fragment)
last_fragment = 0
while last_fragment < total_fragments - 1:
try:
fragment = self._response_queue.get(True,
self.FRAGMENT_TIMEOUT_S)
except Queue.Empty:
# *Don't fail* Just return the fragments we got so far.
break
fragment_id, fragment_total, fragment_current = (
self._fragment_metadata(fragment))
if fragment_id != transaction_id:
# *Don't fail* Treat a different transaction id as indicating
# that the next packet has already arrived.
logging.warning('Recieved only %d out of %d fragments for '
'transaction %d.',
last_fragment,
total_fragments,
transaction_id)
self._stashed_first_fragment = fragment
break
if fragment_total != total_fragments:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Fragment number %d reports incorrect total (%d/%d)' %
(last_fragment + 1, fragment_total, total_fragments))
if fragment_current != last_fragment + 1:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Received reordered fragments. Expected %d, got %d' %
(last_fragment + 1, fragment_current))
last_fragment += 1
fragments.append(fragment)
return fragments
def _fragment_metadata(self, fragment):
""" This function houses all the MBIM packet knowledge. """
# All packets have a message header.
if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT):
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Corrupted fragment |%s| does not have an MBIM header.' %
fragment)
message_type, _, transaction_id = struct.unpack_from(
self.MESSAGE_HEADER_FORMAT,
fragment)
if message_type in self.MBIM_FRAGMENTED_MESSAGES:
fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):]
if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT):
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'Corrupted fragment |%s| does not have a fragment '
'header. ' %
fragment)
total_fragments, current_fragment = struct.unpack_from(
self.FRAGMENT_HEADER_FORMAT,
fragment)
else:
# For other types, there is only one 'fragment'.
total_fragments = 1
current_fragment = 0
return transaction_id, total_fragments, current_fragment
def _verify_endpoint_open(self):
if not self._endpoint_process.is_alive():
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceChannelError,
'MBIMChannelEndpoint died unexpectedly. '
'The actual exception can be found in log entries from the '
'subprocess.')