From 025ebf7d0721bd06adf0e8dbe8bce01a647a0ad9 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 24 Apr 2020 10:31:40 -0700 Subject: [PATCH] Revert "Revert "[Aio] Add AsyncIO support to grpcio-status"" --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 3 + .../grpcio_status/grpc_status/BUILD.bazel | 2 +- .../grpcio_status/grpc_status/_async.py | 56 ++++++ .../grpcio_status/grpc_status/_common.py | 27 +++ .../grpcio_status/grpc_status/rpc_status.py | 32 ++-- .../grpcio_tests/tests_aio/status/BUILD.bazel | 30 +++ .../grpcio_tests/tests_aio/status/__init__.py | 13 ++ .../tests_aio/status/grpc_status_test.py | 175 ++++++++++++++++++ src/python/grpcio_tests/tests_aio/tests.json | 1 + 9 files changed, 322 insertions(+), 17 deletions(-) create mode 100644 src/python/grpcio_status/grpc_status/_async.py create mode 100644 src/python/grpcio_status/grpc_status/_common.py create mode 100644 src/python/grpcio_tests/tests_aio/status/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests_aio/status/__init__.py create mode 100644 src/python/grpcio_tests/tests_aio/status/grpc_status_test.py diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 0e407172806..a0d51a14325 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -189,6 +189,9 @@ cdef class _ServicerContext: raise self._rpc_state.abort_exception + async def abort_with_status(self, object status): + await self.abort(status.code, status.details, status.trailing_metadata) + def set_trailing_metadata(self, tuple metadata): self._rpc_state.trailing_metadata = metadata diff --git a/src/python/grpcio_status/grpc_status/BUILD.bazel b/src/python/grpcio_status/grpc_status/BUILD.bazel index 122a94f411a..a6abdd3ef56 100644 --- a/src/python/grpcio_status/grpc_status/BUILD.bazel +++ b/src/python/grpcio_status/grpc_status/BUILD.bazel @@ -4,7 +4,7 @@ package(default_visibility = ["//visibility:public"]) py_library( name = "grpc_status", - srcs = ["rpc_status.py"], + srcs = glob(["*.py"]), imports = ["../"], deps = [ "//src/python/grpcio/grpc:grpcio", diff --git a/src/python/grpcio_status/grpc_status/_async.py b/src/python/grpcio_status/grpc_status/_async.py new file mode 100644 index 00000000000..a6a6f7ef6ad --- /dev/null +++ b/src/python/grpcio_status/grpc_status/_async.py @@ -0,0 +1,56 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Reference implementation for status mapping in gRPC Python.""" + +from grpc.experimental import aio + +from google.rpc import status_pb2 + +from ._common import code_to_grpc_status_code, GRPC_DETAILS_METADATA_KEY + + +async def from_call(call: aio.Call): + """Returns a google.rpc.status.Status message from a given grpc.aio.Call. + + This is an EXPERIMENTAL API. + + Args: + call: An grpc.aio.Call instance. + + Returns: + A google.rpc.status.Status message representing the status of the RPC. + """ + code = await call.code() + details = await call.details() + trailing_metadata = await call.trailing_metadata() + if trailing_metadata is None: + return None + for key, value in trailing_metadata: + if key == GRPC_DETAILS_METADATA_KEY: + rich_status = status_pb2.Status.FromString(value) + if code.value[0] != rich_status.code: + raise ValueError( + 'Code in Status proto (%s) doesn\'t match status code (%s)' + % (code_to_grpc_status_code(rich_status.code), code)) + if details != rich_status.message: + raise ValueError( + 'Message in Status proto (%s) doesn\'t match status details (%s)' + % (rich_status.message, details)) + return rich_status + return None + + +__all__ = [ + 'from_call', +] diff --git a/src/python/grpcio_status/grpc_status/_common.py b/src/python/grpcio_status/grpc_status/_common.py new file mode 100644 index 00000000000..4bec0ba1372 --- /dev/null +++ b/src/python/grpcio_status/grpc_status/_common.py @@ -0,0 +1,27 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Reference implementation for status mapping in gRPC Python.""" + +import grpc + +_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode} + +GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin' + + +def code_to_grpc_status_code(code): + try: + return _CODE_TO_GRPC_CODE_MAPPING[code] + except KeyError: + raise ValueError('Invalid status code %s' % code) diff --git a/src/python/grpcio_status/grpc_status/rpc_status.py b/src/python/grpcio_status/grpc_status/rpc_status.py index ec78c477694..d0ec08e3a5d 100644 --- a/src/python/grpcio_status/grpc_status/rpc_status.py +++ b/src/python/grpcio_status/grpc_status/rpc_status.py @@ -14,14 +14,12 @@ """Reference implementation for status mapping in gRPC Python.""" import collections +import sys import grpc from google.rpc import status_pb2 - -_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode} - -_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin' +from ._common import code_to_grpc_status_code, GRPC_DETAILS_METADATA_KEY class _Status( @@ -31,13 +29,6 @@ class _Status( pass -def _code_to_grpc_status_code(code): - try: - return _CODE_TO_GRPC_CODE_MAPPING[code] - except KeyError: - raise ValueError('Invalid status code %s' % code) - - def from_call(call): """Returns a google.rpc.status.Status message corresponding to a given grpc.Call. @@ -56,13 +47,12 @@ def from_call(call): if call.trailing_metadata() is None: return None for key, value in call.trailing_metadata(): - if key == _GRPC_DETAILS_METADATA_KEY: + if key == GRPC_DETAILS_METADATA_KEY: rich_status = status_pb2.Status.FromString(value) if call.code().value[0] != rich_status.code: raise ValueError( 'Code in Status proto (%s) doesn\'t match status code (%s)' - % - (_code_to_grpc_status_code(rich_status.code), call.code())) + % (code_to_grpc_status_code(rich_status.code), call.code())) if call.details() != rich_status.message: raise ValueError( 'Message in Status proto (%s) doesn\'t match status details (%s)' @@ -83,7 +73,17 @@ def to_status(status): Returns: A grpc.Status instance representing the input google.rpc.status.Status message. """ - return _Status(code=_code_to_grpc_status_code(status.code), + return _Status(code=code_to_grpc_status_code(status.code), details=status.message, - trailing_metadata=((_GRPC_DETAILS_METADATA_KEY, + trailing_metadata=((GRPC_DETAILS_METADATA_KEY, status.SerializeToString()),)) + + +__all__ = [ + 'from_call', + 'to_status', +] + +if sys.version_info[0] >= 3 and sys.version_info[1] >= 6: + from . import _async as aio # pylint: disable=unused-import + __all__.append('aio') diff --git a/src/python/grpcio_tests/tests_aio/status/BUILD.bazel b/src/python/grpcio_tests/tests_aio/status/BUILD.bazel new file mode 100644 index 00000000000..2fd82f2684c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/status/BUILD.bazel @@ -0,0 +1,30 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("@grpc_python_dependencies//:requirements.bzl", "requirement") + +py_test( + name = "grpc_status_test", + size = "small", + srcs = ["grpc_status_test.py"], + imports = ["../../"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_status/grpc_status", + "//src/python/grpcio_tests/tests_aio/unit:_test_base", + requirement("protobuf"), + requirement("googleapis-common-protos"), + ], +) diff --git a/src/python/grpcio_tests/tests_aio/status/__init__.py b/src/python/grpcio_tests/tests_aio/status/__init__.py new file mode 100644 index 00000000000..1517f71d093 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/status/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests_aio/status/grpc_status_test.py b/src/python/grpcio_tests/tests_aio/status/grpc_status_test.py new file mode 100644 index 00000000000..980cf5a67e7 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/status/grpc_status_test.py @@ -0,0 +1,175 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests of grpc_status with gRPC AsyncIO stack.""" + +import logging +import traceback +import unittest + +import grpc +from google.protobuf import any_pb2 +from google.rpc import code_pb2, error_details_pb2, status_pb2 +from grpc.experimental import aio + +from grpc_status import rpc_status +from tests_aio.unit._test_base import AioTestBase + +_STATUS_OK = '/test/StatusOK' +_STATUS_NOT_OK = '/test/StatusNotOk' +_ERROR_DETAILS = '/test/ErrorDetails' +_INCONSISTENT = '/test/Inconsistent' +_INVALID_CODE = '/test/InvalidCode' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' + +_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin' + +_STATUS_DETAILS = 'This is an error detail' +_STATUS_DETAILS_ANOTHER = 'This is another error detail' + + +async def _ok_unary_unary(request, servicer_context): + return _RESPONSE + + +async def _not_ok_unary_unary(request, servicer_context): + await servicer_context.abort(grpc.StatusCode.INTERNAL, _STATUS_DETAILS) + + +async def _error_details_unary_unary(request, servicer_context): + details = any_pb2.Any() + details.Pack( + error_details_pb2.DebugInfo(stack_entries=traceback.format_stack(), + detail='Intentionally invoked')) + rich_status = status_pb2.Status( + code=code_pb2.INTERNAL, + message=_STATUS_DETAILS, + details=[details], + ) + await servicer_context.abort_with_status(rpc_status.to_status(rich_status)) + + +async def _inconsistent_unary_unary(request, servicer_context): + rich_status = status_pb2.Status( + code=code_pb2.INTERNAL, + message=_STATUS_DETAILS, + ) + servicer_context.set_code(grpc.StatusCode.NOT_FOUND) + servicer_context.set_details(_STATUS_DETAILS_ANOTHER) + # User put inconsistent status information in trailing metadata + servicer_context.set_trailing_metadata( + ((_GRPC_DETAILS_METADATA_KEY, rich_status.SerializeToString()),)) + + +async def _invalid_code_unary_unary(request, servicer_context): + rich_status = status_pb2.Status( + code=42, + message='Invalid code', + ) + await servicer_context.abort_with_status(rpc_status.to_status(rich_status)) + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _STATUS_OK: + return grpc.unary_unary_rpc_method_handler(_ok_unary_unary) + elif handler_call_details.method == _STATUS_NOT_OK: + return grpc.unary_unary_rpc_method_handler(_not_ok_unary_unary) + elif handler_call_details.method == _ERROR_DETAILS: + return grpc.unary_unary_rpc_method_handler( + _error_details_unary_unary) + elif handler_call_details.method == _INCONSISTENT: + return grpc.unary_unary_rpc_method_handler( + _inconsistent_unary_unary) + elif handler_call_details.method == _INVALID_CODE: + return grpc.unary_unary_rpc_method_handler( + _invalid_code_unary_unary) + else: + return None + + +class StatusTest(AioTestBase): + + async def setUp(self): + self._server = aio.server() + self._server.add_generic_rpc_handlers((_GenericHandler(),)) + port = self._server.add_insecure_port('[::]:0') + await self._server.start() + + self._channel = aio.insecure_channel('localhost:%d' % port) + + async def tearDown(self): + await self._server.stop(None) + await self._channel.close() + + async def test_status_ok(self): + call = self._channel.unary_unary(_STATUS_OK)(_REQUEST) + + # Succeed RPC doesn't have status + status = await rpc_status.aio.from_call(call) + self.assertIs(status, None) + + async def test_status_not_ok(self): + call = self._channel.unary_unary(_STATUS_NOT_OK)(_REQUEST) + with self.assertRaises(aio.AioRpcError) as exception_context: + await call + rpc_error = exception_context.exception + + self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) + # Failed RPC doesn't automatically generate status + status = await rpc_status.aio.from_call(call) + self.assertIs(status, None) + + async def test_error_details(self): + call = self._channel.unary_unary(_ERROR_DETAILS)(_REQUEST) + with self.assertRaises(aio.AioRpcError) as exception_context: + await call + rpc_error = exception_context.exception + + status = await rpc_status.aio.from_call(call) + self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) + self.assertEqual(status.code, code_pb2.Code.Value('INTERNAL')) + + # Check if the underlying proto message is intact + self.assertTrue(status.details[0].Is( + error_details_pb2.DebugInfo.DESCRIPTOR)) + info = error_details_pb2.DebugInfo() + status.details[0].Unpack(info) + self.assertIn('_error_details_unary_unary', info.stack_entries[-1]) + + async def test_code_message_validation(self): + call = self._channel.unary_unary(_INCONSISTENT)(_REQUEST) + with self.assertRaises(aio.AioRpcError) as exception_context: + await call + rpc_error = exception_context.exception + self.assertEqual(rpc_error.code(), grpc.StatusCode.NOT_FOUND) + + # Code/Message validation failed + with self.assertRaises(ValueError): + await rpc_status.aio.from_call(call) + + async def test_invalid_code(self): + with self.assertRaises(aio.AioRpcError) as exception_context: + await self._channel.unary_unary(_INVALID_CODE)(_REQUEST) + rpc_error = exception_context.exception + self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN) + # Invalid status code exception raised during coversion + self.assertIn('Invalid status code', rpc_error.details()) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index e657cfa7e4d..a1e6f88d38f 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -4,6 +4,7 @@ "interop.local_interop_test.InsecureLocalInteropTest", "interop.local_interop_test.SecureLocalInteropTest", "reflection.reflection_servicer_test.ReflectionServicerTest", + "status.grpc_status_test.StatusTest", "unit._metadata_test.TestTypeMetadata", "unit.abort_test.TestAbort", "unit.aio_rpc_error_test.TestAioRpcError",