blob: 3c2b9136b2eceb17ad98137efab7f5f138f81f86 [file] [log] [blame]
"""Unittest for the pipeline_worker functions in the build/test stage.
Part of the Chrome build flags optimization.
This module tests the helper method and the worker method.
"""
__author__ = 'yuhenglong@google.com (Yuheng Long)'
import multiprocessing
import random
import sys
import unittest
import pipeline_process
import pipeline_worker
TESTSTAGE = 0
def MockTaskCostGenerator():
"""Calls a random number generator and returns a negative number."""
return random.randint(-sys.maxint - 1, -1)
class MockTask(object):
"""This class emulates an actual task.
It does not do the actual work, but simply returns the result as given when
this task is constructed.
"""
def __init__(self, identifier, cost):
"""Set up the results for this task.
Args:
identifier: the identifier of this task.
cost: the mock cost of this task.
The _pre_cost field stores the cost. Once this task is performed, i.e., by
calling the work method , the _cost field will have this cost.
"""
self._identifier = identifier
self._pre_cost = cost
def get_identifier(self, stage):
assert stage == TESTSTAGE
return self._identifier
def __eq__(self, other):
if isinstance(other, MockTask):
return self._identifier == other._identifier and self._cost == other._cost
return False
def set_result(self, stage, cost):
assert stage == TESTSTAGE
self._cost = cost
def work(self, stage):
assert stage == TESTSTAGE
self._cost = self._pre_cost
def get_result(self, stage):
assert stage == TESTSTAGE
return self._cost
def done(self, stage):
"""Indicates whether the task has been performed."""
assert stage == TESTSTAGE
return '_cost' in self.__dict__
class AuxiliaryTest(unittest.TestCase):
"""This class tests the pipeline_worker functions.
Given the same identifier, the cost should result the same from the
pipeline_worker functions.
"""
def testHelper(self):
""""Test the helper.
Call the helper method twice, and test the results. The results should be
the same, i.e., the cost should be the same.
"""
# Set up the input, helper and output queue for the helper method.
manager = multiprocessing.Manager()
helper_queue = manager.Queue()
result_queue = manager.Queue()
completed_queue = manager.Queue()
# Set up the helper process that holds the helper method.
helper_process = multiprocessing.Process(target=pipeline_worker.helper,
args=(TESTSTAGE, {}, helper_queue,
completed_queue,
result_queue))
helper_process.start()
# A dictionary defines the mock result to the helper.
mock_result = {1: 1995, 2: 59, 9: 1027}
# Test if there is a task that is done before, whether the duplicate task
# will have the same result. Here, two different scenarios are tested. That
# is the mock results are added to the completed_queue before and after the
# corresponding mock tasks being added to the input queue.
completed_queue.put((9, mock_result[9]))
# The output of the helper should contain all the following tasks.
results = [1, 1, 2, 9]
# Testing the correctness of having tasks having the same identifier, here
# 1.
for result in results:
helper_queue.put(MockTask(result, MockTaskCostGenerator()))
completed_queue.put((2, mock_result[2]))
completed_queue.put((1, mock_result[1]))
# Signal there is no more duplicate task.
helper_queue.put(pipeline_process.POISONPILL)
helper_process.join()
while results:
task = result_queue.get()
identifier = task._identifier
cost = task._cost
self.assertTrue(identifier in results)
if identifier in mock_result:
self.assertTrue(cost, mock_result[identifier])
results.remove(task._identifier)
def testWorker(self):
""""Test the worker method.
The worker should process all the input tasks and output the tasks to the
helper and result queue.
"""
manager = multiprocessing.Manager()
result_queue = manager.Queue()
completed_queue = manager.Queue()
# A dictionary defines the mock tasks and their corresponding results.
mock_work_tasks = {1: 86, 2: 788}
mock_tasks = []
for flag, cost in mock_work_tasks.iteritems():
mock_tasks.append(MockTask(flag, cost))
# Submit the mock tasks to the worker.
for mock_task in mock_tasks:
pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue,
result_queue)
# The tasks, from the output queue, should be the same as the input and
# should be performed.
for task in mock_tasks:
output = result_queue.get()
self.assertEqual(output, task)
self.assertTrue(output.done(TESTSTAGE))
# The tasks, from the completed_queue, should be defined in the
# mock_work_tasks dictionary.
for flag, cost in mock_work_tasks.iteritems():
helper_input = completed_queue.get()
self.assertEqual(helper_input, (flag, cost))
if __name__ == '__main__':
unittest.main()