@ -1078,21 +1078,22 @@ def decode(stream: T.Union[None, bytes]) -> str:
return stream . decode ( ' iso-8859-1 ' , errors = ' ignore ' )
async def read_decode ( reader : asyncio . StreamReader , console_mode : ConsoleUser ) - > str :
if console_mode is not ConsoleUser . STDOUT :
return decode ( await reader . read ( - 1 ) )
stdo_lines = [ ]
while not reader . at_eof ( ) :
line = decode ( await reader . readline ( ) )
stdo_lines . append ( line )
print ( line , end = ' ' , flush = True )
return ' ' . join ( stdo_lines )
try :
while not reader . at_eof ( ) :
line = decode ( await reader . readline ( ) )
stdo_lines . append ( line )
if console_mode is ConsoleUser . STDOUT :
print ( line , end = ' ' , flush = True )
return ' ' . join ( stdo_lines )
except asyncio . CancelledError :
return ' ' . join ( stdo_lines )
# Extract lines out of the StreamReader. Print them
# along the way if requested, and at the end collect
# them all into a future.
async def read_decode_lines ( reader : asyncio . StreamReader , f : ' asyncio.Future[str ] ' ,
console_mode : ConsoleUser ) - > T . AsyncIterator [ str ] :
async def read_decode_lines ( reader : asyncio . StreamReader , q : ' asyncio.Queue[T.Optional[str] ] ' ,
console_mode : ConsoleUser ) - > str :
stdo_lines = [ ]
try :
while not reader . at_eof ( ) :
@ -1100,11 +1101,12 @@ async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str
stdo_lines . append ( line )
if console_mode is ConsoleUser . STDOUT :
print ( line , end = ' ' , flush = True )
yield line
except Exception as e :
f . set_exception ( e )
await q . put ( line )
return ' ' . join ( stdo_lines )
except asyncio . CancelledError :
return ' ' . join ( stdo_lines )
finally :
f . set_result ( ' ' . join ( stdo_lines ) )
await q . put ( None )
def run_with_mono ( fname : str ) - > bool :
return fname . endswith ( ' .exe ' ) and not ( is_windows ( ) or is_cygwin ( ) )
@ -1130,6 +1132,14 @@ async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, floa
except asyncio . TimeoutError :
pass
async def queue_iter ( q : ' asyncio.Queue[T.Optional[str]] ' ) - > T . AsyncIterator [ str ] :
while True :
item = await q . get ( )
q . task_done ( )
if item is None :
break
yield item
async def complete ( future : asyncio . Future ) - > None :
""" Wait for completion of the given future, ignoring cancellation. """
try :
@ -1153,13 +1163,15 @@ class TestSubprocess:
self . _process = p
self . stdout = stdout
self . stderr = stderr
self . stdo_task = None # type: T.Optional[T.Awaitabl e[str]]
self . stde_task = None # type: T.Optional[T.Awaitabl e[str]]
self . stdo_task = None # type: T.Optional[asyncio.Futur e[str]]
self . stde_task = None # type: T.Optional[asyncio.Futur e[str]]
self . postwait_fn = postwait_fn # type: T.Callable[[], None]
def stdout_lines ( self , console_mode : ConsoleUser ) - > T . AsyncIterator [ str ] :
self . stdo_task = asyncio . get_event_loop ( ) . create_future ( )
return read_decode_lines ( self . _process . stdout , self . stdo_task , console_mode )
q = asyncio . Queue ( ) # type: asyncio.Queue[T.Optional[str]]
decode_coro = read_decode_lines ( self . _process . stdout , q , console_mode )
self . stdo_task = asyncio . ensure_future ( decode_coro )
return queue_iter ( q )
def communicate ( self , console_mode : ConsoleUser ) - > T . Tuple [ T . Optional [ T . Awaitable [ str ] ] ,
T . Optional [ T . Awaitable [ str ] ] ] :
@ -1213,6 +1225,11 @@ class TestSubprocess:
# for the event loop to pick that up.
await p . wait ( )
return None
finally :
if self . stdo_task :
self . stdo_task . cancel ( )
if self . stde_task :
self . stde_task . cancel ( )
async def wait ( self , timeout : T . Optional [ int ] ) - > T . Tuple [ int , TestResult , T . Optional [ str ] ] :
p = self . _process