| # Copyright 2010 Google Inc. All Rights Reserved. |
| # |
| |
| import logging |
| import os |
| import re |
| import threading |
| |
| from automation.common import job |
| from automation.common import logger |
| from automation.server.job_executer import JobExecuter |
| |
| |
| class IdProducerPolicy(object): |
| """Produces series of unique integer IDs. |
| |
| Example: |
| id_producer = IdProducerPolicy() |
| id_a = id_producer.GetNextId() |
| id_b = id_producer.GetNextId() |
| assert id_a != id_b |
| """ |
| |
| def __init__(self): |
| self._counter = 1 |
| |
| def Initialize(self, home_prefix, home_pattern): |
| """Find first available ID based on a directory listing. |
| |
| Args: |
| home_prefix: A directory to be traversed. |
| home_pattern: A regexp describing all files/directories that will be |
| considered. The regexp must contain exactly one match group with name |
| "id", which must match an integer number. |
| |
| Example: |
| id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)') |
| """ |
| harvested_ids = [] |
| |
| if os.path.isdir(home_prefix): |
| for filename in os.listdir(home_prefix): |
| path = os.path.join(home_prefix, filename) |
| |
| if os.path.isdir(path): |
| match = re.match(home_pattern, filename) |
| |
| if match: |
| harvested_ids.append(int(match.group('id'))) |
| |
| self._counter = max(harvested_ids or [0]) + 1 |
| |
| def GetNextId(self): |
| """Calculates another ID considered to be unique.""" |
| new_id = self._counter |
| self._counter += 1 |
| return new_id |
| |
| |
| class JobManager(threading.Thread): |
| |
| def __init__(self, machine_manager): |
| threading.Thread.__init__(self, name=self.__class__.__name__) |
| self.all_jobs = [] |
| self.ready_jobs = [] |
| self.job_executer_mapping = {} |
| |
| self.machine_manager = machine_manager |
| |
| self._lock = threading.Lock() |
| self._jobs_available = threading.Condition(self._lock) |
| self._exit_request = False |
| |
| self.listeners = [] |
| self.listeners.append(self) |
| |
| self._id_producer = IdProducerPolicy() |
| self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)') |
| |
| self._logger = logging.getLogger(self.__class__.__name__) |
| |
| def StartJobManager(self): |
| self._logger.info('Starting...') |
| |
| with self._lock: |
| self.start() |
| self._jobs_available.notifyAll() |
| |
| def StopJobManager(self): |
| self._logger.info('Shutdown request received.') |
| |
| with self._lock: |
| for job_ in self.all_jobs: |
| self._KillJob(job_.id) |
| |
| # Signal to die |
| self._exit_request = True |
| self._jobs_available.notifyAll() |
| |
| # Wait for all job threads to finish |
| for executer in self.job_executer_mapping.values(): |
| executer.join() |
| |
| def KillJob(self, job_id): |
| """Kill a job by id. |
| |
| Does not block until the job is completed. |
| """ |
| with self._lock: |
| self._KillJob(job_id) |
| |
| def GetJob(self, job_id): |
| for job_ in self.all_jobs: |
| if job_.id == job_id: |
| return job_ |
| return None |
| |
| def _KillJob(self, job_id): |
| self._logger.info('Killing [Job: %d].', job_id) |
| |
| if job_id in self.job_executer_mapping: |
| self.job_executer_mapping[job_id].Kill() |
| for job_ in self.ready_jobs: |
| if job_.id == job_id: |
| self.ready_jobs.remove(job_) |
| break |
| |
| def AddJob(self, job_): |
| with self._lock: |
| job_.id = self._id_producer.GetNextId() |
| |
| self.all_jobs.append(job_) |
| # Only queue a job as ready if it has no dependencies |
| if job_.is_ready: |
| self.ready_jobs.append(job_) |
| |
| self._jobs_available.notifyAll() |
| |
| return job_.id |
| |
| def CleanUpJob(self, job_): |
| with self._lock: |
| if job_.id in self.job_executer_mapping: |
| self.job_executer_mapping[job_.id].CleanUpWorkDir() |
| del self.job_executer_mapping[job_.id] |
| # TODO(raymes): remove job from self.all_jobs |
| |
| def NotifyJobComplete(self, job_): |
| self.machine_manager.ReturnMachines(job_.machines) |
| |
| with self._lock: |
| self._logger.debug('Handling %r completion event.', job_) |
| |
| if job_.status == job.STATUS_SUCCEEDED: |
| for succ in job_.successors: |
| if succ.is_ready: |
| if succ not in self.ready_jobs: |
| self.ready_jobs.append(succ) |
| |
| self._jobs_available.notifyAll() |
| |
| def AddListener(self, listener): |
| self.listeners.append(listener) |
| |
| @logger.HandleUncaughtExceptions |
| def run(self): |
| self._logger.info('Started.') |
| |
| while not self._exit_request: |
| with self._lock: |
| # Get the next ready job, block if there are none |
| self._jobs_available.wait() |
| |
| while self.ready_jobs: |
| ready_job = self.ready_jobs.pop() |
| |
| required_machines = ready_job.machine_dependencies |
| for pred in ready_job.predecessors: |
| required_machines[0].AddPreferredMachine( |
| pred.primary_machine.hostname) |
| |
| machines = self.machine_manager.GetMachines(required_machines) |
| if not machines: |
| # If we can't get the necessary machines right now, simply wait |
| # for some jobs to complete |
| self.ready_jobs.insert(0, ready_job) |
| break |
| else: |
| # Mark as executing |
| executer = JobExecuter(ready_job, machines, self.listeners) |
| executer.start() |
| self.job_executer_mapping[ready_job.id] = executer |
| |
| self._logger.info('Stopped.') |