| # Shared utility functions across monitors scripts. |
| |
| import fcntl, os, re, select, signal, subprocess, sys, time |
| |
| TERM_MSG = 'Console connection unexpectedly lost. Terminating monitor.' |
| |
| |
| class Error(Exception): |
| pass |
| |
| |
| class InvalidTimestampFormat(Error): |
| pass |
| |
| |
| def prepend_timestamp(msg, format): |
| """Prepend timestamp to a message in a standard way. |
| |
| Args: |
| msg: str; Message to prepend timestamp to. |
| format: str or callable; Either format string that |
| can be passed to time.strftime or a callable |
| that will generate the timestamp string. |
| |
| Returns: str; 'timestamp\tmsg' |
| """ |
| if type(format) is str: |
| timestamp = time.strftime(format, time.localtime()) |
| elif callable(format): |
| timestamp = str(format()) |
| else: |
| raise InvalidTimestampFormat |
| |
| return '%s\t%s' % (timestamp, msg) |
| |
| |
| def write_logline(logfile, msg, timestamp_format=None): |
| """Write msg, possibly prepended with a timestamp, as a terminated line. |
| |
| Args: |
| logfile: file; File object to .write() msg to. |
| msg: str; Message to write. |
| timestamp_format: str or callable; If specified will |
| be passed into prepend_timestamp along with msg. |
| """ |
| msg = msg.rstrip('\n') |
| if timestamp_format: |
| msg = prepend_timestamp(msg, timestamp_format) |
| logfile.write(msg + '\n') |
| |
| |
| def make_alert(warnfile, msg_type, msg_template, timestamp_format=None): |
| """Create an alert generation function that writes to warnfile. |
| |
| Args: |
| warnfile: file; File object to write msg's to. |
| msg_type: str; String describing the message type |
| msg_template: str; String template that function params |
| are passed through. |
| timestamp_format: str or callable; If specified will |
| be passed into prepend_timestamp along with msg. |
| |
| Returns: function with a signature of (*params); |
| The format for a warning used here is: |
| %(timestamp)d\t%(msg_type)s\t%(status)s\n |
| """ |
| if timestamp_format is None: |
| timestamp_format = lambda: int(time.time()) |
| |
| def alert(*params): |
| formatted_msg = msg_type + "\t" + msg_template % params |
| timestamped_msg = prepend_timestamp(formatted_msg, timestamp_format) |
| print >> warnfile, timestamped_msg |
| return alert |
| |
| |
| def _assert_is_all_blank_lines(lines, source_file): |
| if sum(len(line.strip()) for line in lines) > 0: |
| raise ValueError('warning patterns are not separated by blank lines ' |
| 'in %s' % source_file) |
| |
| |
| def _read_overrides(overrides_file): |
| """ |
| Read pattern overrides from overrides_file, which may be None. Overrides |
| files are expected to have the format: |
| <old regex> <newline> <new regex> <newline> <newline> |
| old regex = a regex from the patterns file |
| new regex = the regex to replace it |
| Lines beginning with # are ignored. |
| |
| Returns a dict mapping old regexes to their replacements. |
| """ |
| if not overrides_file: |
| return {} |
| overrides_lines = [line for line in overrides_file.readlines() |
| if not line.startswith('#')] |
| overrides_pairs = zip(overrides_lines[0::3], overrides_lines[1::3]) |
| _assert_is_all_blank_lines(overrides_lines[2::3], overrides_file) |
| return dict(overrides_pairs) |
| |
| |
| def build_alert_hooks(patterns_file, warnfile, overrides_file=None): |
| """Parse data in patterns file and transform into alert_hook list. |
| |
| Args: |
| patterns_file: file; File to read alert pattern definitions from. |
| warnfile: file; File to configure alert function to write warning to. |
| |
| Returns: |
| list; Regex to alert function mapping. |
| [(regex, alert_function), ...] |
| """ |
| pattern_lines = patterns_file.readlines() |
| # expected pattern format: |
| # <msgtype> <newline> <regex> <newline> <alert> <newline> <newline> |
| # msgtype = a string categorizing the type of the message - used for |
| # enabling/disabling specific categories of warnings |
| # regex = a python regular expression |
| # alert = a string describing the alert message |
| # if the regex matches the line, this displayed warning will |
| # be the result of (alert % match.groups()) |
| patterns = zip(pattern_lines[0::4], pattern_lines[1::4], |
| pattern_lines[2::4]) |
| _assert_is_all_blank_lines(pattern_lines[3::4], patterns_file) |
| |
| overrides_map = _read_overrides(overrides_file) |
| |
| hooks = [] |
| for msgtype, regex, alert in patterns: |
| regex = overrides_map.get(regex, regex) |
| regex = re.compile(regex.rstrip('\n')) |
| alert_function = make_alert(warnfile, msgtype.rstrip('\n'), |
| alert.rstrip('\n')) |
| hooks.append((regex, alert_function)) |
| return hooks |
| |
| |
| def build_alert_hooks_from_path(patterns_path, warnfile): |
| """ |
| Same as build_alert_hooks, but accepts a path to a patterns file and |
| automatically finds the corresponding site overrides file if one exists. |
| """ |
| dirname, basename = os.path.split(patterns_path) |
| site_overrides_basename = 'site_' + basename + '_overrides' |
| site_overrides_path = os.path.join(dirname, site_overrides_basename) |
| site_overrides_file = None |
| patterns_file = open(patterns_path) |
| try: |
| if os.path.exists(site_overrides_path): |
| site_overrides_file = open(site_overrides_path) |
| try: |
| return build_alert_hooks(patterns_file, warnfile, |
| overrides_file=site_overrides_file) |
| finally: |
| if site_overrides_file: |
| site_overrides_file.close() |
| finally: |
| patterns_file.close() |
| |
| |
| def process_input( |
| input, logfile, log_timestamp_format=None, alert_hooks=()): |
| """Continuously read lines from input stream and: |
| |
| - Write them to log, possibly prefixed by timestamp. |
| - Watch for alert patterns. |
| |
| Args: |
| input: file; Stream to read from. |
| logfile: file; Log file to write to |
| log_timestamp_format: str; Format to use for timestamping entries. |
| No timestamp is added if None. |
| alert_hooks: list; Generated from build_alert_hooks. |
| [(regex, alert_function), ...] |
| """ |
| while True: |
| line = input.readline() |
| if len(line) == 0: |
| # this should only happen if the remote console unexpectedly |
| # goes away. terminate this process so that we don't spin |
| # forever doing 0-length reads off of input |
| write_logline(logfile, TERM_MSG, log_timestamp_format) |
| break |
| |
| if line == '\n': |
| # If it's just an empty line we discard and continue. |
| continue |
| |
| write_logline(logfile, line, log_timestamp_format) |
| |
| for regex, callback in alert_hooks: |
| match = re.match(regex, line.strip()) |
| if match: |
| callback(*match.groups()) |
| |
| |
| def lookup_lastlines(lastlines_dirpath, path): |
| """Retrieve last lines seen for path. |
| |
| Open corresponding lastline file for path |
| If there isn't one or isn't a match return None |
| |
| Args: |
| lastlines_dirpath: str; Dirpath to store lastlines files to. |
| path: str; Filepath to source file that lastlines came from. |
| |
| Returns: |
| str; Last lines seen if they exist |
| - Or - |
| None; Otherwise |
| """ |
| underscored = path.replace('/', '_') |
| try: |
| lastlines_file = open(os.path.join(lastlines_dirpath, underscored)) |
| except (OSError, IOError): |
| return |
| |
| lastlines = lastlines_file.read() |
| lastlines_file.close() |
| os.remove(lastlines_file.name) |
| if not lastlines: |
| return |
| |
| try: |
| target_file = open(path) |
| except (OSError, IOError): |
| return |
| |
| # Load it all in for now |
| target_data = target_file.read() |
| target_file.close() |
| # Get start loc in the target_data string, scanning from right |
| loc = target_data.rfind(lastlines) |
| if loc == -1: |
| return |
| |
| # Then translate this into a reverse line number |
| # (count newlines that occur afterward) |
| reverse_lineno = target_data.count('\n', loc + len(lastlines)) |
| return reverse_lineno |
| |
| |
| def write_lastlines_file(lastlines_dirpath, path, data): |
| """Write data to lastlines file for path. |
| |
| Args: |
| lastlines_dirpath: str; Dirpath to store lastlines files to. |
| path: str; Filepath to source file that data comes from. |
| data: str; |
| |
| Returns: |
| str; Filepath that lastline data was written to. |
| """ |
| underscored = path.replace('/', '_') |
| dest_path = os.path.join(lastlines_dirpath, underscored) |
| open(dest_path, 'w').write(data) |
| return dest_path |
| |
| |
| def nonblocking(pipe): |
| """Set python file object to nonblocking mode. |
| |
| This allows us to take advantage of pipe.read() |
| where we don't have to specify a buflen. |
| Cuts down on a few lines we'd have to maintain. |
| |
| Args: |
| pipe: file; File object to modify |
| |
| Returns: pipe |
| """ |
| flags = fcntl.fcntl(pipe, fcntl.F_GETFL) |
| fcntl.fcntl(pipe, fcntl.F_SETFL, flags| os.O_NONBLOCK) |
| return pipe |
| |
| |
| def launch_tails(follow_paths, lastlines_dirpath=None): |
| """Launch a tail process for each follow_path. |
| |
| Args: |
| follow_paths: list; |
| lastlines_dirpath: str; |
| |
| Returns: |
| tuple; (procs, pipes) or |
| ({path: subprocess.Popen, ...}, {file: path, ...}) |
| """ |
| if lastlines_dirpath and not os.path.exists(lastlines_dirpath): |
| os.makedirs(lastlines_dirpath) |
| |
| tail_cmd = ('/usr/bin/tail', '--retry', '--follow=name') |
| procs = {} # path -> tail_proc |
| pipes = {} # tail_proc.stdout -> path |
| for path in follow_paths: |
| cmd = list(tail_cmd) |
| if lastlines_dirpath: |
| reverse_lineno = lookup_lastlines(lastlines_dirpath, path) |
| if reverse_lineno is None: |
| reverse_lineno = 1 |
| cmd.append('--lines=%d' % reverse_lineno) |
| |
| cmd.append(path) |
| tail_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
| procs[path] = tail_proc |
| pipes[nonblocking(tail_proc.stdout)] = path |
| |
| return procs, pipes |
| |
| |
| def poll_tail_pipes(pipes, lastlines_dirpath=None, waitsecs=5): |
| """Wait on tail pipes for new data for waitsecs, return any new lines. |
| |
| Args: |
| pipes: dict; {subprocess.Popen: follow_path, ...} |
| lastlines_dirpath: str; Path to write lastlines to. |
| waitsecs: int; Timeout to pass to select |
| |
| Returns: |
| tuple; (lines, bad_pipes) or ([line, ...], [subprocess.Popen, ...]) |
| """ |
| lines = [] |
| bad_pipes = [] |
| # Block until at least one is ready to read or waitsecs elapses |
| ready, _, _ = select.select(pipes.keys(), (), (), waitsecs) |
| for fi in ready: |
| path = pipes[fi] |
| data = fi.read() |
| if len(data) == 0: |
| # If no data, process is probably dead, add to bad_pipes |
| bad_pipes.append(fi) |
| continue |
| |
| if lastlines_dirpath: |
| # Overwrite the lastlines file for this source path |
| # Probably just want to write the last 1-3 lines. |
| write_lastlines_file(lastlines_dirpath, path, data) |
| |
| for line in data.splitlines(): |
| lines.append('[%s]\t%s\n' % (path, line)) |
| |
| return lines, bad_pipes |
| |
| |
| def snuff(subprocs): |
| """Helper for killing off remaining live subprocesses. |
| |
| Args: |
| subprocs: list; [subprocess.Popen, ...] |
| """ |
| for proc in subprocs: |
| if proc.poll() is None: |
| os.kill(proc.pid, signal.SIGKILL) |
| proc.wait() |
| |
| |
| def follow_files(follow_paths, outstream, lastlines_dirpath=None, waitsecs=5): |
| """Launch tail on a set of files and merge their output into outstream. |
| |
| Args: |
| follow_paths: list; Local paths to launch tail on. |
| outstream: file; Output stream to write aggregated lines to. |
| lastlines_dirpath: Local dirpath to record last lines seen in. |
| waitsecs: int; Timeout for poll_tail_pipes. |
| """ |
| procs, pipes = launch_tails(follow_paths, lastlines_dirpath) |
| while pipes: |
| lines, bad_pipes = poll_tail_pipes(pipes, lastlines_dirpath, waitsecs) |
| for bad in bad_pipes: |
| pipes.pop(bad) |
| |
| try: |
| outstream.writelines(['\n'] + lines) |
| outstream.flush() |
| except (IOError, OSError), e: |
| # Something is wrong. Stop looping. |
| break |
| |
| snuff(procs.values()) |