| # Copyright 2010 Google Inc. All Rights Reserved. |
| # |
| |
| import copy |
| import logging |
| import threading |
| |
| from automation.common import command as cmd |
| from automation.common import logger |
| from automation.common.command_executer import CommandExecuter |
| from automation.common import job |
| from automation.common import job_group |
| from automation.server.job_manager import IdProducerPolicy |
| |
| |
| class JobGroupManager(object): |
| |
| def __init__(self, job_manager): |
| self.all_job_groups = [] |
| |
| self.job_manager = job_manager |
| self.job_manager.AddListener(self) |
| |
| self._lock = threading.Lock() |
| self._job_group_finished = threading.Condition(self._lock) |
| |
| self._id_producer = IdProducerPolicy() |
| self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX, |
| 'job-group-(?P<id>\d+)') |
| |
| self._logger = logging.getLogger(self.__class__.__name__) |
| |
| def GetJobGroup(self, group_id): |
| with self._lock: |
| for group in self.all_job_groups: |
| if group.id == group_id: |
| return group |
| |
| return None |
| |
| def GetAllJobGroups(self): |
| with self._lock: |
| return copy.deepcopy(self.all_job_groups) |
| |
| def AddJobGroup(self, group): |
| with self._lock: |
| group.id = self._id_producer.GetNextId() |
| |
| self._logger.debug('Creating runtime environment for %r.', group) |
| |
| CommandExecuter().RunCommand(cmd.Chain( |
| cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir))) |
| |
| with self._lock: |
| self.all_job_groups.append(group) |
| |
| for job_ in group.jobs: |
| self.job_manager.AddJob(job_) |
| |
| group.status = job_group.STATUS_EXECUTING |
| |
| self._logger.info('Added %r to queue.', group) |
| |
| return group.id |
| |
| def KillJobGroup(self, group): |
| with self._lock: |
| self._logger.debug('Killing all jobs that belong to %r.', group) |
| |
| for job_ in group.jobs: |
| self.job_manager.KillJob(job_) |
| |
| self._logger.debug('Waiting for jobs to quit.') |
| |
| # Lets block until the group is killed so we know it is completed |
| # when we return. |
| while group.status not in [job_group.STATUS_SUCCEEDED, |
| job_group.STATUS_FAILED]: |
| self._job_group_finished.wait() |
| |
| def NotifyJobComplete(self, job_): |
| self._logger.debug('Handling %r completion event.', job_) |
| |
| group = job_.group |
| |
| with self._lock: |
| # We need to perform an action only if the group hasn't already failed. |
| if group.status != job_group.STATUS_FAILED: |
| if job_.status == job.STATUS_FAILED: |
| # We have a failed job, abort the job group |
| group.status = job_group.STATUS_FAILED |
| if group.cleanup_on_failure: |
| for job_ in group.jobs: |
| # TODO(bjanakiraman): We should probably only kill dependent jobs |
| # instead of the whole job group. |
| self.job_manager.KillJob(job_) |
| self.job_manager.CleanUpJob(job_) |
| else: |
| # The job succeeded successfully -- lets check to see if we are done. |
| assert job_.status == job.STATUS_SUCCEEDED |
| finished = True |
| for other_job in group.jobs: |
| assert other_job.status != job.STATUS_FAILED |
| if other_job.status != job.STATUS_SUCCEEDED: |
| finished = False |
| break |
| |
| if finished and group.status != job_group.STATUS_SUCCEEDED: |
| # TODO(kbaclawski): Without check performed above following code |
| # could be called more than once. This would trigger StateMachine |
| # crash, because it cannot transition from STATUS_SUCCEEDED to |
| # STATUS_SUCCEEDED. Need to address that bug in near future. |
| group.status = job_group.STATUS_SUCCEEDED |
| if group.cleanup_on_completion: |
| for job_ in group.jobs: |
| self.job_manager.CleanUpJob(job_) |
| |
| self._job_group_finished.notifyAll() |