blob: 4732369333d44f45273960be1b41b6c9b654d99f [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing methods/classes related to running parallel test jobs."""
import multiprocessing
import sys
import time
import cros_build_lib as cros_lib
class ParallelJobTimeoutError(Exception):
"""Thrown when a job ran for longer than expected."""
pass
class ParallelJob(multiprocessing.Process):
"""Small wrapper for Process that stores output of its target method."""
MAX_TIMEOUT_SECONDS = 2400
SLEEP_TIMEOUT_SECONDS = 180
def __init__(self, starting_semaphore, target, args):
"""Initializes an instance of a job.
Args:
starting_semaphore: Semaphore used by caller to wait on such that
there isn't more than a certain number of parallel_jobs running. Should
be initialized to a value for the number of parallel_jobs wanting to be
run at a time.
target: The func to run.
args: Args to pass to the fun.
"""
super(ParallelJob, self).__init__(target=target, args=args)
self._target = target
self._args = args
self._starting_semaphore = starting_semaphore
def run(self):
"""Thread override. Runs the method specified and sets output."""
try:
self._target(*self._args)
finally:
self._starting_semaphore.release()
@classmethod
def WaitUntilJobsComplete(cls, parallel_jobs):
"""Waits until all parallel_jobs have completed before returning.
Given an array of parallel_jobs, returns once all parallel_jobs have
completed or a max timeout is reached.
Raises:
ParallelJobTimeoutError: if max timeout is reached.
"""
def GetCurrentActiveCount():
"""Returns the (number of active jobs, first active job)."""
active_count = 0
active_job = None
for parallel_job in parallel_jobs:
if parallel_job.is_alive():
active_count += 1
if not active_job:
active_job = parallel_job
return (active_count, active_job)
first_time = True
start_time = time.time()
while (time.time() - start_time) < cls.MAX_TIMEOUT_SECONDS:
(active_count, active_job) = GetCurrentActiveCount()
if active_count == 0:
return
else:
if not first_time:
print (
'Process Pool Active: Waiting on %d/%d jobs to complete' %
(active_count, len(parallel_jobs)))
else:
first_time = False
active_job.join(cls.SLEEP_TIMEOUT_SECONDS)
time.sleep(5) # Prevents lots of printing out as job is ending.
for parallel_job in parallel_jobs:
if parallel_job.is_alive():
parallel_job.terminate()
raise ParallelJobTimeoutError('Exceeded max time of %d seconds to wait for '
'job completion.' % cls.MAX_TIMEOUT_SECONDS)
def __str__(self):
return '%s(%s)' % (self._target, self._args)
def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args):
"""Runs set number of specified jobs in parallel.
Note that there is a bug in Python Queue implementation that doesn't
allow arbitrary sizes to be returned. Instead, the process will just
appear to have hung. Be careful when accepting output.
Args:
number_of_simultaneous_jobs: Max number of parallel_jobs to be run in
parallel.
jobs: Array of methods to run.
jobs_args: Array of args associated with method calls.
Returns:
Returns an array of results corresponding to each parallel_job's output.
"""
def ProcessOutputWrapper(func, args, output_queue):
"""Simple function wrapper that puts the output of a function in a queue."""
try:
output_queue.put(func(*args))
except:
output_queue.put(None)
raise
finally:
output_queue.close()
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
# Cache sudo access.
cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True)
parallel_jobs = []
output_array = []
# Semaphore used to create a Process Pool.
job_start_semaphore = multiprocessing.Semaphore(number_of_simultaneous_jobs)
# Create the parallel jobs.
for job, args in map(lambda x, y: (x, y), jobs, jobs_args):
output = multiprocessing.Queue()
parallel_job = ParallelJob(job_start_semaphore,
target=ProcessOutputWrapper,
args=(job, args, output))
parallel_jobs.append(parallel_job)
output_array.append(output)
# We use a semaphore to ensure we don't run more jobs than required.
# After each parallel_job finishes, it releases (increments semaphore).
for next_parallel_job in parallel_jobs:
job_start_semaphore.acquire(block=True)
next_parallel_job.start()
ParallelJob.WaitUntilJobsComplete(parallel_jobs)
return [output.get() for output in output_array]