blob: 9ab410a9e6a7262a05ff023fb29cdf19905cd89e [file] [log] [blame]
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import time
from portage.tests import TestCase
from portage.util._async.ForkProcess import ForkProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from portage.util.futures.iter_completed import (
iter_completed,
async_iter_completed,
)
class SleepProcess(ForkProcess):
__slots__ = ('future', 'seconds')
def _start(self):
self.addExitListener(self._future_done)
ForkProcess._start(self)
def _future_done(self, task):
if not self.future.cancelled():
self.future.set_result(self.seconds)
def _run(self):
time.sleep(self.seconds)
class IterCompletedTestCase(TestCase):
def testIterCompleted(self):
# Mark this as todo, since we don't want to fail if heavy system
# load causes the tasks to finish in an unexpected order.
self.todo = True
loop = global_event_loop()
tasks = [
SleepProcess(seconds=0.200),
SleepProcess(seconds=0.100),
SleepProcess(seconds=0.001),
]
expected_order = sorted(task.seconds for task in tasks)
def future_generator():
for task in tasks:
task.future = loop.create_future()
task.scheduler = loop
task.start()
yield task.future
for seconds, future in zip(expected_order, iter_completed(future_generator(),
max_jobs=True, max_load=None, loop=loop)):
self.assertEqual(seconds, future.result())
def testAsyncCancel(self):
loop = global_event_loop()
input_futures = set()
future_count = 3
def future_generator():
for i in range(future_count):
future = loop.create_future()
loop.call_soon(lambda future: None if future.done()
else future.set_result(None), future)
input_futures.add(future)
yield future
for future_done_set in async_iter_completed(future_generator(),
max_jobs=True, max_load=True, loop=loop):
future_done_set.cancel()
break
# With max_jobs=True, async_iter_completed should have executed
# the generator until it raised StopIteration.
self.assertEqual(future_count, len(input_futures))
loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
# The futures may have results or they may have been cancelled
# by TaskScheduler, and behavior varies depending on the python
# interpreter.
for future in input_futures:
future.cancelled() or future.result()