| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import threading |
| |
| |
| class MultithreadedProcessor(): |
| """ |
| An instance of this class allows to execute a given function many times. |
| Function's calls are parallelized by using Python threads. The function |
| must take exactly one parameter: unique task id. |
| There is a simple example in the file test_multithreaded_processor.py. |
| Please note that there is no Python implementation that allows for "real" |
| multithreading. Employing this class makes sense only if the given function |
| stalls on some I/O operations and/or significant amount of execution time |
| is beyond Python interpreter. |
| |
| """ |
| |
| def __init__(self, number_of_threads): |
| """ |
| @param number_of_threads: number of threads used for parallelization. |
| This value must be larger than 0 and is usually NOT directly |
| related to the number of available cores. |
| |
| """ |
| assert number_of_threads > 0 |
| self._number_of_threads = number_of_threads |
| self._critical_section = threading.Lock() |
| |
| |
| def run(self, function_task, number_of_tasks): |
| """ |
| This method calls a given function many times. Each call may be |
| executed in separate thread (depending on the number of threads |
| set in __init__(...)). This is blocking method; it exits only if |
| all threads finish. It also stops on a first error (exception raised |
| in any task). In this case the function cancels all unstarted tasks, |
| waits for all threads to finish (started tasks cannot be stopped) and |
| raises an exception. |
| |
| @param function_task: a function to execute, it must take exactly |
| one parameter: a task_id. It is an unique number from |
| the range [0,number_of_tasks). Each call of the function |
| corresponds to a single task. Tasks can be executed in |
| parallel and in any order, but every task is executed |
| exactly once. |
| @param number_of_tasks: a number of tasks to execute |
| |
| @return: An list of outputs from all tasks, an index of every element |
| coresponds to task id. |
| |
| @throws Exception if at least one of the tasks threw any Exception. |
| |
| """ |
| self._tasks_ids = range(number_of_tasks) # list of tasks ids to process |
| self._outputs = [None]*number_of_tasks |
| self._error = None |
| |
| # creating and starting threads |
| threads = [] |
| while len(threads) < self._number_of_threads: |
| thread = threading.Thread(target=self._thread, args=[function_task]) |
| threads.append(thread) |
| thread.start() |
| |
| # waiting for all threads to finish |
| for thread in threads: |
| thread.join() |
| |
| # the self._error is set <=> at least one of the tasks failed |
| if self._error is not None: |
| message = 'One of threads failed with the following error: ' |
| message += self._error |
| raise Exception(message) |
| |
| # no errors - the list of outputs is returned |
| return self._outputs |
| |
| |
| def _thread(self, function_task): |
| """ |
| An internal method representing single thread. It processes available |
| tasks. It exits when there is no more tasks to process or when a task |
| threw an exception. This method is not supposed to throw any |
| exceptions. |
| |
| @param function_task: a task function to execute, it must take exactly |
| one parameter: a task_id. These identifiers are taken from |
| the list self._tasks_ids. |
| |
| """ |
| try: |
| |
| while True: |
| # critical section is used here to synchronize access to |
| # shared variables |
| with self._critical_section: |
| # exit if there is no more tasks to process |
| if len(self._tasks_ids) == 0: |
| return |
| # otherwise take task id to process |
| task_id = self._tasks_ids.pop() |
| # run task with assigned task id |
| self._outputs[task_id] = function_task(task_id) |
| |
| except BaseException as exception: |
| # probably the task being processed raised an exception |
| # critical section is used to synchronized access to shared fields |
| with self._critical_section: |
| # if this is the first error ... |
| if self._error is None: |
| # ... we cancel all unassigned tasks ... |
| self._tasks_ids = [] |
| # ... and saved the error as string |
| self._error = str(exception) |
| # exit on the first spotted error |
| return |