blob: 0fdea3fb8cf17522f88a1d886215f6826b1d8f5d [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tools for capturing program output at a low level.
Mostly useful for capturing stdout/stderr as directly assigning to those
variables won't work everywhere.
"""
from __future__ import print_function
import os
import re
import sys
import tempfile
class _FdCapturer(object):
"""Helper class to capture output at the file descriptor level.
This is meant to be used with sys.stdout or sys.stderr. By capturing
file descriptors, this will also intercept subprocess output, which
reassigning sys.stdout or sys.stderr will not do.
Output will only be captured, it will no longer be printed while
the capturer is active.
"""
def __init__(self, source, output=None):
"""Construct the _FdCapturer object.
Does not start capturing until Start() is called.
Args:
source: A file object to capture. Typically sys.stdout or
sys.stderr, but will work with anything that implements flush()
and fileno().
output: A file name where the captured output is to be stored. If None,
then the output will be stored to a temporary file.
"""
self._source = source
self._captured = ''
self._saved_fd = None
self._tempfile = None
self._capturefile = None
self._capturefile_reader = None
self._capturefile_name = output
def _SafeCreateTempfile(self, tempfile_obj):
"""Ensure that the tempfile is created safely.
(1) Stash away a reference to the tempfile.
(2) Unlink the file from the filesystem.
(2) ensures that if we crash, the file gets deleted. (1) ensures that while
we are running, we hold a reference to the file so the system does not close
the file.
Args:
tempfile_obj: A tempfile object.
"""
self._tempfile = tempfile_obj
os.unlink(tempfile_obj.name)
def Start(self):
"""Begin capturing output."""
if self._capturefile_name is None:
tempfile_obj = tempfile.NamedTemporaryFile(delete=False)
self._capturefile = tempfile_obj.file
self._capturefile_name = tempfile_obj.name
self._capturefile_reader = open(self._capturefile_name)
self._SafeCreateTempfile(tempfile_obj)
else:
# Open file passed in for writing. Set buffering=1 for line level
# buffering.
self._capturefile = open(self._capturefile_name, 'w', buffering=1)
self._capturefile_reader = open(self._capturefile_name)
# Save the original fd so we can revert in Stop().
self._saved_fd = os.dup(self._source.fileno())
os.dup2(self._capturefile.fileno(), self._source.fileno())
def Stop(self):
"""Stop capturing output."""
self.GetCaptured()
if self._saved_fd is not None:
os.dup2(self._saved_fd, self._source.fileno())
os.close(self._saved_fd)
self._saved_fd = None
# If capturefile and capturefile_reader exist, close them as they were
# opened in self.Start().
if self._capturefile_reader is not None:
self._capturefile_reader.close()
self._capturefile_reader = None
if self._capturefile is not None:
self._capturefile.close()
self._capturefile = None
def GetCaptured(self):
"""Return all output captured up to this point.
Can be used while capturing or after Stop() has been called.
"""
self._source.flush()
if self._capturefile_reader is not None:
self._captured += self._capturefile_reader.read()
return self._captured
def ClearCaptured(self):
"""Erase all captured output."""
self.GetCaptured()
self._captured = ''
class OutputCapturer(object):
"""Class for capturing stdout/stderr output.
Class is designed as a 'ContextManager'.
Examples:
with cros_build_lib.OutputCapturer() as output:
# Capturing of stdout/stderr automatically starts now.
# Do stuff that sends output to stdout/stderr.
# Capturing automatically stops at end of 'with' block.
# stdout/stderr can be retrieved from the OutputCapturer object:
stdout = output.GetStdoutLines() # Or other access methods
# Some Assert methods are only valid if capturing was used in test.
self.AssertOutputContainsError() # Or other related methods
# OutputCapturer can also be used to capture output to specified files.
with self.OutputCapturer(stdout_path='/tmp/stdout.txt') as output:
# Do stuff.
# stdout will be captured to /tmp/stdout.txt.
"""
OPER_MSG_SPLIT_RE = re.compile(r'^\033\[1;.*?\033\[0m$|^[^\n]*$',
re.DOTALL | re.MULTILINE)
__slots__ = ['_stdout_capturer', '_stderr_capturer', '_quiet_fail']
def __init__(self, stdout_path=None, stderr_path=None, quiet_fail=False):
"""Initalize OutputCapturer with capture files.
If OutputCapturer is initialized with filenames to capture stdout and stderr
to, then those files are used. Otherwise, temporary files are created.
Args:
stdout_path: File to capture stdout to. If None, a temporary file is used.
stderr_path: File to capture stderr to. If None, a temporary file is used.
quiet_fail: If True fail quietly without printing the captured stdout and
stderr.
"""
self._stdout_capturer = _FdCapturer(sys.stdout, output=stdout_path)
self._stderr_capturer = _FdCapturer(sys.stderr, output=stderr_path)
self._quiet_fail = quiet_fail
def __enter__(self):
# This method is called with entering 'with' block.
self.StartCapturing()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# This method is called when exiting 'with' block.
self.StopCapturing()
if exc_type and not self._quiet_fail:
print('Exception during output capturing: %r' % (exc_val,))
stdout = self.GetStdout()
if stdout:
print('Captured stdout was:\n%s' % stdout)
else:
print('No captured stdout')
stderr = self.GetStderr()
if stderr:
print('Captured stderr was:\n%s' % stderr)
else:
print('No captured stderr')
def StartCapturing(self):
"""Begin capturing stdout and stderr."""
self._stdout_capturer.Start()
self._stderr_capturer.Start()
def StopCapturing(self):
"""Stop capturing stdout and stderr."""
self._stdout_capturer.Stop()
self._stderr_capturer.Stop()
def ClearCaptured(self):
"""Clear any captured stdout/stderr content."""
self._stdout_capturer.ClearCaptured()
self._stderr_capturer.ClearCaptured()
def GetStdout(self):
"""Return captured stdout so far."""
return self._stdout_capturer.GetCaptured()
def GetStderr(self):
"""Return captured stderr so far."""
return self._stderr_capturer.GetCaptured()
def _GetOutputLines(self, output, include_empties):
"""Split |output| into lines, optionally |include_empties|.
Return array of lines.
"""
lines = self.OPER_MSG_SPLIT_RE.findall(output)
if not include_empties:
lines = [ln for ln in lines if ln]
return lines
def GetStdoutLines(self, include_empties=True):
"""Return captured stdout so far as array of lines.
If |include_empties| is false filter out all empty lines.
"""
return self._GetOutputLines(self.GetStdout(), include_empties)
def GetStderrLines(self, include_empties=True):
"""Return captured stderr so far as array of lines.
If |include_empties| is false filter out all empty lines.
"""
return self._GetOutputLines(self.GetStderr(), include_empties)