diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi index aae9daefd99..acee440e673 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -35,8 +35,8 @@ cdef class _AioCall(GrpcCallWrapper): # the initial metadata. Waiters are used for pausing the execution of # tasks that are asking for one of the field when they are not yet # available. - object _status - object _initial_metadata + readonly AioRpcStatus _status + readonly tuple _initial_metadata list _waiters_status list _waiters_initial_metadata diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 289fce4c303..db1dc2a87dd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -249,6 +249,10 @@ cdef class _AioCall(GrpcCallWrapper): return self._status + def is_ok(self): + """Returns if the RPC is ended with ok.""" + return self.done() and self._status.code() == StatusCode.ok + async def initial_metadata(self): """Returns the initial metadata of the RPC call. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi index da3e74efdad..76cc2996552 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi @@ -13,9 +13,9 @@ # limitations under the License. -cdef int get_status_code(object code) except *: +cdef grpc_status_code get_status_code(object code) except *: if isinstance(code, int): - if code >=0 and code < 15: + if code >= StatusCode.ok and code <= StatusCode.data_loss: return code else: return StatusCode.unknown diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 5c79460879d..f6a6f4d7094 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -28,6 +28,8 @@ cdef class RPCState(GrpcCallWrapper): cdef object abort_exception cdef bint metadata_sent cdef bint status_sent + cdef grpc_status_code status_code + cdef str status_details cdef tuple trailing_metadata cdef bytes method(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index e0211201188..cb427226ab8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -42,6 +42,8 @@ cdef class RPCState: self.abort_exception = None self.metadata_sent = False self.status_sent = False + self.status_code = StatusCode.ok + self.status_details = '' self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA cdef bytes method(self): @@ -143,6 +145,9 @@ cdef class _ServicerContext: if trailing_metadata == _IMMUTABLE_EMPTY_METADATA and self._rpc_state.trailing_metadata: trailing_metadata = self._rpc_state.trailing_metadata + if details == '' and self._rpc_state.status_details: + details = self._rpc_state.status_details + actual_code = get_status_code(code) self._rpc_state.status_sent = True @@ -163,6 +168,12 @@ cdef class _ServicerContext: def invocation_metadata(self): return self._rpc_state.invocation_metadata() + def set_code(self, object code): + self._rpc_state.status_code = get_status_code(code) + + def set_details(self, str details): + self._rpc_state.status_details = details + cdef _find_method_handler(str method, tuple metadata, list generic_handlers): cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(method, @@ -209,8 +220,8 @@ async def _finish_handler_with_unary_response(RPCState rpc_state, SendMessageOperation(response_raw, _EMPTY_FLAGS), SendStatusFromServerOperation( rpc_state.trailing_metadata, - StatusCode.ok, - b'', + rpc_state.status_code, + rpc_state.status_details, _EMPTY_FLAGS, ), ) @@ -262,8 +273,8 @@ async def _finish_handler_with_stream_responses(RPCState rpc_state, # Sends the final status of this RPC cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( rpc_state.trailing_metadata, - StatusCode.ok, - b'', + rpc_state.status_code, + rpc_state.status_details, _EMPTY_FLAGS, ) @@ -419,11 +430,20 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): except _ServerStoppedError: _LOGGER.info('Aborting RPC due to server stop.') except Exception as e: - _LOGGER.exception(e) + _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % ( + type(e).__name__, + _decode(rpc_state.method()), + )) if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED: + # Allows users to raise other types of exception with specified status code + if rpc_state.status_code == StatusCode.ok: + status_code = StatusCode.unknown + else: + status_code = rpc_state.status_code + await _send_error_status_from_server( rpc_state, - StatusCode.unknown, + status_code, 'Unexpected %s: %s' % (type(e), e), rpc_state.trailing_metadata, rpc_state.metadata_sent, diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 8067a50c096..e237cc8085e 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -260,7 +260,20 @@ class _UnaryResponseMixin(Call): if not self.cancelled(): self.cancel() raise - return response + + # NOTE(lidiz) If we raise RpcError in the task, and users doesn't + # 'await' on it. AsyncIO will log 'Task exception was never retrieved'. + # Instead, if we move the exception raising here, the spam stops. + # Unfortunately, there can only be one 'yield from' in '__await__'. So, + # we need to access the private instance variable. + if response is cygrpc.EOF: + if self._cython_call.is_locally_cancelled(): + raise asyncio.CancelledError() + else: + raise _create_rpc_error(self._cython_call._initial_metadata, + self._cython_call._status) + else: + return response class _StreamResponseMixin(Call): @@ -432,11 +445,11 @@ class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): if not self.cancelled(): self.cancel() - # Raises here if RPC failed or cancelled - await self._raise_for_status() - - return _common.deserialize(serialized_response, - self._response_deserializer) + if self._cython_call.is_ok(): + return _common.deserialize(serialized_response, + self._response_deserializer) + else: + return cygrpc.EOF class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall): @@ -506,11 +519,11 @@ class StreamUnaryCall(_StreamRequestMixin, _UnaryResponseMixin, Call, if not self.cancelled(): self.cancel() - # Raises RpcError if the RPC failed or cancelled - await self._raise_for_status() - - return _common.deserialize(serialized_response, - self._response_deserializer) + if self._cython_call.is_ok(): + return _common.deserialize(serialized_response, + self._response_deserializer) + else: + return cygrpc.EOF class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call, diff --git a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py index 861a4efbb49..e6dea065a32 100644 --- a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py +++ b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py @@ -67,6 +67,26 @@ class InteropTestCaseMixin: await methods.test_interoperability( methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None) + async def test_empty_stream(self): + await methods.test_interoperability(methods.TestCase.EMPTY_STREAM, + self._stub, None) + + async def test_status_code_and_message(self): + await methods.test_interoperability( + methods.TestCase.STATUS_CODE_AND_MESSAGE, self._stub, None) + + async def test_unimplemented_method(self): + await methods.test_interoperability( + methods.TestCase.UNIMPLEMENTED_METHOD, self._stub, None) + + async def test_unimplemented_service(self): + await methods.test_interoperability( + methods.TestCase.UNIMPLEMENTED_SERVICE, self._stub, None) + + async def test_custom_metadata(self): + await methods.test_interoperability(methods.TestCase.CUSTOM_METADATA, + self._stub, None) + async def test_special_status_message(self): await methods.test_interoperability( methods.TestCase.SPECIAL_STATUS_MESSAGE, self._stub, None) diff --git a/src/python/grpcio_tests/tests_aio/interop/methods.py b/src/python/grpcio_tests/tests_aio/interop/methods.py index d152b914748..7c5c1edfd2b 100644 --- a/src/python/grpcio_tests/tests_aio/interop/methods.py +++ b/src/python/grpcio_tests/tests_aio/interop/methods.py @@ -42,14 +42,14 @@ async def _expect_status_code(call: aio.Call, code = await call.code() if code != expected_code: raise ValueError('expected code %s, got %s' % - (expected_code, call.code())) + (expected_code, await call.code())) async def _expect_status_details(call: aio.Call, expected_details: str) -> None: details = await call.details() if details != expected_details: raise ValueError('expected message %s, got %s' % - (expected_details, call.details())) + (expected_details, await call.details())) async def _validate_status_code_and_details(call: aio.Call, @@ -245,7 +245,6 @@ async def _empty_stream(stub: test_pb2_grpc.TestServiceStub): async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub): details = 'test status message' - code = 2 status = grpc.StatusCode.UNKNOWN # code = 2 # Test with a UnaryCall @@ -253,7 +252,8 @@ async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub): response_type=messages_pb2.COMPRESSABLE, response_size=1, payload=messages_pb2.Payload(body=b'\x00'), - response_status=messages_pb2.EchoStatus(code=code, message=details)) + response_status=messages_pb2.EchoStatus(code=status.value[0], + message=details)) call = stub.UnaryCall(request) await _validate_status_code_and_details(call, status, details) @@ -263,7 +263,8 @@ async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub): response_type=messages_pb2.COMPRESSABLE, response_parameters=(messages_pb2.ResponseParameters(size=1),), payload=messages_pb2.Payload(body=b'\x00'), - response_status=messages_pb2.EchoStatus(code=code, message=details)) + response_status=messages_pb2.EchoStatus(code=status.value[0], + message=details)) await call.write(request) # sends the initial request. await call.done_writing() await _validate_status_code_and_details(call, status, details) diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_server.py b/src/python/grpcio_tests/tests_aio/unit/_test_server.py index 0a2e6a5f65c..32f988a6b90 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_server.py @@ -87,8 +87,10 @@ class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): return messages_pb2.StreamingInputCallResponse( aggregated_payload_size=aggregate_size) - async def FullDuplexCall(self, request_async_iterator, unused_context): + async def FullDuplexCall(self, request_async_iterator, context): + await _maybe_echo_metadata(context) async for request in request_async_iterator: + await _maybe_echo_status(request, context) for response_parameters in request.response_parameters: if response_parameters.interval_us != 0: await asyncio.sleep( diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 967edff204c..ae2dbc58bc3 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -82,7 +82,7 @@ class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): call = stub.UnaryCall(messages_pb2.SimpleRequest()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(aio.AioRpcError) as exception_context: await call self.assertEqual(grpc.StatusCode.UNAVAILABLE, @@ -91,14 +91,6 @@ class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): self.assertTrue(call.done()) self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code()) - # Exception is cached at call object level, reentrance - # returns again the same exception - with self.assertRaises(grpc.RpcError) as exception_context_retry: - await call - - self.assertIs(exception_context.exception, - exception_context_retry.exception) - async def test_call_code_awaitable(self): call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) self.assertEqual(await call.code(), grpc.StatusCode.OK)