blob: de5bc47c5795badc1f23c9487b3bf63e55641090 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright 2010 Google Inc. All Rights Reserved.
#
import logging
import os
import re
import threading
from automation.common import job
from automation.common import logger
from automation.server.job_executer import JobExecuter
class IdProducerPolicy(object):
"""Produces series of unique integer IDs.
Example:
id_producer = IdProducerPolicy()
id_a = id_producer.GetNextId()
id_b = id_producer.GetNextId()
assert id_a != id_b
"""
def __init__(self):
self._counter = 1
def Initialize(self, home_prefix, home_pattern):
"""Find first available ID based on a directory listing.
Args:
home_prefix: A directory to be traversed.
home_pattern: A regexp describing all files/directories that will be
considered. The regexp must contain exactly one match group with name
"id", which must match an integer number.
Example:
id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)')
"""
harvested_ids = []
if os.path.isdir(home_prefix):
for filename in os.listdir(home_prefix):
path = os.path.join(home_prefix, filename)
if os.path.isdir(path):
match = re.match(home_pattern, filename)
if match:
harvested_ids.append(int(match.group('id')))
self._counter = max(harvested_ids or [0]) + 1
def GetNextId(self):
"""Calculates another ID considered to be unique."""
new_id = self._counter
self._counter += 1
return new_id
class JobManager(threading.Thread):
def __init__(self, machine_manager):
threading.Thread.__init__(self, name=self.__class__.__name__)
self.all_jobs = []
self.ready_jobs = []
self.job_executer_mapping = {}
self.machine_manager = machine_manager
self._lock = threading.Lock()
self._jobs_available = threading.Condition(self._lock)
self._exit_request = False
self.listeners = []
self.listeners.append(self)
self._id_producer = IdProducerPolicy()
self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)')
self._logger = logging.getLogger(self.__class__.__name__)
def StartJobManager(self):
self._logger.info("Starting...")
with self._lock:
self.start()
self._jobs_available.notifyAll()
def StopJobManager(self):
self._logger.info("Shutdown request received.")
with self._lock:
for job_ in self.all_jobs:
self._KillJob(job_.id)
# Signal to die
self._exit_request = True
self._jobs_available.notifyAll()
# Wait for all job threads to finish
for executer in self.job_executer_mapping.values():
executer.join()
def KillJob(self, job_id):
"""Kill a job by id.
Does not block until the job is completed.
"""
with self._lock:
self._KillJob(job_id)
def GetJob(self, job_id):
for job_ in self.all_jobs:
if job_.id == job_id:
return job_
return None
def _KillJob(self, job_id):
self._logger.info("Killing [Job: %d].", job_id)
if job_id in self.job_executer_mapping:
self.job_executer_mapping[job_id].Kill()
for job_ in self.ready_jobs:
if job_.id == job_id:
self.ready_jobs.remove(job_)
break
def AddJob(self, job_):
with self._lock:
job_.id = self._id_producer.GetNextId()
self.all_jobs.append(job_)
# Only queue a job as ready if it has no dependencies
if job_.is_ready:
self.ready_jobs.append(job_)
self._jobs_available.notifyAll()
return job_.id
def CleanUpJob(self, job_):
with self._lock:
if job_.id in self.job_executer_mapping:
self.job_executer_mapping[job_.id].CleanUpWorkDir()
del self.job_executer_mapping[job_.id]
# TODO(raymes): remove job from self.all_jobs
def NotifyJobComplete(self, job_):
self.machine_manager.ReturnMachines(job_.machines)
with self._lock:
self._logger.debug('Handling %r completion event.', job_)
if job_.status == job.STATUS_SUCCEEDED:
for succ in job_.successors:
if succ.is_ready:
if succ not in self.ready_jobs:
self.ready_jobs.append(succ)
self._jobs_available.notifyAll()
def AddListener(self, listener):
self.listeners.append(listener)
@logger.HandleUncaughtExceptions
def run(self):
self._logger.info("Started.")
while not self._exit_request:
with self._lock:
# Get the next ready job, block if there are none
self._jobs_available.wait()
while self.ready_jobs:
ready_job = self.ready_jobs.pop()
required_machines = ready_job.machine_dependencies
for pred in ready_job.predecessors:
required_machines[0].AddPreferredMachine(
pred.primary_machine.hostname)
machines = self.machine_manager.GetMachines(required_machines)
if not machines:
# If we can't get the necessary machines right now, simply wait
# for some jobs to complete
self.ready_jobs.insert(0, ready_job)
break
else:
# Mark as executing
executer = JobExecuter(ready_job, machines, self.listeners)
executer.start()
self.job_executer_mapping[ready_job.id] = executer
self._logger.info("Stopped.")