@ -1148,14 +1148,37 @@ async def complete(future: asyncio.Future) -> None:
except asyncio . CancelledError :
pass
async def complete_all ( futures : T . Iterable [ asyncio . Future ] ) - > None :
""" Wait for completion of all the given futures, ignoring cancellation. """
while futures :
done , futures = await asyncio . wait ( futures , return_when = asyncio . FIRST_EXCEPTION )
# Raise exceptions if needed for all the "done" futures
for f in done :
if not f . cancelled ( ) :
async def complete_all ( futures : T . Iterable [ asyncio . Future ] ,
timeout : T . Optional [ T . Union [ int , float ] ] = None ) - > None :
""" Wait for completion of all the given futures, ignoring cancellation.
If timeout is not None , raise an asyncio . TimeoutError after the given
time has passed . asyncio . TimeoutError is only raised if some futures
have not completed and none have raised exceptions , even if timeout
is zero . """
def check_futures ( futures : T . Iterable [ asyncio . Future ] ) - > None :
# Raise exceptions if needed
left = False
for f in futures :
if not f . done ( ) :
left = True
elif not f . cancelled ( ) :
f . result ( )
if left :
raise asyncio . TimeoutError
# Python is silly and does not have a variant of asyncio.wait with an
# absolute time as deadline.
deadline = None if timeout is None else asyncio . get_event_loop ( ) . time ( ) + timeout
while futures and ( timeout is None or timeout > 0 ) :
done , futures = await asyncio . wait ( futures , timeout = timeout ,
return_when = asyncio . FIRST_EXCEPTION )
check_futures ( done )
if deadline :
timeout = deadline - asyncio . get_event_loop ( ) . time ( )
check_futures ( futures )
class TestSubprocess :
def __init__ ( self , p : asyncio . subprocess . Process ,
@ -1167,6 +1190,7 @@ class TestSubprocess:
self . stdo_task = None # type: T.Optional[asyncio.Future[str]]
self . stde_task = None # type: T.Optional[asyncio.Future[str]]
self . postwait_fn = postwait_fn # type: T.Callable[[], None]
self . all_futures = [ ] # type: T.List[asyncio.Future]
def stdout_lines ( self , console_mode : ConsoleUser ) - > T . AsyncIterator [ str ] :
q = asyncio . Queue ( ) # type: asyncio.Queue[T.Optional[str]]
@ -1181,9 +1205,11 @@ class TestSubprocess:
if self . stdo_task is None and self . stdout is not None :
decode_coro = read_decode ( self . _process . stdout , console_mode )
self . stdo_task = asyncio . ensure_future ( decode_coro )
self . all_futures . append ( self . stdo_task )
if self . stderr is not None and self . stderr != asyncio . subprocess . STDOUT :
decode_coro = read_decode ( self . _process . stderr , console_mode )
self . stde_task = asyncio . ensure_future ( decode_coro )
self . all_futures . append ( self . stde_task )
return self . stdo_task , self . stde_task
@ -1236,11 +1262,13 @@ class TestSubprocess:
p = self . _process
result = None
additional_error = None
self . all_futures . append ( asyncio . ensure_future ( p . wait ( ) ) )
try :
await try_wait_one ( p . wait ( ) , timeout = timeout )
if p . returncode is None :
additional_error = await self . _kill ( )
result = TestResult . TIMEOUT
await complete_all ( self . all_futures , timeout = timeout )
except asyncio . TimeoutError :
additional_error = await self . _kill ( )
result = TestResult . TIMEOUT
except asyncio . CancelledError :
# The main loop must have seen Ctrl-C.
additional_error = await self . _kill ( )