| # Copyright 2011 Google Inc. All Rights Reserved. |
| # |
| """Classes that help running commands in a subshell. |
| |
| Commands can be run locally, or remotly using SSH connection. You may log the |
| output of a command to a terminal or a file, or any other destination. |
| """ |
| |
| __author__ = 'kbaclawski@google.com (Krystian Baclawski)' |
| |
| import fcntl |
| import logging |
| import os |
| import select |
| import subprocess |
| import time |
| |
| from automation.common import logger |
| |
| |
| class CommandExecuter(object): |
| DRY_RUN = False |
| |
| def __init__(self, dry_run=False): |
| self._logger = logging.getLogger(self.__class__.__name__) |
| self._dry_run = dry_run or self.DRY_RUN |
| |
| @classmethod |
| def Configure(cls, dry_run): |
| cls.DRY_RUN = dry_run |
| |
| def RunCommand(self, |
| cmd, |
| machine=None, |
| username=None, |
| command_terminator=None, |
| command_timeout=None): |
| cmd = str(cmd) |
| |
| if self._dry_run: |
| return 0 |
| |
| if not command_terminator: |
| command_terminator = CommandTerminator() |
| |
| if command_terminator.IsTerminated(): |
| self._logger.warning('Command has been already terminated!') |
| return 1 |
| |
| # Rewrite command for remote execution. |
| if machine: |
| if username: |
| login = '%s@%s' % (username, machine) |
| else: |
| login = machine |
| |
| self._logger.debug("Executing '%s' on %s.", cmd, login) |
| |
| # FIXME(asharif): Remove this after crosbug.com/33007 is fixed. |
| cmd = "ssh -t -t %s -- '%s'" % (login, cmd) |
| else: |
| self._logger.debug("Executing: '%s'.", cmd) |
| |
| child = self._SpawnProcess(cmd, command_terminator, command_timeout) |
| |
| self._logger.debug('{PID: %d} Finished with %d code.', child.pid, |
| child.returncode) |
| |
| return child.returncode |
| |
| def _Terminate(self, child, command_timeout, wait_timeout=10): |
| """Gracefully shutdown the child by sending SIGTERM.""" |
| |
| if command_timeout: |
| self._logger.warning('{PID: %d} Timeout of %s seconds reached since ' |
| 'process started.', child.pid, command_timeout) |
| |
| self._logger.warning('{PID: %d} Terminating child.', child.pid) |
| |
| try: |
| child.terminate() |
| except OSError: |
| pass |
| |
| wait_started = time.time() |
| |
| while not child.poll(): |
| if time.time() - wait_started >= wait_timeout: |
| break |
| time.sleep(0.1) |
| |
| return child.poll() |
| |
| def _Kill(self, child): |
| """Kill the child with immediate result.""" |
| self._logger.warning('{PID: %d} Process still alive.', child.pid) |
| self._logger.warning('{PID: %d} Killing child.', child.pid) |
| child.kill() |
| child.wait() |
| |
| def _SpawnProcess(self, cmd, command_terminator, command_timeout): |
| # Create a child process executing provided command. |
| child = subprocess.Popen(cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| stdin=subprocess.PIPE, |
| shell=True) |
| |
| # Close stdin so the child won't be able to block on read. |
| child.stdin.close() |
| |
| started_time = time.time() |
| |
| # Watch for data on process stdout, stderr. |
| pipes = [child.stdout, child.stderr] |
| |
| # Put pipes into non-blocking mode. |
| for pipe in pipes: |
| fd = pipe.fileno() |
| fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL) |
| fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK) |
| |
| already_terminated = False |
| |
| while pipes: |
| # Maybe timeout reached? |
| if command_timeout and time.time() - started_time > command_timeout: |
| command_terminator.Terminate() |
| |
| # Check if terminate request was received. |
| if command_terminator.IsTerminated() and not already_terminated: |
| if not self._Terminate(child, command_timeout): |
| self._Kill(child) |
| # Don't exit the loop immediately. Firstly try to read everything that |
| # was left on stdout and stderr. |
| already_terminated = True |
| |
| # Wait for pipes to become ready. |
| ready_pipes, _, _ = select.select(pipes, [], [], 0.1) |
| |
| # Handle file descriptors ready to be read. |
| for pipe in ready_pipes: |
| fd = pipe.fileno() |
| |
| data = os.read(fd, 4096) |
| |
| # check for end-of-file |
| if not data: |
| pipes.remove(pipe) |
| continue |
| |
| # read all data that's available |
| while data: |
| if pipe == child.stdout: |
| self.DataReceivedOnOutput(data) |
| elif pipe == child.stderr: |
| self.DataReceivedOnError(data) |
| |
| try: |
| data = os.read(fd, 4096) |
| except OSError: |
| # terminate loop if EWOULDBLOCK (EAGAIN) is received |
| data = '' |
| |
| if not already_terminated: |
| self._logger.debug('Waiting for command to finish.') |
| child.wait() |
| |
| return child |
| |
| def DataReceivedOnOutput(self, data): |
| """Invoked when the child process wrote data to stdout.""" |
| sys.stdout.write(data) |
| |
| def DataReceivedOnError(self, data): |
| """Invoked when the child process wrote data to stderr.""" |
| sys.stderr.write(data) |
| |
| |
| class LoggingCommandExecuter(CommandExecuter): |
| |
| def __init__(self, *args, **kwargs): |
| super(LoggingCommandExecuter, self).__init__(*args, **kwargs) |
| |
| # Create a logger for command's stdout/stderr streams. |
| self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output')) |
| |
| def OpenLog(self, log_path): |
| """The messages are going to be saved to gzip compressed file.""" |
| formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s', |
| '%Y-%m-%d %H:%M:%S') |
| handler = logger.CompressedFileHandler(log_path, delay=True) |
| handler.setFormatter(formatter) |
| self._output.addHandler(handler) |
| |
| # Set a flag to prevent log records from being propagated up the logger |
| # hierarchy tree. We don't want for command output messages to appear in |
| # the main log. |
| self._output.propagate = 0 |
| |
| def CloseLog(self): |
| """Remove handlers and reattach the logger to its parent.""" |
| for handler in list(self._output.handlers): |
| self._output.removeHandler(handler) |
| handler.flush() |
| handler.close() |
| |
| self._output.propagate = 1 |
| |
| def DataReceivedOnOutput(self, data): |
| """Invoked when the child process wrote data to stdout.""" |
| for line in data.splitlines(): |
| self._output.info(line, extra={'prefix': 'STDOUT'}) |
| |
| def DataReceivedOnError(self, data): |
| """Invoked when the child process wrote data to stderr.""" |
| for line in data.splitlines(): |
| self._output.warning(line, extra={'prefix': 'STDERR'}) |
| |
| |
| class CommandTerminator(object): |
| |
| def __init__(self): |
| self.terminated = False |
| |
| def Terminate(self): |
| self.terminated = True |
| |
| def IsTerminated(self): |
| return self.terminated |