Make YAPF happy

pull/22713/head
Pau Freixes 5 years ago
parent dae80a4977
commit f07e0da6dd
  1. 15
      src/python/grpcio/grpc/experimental/aio/_channel.py
  2. 21
      src/python/grpcio/grpc/experimental/aio/_interceptor.py
  3. 1
      src/python/grpcio_tests/tests_aio/unit/_common.py
  4. 33
      src/python/grpcio_tests/tests_aio/unit/client_unary_stream_interceptor_test.py

@ -224,16 +224,17 @@ class Channel(_base_channel.Channel):
self._unary_stream_interceptors = [] self._unary_stream_interceptors = []
if interceptors: if interceptors:
attrs_and_interceptor_classes = ( attrs_and_interceptor_classes = ((self._unary_unary_interceptors,
(self._unary_unary_interceptors, UnaryUnaryClientInterceptor), UnaryUnaryClientInterceptor),
(self._unary_stream_interceptors, UnaryStreamClientInterceptor) (self._unary_stream_interceptors,
) UnaryStreamClientInterceptor))
# pylint: disable=cell-var-from-loop # pylint: disable=cell-var-from-loop
for attr, interceptor_class in attrs_and_interceptor_classes: for attr, interceptor_class in attrs_and_interceptor_classes:
attr.extend( attr.extend([
[interceptor for interceptor in interceptors if isinstance(interceptor, interceptor_class)] interceptor for interceptor in interceptors
) if isinstance(interceptor, interceptor_class)
])
invalid_interceptors = set(interceptors) - set( invalid_interceptors = set(interceptors) - set(
self._unary_unary_interceptors) - set( self._unary_unary_interceptors) - set(

@ -125,10 +125,11 @@ class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
"""Affords intercepting unary-stream invocations.""" """Affords intercepting unary-stream invocations."""
@abstractmethod @abstractmethod
async def intercept_unary_stream(self, continuation: Callable[[ async def intercept_unary_stream(
ClientCallDetails, RequestType], UnaryStreamCall], self, continuation: Callable[[ClientCallDetails, RequestType],
client_call_details: ClientCallDetails, UnaryStreamCall],
request: RequestType) -> Union[AsyncIterable[ResponseType], UnaryStreamCall]: client_call_details: ClientCallDetails, request: RequestType
) -> Union[AsyncIterable[ResponseType], UnaryStreamCall]:
"""Intercepts a unary-stream invocation asynchronously. """Intercepts a unary-stream invocation asynchronously.
Args: Args:
@ -186,8 +187,8 @@ class InterceptedCall:
self.cancel() self.cancel()
def _fire_or_add_pending_done_callbacks(self, def _fire_or_add_pending_done_callbacks(self,
interceptors_task: asyncio.Task interceptors_task: asyncio.Task
) -> None: ) -> None:
if not self._pending_add_done_callbacks: if not self._pending_add_done_callbacks:
return return
@ -449,7 +450,6 @@ class InterceptedUnaryStreamCall(InterceptedCall, _base_call.UnaryStreamCall):
) -> UnaryStreamCall: ) -> UnaryStreamCall:
"""Run the RPC call wrapped in interceptors""" """Run the RPC call wrapped in interceptors"""
async def _run_interceptor( async def _run_interceptor(
interceptors: Iterator[UnaryStreamClientInterceptor], interceptors: Iterator[UnaryStreamClientInterceptor],
client_call_details: ClientCallDetails, client_call_details: ClientCallDetails,
@ -464,12 +464,13 @@ class InterceptedUnaryStreamCall(InterceptedCall, _base_call.UnaryStreamCall):
call_or_response_iterator = await interceptor.intercept_unary_stream( call_or_response_iterator = await interceptor.intercept_unary_stream(
continuation, client_call_details, request) continuation, client_call_details, request)
if isinstance(call_or_response_iterator, _base_call.UnaryUnaryCall): if isinstance(call_or_response_iterator,
_base_call.UnaryUnaryCall):
self._last_returned_call_from_interceptors = call_or_response_iterator self._last_returned_call_from_interceptors = call_or_response_iterator
else: else:
self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator( self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator(
self._last_returned_call_from_interceptors, self._last_returned_call_from_interceptors,
call_or_response_iterator) call_or_response_iterator)
return self._last_returned_call_from_interceptors return self._last_returned_call_from_interceptors
else: else:
self._last_returned_call_from_interceptors = UnaryStreamCall( self._last_returned_call_from_interceptors = UnaryStreamCall(

@ -36,6 +36,7 @@ async def block_until_certain_state(channel: aio.Channel,
await channel.wait_for_state_change(state) await channel.wait_for_state_change(state)
state = channel.get_state() state = channel.get_state()
def inject_callbacks(call): def inject_callbacks(call):
first_callback_ran = asyncio.Event() first_callback_ran = asyncio.Event()

@ -86,8 +86,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend([
messages_pb2.ResponseParameters( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) ] * _NUM_STREAM_RESPONSES)
channel = aio.insecure_channel(self._server_target, channel = aio.insecure_channel(self._server_target,
interceptors=[interceptor]) interceptors=[interceptor])
@ -116,7 +116,7 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
if interceptor_class == _UnaryStreamInterceptorWithResponseIterator: if interceptor_class == _UnaryStreamInterceptorWithResponseIterator:
self.assertEqual(interceptor.response_iterator.response_cnt, self.assertEqual(interceptor.response_iterator.response_cnt,
_NUM_STREAM_RESPONSES) _NUM_STREAM_RESPONSES)
await channel.close() await channel.close()
@ -129,8 +129,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend([
messages_pb2.ResponseParameters( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) ] * _NUM_STREAM_RESPONSES)
channel = aio.insecure_channel(self._server_target, channel = aio.insecure_channel(self._server_target,
interceptors=[interceptor]) interceptors=[interceptor])
@ -155,8 +155,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend([
messages_pb2.ResponseParameters( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) ] * _NUM_STREAM_RESPONSES)
channel = aio.insecure_channel(self._server_target, channel = aio.insecure_channel(self._server_target,
interceptors=[interceptor]) interceptors=[interceptor])
@ -185,9 +185,9 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
stub = test_pb2_grpc.TestServiceStub(channel) stub = test_pb2_grpc.TestServiceStub(channel)
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend(
messages_pb2.ResponseParameters( [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] *
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) _NUM_STREAM_RESPONSES)
call = stub.StreamingOutputCall(request) call = stub.StreamingOutputCall(request)
@ -201,7 +201,7 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES) self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES)
self.assertEqual(interceptor.response_iterator.response_cnt, self.assertEqual(interceptor.response_iterator.response_cnt,
_NUM_STREAM_RESPONSES) _NUM_STREAM_RESPONSES)
self.assertEqual(await call.code(), grpc.StatusCode.OK) self.assertEqual(await call.code(), grpc.StatusCode.OK)
await channel.close() await channel.close()
@ -220,8 +220,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend([
messages_pb2.ResponseParameters( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) ] * _NUM_STREAM_RESPONSES)
call = stub.StreamingOutputCall(request) call = stub.StreamingOutputCall(request)
@ -334,9 +334,9 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
async def test_cancel_consuming_response_iterator(self): async def test_cancel_consuming_response_iterator(self):
request = messages_pb2.StreamingOutputCallRequest() request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.extend([ request.response_parameters.extend(
messages_pb2.ResponseParameters( [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] *
size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) _NUM_STREAM_RESPONSES)
channel = aio.insecure_channel( channel = aio.insecure_channel(
self._server_target, self._server_target,
@ -402,7 +402,6 @@ class TestUnaryStreamClientInterceptor(AioTestBase):
await channel.close() await channel.close()
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2) unittest.main(verbosity=2)

Loading…
Cancel
Save