blob: 3ec57d42c4e7a689df1d060dffb0c79e45e8bdb5 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A module to abstract the shell execution environment on DUT."""
import subprocess
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
class UnsupportedSuccessToken(Exception):
"""Unsupported character found."""
pass
class LocalShell(object):
"""An object to wrap the local shell environment."""
def __init__(self, os_if):
"""Initialize the LocalShell object."""
self._os_if = os_if
def _run_command(self, cmd, block=True):
"""Helper function of run_command() methods.
Return the subprocess.Popen() instance to provide access to console
output in case command succeeded. If block=False, will not wait for
process to return before returning.
"""
stdout = None
stderr = None
if cmd and cmd.rstrip()[-1] == '&' and block:
errormsg = ('Remove & from command \'%s\', '
'use block=True instead, '
'refer to b/172325331 for more details' % cmd)
raise UnsupportedSuccessToken(errormsg)
self._os_if.log('Executing: %s' % cmd)
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if block:
stdout, stderr = process.communicate()
return process, stdout, stderr
def run_command(self, cmd, block=True):
"""Run a shell command.
In case of the command returning an error print its stdout and stderr
outputs on the console and dump them into the log. Otherwise suppress
all output.
@param block: if True (default), wait for command to finish
@raise error.CmdError: if block is True and command fails (rc!=0)
"""
start_time = time.time()
process, stdout, stderr = self._run_command(cmd, block)
if block and process.returncode:
# Grab output only if an error occurred
returncode = process.returncode
duration = time.time() - start_time
result = utils.CmdResult(cmd, stdout, stderr, returncode, duration)
self._os_if.log('Command failed.\n%s' % result)
raise error.CmdError(cmd, result)
def run_command_get_result(self, cmd, ignore_status=False):
"""Run a shell command, and get the result (output and returncode).
@param ignore_status: if True, do not raise CmdError, even if rc != 0.
@raise error.CmdError: if command fails (rc!=0) and not ignore_result
@return the result of the command
@rtype: utils.CmdResult
"""
start_time = time.time()
process, stdout, stderr = self._run_command(cmd, block=True)
returncode = process.returncode
duration = time.time() - start_time
result = utils.CmdResult(cmd, stdout, stderr, returncode, duration)
if returncode and not ignore_status:
self._os_if.log('Command failed:\n%s' % result)
raise error.CmdError(cmd, result)
self._os_if.log('Command result:\n%s' % result)
return result
def run_command_check_output(self, cmd, success_token):
"""Run a command and check whether standard output contains some string.
The sucess token is assumed to not contain newlines.
@param cmd: A string of the command to make a blocking call with.
@param success_token: A string to search the standard output of the
command for.
@returns a Boolean indicating whthere the success_token was in the
stdout of the cmd.
@raises UnsupportedSuccessToken if a newline is found in the
success_token.
"""
# The run_command_get_outuput method strips newlines from stdout.
if '\n' in success_token:
raise UnsupportedSuccessToken()
cmd_stdout = ''.join(self.run_command_get_output(cmd))
self._os_if.log('Checking for %s in %s' % (success_token, cmd_stdout))
return success_token in cmd_stdout
def run_command_get_status(self, cmd):
"""Run a shell command and return its return code.
The return code of the command is returned, in case of any error.
"""
process, stdout, stderr = self._run_command(cmd)
return process.returncode
def run_command_get_output(self, cmd, include_stderr=False):
"""Run shell command and return stdout (and possibly stderr) to the caller.
The output is returned as a list of strings stripped of the newline
characters.
"""
process, stdout, stderr = self._run_command(cmd)
text = [x.rstrip() for x in stdout.splitlines()]
if include_stderr:
text.extend([x.rstrip() for x in stderr.splitlines()])
return text
def read_file(self, path):
"""Read the content of the file."""
with open(path) as f:
return f.read()
def write_file(self, path, data):
"""Write the data to the file."""
with open(path, 'w') as f:
f.write(data)
def append_file(self, path, data):
"""Append the data to the file."""
with open(path, 'a') as f:
f.write(data)