From 2996c03114726f9607819c1419e38478c355eb7b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 20 Nov 2020 12:08:23 -0800 Subject: [PATCH 1/3] Fix the emtpy response handling in streaming RPC --- .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 2 +- .../_cygrpc/aio/callback_common.pyx.pxi | 2 ++ .../tests_aio/unit/_test_server.py | 22 ++++++++----- .../grpcio_tests/tests_aio/unit/call_test.py | 31 +++++++++++++++++++ 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 10c024e1b39..2c2a3ff3f6e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -360,7 +360,7 @@ cdef class _AioCall(GrpcCallWrapper): self, self._loop ) - if received_message: + if received_message is not None: return received_message else: return EOF diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 86fc91e76a4..b10382ef7ee 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -130,6 +130,8 @@ async def _receive_message(GrpcCallWrapper grpc_call_wrapper, # # Since they all indicates finish, they are better be merged. _LOGGER.debug('Failed to receive any message from Core') + # NOTE(lidiz) The returned message might be an empty bytess (aka. b''). + # Please explicitly check if it is None or falsified string object! return receive_op.message() diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_server.py b/src/python/grpcio_tests/tests_aio/unit/_test_server.py index 5e5081a38d0..ee137dedb68 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_server.py @@ -67,10 +67,13 @@ class TestServiceServicer(test_pb2_grpc.TestServiceServicer): await asyncio.sleep( datetime.timedelta(microseconds=response_parameters. interval_us).total_seconds()) - yield messages_pb2.StreamingOutputCallResponse( - payload=messages_pb2.Payload(type=request.response_type, - body=b'\x00' * - response_parameters.size)) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.response_type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() # Next methods are extra ones that are registred programatically # when the sever is instantiated. They are not being provided by @@ -96,10 +99,13 @@ class TestServiceServicer(test_pb2_grpc.TestServiceServicer): await asyncio.sleep( datetime.timedelta(microseconds=response_parameters. interval_us).total_seconds()) - yield messages_pb2.StreamingOutputCallResponse( - payload=messages_pb2.Payload(type=request.payload.type, - body=b'\x00' * - response_parameters.size)) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.payload.type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() def _create_extra_generic_handler(servicer: TestServiceServicer): diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 1961226fa6d..c7d99a20c48 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -472,6 +472,24 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): self.assertEqual(grpc.StatusCode.OK, await call.code()) + async def test_empty_responses(self): + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters()) + + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) + + for _ in range(_NUM_STREAM_RESPONSES): + response = await call.read() + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(b'', response.SerializeToString()) + + self.assertEqual(grpc.StatusCode.OK, await call.code()) + class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): @@ -624,6 +642,10 @@ class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): _STREAM_OUTPUT_REQUEST_ONE_RESPONSE = messages_pb2.StreamingOutputCallRequest() _STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append( messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) +_STREAM_OUTPUT_REQUEST_ONE_EMPTY_RESPONSE = messages_pb2.StreamingOutputCallRequest( +) +_STREAM_OUTPUT_REQUEST_ONE_EMPTY_RESPONSE.response_parameters.append( + messages_pb2.ResponseParameters()) class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): @@ -808,6 +830,15 @@ class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_empty_ping_pong(self): + call = self._stub.FullDuplexCall() + for _ in range(_NUM_STREAM_RESPONSES): + await call.write(_STREAM_OUTPUT_REQUEST_ONE_EMPTY_RESPONSE) + response = await call.read() + self.assertEqual(b'', response.SerializeToString()) + await call.done_writing() + self.assertEqual(await call.code(), grpc.StatusCode.OK) + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) From 06ce4cb51cf884e2decaa3ec35ca1d75b7e29c32 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 20 Nov 2020 13:04:09 -0800 Subject: [PATCH 2/3] Use the correct adjective for false --- .../grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index b10382ef7ee..41d2ea5012d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -131,7 +131,7 @@ async def _receive_message(GrpcCallWrapper grpc_call_wrapper, # Since they all indicates finish, they are better be merged. _LOGGER.debug('Failed to receive any message from Core') # NOTE(lidiz) The returned message might be an empty bytess (aka. b''). - # Please explicitly check if it is None or falsified string object! + # Please explicitly check if it is None or falsey string object! return receive_op.message() From a0883ea81ae0586804eaec72b7bcd589744a0f5c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 20 Nov 2020 13:24:45 -0800 Subject: [PATCH 3/3] Found another typo in the comment --- .../grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 41d2ea5012d..bc25c2e4bae 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -130,7 +130,7 @@ async def _receive_message(GrpcCallWrapper grpc_call_wrapper, # # Since they all indicates finish, they are better be merged. _LOGGER.debug('Failed to receive any message from Core') - # NOTE(lidiz) The returned message might be an empty bytess (aka. b''). + # NOTE(lidiz) The returned message might be an empty bytes (aka. b''). # Please explicitly check if it is None or falsey string object! return receive_op.message()