blob: 6b878b30f1e8c1f81ba420eb9e30a5d2cf5b8e02 [file] [log] [blame]
"""Pipeline process that encapsulates the actual content.
The actual stages include the Steering algorithm, the builder and the executor.
"""
__author__ = 'yuhenglong@google.com (Yuheng Long)'
import multiprocessing
class PipelineProcess(multiprocessing.Process):
"""A process that encapsulates the actual content.
It continuously pull tasks from the queue until a poison pill is received.
Once a job is received, it will hand it to the actual stage for processing.
"""
# Poison pill means shutdown
POISON_PILL = None
def __init__(self, method, task_queue, result_queue):
"""Set up input/output queue and the actual method to be called.
Args:
method: The actual pipeline stage to be invoked.
task_queue: The input task queue for this pipeline stage.
result_queue: The output task queue for this pipeline stage.
"""
multiprocessing.Process.__init__(self)
self._method = method
self._task_queue = task_queue
self._result_queue = result_queue
def run(self):
"""Busy pulling the next task from the queue for execution.
Once a job is pulled, this stage invokes the actual stage method and submits
the result to the next pipeline stage.
The process will terminate on receiving the poison pill from previous stage.
"""
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
self.result_queue.put(None)
break
self._method(next_task)
self.result_queue.put(next_task)