blob: b3aeb4c9304b917ec9a02b348028098274a2be1c [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import numpy
import common
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
class MBIMDataChannel(object):
"""
Provides access to the data channel of a MBIM modem.
The object is used to send and receive MBIM frames to/from the modem.
The object uses the BULK-IN endpoint exposed in the data interface for any
reads from the modem to the host.
The object uses the BULK-OUT endpoint exposed in the data interface for any
writes from the host to the modem.
The channel does not deaggregate/aggregate packets into MBIM frames. The
caller is expected to validate/provide MBIM frames to the channel. The
channel is just used to send raw bytes to the device and read raw bytes from
the device.
"""
_READ_TIMEOUT_MS = 10000
_WRITE_TIMEOUT_MS = 10000
def __init__(self,
device,
data_interface_number,
bulk_in_endpoint_address,
bulk_out_endpoint_address,
max_in_buffer_size):
"""
@param device: Device handle returned by PyUSB for the modem to test.
@param bulk_in_endpoint_address: |bEndpointAddress| for the usb
BULK IN endpoint from the data interface.
@param bulk_out_endpoint_address: |bEndpointAddress| for the usb
BULK OUT endpoint from the data interface.
@param max_in_buffer_size: The (fixed) buffer size to used for in
data transfers.
"""
self._device = device
self._data_interface_number = data_interface_number
self._bulk_in_endpoint_address = bulk_in_endpoint_address
self._bulk_out_endpoint_address = bulk_out_endpoint_address
self._max_in_buffer_size = max_in_buffer_size
def send_ntb(self, ntb):
"""
Send the specified payload down to the device using the bulk-out USB
pipe.
@param ntb: Byte array of complete MBIM NTB to be sent to the device.
@raises MBIMComplianceDataTransferError if the complete |ntb| could not
be sent.
"""
ntb_length = len(ntb)
written = self._device.write(endpoint=self._bulk_out_endpoint_address,
data=ntb,
timeout=self._WRITE_TIMEOUT_MS,
interface=self._data_interface_number)
numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
linewidth=1000)
logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. '
'Payload: %s',
written, ntb_length, numpy.array(ntb))
if written < ntb_length:
mbim_errors.log_and_raise(
mbim_errors.MBIMComplianceDataTransferError,
'Could not send the complete NTB (%d/%d bytes sent)' %
written, ntb_length)
def receive_ntb(self):
"""
Receive a payload from the device using the bulk-in USB pipe.
This API will return any data it receives from the device within
|_READ_TIMEOUT_S| seconds. If nothing is received within this duration,
it returns an empty byte array. The API returns only one MBIM NTB
received per invocation.
@returns Byte array of complete MBIM NTB received from the device. This
could be empty if nothing is received from the device.
"""
ntb = self._device.read(endpoint=self._bulk_in_endpoint_address,
size=self._max_in_buffer_size,
timeout=self._READ_TIMEOUT_MS,
interface=self._data_interface_number)
ntb_length = len(ntb)
numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
linewidth=1000)
logging.debug('Data Channel: Received %d bytes response. Payload: %s',
ntb_length, numpy.array(ntb))
return ntb