New abort with grpc.Status API

* Add `abort_with_status` method in ServicerContext
* Add `Status` interface similar to the design of Details in interceptor
* Add 3 unit test cases for abort mechanism
pull/17481/head
Lidi Zheng 6 years ago
parent 352fbc19e7
commit fd74fcf2a0
  1. 36
      src/python/grpcio/grpc/__init__.py
  2. 4
      src/python/grpcio/grpc/_server.py
  3. 3
      src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
  4. 1
      src/python/grpcio_tests/tests/tests.json
  5. 1
      src/python/grpcio_tests/tests/unit/BUILD.bazel
  6. 124
      src/python/grpcio_tests/tests/unit/_abort_test.py
  7. 1
      src/python/grpcio_tests/tests/unit/_api_test.py

@ -266,6 +266,23 @@ class StatusCode(enum.Enum):
UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated')
############################# gRPC Status ################################
class Status(six.with_metaclass(abc.ABCMeta)):
"""Describes the status of an RPC.
This is an EXPERIMENTAL API.
Attributes:
code: A StatusCode object to be sent to the client.
It must not be StatusCode.OK.
details: An ASCII-encodable string to be sent to the client upon
termination of the RPC.
trailing_metadata: The trailing :term:`metadata` in the RPC.
"""
############################# gRPC Exceptions ################################
@ -1118,6 +1135,24 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
"""
raise NotImplementedError()
@abc.abstractmethod
def abort_with_status(self, status):
"""Raises an exception to terminate the RPC with a non-OK status.
The status passed as argument will supercede any existing status code,
status message and trailing metadata.
This is an EXPERIMENTAL API.
Args:
status: A grpc.Status object.
Raises:
Exception: An exception is always raised to signal the abortion the
RPC to the gRPC runtime.
"""
raise NotImplementedError()
@abc.abstractmethod
def set_code(self, code):
"""Sets the value to be used as status code upon RPC completion.
@ -1747,6 +1782,7 @@ __all__ = (
'Future',
'ChannelConnectivity',
'StatusCode',
'Status',
'RpcError',
'RpcContext',
'Call',

@ -291,6 +291,10 @@ class _Context(grpc.ServicerContext):
self._state.abortion = Exception()
raise self._state.abortion
def abort_with_status(self, status):
self._state.trailing_metadata = status.trailing_metadata
self.abort(status.code, status.details)
def set_code(self, code):
with self._state.condition:
self._state.code = code

@ -70,6 +70,9 @@ class ServicerContext(grpc.ServicerContext):
def abort(self, code, details):
raise NotImplementedError()
def abort_with_status(self, status):
raise NotImplementedError()
def set_code(self, code):
self._rpc.set_code(code)

@ -19,6 +19,7 @@
"testing._server_test.FirstServiceServicerTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._abort_test.AbortTest",
"unit._api_test.AllTest",
"unit._api_test.ChannelConnectivityTest",
"unit._api_test.ChannelTest",

@ -3,6 +3,7 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement")
package(default_visibility = ["//visibility:public"])
GRPCIO_TESTS_UNIT = [
"_abort_test.py",
"_api_test.py",
"_auth_context_test.py",
"_auth_test.py",

@ -0,0 +1,124 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests server context abort mechanism"""
import unittest
import collections
import logging
import grpc
from tests.unit import test_common
from tests.unit.framework.common import test_constants
_ABORT = '/test/abort'
_ABORT_WITH_STATUS = '/test/AbortWithStatus'
_INVALID_CODE = '/test/InvalidCode'
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x00'
_ABORT_DETAILS = 'Abandon ship!'
_ABORT_METADATA = (('a-trailing-metadata', '42'),)
class _Status(
collections.namedtuple(
'_Status', ('code', 'details', 'trailing_metadata')), grpc.Status):
pass
def abort_unary_unary(request, servicer_context):
servicer_context.abort(
grpc.StatusCode.INTERNAL,
_ABORT_DETAILS,
)
raise Exception('This line should not be executed!')
def abort_with_status_unary_unary(request, servicer_context):
servicer_context.abort_with_status(
_Status(
code=grpc.StatusCode.INTERNAL,
details=_ABORT_DETAILS,
trailing_metadata=_ABORT_METADATA,
))
raise Exception('This line should not be executed!')
def invalid_code_unary_unary(request, servicer_context):
servicer_context.abort(
42,
_ABORT_DETAILS,
)
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _ABORT:
return grpc.unary_unary_rpc_method_handler(abort_unary_unary)
elif handler_call_details.method == _ABORT_WITH_STATUS:
return grpc.unary_unary_rpc_method_handler(
abort_with_status_unary_unary)
elif handler_call_details.method == _INVALID_CODE:
return grpc.stream_stream_rpc_method_handler(
invalid_code_unary_unary)
else:
return None
class AbortTest(unittest.TestCase):
def setUp(self):
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(),))
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
def tearDown(self):
self._channel.close()
self._server.stop(0)
def test_abort(self):
with self.assertRaises(grpc.RpcError) as exception_context:
self._channel.unary_unary(_ABORT)(_REQUEST)
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
def test_abort_with_status(self):
with self.assertRaises(grpc.RpcError) as exception_context:
self._channel.unary_unary(_ABORT_WITH_STATUS)(_REQUEST)
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA)
def test_invalid_code(self):
with self.assertRaises(grpc.RpcError) as exception_context:
self._channel.unary_unary(_INVALID_CODE)(_REQUEST)
rpc_error = exception_context.exception
self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN)
self.assertEqual(rpc_error.details(), _ABORT_DETAILS)
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)

@ -32,6 +32,7 @@ class AllTest(unittest.TestCase):
'Future',
'ChannelConnectivity',
'StatusCode',
'Status',
'RpcError',
'RpcContext',
'Call',

Loading…
Cancel
Save