Eliminate uses of Queue. Causes serious contention/slowdown as was currently used. Using explicit locks and shared resources scales a lot better locally.
diff --git a/gtest-parallel b/gtest-parallel index 7a2d423..58affcb 100755 --- a/gtest-parallel +++ b/gtest-parallel
@@ -12,13 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import Queue import optparse import subprocess import sys import threading import time +stdout_lock = threading.Lock() class FilterFormat: total_tests = 0 finished_tests = 0 @@ -50,35 +50,35 @@ print line elif command == "TESTCNT": self.total_tests = int(arg.split(' ', 1)[1]) + print "[0/%d] Running tests...\r" % self.total_tests, def add_stdout(self, job_id, output): self.outputs[job_id].append(output) - def log(self): - print "[0/?] Running tests...\r", - while True: - line = log.get() - if line == "": - break - (prefix, output) = line.split(' ', 1) + def log(self, line): + stdout_lock.acquire() + (prefix, output) = line.split(' ', 1) - if prefix[-1] == ':': - self.handle_meta(int(prefix[:-1]), output) - else: - self.add_stdout(int(prefix[:-1]), output) + if prefix[-1] == ':': + self.handle_meta(int(prefix[:-1]), output) + else: + self.add_stdout(int(prefix[:-1]), output) + stdout_lock.release() + + def end(self): if self.failures: print "FAILED TESTS (%d/%d):" % (len(self.failures), self.total_tests) for (binary, test) in self.failures: print " ", binary + ": " + test class RawFormat: - def log(self): - while True: - line = log.get() - if line == "": - return - sys.stdout.write(line + "\n") - sys.stdout.flush() + def log(self, line): + stdout_lock.acquire() + sys.stdout.write(line + "\n") + sys.stdout.flush() + stdout_lock.release() + def end(self): + pass parser = optparse.OptionParser( usage = 'usage: %prog [options] executable [executable ...]') @@ -138,14 +138,12 @@ # Repeat tests (-r flag). tests *= options.repeat - -log = Queue.Queue() -test_queue = Queue.Queue() +test_lock = threading.Lock() +job_id = 0 +logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests))) exit_code = 0 def run_job((command, job_id, test)): - if test == '': - raise Queue.Empty begin = time.time() sub = subprocess.Popen(command + ['--gtest_filter=' + test] + ['--gtest_color=' + options.gtest_color], @@ -156,22 +154,29 @@ line = sub.stdout.readline() if line == '': break - log.put(str(job_id) + '> ' + line.rstrip()) + logger.log(str(job_id) + '> ' + line.rstrip()) code = sub.wait() runtime_ms = int(1000 * (time.time() - begin)) - log.put(str(job_id) + ': EXIT ' + str(code) + ' ' + str(runtime_ms)) + logger.log(str(job_id) + ': EXIT ' + str(code) + ' ' + str(runtime_ms)) if code != 0: global exit_code exit_code = code def worker(): + global job_id while True: - try: - run_job(test_queue.get()) - test_queue.task_done() - except Queue.Empty: + job = None + test_lock.acquire() + if job_id < len(tests): + (test_binary, command, test) = tests[job_id] + logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test) + job = (command, job_id, test) + job_id += 1 + test_lock.release() + if job is None: return + run_job(job) def start_daemon(func): t = threading.Thread(target=func) @@ -180,17 +185,7 @@ return t workers = [start_daemon(worker) for i in range(options.workers)] -printer = start_daemon(logger.log) - -log.put(str(-1) + ': TESTCNT ' + ' ' + str(len(tests))) -for job_id, (test_binary, command, test) in enumerate(tests): - log.put(str(job_id) + ': TEST ' + test_binary + ' ' + test) - test_queue.put((command, job_id, test)) - -# Signal to tell the workers to stop. -[test_queue.put(('', '', '')) for i in range(options.workers)] [t.join() for t in workers] -log.put("") -printer.join() +logger.end() sys.exit(exit_code)