Merge pull request #20426 from gnossen/fake_servicer_context

Make grpc_testing's Servicer context abort method stop execution of the servicer handler
pull/20431/head
Richard Belleville 5 years ago committed by GitHub
commit 536fc0aaff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
  2. 5
      src/python/grpcio_tests/tests/testing/_application_common.py
  3. 21
      src/python/grpcio_tests/tests/testing/_server_application.py
  4. 13
      src/python/grpcio_tests/tests/testing/_server_test.py

@ -76,6 +76,7 @@ class ServicerContext(grpc.ServicerContext):
def abort(self, code, details): def abort(self, code, details):
with self._rpc._condition: with self._rpc._condition:
self._rpc._abort(code, details) self._rpc._abort(code, details)
raise Exception()
def abort_with_status(self, status): def abort_with_status(self, status):
raise NotImplementedError() raise NotImplementedError()

@ -32,5 +32,10 @@ STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17)
STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19) STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19)
STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23) STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23)
TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2 TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2
ABORT_REQUEST = requests_pb2.Up(first_up_field=42)
ABORT_SUCCESS_QUERY = requests_pb2.Up(first_up_field=43)
ABORT_NO_STATUS_RESPONSE = services_pb2.Down(first_down_field=50)
ABORT_SUCCESS_RESPONSE = services_pb2.Down(first_down_field=51)
ABORT_FAILURE_RESPONSE = services_pb2.Down(first_down_field=52)
INFINITE_REQUEST_STREAM_TIMEOUT = 0.2 INFINITE_REQUEST_STREAM_TIMEOUT = 0.2

@ -15,6 +15,8 @@
import grpc import grpc
import threading
# requests_pb2 is a semantic dependency of this module. # requests_pb2 is a semantic dependency of this module.
from tests.testing import _application_common from tests.testing import _application_common
from tests.testing.proto import requests_pb2 # pylint: disable=unused-import from tests.testing.proto import requests_pb2 # pylint: disable=unused-import
@ -25,9 +27,26 @@ from tests.testing.proto import services_pb2_grpc
class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
"""Services RPCs.""" """Services RPCs."""
def __init__(self):
self._abort_lock = threading.RLock()
self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE
def UnUn(self, request, context): def UnUn(self, request, context):
if _application_common.UNARY_UNARY_REQUEST == request: if request == _application_common.UNARY_UNARY_REQUEST:
return _application_common.UNARY_UNARY_RESPONSE return _application_common.UNARY_UNARY_RESPONSE
elif request == _application_common.ABORT_REQUEST:
with self._abort_lock:
try:
context.abort(grpc.StatusCode.PERMISSION_DENIED,
"Denying permission to test abort.")
except Exception as e: # pylint: disable=broad-except
self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE
else:
self._abort_status = _application_common.ABORT_FAILURE_RESPONSE
return None # NOTE: For the linter.
elif request == _application_common.ABORT_SUCCESS_QUERY:
with self._abort_lock:
return self._abort_response
else: else:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!') context.set_details('Something is wrong with your request!')

@ -164,6 +164,19 @@ class FirstServiceServicerTest(unittest.TestCase):
self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
def test_servicer_context_abort(self):
rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (),
_application_common.ABORT_REQUEST, None)
_, _, code, _ = rpc.termination()
self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED)
rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (),
_application_common.ABORT_SUCCESS_QUERY, None)
response, _, code, _ = rpc.termination()
self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response)
self.assertIs(code, grpc.StatusCode.OK)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(verbosity=2) unittest.main(verbosity=2)

Loading…
Cancel
Save