blob: 8e82d3a7242c713f92b087ec3f40d507cf0b2206 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import threading
class MultithreadedProcessor():
"""
An instance of this class allows to execute a given function many times.
Function's calls are parallelized by using Python threads. The function
must take exactly one parameter: unique task id.
There is a simple example in the file test_multithreaded_processor.py.
Please note that there is no Python implementation that allows for "real"
multithreading. Employing this class makes sense only if the given function
stalls on some I/O operations and/or significant amount of execution time
is beyond Python interpreter.
"""
def __init__(self, number_of_threads):
"""
@param number_of_threads: number of threads used for parallelization.
This value must be larger than 0 and is usually NOT directly
related to the number of available cores.
"""
assert number_of_threads > 0
self._number_of_threads = number_of_threads
self._critical_section = threading.Lock()
def run(self, function_task, number_of_tasks):
"""
This method calls a given function many times. Each call may be
executed in separate thread (depending on the number of threads
set in __init__(...)). This is blocking method; it exits only if
all threads finish. It also stops on a first error (exception raised
in any task). In this case the function cancels all unstarted tasks,
waits for all threads to finish (started tasks cannot be stopped) and
raises an exception.
@param function_task: a function to execute, it must take exactly
one parameter: a task_id. It is an unique number from
the range [0,number_of_tasks). Each call of the function
corresponds to a single task. Tasks can be executed in
parallel and in any order, but every task is executed
exactly once.
@param number_of_tasks: a number of tasks to execute
@return: An list of outputs from all tasks, an index of every element
coresponds to task id.
@throws Exception if at least one of the tasks threw any Exception.
"""
self._tasks_ids = range(number_of_tasks) # list of tasks ids to process
self._outputs = [None]*number_of_tasks
self._error = None
# creating and starting threads
threads = []
while len(threads) < self._number_of_threads:
thread = threading.Thread(target=self._thread, args=[function_task])
threads.append(thread)
thread.start()
# waiting for all threads to finish
for thread in threads:
thread.join()
# the self._error is set <=> at least one of the tasks failed
if self._error is not None:
message = 'One of threads failed with the following error: '
message += self._error
raise Exception(message)
# no errors - the list of outputs is returned
return self._outputs
def _thread(self, function_task):
"""
An internal method representing single thread. It processes available
tasks. It exits when there is no more tasks to process or when a task
threw an exception. This method is not supposed to throw any
exceptions.
@param function_task: a task function to execute, it must take exactly
one parameter: a task_id. These identifiers are taken from
the list self._tasks_ids.
"""
try:
while True:
# critical section is used here to synchronize access to
# shared variables
with self._critical_section:
# exit if there is no more tasks to process
if len(self._tasks_ids) == 0:
return
# otherwise take task id to process
task_id = self._tasks_ids.pop()
# run task with assigned task id
self._outputs[task_id] = function_task(task_id)
except BaseException as exception:
# probably the task being processed raised an exception
# critical section is used to synchronized access to shared fields
with self._critical_section:
# if this is the first error ...
if self._error is None:
# ... we cancel all unassigned tasks ...
self._tasks_ids = []
# ... and saved the error as string
self._error = str(exception)
# exit on the first spotted error
return