|
|
|
@ -391,11 +391,11 @@ def _call_behavior(rpc_event, |
|
|
|
|
behavior, |
|
|
|
|
argument, |
|
|
|
|
request_deserializer, |
|
|
|
|
stream_observer=None): |
|
|
|
|
on_next_callback=None): |
|
|
|
|
context = _Context(rpc_event, state, request_deserializer) |
|
|
|
|
try: |
|
|
|
|
if stream_observer is not None: |
|
|
|
|
return behavior(argument, context, stream_observer), True |
|
|
|
|
if on_next_callback is not None: |
|
|
|
|
return behavior(argument, context, on_next_callback), True |
|
|
|
|
else: |
|
|
|
|
return behavior(argument, context), True |
|
|
|
|
except Exception as exception: # pylint: disable=broad-except |
|
|
|
@ -530,7 +530,7 @@ def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, |
|
|
|
|
behavior, |
|
|
|
|
argument, |
|
|
|
|
request_deserializer, |
|
|
|
|
stream_observer=on_next) |
|
|
|
|
on_next_callback=on_next) |
|
|
|
|
else: |
|
|
|
|
response_iterator, proceed = _call_behavior( |
|
|
|
|
rpc_event, state, behavior, argument, request_deserializer) |
|
|
|
@ -545,13 +545,13 @@ def _is_rpc_state_active(state): |
|
|
|
|
return state.client is not _CANCELLED and not state.statused |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _stream_response_iterator_adapter(rpc_event, state, stream_observer, |
|
|
|
|
def _stream_response_iterator_adapter(rpc_event, state, on_next_callback, |
|
|
|
|
response_iterator): |
|
|
|
|
while True: |
|
|
|
|
response, proceed = _take_response_from_response_iterator( |
|
|
|
|
rpc_event, state, response_iterator) |
|
|
|
|
if proceed: |
|
|
|
|
stream_observer(response) |
|
|
|
|
on_next_callback(response) |
|
|
|
|
if not _is_rpc_state_active(state): |
|
|
|
|
break |
|
|
|
|
else: |
|
|
|
|