Merge pull request #22688 from lidizheng/aio-status

[Aio] Add AsyncIO support to grpcio-status
pull/22740/head
Lidi Zheng 5 years ago committed by GitHub
commit 4e7f3376d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  2. 2
      src/python/grpcio_status/grpc_status/BUILD.bazel
  3. 56
      src/python/grpcio_status/grpc_status/_async.py
  4. 27
      src/python/grpcio_status/grpc_status/_common.py
  5. 32
      src/python/grpcio_status/grpc_status/rpc_status.py
  6. 30
      src/python/grpcio_tests/tests_aio/status/BUILD.bazel
  7. 13
      src/python/grpcio_tests/tests_aio/status/__init__.py
  8. 175
      src/python/grpcio_tests/tests_aio/status/grpc_status_test.py
  9. 1
      src/python/grpcio_tests/tests_aio/tests.json

@ -189,6 +189,9 @@ cdef class _ServicerContext:
raise self._rpc_state.abort_exception raise self._rpc_state.abort_exception
async def abort_with_status(self, object status):
await self.abort(status.code, status.details, status.trailing_metadata)
def set_trailing_metadata(self, tuple metadata): def set_trailing_metadata(self, tuple metadata):
self._rpc_state.trailing_metadata = metadata self._rpc_state.trailing_metadata = metadata

@ -4,7 +4,7 @@ package(default_visibility = ["//visibility:public"])
py_library( py_library(
name = "grpc_status", name = "grpc_status",
srcs = ["rpc_status.py"], srcs = glob(["*.py"]),
imports = ["../"], imports = ["../"],
deps = [ deps = [
"//src/python/grpcio/grpc:grpcio", "//src/python/grpcio/grpc:grpcio",

@ -0,0 +1,56 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reference implementation for status mapping in gRPC Python."""
from grpc.experimental import aio
from google.rpc import status_pb2
from ._common import code_to_grpc_status_code, GRPC_DETAILS_METADATA_KEY
async def from_call(call: aio.Call):
"""Returns a google.rpc.status.Status message from a given grpc.aio.Call.
This is an EXPERIMENTAL API.
Args:
call: An grpc.aio.Call instance.
Returns:
A google.rpc.status.Status message representing the status of the RPC.
"""
code = await call.code()
details = await call.details()
trailing_metadata = await call.trailing_metadata()
if trailing_metadata is None:
return None
for key, value in trailing_metadata:
if key == GRPC_DETAILS_METADATA_KEY:
rich_status = status_pb2.Status.FromString(value)
if code.value[0] != rich_status.code:
raise ValueError(
'Code in Status proto (%s) doesn\'t match status code (%s)'
% (code_to_grpc_status_code(rich_status.code), code))
if details != rich_status.message:
raise ValueError(
'Message in Status proto (%s) doesn\'t match status details (%s)'
% (rich_status.message, details))
return rich_status
return None
__all__ = [
'from_call',
]

@ -0,0 +1,27 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reference implementation for status mapping in gRPC Python."""
import grpc
_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode}
GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin'
def code_to_grpc_status_code(code):
try:
return _CODE_TO_GRPC_CODE_MAPPING[code]
except KeyError:
raise ValueError('Invalid status code %s' % code)

@ -14,14 +14,12 @@
"""Reference implementation for status mapping in gRPC Python.""" """Reference implementation for status mapping in gRPC Python."""
import collections import collections
import sys
import grpc import grpc
from google.rpc import status_pb2 from google.rpc import status_pb2
from ._common import code_to_grpc_status_code, GRPC_DETAILS_METADATA_KEY
_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode}
_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin'
class _Status( class _Status(
@ -31,13 +29,6 @@ class _Status(
pass pass
def _code_to_grpc_status_code(code):
try:
return _CODE_TO_GRPC_CODE_MAPPING[code]
except KeyError:
raise ValueError('Invalid status code %s' % code)
def from_call(call): def from_call(call):
"""Returns a google.rpc.status.Status message corresponding to a given grpc.Call. """Returns a google.rpc.status.Status message corresponding to a given grpc.Call.
@ -56,13 +47,12 @@ def from_call(call):
if call.trailing_metadata() is None: if call.trailing_metadata() is None:
return None return None
for key, value in call.trailing_metadata(): for key, value in call.trailing_metadata():
if key == _GRPC_DETAILS_METADATA_KEY: if key == GRPC_DETAILS_METADATA_KEY:
rich_status = status_pb2.Status.FromString(value) rich_status = status_pb2.Status.FromString(value)
if call.code().value[0] != rich_status.code: if call.code().value[0] != rich_status.code:
raise ValueError( raise ValueError(
'Code in Status proto (%s) doesn\'t match status code (%s)' 'Code in Status proto (%s) doesn\'t match status code (%s)'
% % (code_to_grpc_status_code(rich_status.code), call.code()))
(_code_to_grpc_status_code(rich_status.code), call.code()))
if call.details() != rich_status.message: if call.details() != rich_status.message:
raise ValueError( raise ValueError(
'Message in Status proto (%s) doesn\'t match status details (%s)' 'Message in Status proto (%s) doesn\'t match status details (%s)'
@ -83,7 +73,17 @@ def to_status(status):
Returns: Returns:
A grpc.Status instance representing the input google.rpc.status.Status message. A grpc.Status instance representing the input google.rpc.status.Status message.
""" """
return _Status(code=_code_to_grpc_status_code(status.code), return _Status(code=code_to_grpc_status_code(status.code),
details=status.message, details=status.message,
trailing_metadata=((_GRPC_DETAILS_METADATA_KEY, trailing_metadata=((GRPC_DETAILS_METADATA_KEY,
status.SerializeToString()),)) status.SerializeToString()),))
__all__ = [
'from_call',
'to_status',
]
if sys.version_info[0] >= 3 and sys.version_info[1] >= 6:
from . import _async as aio # pylint: disable=unused-import
__all__.append('aio')

@ -0,0 +1,30 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
py_test(
name = "grpc_status_test",
size = "small",
srcs = ["grpc_status_test.py"],
imports = ["../../"],
python_version = "PY3",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_status/grpc_status",
"//src/python/grpcio_tests/tests_aio/unit:_test_base",
requirement("protobuf"),
requirement("googleapis-common-protos"),
],
)

@ -0,0 +1,13 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,175 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of grpc_status with gRPC AsyncIO stack."""
import logging
import traceback
import unittest
import grpc
from google.protobuf import any_pb2
from google.rpc import code_pb2, error_details_pb2, status_pb2
from grpc.experimental import aio
from grpc_status import rpc_status
from tests_aio.unit._test_base import AioTestBase
_STATUS_OK = '/test/StatusOK'
_STATUS_NOT_OK = '/test/StatusNotOk'
_ERROR_DETAILS = '/test/ErrorDetails'
_INCONSISTENT = '/test/Inconsistent'
_INVALID_CODE = '/test/InvalidCode'
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x01\x01\x01'
_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin'
_STATUS_DETAILS = 'This is an error detail'
_STATUS_DETAILS_ANOTHER = 'This is another error detail'
async def _ok_unary_unary(request, servicer_context):
return _RESPONSE
async def _not_ok_unary_unary(request, servicer_context):
await servicer_context.abort(grpc.StatusCode.INTERNAL, _STATUS_DETAILS)
async def _error_details_unary_unary(request, servicer_context):
details = any_pb2.Any()
details.Pack(
error_details_pb2.DebugInfo(stack_entries=traceback.format_stack(),
detail='Intentionally invoked'))
rich_status = status_pb2.Status(
code=code_pb2.INTERNAL,
message=_STATUS_DETAILS,
details=[details],
)
await servicer_context.abort_with_status(rpc_status.to_status(rich_status))
async def _inconsistent_unary_unary(request, servicer_context):
rich_status = status_pb2.Status(
code=code_pb2.INTERNAL,
message=_STATUS_DETAILS,
)
servicer_context.set_code(grpc.StatusCode.NOT_FOUND)
servicer_context.set_details(_STATUS_DETAILS_ANOTHER)
# User put inconsistent status information in trailing metadata
servicer_context.set_trailing_metadata(
((_GRPC_DETAILS_METADATA_KEY, rich_status.SerializeToString()),))
async def _invalid_code_unary_unary(request, servicer_context):
rich_status = status_pb2.Status(
code=42,
message='Invalid code',
)
await servicer_context.abort_with_status(rpc_status.to_status(rich_status))
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _STATUS_OK:
return grpc.unary_unary_rpc_method_handler(_ok_unary_unary)
elif handler_call_details.method == _STATUS_NOT_OK:
return grpc.unary_unary_rpc_method_handler(_not_ok_unary_unary)
elif handler_call_details.method == _ERROR_DETAILS:
return grpc.unary_unary_rpc_method_handler(
_error_details_unary_unary)
elif handler_call_details.method == _INCONSISTENT:
return grpc.unary_unary_rpc_method_handler(
_inconsistent_unary_unary)
elif handler_call_details.method == _INVALID_CODE:
return grpc.unary_unary_rpc_method_handler(
_invalid_code_unary_unary)
else:
return None
class StatusTest(AioTestBase):
async def setUp(self):
self._server = aio.server()
self._server.add_generic_rpc_handlers((_GenericHandler(),))
port = self._server.add_insecure_port('[::]:0')
await self._server.start()
self._channel = aio.insecure_channel('localhost:%d' % port)
async def tearDown(self):
await self._server.stop(None)
await self._channel.close()
async def test_status_ok(self):
call = self._channel.unary_unary(_STATUS_OK)(_REQUEST)
# Succeed RPC doesn't have status
status = await rpc_status.aio.from_call(call)
self.assertIs(status, None)
async def test_status_not_ok(self):
call = self._channel.unary_unary(_STATUS_NOT_OK)(_REQUEST)
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
# Failed RPC doesn't automatically generate status
status = await rpc_status.aio.from_call(call)
self.assertIs(status, None)
async def test_error_details(self):
call = self._channel.unary_unary(_ERROR_DETAILS)(_REQUEST)
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
rpc_error = exception_context.exception
status = await rpc_status.aio.from_call(call)
self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(status.code, code_pb2.Code.Value('INTERNAL'))
# Check if the underlying proto message is intact
self.assertTrue(status.details[0].Is(
error_details_pb2.DebugInfo.DESCRIPTOR))
info = error_details_pb2.DebugInfo()
status.details[0].Unpack(info)
self.assertIn('_error_details_unary_unary', info.stack_entries[-1])
async def test_code_message_validation(self):
call = self._channel.unary_unary(_INCONSISTENT)(_REQUEST)
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.NOT_FOUND)
# Code/Message validation failed
with self.assertRaises(ValueError):
await rpc_status.aio.from_call(call)
async def test_invalid_code(self):
with self.assertRaises(aio.AioRpcError) as exception_context:
await self._channel.unary_unary(_INVALID_CODE)(_REQUEST)
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
# Invalid status code exception raised during coversion
self.assertIn('Invalid status code', rpc_error.details())
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)

@ -3,6 +3,7 @@
"health_check.health_servicer_test.HealthServicerTest", "health_check.health_servicer_test.HealthServicerTest",
"interop.local_interop_test.InsecureLocalInteropTest", "interop.local_interop_test.InsecureLocalInteropTest",
"interop.local_interop_test.SecureLocalInteropTest", "interop.local_interop_test.SecureLocalInteropTest",
"status.grpc_status_test.StatusTest",
"unit._metadata_test.TestTypeMetadata", "unit._metadata_test.TestTypeMetadata",
"unit.abort_test.TestAbort", "unit.abort_test.TestAbort",
"unit.aio_rpc_error_test.TestAioRpcError", "unit.aio_rpc_error_test.TestAioRpcError",

Loading…
Cancel
Save