From f07e0da6dd5ef1e9a634b67714eaf342984b3022 Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Mon, 20 Apr 2020 23:25:59 +0200 Subject: [PATCH] Make YAPF happy --- .../grpcio/grpc/experimental/aio/_channel.py | 15 +++++---- .../grpc/experimental/aio/_interceptor.py | 21 ++++++------ .../grpcio_tests/tests_aio/unit/_common.py | 1 + .../client_unary_stream_interceptor_test.py | 33 +++++++++---------- 4 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index f783999dd2a..edd956b973a 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -224,16 +224,17 @@ class Channel(_base_channel.Channel): self._unary_stream_interceptors = [] if interceptors: - attrs_and_interceptor_classes = ( - (self._unary_unary_interceptors, UnaryUnaryClientInterceptor), - (self._unary_stream_interceptors, UnaryStreamClientInterceptor) - ) + attrs_and_interceptor_classes = ((self._unary_unary_interceptors, + UnaryUnaryClientInterceptor), + (self._unary_stream_interceptors, + UnaryStreamClientInterceptor)) # pylint: disable=cell-var-from-loop for attr, interceptor_class in attrs_and_interceptor_classes: - attr.extend( - [interceptor for interceptor in interceptors if isinstance(interceptor, interceptor_class)] - ) + attr.extend([ + interceptor for interceptor in interceptors + if isinstance(interceptor, interceptor_class) + ]) invalid_interceptors = set(interceptors) - set( self._unary_unary_interceptors) - set( diff --git a/src/python/grpcio/grpc/experimental/aio/_interceptor.py b/src/python/grpcio/grpc/experimental/aio/_interceptor.py index 469585de51a..80d17e04ce9 100644 --- a/src/python/grpcio/grpc/experimental/aio/_interceptor.py +++ b/src/python/grpcio/grpc/experimental/aio/_interceptor.py @@ -125,10 +125,11 @@ class UnaryStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta): """Affords intercepting unary-stream invocations.""" @abstractmethod - async def intercept_unary_stream(self, continuation: Callable[[ - ClientCallDetails, RequestType], UnaryStreamCall], - client_call_details: ClientCallDetails, - request: RequestType) -> Union[AsyncIterable[ResponseType], UnaryStreamCall]: + async def intercept_unary_stream( + self, continuation: Callable[[ClientCallDetails, RequestType], + UnaryStreamCall], + client_call_details: ClientCallDetails, request: RequestType + ) -> Union[AsyncIterable[ResponseType], UnaryStreamCall]: """Intercepts a unary-stream invocation asynchronously. Args: @@ -186,8 +187,8 @@ class InterceptedCall: self.cancel() def _fire_or_add_pending_done_callbacks(self, - interceptors_task: asyncio.Task - ) -> None: + interceptors_task: asyncio.Task + ) -> None: if not self._pending_add_done_callbacks: return @@ -449,7 +450,6 @@ class InterceptedUnaryStreamCall(InterceptedCall, _base_call.UnaryStreamCall): ) -> UnaryStreamCall: """Run the RPC call wrapped in interceptors""" - async def _run_interceptor( interceptors: Iterator[UnaryStreamClientInterceptor], client_call_details: ClientCallDetails, @@ -464,12 +464,13 @@ class InterceptedUnaryStreamCall(InterceptedCall, _base_call.UnaryStreamCall): call_or_response_iterator = await interceptor.intercept_unary_stream( continuation, client_call_details, request) - if isinstance(call_or_response_iterator, _base_call.UnaryUnaryCall): + if isinstance(call_or_response_iterator, + _base_call.UnaryUnaryCall): self._last_returned_call_from_interceptors = call_or_response_iterator else: self._last_returned_call_from_interceptors = UnaryStreamCallResponseIterator( - self._last_returned_call_from_interceptors, - call_or_response_iterator) + self._last_returned_call_from_interceptors, + call_or_response_iterator) return self._last_returned_call_from_interceptors else: self._last_returned_call_from_interceptors = UnaryStreamCall( diff --git a/src/python/grpcio_tests/tests_aio/unit/_common.py b/src/python/grpcio_tests/tests_aio/unit/_common.py index e820a18dd77..97cbe759ed0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_common.py +++ b/src/python/grpcio_tests/tests_aio/unit/_common.py @@ -36,6 +36,7 @@ async def block_until_certain_state(channel: aio.Channel, await channel.wait_for_state_change(state) state = channel.get_state() + def inject_callbacks(call): first_callback_ran = asyncio.Event() diff --git a/src/python/grpcio_tests/tests_aio/unit/client_unary_stream_interceptor_test.py b/src/python/grpcio_tests/tests_aio/unit/client_unary_stream_interceptor_test.py index fc9c8d81ad0..3d14b496a8a 100644 --- a/src/python/grpcio_tests/tests_aio/unit/client_unary_stream_interceptor_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/client_unary_stream_interceptor_test.py @@ -86,8 +86,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase): request = messages_pb2.StreamingOutputCallRequest() request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE) + ] * _NUM_STREAM_RESPONSES) channel = aio.insecure_channel(self._server_target, interceptors=[interceptor]) @@ -116,7 +116,7 @@ class TestUnaryStreamClientInterceptor(AioTestBase): if interceptor_class == _UnaryStreamInterceptorWithResponseIterator: self.assertEqual(interceptor.response_iterator.response_cnt, - _NUM_STREAM_RESPONSES) + _NUM_STREAM_RESPONSES) await channel.close() @@ -129,8 +129,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase): request = messages_pb2.StreamingOutputCallRequest() request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE) + ] * _NUM_STREAM_RESPONSES) channel = aio.insecure_channel(self._server_target, interceptors=[interceptor]) @@ -155,8 +155,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase): request = messages_pb2.StreamingOutputCallRequest() request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE) + ] * _NUM_STREAM_RESPONSES) channel = aio.insecure_channel(self._server_target, interceptors=[interceptor]) @@ -185,9 +185,9 @@ class TestUnaryStreamClientInterceptor(AioTestBase): stub = test_pb2_grpc.TestServiceStub(channel) request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + request.response_parameters.extend( + [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] * + _NUM_STREAM_RESPONSES) call = stub.StreamingOutputCall(request) @@ -201,7 +201,7 @@ class TestUnaryStreamClientInterceptor(AioTestBase): self.assertEqual(response_cnt, _NUM_STREAM_RESPONSES) self.assertEqual(interceptor.response_iterator.response_cnt, - _NUM_STREAM_RESPONSES) + _NUM_STREAM_RESPONSES) self.assertEqual(await call.code(), grpc.StatusCode.OK) await channel.close() @@ -220,8 +220,8 @@ class TestUnaryStreamClientInterceptor(AioTestBase): request = messages_pb2.StreamingOutputCallRequest() request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE) + ] * _NUM_STREAM_RESPONSES) call = stub.StreamingOutputCall(request) @@ -334,9 +334,9 @@ class TestUnaryStreamClientInterceptor(AioTestBase): async def test_cancel_consuming_response_iterator(self): request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.extend([ - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE)] * _NUM_STREAM_RESPONSES) + request.response_parameters.extend( + [messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)] * + _NUM_STREAM_RESPONSES) channel = aio.insecure_channel( self._server_target, @@ -402,7 +402,6 @@ class TestUnaryStreamClientInterceptor(AioTestBase): await channel.close() - if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2)