blob: f44b1329d7b43644a7e063ebdf3f37ff56ea8269 [file] [log] [blame] [edit]
#!/usr/bin/python
"""
Usage: ./cron_scripts/log_distiller.py job_id path_to_logfile
If the job_id is a suite it will find all subjobs.
You need to change the location of the log it will parse.
The job_id needs to be in the afe database.
"""
import abc
import datetime
import os
import re
import pprint
import subprocess
import sys
import time
import common
from autotest_lib.server import frontend
LOGFIE = './logs/scheduler.log.2014-04-17-16.51.47'
# logfile name format: scheduler.log.2014-02-14-18.10.56
time_format = '%Y-%m-%d-%H.%M.%S'
logfile_regex = r'scheduler.log.([0-9,.,-]+)'
logdir = os.path.join('/usr/local/autotest', 'logs')
class StateMachineViolation(Exception):
pass
class LogLineException(Exception):
pass
def should_process_log(time_str, time_format, cutoff_days=7):
"""Returns true if the logs was created after cutoff days.
@param time_str: A string representing the time.
eg: 2014-02-14-18.10.56
@param time_format: A string representing the format of the time string.
ref: http://docs.python.org/2/library/datetime.html#strftime-strptime-behavior
@param cutoff_days: Int representind the cutoff in days.
@return: Returns True if time_str has aged more than cutoff_days.
"""
log_time = datetime.datetime.strptime(time_str, time_format)
now = datetime.datetime.strptime(time.strftime(time_format), time_format)
cutoff = now - datetime.timedelta(days=cutoff_days)
return log_time < cutoff
def apply_regex(regex, line):
"""Simple regex applicator.
@param regex: Regex to apply.
@param line: The line to apply regex on.
@return: A tuple with the matching groups, if there was a match.
"""
log_match = re.match(regex, line)
if log_match:
return log_match.groups()
class StateMachineParser(object):
"""Abstract class that enforces state transition ordering.
Classes inheriting from StateMachineParser need to define an
expected_transitions dictionary. The SMP will pop 'to' states
from the dictionary as they occur, so you cannot same state transitions
unless you specify 2 of them.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self):
self.visited_states = []
self.expected_transitions = {}
def advance_state(self, from_state, to_state):
"""Checks that a transition is valid.
@param from_state: A string representind the state the host is leaving.
@param to_state: The state The host is going to, represented as a string.
@raises LogLineException: If an invalid state transition was
detected.
"""
# TODO: Updating to the same state is a waste of bw.
if from_state and from_state == to_state:
return ('Updating to the same state is a waste of BW: %s->%s' %
(from_state, to_state))
return
if (from_state in self.expected_transitions and
to_state in self.expected_transitions[from_state]):
self.expected_transitions[from_state].remove(to_state)
self.visited_states.append(to_state)
else:
return (from_state, to_state)
class SingleJobHostSMP(StateMachineParser):
def __init__(self):
self.visited_states = []
self.expected_transitions = {
'Ready': ['Resetting', 'Verifying', 'Pending', 'Provisioning'],
'Resetting': ['Ready', 'Provisioning'],
'Pending': ['Running'],
'Provisioning': ['Repairing'],
'Running': ['Ready']
}
def check_transitions(self, hostline):
if hostline.line_info['field'] == 'status':
self.advance_state(hostline.line_info['state'],
hostline.line_info['value'])
class SingleJobHqeSMP(StateMachineParser):
def __init__(self):
self.visited_states = []
self.expected_transitions = {
'Queued': ['Starting', 'Resetting', 'Aborted'],
'Resetting': ['Pending', 'Provisioning'],
'Provisioning': ['Pending', 'Queued', 'Repairing'],
'Pending': ['Starting'],
'Starting': ['Running'],
'Running': ['Gathering', 'Parsing'],
'Gathering': ['Parsing'],
'Parsing': ['Completed', 'Aborted']
}
def check_transitions(self, hqeline):
invalid_states = self.advance_state(
hqeline.line_info['from_state'], hqeline.line_info['to_state'])
if not invalid_states:
return
# Deal with repair.
if (invalid_states[0] == 'Queued' and
'Running' in self.visited_states):
raise StateMachineViolation('Unrecognized state transition '
'%s->%s, expected transitions are %s' %
(invalid_states[0], invalid_states[1],
self.expected_transitions))
class LogLine(object):
"""Line objects.
All classes inheriting from LogLine represent a line of some sort.
A line is responsible for parsing itself, and invoking an SMP to
validate state transitions. A line can be part of several state machines.
"""
line_format = '%s'
def __init__(self, state_machine_parsers):
"""
@param state_machine_parsers: A list of smp objects to use to validate
state changes on these types of lines..
"""
self.smps = state_machine_parsers
# Because, this is easier to flush.
self.line_info = {}
def parse_line(self, line):
"""Apply a line regex and save any information the parsed line contains.
@param line: A string representing a line.
"""
# Regex for all the things.
line_rgx = '(.*)'
parsed_line = apply_regex(line_rgx, line)
if parsed_line:
self.line_info['line'] = parsed_line[0]
def flush(self):
"""Call any state machine parsers, persist line info if needed.
"""
for smp in self.smps:
smp.check_transitions(self)
# TODO: persist this?
self.line_info={}
def format_line(self):
try:
return self.line_format % self.line_info
except KeyError:
return self.line_info['line']
class TimeLine(LogLine):
"""Filters timestamps for scheduler logs.
"""
def parse_line(self, line):
super(TimeLine, self).parse_line(line)
# Regex for isolating the date and time from scheduler logs, eg:
# 02/16 16:04:36.573 INFO |scheduler_:0574|...
line_rgx = '([0-9,/,:,., ]+)(.*)'
parsed_line = apply_regex(line_rgx, self.line_info['line'])
if parsed_line:
self.line_info['time'] = parsed_line[0]
self.line_info['line'] = parsed_line[1]
class HostLine(TimeLine):
"""Manages hosts line parsing.
"""
line_format = (' \t\t %(time)s %(host)s, currently in %(state)s, '
'updated %(field)s->%(value)s')
def record_state_transition(self, line):
"""Apply the state_transition_rgx to a line and record state changes.
@param line: The line we're expecting to contain a state transition.
"""
state_transition_rgx = ".* ([a-zA-Z]+) updating {'([a-zA-Z]+)': ('[a-zA-Z]+'|[0-9])}.*"
match = apply_regex(state_transition_rgx, line)
if match:
self.line_info['state'] = match[0]
self.line_info['field'] = match[1]
self.line_info['value'] = match[2].replace("'", "")
def parse_line(self, line):
super(HostLine, self).parse_line(line)
# Regex for getting host status. Eg:
# 172.22.4 in Running updating {'status': 'Running'}
line_rgx = '.*Host (([0-9,.,a-z,-]+).*)'
parsed_line = apply_regex(line_rgx, self.line_info['line'])
if parsed_line:
self.line_info['line'] = parsed_line[0]
self.line_info['host'] = parsed_line[1]
self.record_state_transition(self.line_info['line'])
return self.format_line()
class HQELine(TimeLine):
"""Manages HQE line parsing.
"""
line_format = ('%(time)s %(hqe)s, currently in %(from_state)s, '
'updated to %(to_state)s. Flags: %(flags)s')
def record_state_transition(self, line):
"""Apply the state_transition_rgx to a line and record state changes.
@param line: The line we're expecting to contain a state transition.
"""
# Regex for getting hqe status. Eg:
# status:Running [active] -> Gathering
state_transition_rgx = ".*status:([a-zA-Z]+)( \[[a-z\,]+\])? -> ([a-zA-Z]+)"
match = apply_regex(state_transition_rgx, line)
if match:
self.line_info['from_state'] = match[0]
self.line_info['flags'] = match[1]
self.line_info['to_state'] = match[2]
def parse_line(self, line):
super(HQELine, self).parse_line(line)
line_rgx = r'.*\| HQE: (([0-9]+).*)'
parsed_line = apply_regex(line_rgx, self.line_info['line'])
if parsed_line:
self.line_info['line'] = parsed_line[0]
self.line_info['hqe'] = parsed_line[1]
self.record_state_transition(self.line_info['line'])
return self.format_line()
class LogCrawler(object):
"""Crawl logs.
Log crawlers are meant to apply some basic preprocessing to a log, and crawl
the output validating state changes. They manage line and state machine
creation. The initial filtering applied to the log needs to be grab all lines
that match an action, such as the running of a job.
"""
def __init__(self, log_name):
self.log = log_name
self.filter_command = 'cat %s' % log_name
def preprocess_log(self):
"""Apply some basic filtering to the log.
"""
proc = subprocess.Popen(self.filter_command,
shell=True, stdout=subprocess.PIPE)
out, err = proc.communicate()
return out
class SchedulerLogCrawler(LogCrawler):
"""A log crawler for the scheduler logs.
This crawler is only capable of processing information about a single job.
"""
def __init__(self, log_name, **kwargs):
super(SchedulerLogCrawler, self).__init__(log_name)
self.job_id = kwargs['job_id']
self.line_processors = [HostLine([SingleJobHostSMP()]),
HQELine([SingleJobHqeSMP()])]
self.filter_command = ('%s | grep "for job: %s"' %
(self.filter_command, self.job_id))
def parse_log(self):
"""Parse each line of the preprocessed log output.
Pass each line through each possible line_processor. The one that matches
will populate itself, call flush, this will walk the state machine of that
line to the next step.
"""
out = self.preprocess_log()
response = []
for job_line in out.split('\n'):
parsed_line = None
for processor in self.line_processors:
line = processor.parse_line(job_line)
if line and parsed_line:
raise LogLineException('Multiple Parsers claiming the line %s: '
'previous parsing: %s, current parsing: %s ' %
(job_line, parsed_line, line))
elif line:
parsed_line = line
try:
processor.flush()
except StateMachineViolation as e:
response.append(str(e))
raise StateMachineViolation(response)
response.append(parsed_line if parsed_line else job_line)
return response
def process_logs():
if len(sys.argv) < 2:
print ('Usage: ./cron_scripts/log_distiller.py 0 8415620 '
'You need to change the location of the log it will parse.'
'The job_id needs to be in the afe database.')
sys.exit(1)
job_id = int(sys.argv[1])
rpc = frontend.AFE()
suite_jobs = rpc.run('get_jobs', id=job_id)
if not suite_jobs[0]['parent_job']:
suite_jobs = rpc.run('get_jobs', parent_job=job_id)
try:
logfile = sys.argv[2]
except Exception:
logfile = LOGFILE
for job in suite_jobs:
log_crawler = SchedulerLogCrawler(logfile, job_id=job['id'])
for line in log_crawler.parse_log():
print line
return
if __name__ == '__main__':
process_logs()