blob: 21cd5594b57720418446406eb16cb19f931df1f5 [file] [log] [blame]
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import time
import logging
from autotest_lib.server.cros.cellular import cellular_simulator as cc
from autotest_lib.server.cros.cellular.callbox_utils import cmw500
from autotest_lib.server.cros.cellular.simulation_utils import LteSimulation
CMW_TM_MAPPING = {
LteSimulation.TransmissionMode.TM1: cmw500.TransmissionModes.TM1,
LteSimulation.TransmissionMode.TM2: cmw500.TransmissionModes.TM2,
LteSimulation.TransmissionMode.TM3: cmw500.TransmissionModes.TM3,
LteSimulation.TransmissionMode.TM4: cmw500.TransmissionModes.TM4,
LteSimulation.TransmissionMode.TM7: cmw500.TransmissionModes.TM7,
LteSimulation.TransmissionMode.TM8: cmw500.TransmissionModes.TM8,
LteSimulation.TransmissionMode.TM9: cmw500.TransmissionModes.TM9
}
CMW_SCH_MAPPING = {
LteSimulation.SchedulingMode.STATIC:
cmw500.SchedulingMode.USERDEFINEDCH
}
CMW_MIMO_MAPPING = {
LteSimulation.MimoMode.MIMO_1x1: cmw500.MimoModes.MIMO1x1,
LteSimulation.MimoMode.MIMO_2x2: cmw500.MimoModes.MIMO2x2,
LteSimulation.MimoMode.MIMO_4x4: cmw500.MimoModes.MIMO4x4
}
CMW_MODULATION_MAPPING = {
LteSimulation.ModulationType.QPSK: cmw500.ModulationType.QPSK,
LteSimulation.ModulationType.Q16: cmw500.ModulationType.Q16,
LteSimulation.ModulationType.Q64: cmw500.ModulationType.Q64,
LteSimulation.ModulationType.Q256: cmw500.ModulationType.Q256
}
# get mcs vs tbsi map with 256-qam disabled(downlink)
get_mcs_tbsi_map_dl = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9
},
cmw500.ModulationType.Q16: {
10: 9,
11: 10,
12: 11,
13: 12,
14: 13,
15: 14,
16: 15
},
cmw500.ModulationType.Q64: {
17: 15,
18: 16,
19: 17,
20: 18,
21: 19,
22: 20,
23: 21,
24: 22,
25: 23,
26: 24,
27: 25,
28: 26
}
}
# get mcs vs tbsi map with 256-qam enabled(downlink)
get_mcs_tbsi_map_for_256qam_dl = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 2,
2: 4,
3: 6,
4: 8,
},
cmw500.ModulationType.Q16: {
5: 10,
6: 11,
7: 12,
8: 13,
9: 14,
10: 15
},
cmw500.ModulationType.Q64: {
11: 16,
12: 17,
13: 18,
14: 19,
15: 20,
16: 21,
17: 22,
18: 23,
19: 24
},
cmw500.ModulationType.Q256: {
20: 25,
21: 27,
22: 28,
23: 29,
24: 30,
25: 31,
26: 32,
27: 33
}
}
# get mcs vs tbsi map (uplink)
get_mcs_tbsi_map_ul = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9
},
cmw500.ModulationType.Q16: {
10: 10,
11: 10,
12: 11,
13: 12,
14: 13,
15: 14,
16: 15,
17: 16,
18: 17,
19: 18,
20: 19,
21: 19,
22: 20,
23: 21,
24: 22,
25: 23,
26: 24,
27: 25,
28: 26
}
}
class CMW500CellularSimulator(cc.AbstractCellularSimulator):
""" A cellular simulator for telephony simulations based on the CMW 500
controller. """
# Indicates if it is able to use 256 QAM as the downlink modulation for LTE
LTE_SUPPORTS_DL_256QAM = True
# Indicates if it is able to use 64 QAM as the uplink modulation for LTE
LTE_SUPPORTS_UL_64QAM = True
# Indicates if 4x4 MIMO is supported for LTE
LTE_SUPPORTS_4X4_MIMO = True
# The maximum number of carriers that this simulator can support for LTE
LTE_MAX_CARRIERS = 1
def __init__(self, ip_address, port):
""" Initializes the cellular simulator.
Args:
ip_address: the ip address of the CMW500
port: the port number for the CMW500 controller
"""
try:
self.cmw = cmw500.Cmw500(ip_address, port)
except cmw500.CmwError:
raise cc.CellularSimulatorError('Could not connect to CMW500.')
self.bts = None
self.log = logging.getLogger(__name__)
self.dl_modulation = None
self.ul_modulation = None
def destroy(self):
""" Sends finalization commands to the cellular equipment and closes
the connection. """
self.cmw.disconnect()
def setup_lte_scenario(self):
""" Configures the equipment for an LTE simulation. """
self.cmw.connection_type = cmw500.ConnectionType.DAU
self.bts = [self.cmw.get_base_station()]
self.cmw.switch_lte_signalling(cmw500.LteState.LTE_ON)
def setup_lte_ca_scenario(self):
""" Configures the equipment for an LTE with CA simulation. """
raise NotImplementedError()
def set_lte_rrc_state_change_timer(self, enabled, time=10):
""" Configures the LTE RRC state change timer.
Args:
enabled: a boolean indicating if the timer should be on or off.
time: time in seconds for the timer to expire
"""
if enabled:
self.cmw.rrc_connection = cmw500.RrcState.RRC_OFF
self.cmw.rrc_connection_timer = time
else:
self.cmw.rrc_connection = cmw500.RrcState.RRC_ON
def set_band(self, bts_index, band):
""" Sets the band for the indicated base station.
Args:
bts_index: the base station number
band: the new band
"""
bts = self.bts[bts_index]
bts.duplex_mode = self.get_duplex_mode(band)
band = 'OB' + band
bts.band = band
self.log.debug('Band set to {}'.format(band))
def get_duplex_mode(self, band):
""" Determines if the band uses FDD or TDD duplex mode
Args:
band: a band number
Returns:
an variable of class DuplexMode indicating if band is FDD or TDD
"""
if 33 <= int(band) <= 46:
return cmw500.DuplexMode.TDD
else:
return cmw500.DuplexMode.FDD
def set_input_power(self, bts_index, input_power):
""" Sets the input power for the indicated base station.
Args:
bts_index: the base station number
input_power: the new input power
"""
bts = self.bts[bts_index]
if input_power > 23:
self.log.warning('Open loop supports-50dBm to 23 dBm. '
'Setting it to max power 23 dBm')
input_power = 23
bts.uplink_power_control = input_power
bts.tpc_power_control = cmw500.TpcPowerControl.CLOSED_LOOP
bts.tpc_closed_loop_target_power = input_power
def set_output_power(self, bts_index, output_power):
""" Sets the output power for the indicated base station.
Args:
bts_index: the base station number
output_power: the new output power
"""
bts = self.bts[bts_index]
bts.downlink_power_level = output_power
def set_tdd_config(self, bts_index, tdd_config):
""" Sets the tdd configuration number for the indicated base station.
Args:
bts_index: the base station number
tdd_config: the new tdd configuration number
"""
self.bts[bts_index].uldl_configuration = tdd_config
def set_ssf_config(self, bts_index, ssf_config):
""" Sets the Special Sub-Frame config number for the indicated
base station.
Args:
bts_index: the base station number
ssf_config: the new ssf config number
"""
if not 0 <= ssf_config <= 9:
raise ValueError('The Special Sub-Frame configuration has to be a '
'number between 0 and 9.')
self.bts[bts_index].tdd_special_subframe = ssf_config
def set_bandwidth(self, bts_index, bandwidth):
""" Sets the bandwidth for the indicated base station.
Args:
bts_index: the base station number
bandwidth: the new bandwidth
"""
bts = self.bts[bts_index]
if bandwidth == 20:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_20MHz
elif bandwidth == 15:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_15MHz
elif bandwidth == 10:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_10MHz
elif bandwidth == 5:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_5MHz
elif bandwidth == 3:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_3MHz
elif bandwidth == 1.4:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_1MHz
else:
msg = 'Bandwidth {} MHz is not valid for LTE'.format(bandwidth)
raise ValueError(msg)
def set_downlink_channel_number(self, bts_index, channel_number):
""" Sets the downlink channel number for the indicated base station.
Args:
bts_index: the base station number
channel_number: the new channel number
"""
bts = self.bts[bts_index]
bts.dl_channel = channel_number
self.log.debug('Downlink Channel set to {}'.format(bts.dl_channel))
def set_mimo_mode(self, bts_index, mimo_mode):
""" Sets the mimo mode for the indicated base station.
Args:
bts_index: the base station number
mimo_mode: the new mimo mode
"""
bts = self.bts[bts_index]
mimo_mode = CMW_MIMO_MAPPING[mimo_mode]
if mimo_mode == cmw500.MimoModes.MIMO1x1:
self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN1x1)
bts.dl_antenna = cmw500.MimoModes.MIMO1x1
elif mimo_mode == cmw500.MimoModes.MIMO2x2:
self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN2x2)
bts.dl_antenna = cmw500.MimoModes.MIMO2x2
elif mimo_mode == cmw500.MimoModes.MIMO4x4:
self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN4x4)
bts.dl_antenna = cmw500.MimoModes.MIMO4x4
else:
raise RuntimeError('The requested MIMO mode is not supported.')
def set_transmission_mode(self, bts_index, tmode):
""" Sets the transmission mode for the indicated base station.
Args:
bts_index: the base station number
tmode: the new transmission mode
"""
bts = self.bts[bts_index]
tmode = CMW_TM_MAPPING[tmode]
if (tmode in [
cmw500.TransmissionModes.TM1, cmw500.TransmissionModes.TM7
] and bts.dl_antenna == cmw500.MimoModes.MIMO1x1.value):
bts.transmode = tmode
elif (tmode.value in cmw500.TransmissionModes.__members__
and bts.dl_antenna == cmw500.MimoModes.MIMO2x2.value):
bts.transmode = tmode
elif (tmode in [
cmw500.TransmissionModes.TM2, cmw500.TransmissionModes.TM3,
cmw500.TransmissionModes.TM4, cmw500.TransmissionModes.TM9
] and bts.dl_antenna == cmw500.MimoModes.MIMO4x4.value):
bts.transmode = tmode
else:
raise ValueError('Transmission modes should support the current '
'mimo mode')
def set_scheduling_mode(self,
bts_index,
scheduling,
mcs_dl=None,
mcs_ul=None,
nrb_dl=None,
nrb_ul=None):
""" Sets the scheduling mode for the indicated base station.
Args:
bts_index: the base station number.
scheduling: the new scheduling mode.
mcs_dl: Downlink MCS.
mcs_ul: Uplink MCS.
nrb_dl: Number of RBs for downlink.
nrb_ul: Number of RBs for uplink.
"""
bts = self.bts[bts_index]
bts.reduced_pdcch = cmw500.ReducedPdcch.ON
scheduling = CMW_SCH_MAPPING[scheduling]
bts.scheduling_mode = scheduling
if not (self.ul_modulation and self.dl_modulation):
raise ValueError('Modulation should be set prior to scheduling '
'call')
if scheduling == cmw500.SchedulingMode.RMC:
if not nrb_ul and nrb_dl:
raise ValueError('nrb_ul and nrb dl should not be none')
bts.rb_configuration_ul = (nrb_ul, self.ul_modulation, 'KEEP')
self.log.info('ul rb configurations set to {}'.format(
bts.rb_configuration_ul))
time.sleep(1)
self.log.debug('Setting rb configurations for down link')
bts.rb_configuration_dl = (nrb_dl, self.dl_modulation, 'KEEP')
self.log.info('dl rb configurations set to {}'.format(
bts.rb_configuration_ul))
elif scheduling == cmw500.SchedulingMode.USERDEFINEDCH:
if not all([nrb_ul, nrb_dl, mcs_dl, mcs_ul]):
raise ValueError('All parameters are mandatory.')
tbs = get_mcs_tbsi_map_ul[self.ul_modulation][mcs_ul]
bts.rb_configuration_ul = (nrb_ul, 0, self.ul_modulation, tbs)
self.log.info('ul rb configurations set to {}'.format(
bts.rb_configuration_ul))
time.sleep(1)
if self.dl_modulation == cmw500.ModulationType.Q256:
tbs = get_mcs_tbsi_map_for_256qam_dl[
self.dl_modulation][mcs_dl]
else:
tbs = get_mcs_tbsi_map_dl[self.dl_modulation][mcs_dl]
bts.rb_configuration_dl = (nrb_dl, 0, self.dl_modulation, tbs)
self.log.info('dl rb configurations set to {}'.format(
bts.rb_configuration_dl))
def set_dl_modulation(self, bts_index, modulation):
""" Sets the DL modulation for the indicated base station.
This function does not actually configure the test equipment with this
setting, but stores the value to be used later on when setting the
scheduling type. This is because the CMW500 API only allows to set
this parameters together.
Args:
bts_index: the base station number
modulation: the new DL modulation
"""
# Convert dl modulation type to CMW modulation type.
self.dl_modulation = CMW_MODULATION_MAPPING[modulation]
self.log.warning('Modulation config stored but not applied until '
'set_scheduling_mode called.')
def set_ul_modulation(self, bts_index, modulation):
""" Sets the UL modulation for the indicated base station.
This function does not actually configure the test equipment with this
setting, but stores the value to be used later on when setting the
scheduling type. This is because the CMW500 API only allows to set
this parameters together.
Args:
bts_index: the base station number
modulation: the new UL modulation
"""
# Convert ul modulation type to CMW modulation type.
self.ul_modulation = CMW_MODULATION_MAPPING[modulation]
self.log.warning('Modulation config stored but not applied until '
'set_scheduling_mode called.')
def set_tbs_pattern_on(self, bts_index, tbs_pattern_on):
""" Enables or disables TBS pattern in the indicated base station.
Args:
bts_index: the base station number
tbs_pattern_on: the new TBS pattern setting
"""
# TODO (b/143918664): CMW500 doesn't have an equivalent setting.
pass
def set_cfi(self, bts_index, cfi):
""" Sets the Channel Format Indicator for the indicated base station.
Args:
bts_index: the base station number
cfi: the new CFI setting
"""
# TODO (b/143497738): implement.
self.log.error('Setting CFI is not yet implemented in the CMW500 '
'controller.')
def set_paging_cycle(self, bts_index, cycle_duration):
""" Sets the paging cycle duration for the indicated base station.
Args:
bts_index: the base station number
cycle_duration: the new paging cycle duration in milliseconds
"""
# TODO (b/146068532): implement.
self.log.error('Setting the paging cycle duration is not yet '
'implemented in the CMW500 controller.')
def set_phich_resource(self, bts_index, phich):
""" Sets the PHICH Resource setting for the indicated base station.
Args:
bts_index: the base station number
phich: the new PHICH resource setting
"""
self.log.error('Configuring the PHICH resource setting is not yet '
'implemented in the CMW500 controller.')
def lte_attach_secondary_carriers(self, ue_capability_enquiry):
""" Activates the secondary carriers for CA. Requires the DUT to be
attached to the primary carrier first.
Args:
ue_capability_enquiry: UE capability enquiry message to be sent to
the UE before starting carrier aggregation.
"""
raise NotImplementedError()
def wait_until_attached(self, timeout=120):
""" Waits until the DUT is attached to the primary carrier.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_attached_state(timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'attached state before '
'the timeout period ended.')
def wait_until_communication_state(self, timeout=120):
""" Waits until the DUT is in Communication state.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_rrc_state(cmw500.LTE_CONN_RESP, timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'Communication state before '
'the timeout period ended.')
def wait_until_idle_state(self, timeout=120):
""" Waits until the DUT is in Idle state.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_rrc_state(cmw500.LTE_IDLE_RESP, timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'Idle state before '
'the timeout period ended.')
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
self.cmw.detach()
def stop(self):
""" Stops current simulation. After calling this method, the simulator
will need to be set up again. """
raise NotImplementedError()
def start_data_traffic(self):
""" Starts transmitting data from the instrument to the DUT. """
raise NotImplementedError()
def stop_data_traffic(self):
""" Stops transmitting data from the instrument to the DUT. """
raise NotImplementedError()