blob: d901e5509820ac7d3e70aceb6fddf1d35df2e494 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, re
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
class KernelTrace(object):
"""Allows access and control to Kernel tracing facilities.
Example code snippet:
trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
results = trace.read(regexp=r'frequency=(\d+)')
Public methods:
on : Enables tracing
off : Disables tracing
is_tracing : Returns Boolean of tracing status.
event_on : Turns event on. Returns boolean of success
event_off : Turns event off. Returns boolean of success
flush : Flushes trace buffer
read : Reads trace buffer returns list of
- tuples if regexp provided
- else matching string
uptime_secs : Returns float of current uptime.
Private functions:
_onoff : Disable/enable tracing
_onoff_event : Disable/enable events
Private attributes:
_buffer : list to hold parsed results from trace buffer
_buffer_ptr : integer pointing to last byte read
TODO(tbroch): List of potential enhancements
- currently only supports trace events. Add other tracers.
"""
_TRACE_ROOT = '/sys/kernel/debug/tracing'
_TRACE_ON_PATH = os.path.join(_TRACE_ROOT, 'tracing_on')
def __init__(self, flush=True, events=None, on=True):
"""Constructor for KernelTrace class"""
self._buffer = []
self._buffer_ptr = 0
self._events = []
self._on = on
if flush:
self.flush()
for event in events:
if self.event_on(event):
self._events.append(event)
if on:
self.on()
def __del__(self, flush=True, events=None, on=True):
"""Deconstructor for KernelTrace class"""
for event in self._events:
self.event_off(event)
if self._on:
self.off()
def _onoff(self, val):
"""Turn tracing on or off.
Arguments:
val: integer, 1 for on, 0 for off
Raises:
error.TestFail: If unable to turn tracing on or off.
"""
utils.write_one_line(self._TRACE_ON_PATH, val)
result = int(utils.read_one_line(self._TRACE_ON_PATH).strip())
if not result == val:
raise error.TestFail("Unable to %sable tracing" %
'en' if val == 1 else 'dis')
def on(self):
"""Enable tracing."""
return self._onoff(1)
def off(self):
"""Disable tracing."""
self._onoff(0)
def is_tracing(self):
"""Is tracing on?
Returns:
True if tracing enabled and at least one event is enabled.
"""
result = int(utils.read_one_line(self._TRACE_ON_PATH).strip())
if result == 1 and len(self._events) > 0:
return True
return False
def _event_onoff(self, event, val):
"""Enable/Disable tracing event.
TODO(tbroch) Consider allowing wild card enabling of trace events via
/sys/kernel/debug/tracing/set_event although it makes filling buffer
really easy
Arguments:
event: list of events.
See kernel(Documentation/trace/events.txt) for formatting.
val: integer, 1 for on, 0 for off
Returns:
True if success, false otherwise
"""
logging.debug("event_onoff: event:%s val:%d", event, val)
event_path = event.replace(':', '/')
fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
if not os.path.exists(fname):
logging.warning("Unable to locate tracing event %s", fname)
return False
utils.write_one_line(fname, val)
fname = os.path.join(self._TRACE_ROOT, "set_event")
found = False
with open(fname) as fd:
for ln in fd.readlines():
logging.debug("set_event ln:%s", ln)
if re.findall(event, ln):
found = True
break
if val == 1 and not found:
logging.warning("Event %s not enabled", event)
return False
if val == 0 and found:
logging.warning("Event %s not disabled", event)
return False
return True
def event_on(self, event):
return self._event_onoff(event, 1)
def event_off(self, event):
return self._event_onoff(event, 0)
def flush(self):
"""Flush trace buffer.
Raises:
error.TestFail: If unable to flush
"""
self.off()
fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
utils.write_one_line(fname, 1)
self._buffer_ptr = 0
fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
result = utils.read_one_line(fname).strip()
if result is '0':
return True
return False
def read(self, regexp=None):
fname = os.path.join(self._TRACE_ROOT, 'trace')
fd = open(fname)
fd.seek(self._buffer_ptr)
for ln in fd.readlines():
if regexp is None:
self._buffer.append(ln)
continue
results = re.findall(regexp, ln)
if results:
logging.debug(ln)
self._buffer.append(results[0])
self._buffer_ptr = fd.tell()
fd.close()
return self._buffer
@staticmethod
def uptime_secs():
results = utils.read_one_line("/proc/uptime")
return float(results.split()[0])