blob: 31aa4598670a24e0bed2785e2eeeaa0829674bc7 [file] [log] [blame]
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper class to manage bluetooth logs"""
from datetime import datetime
import logging
import os
import re
import time
class LogManager(object):
"""The LogManager class helps to collect logs without a listening thread"""
class LoggingException(Exception):
"""A stub exception class for LogManager class."""
pass
def __init__(self, log_path):
"""Initialize log manager object
@param log_path: string path to log file to manage
@raises: LogManager.LoggingException on non-existent log file
"""
if not os.path.isfile(log_path):
msg = 'Requested log file {} does not exist'.format(log_path)
raise LogManager.LoggingException(msg)
self.log_path = log_path
self.initial_log_size = -1
self.log_contents = []
def _GetSize(self):
"""Get the size of the log"""
return os.path.getsize(self.log_path)
def StartRecording(self):
"""Mark initial log size for later comparison"""
self.initial_log_size = self._GetSize()
self.log_contents = []
def StopRecording(self):
"""Gather the logs since StartRecording was called
@raises: LogManager.LoggingException if:
- Log file disappeared since StartRecording was called
- Log file is smaller than when logging began
- StartRecording was never called
"""
now_size = self._GetSize()
if not os.path.isfile(self.log_path):
msg = 'File {} disappeared unexpectedly'.format(self.log_path)
raise LogManager.LoggingException(msg)
if now_size < self.initial_log_size:
msg = 'Log became smaller unexpectedly'
raise LogManager.LoggingException(msg)
if self.initial_log_size < 0:
msg = 'Recording stopped before it started'
raise LogManager.LoggingException(msg)
with open(self.log_path, 'r') as mf:
# Skip to the point where we started recording
mf.seek(self.initial_log_size)
readsize = now_size - self.initial_log_size
self.log_contents = mf.read(readsize).split('\n')
def LogContains(self, search_str):
"""Performs simple string checking on each line from the collected log
@param search_str: string to be located within log contents. This arg
is expected to not span between lines in the logs
@returns: True if search_str was located in the collected log contents,
False otherwise
"""
for line in self.log_contents:
if re.search(search_str, line):
return True
return False
def FilterOut(self, rm_reg_exp):
"""Remove lines with specified pattern from the log file
@param rm_reg_exp: regular expression of the lines to be removed
"""
rm_line_cnt = 0
initial_size = self._GetSize()
rm_pattern = re.compile(rm_reg_exp)
with open(self.log_path, 'r+') as mf:
lines = mf.readlines()
mf.seek(0)
for line in lines:
if rm_pattern.search(line):
rm_line_cnt += 1
else:
mf.write(line)
mf.truncate()
rm_byte = initial_size - self._GetSize()
logging.info('Removed number of line: %d, Reduced log size: %d byte',
rm_line_cnt, rm_byte)
class InterleaveLogger(LogManager):
"""LogManager class that focus on interleave scan"""
SYSLOG_PATH = '/var/log/messages'
# Example bluetooth kernel log:
# "2020-11-23T07:52:31.395941Z DEBUG kernel: [ 6469.811135] Bluetooth: "
# "cancel_interleave_scan() hci0: cancelling interleave scan"
KERNEL_LOG_PATTERN = ('([^ ]+) DEBUG kernel: \[.*\] Bluetooth: '
'{FUNCTION}\(\) hci0: {LOG_STR}')
STATE_PATTERN = KERNEL_LOG_PATTERN.format(
FUNCTION='hci_req_add_le_interleaved_scan',
LOG_STR='next state: (.+)')
CANCEL_PATTERN = KERNEL_LOG_PATTERN.format(
FUNCTION='cancel_interleave_scan',
LOG_STR='cancelling interleave scan')
SYSTIME_LENGTH = len('2020-12-18T00:11:22.345678')
def __init__(self):
""" Initialize object
"""
self.reset()
self.state_pattern = re.compile(self.STATE_PATTERN)
self.cancel_pattern = re.compile(self.CANCEL_PATTERN)
super(InterleaveLogger, self).__init__(self.SYSLOG_PATH)
def reset(self):
""" Clear data between each log collection attempt
"""
self.records = []
self.cancel_events = []
def StartRecording(self):
""" Reset the previous data and start recording.
"""
self.reset()
super(InterleaveLogger, self).StartRecording()
def StopRecording(self):
""" Stop recording and parse logs
The following data will be set after this call
- self.records: a dictionary where each item is a record of
interleave |state| and the |time| the state starts.
|state| could be {'no filter', 'allowlist'}
|time| is system time in sec
- self.cancel_events: a list of |time| when a interleave cancel
event log was found
|time| is system time in sec
@returns: True if StopRecording success, False otherwise
"""
try:
super(InterleaveLogger, self).StopRecording()
except Exception as e:
logging.error(e)
return False
success = True
def sys_time_to_timestamp(time_str):
""" Return timestamp of time_str """
# This is to remove the suffix of time string, in some cases the
# time string ends with an extra 'Z', in other cases, the string
# ends with time zone (ex. '+08:00')
time_str = time_str[:self.SYSTIME_LENGTH]
try:
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f")
except Exception as e:
logging.error(e)
success = False
return 0
return time.mktime(dt.timetuple()) + dt.microsecond * (10**-6)
for line in self.log_contents:
line = line.strip().replace('\\r\\n', '')
state_pattern = self.state_pattern.search(line)
cancel_pattern = self.cancel_pattern.search(line)
if cancel_pattern:
time_str = cancel_pattern.groups()[0]
time_sec = sys_time_to_timestamp(time_str)
self.cancel_events.append(time_sec)
if state_pattern:
time_str, state = state_pattern.groups()
time_sec = sys_time_to_timestamp(time_str)
self.records.append({'time': time_sec, 'state': state})
return success