From 9bba7b7af53c17ed22283b871c2708ca6b201af3 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 8 Jan 2020 10:20:26 -0800 Subject: [PATCH] Improve readability --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 46594924dea..ed1a61e46f2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -141,15 +141,25 @@ cdef class _ServicerContext: object code, str details='', tuple trailing_metadata=_EMPTY_METADATA): - await _perform_abort( - self._rpc_state, - code.value[0], - details, - trailing_metadata, - self._loop - ) + if self._rpc_state.abort_exception is not None: + raise RuntimeError('Abort already called!') + else: + # Keeps track of the exception object. After abort happen, the RPC + # should stop execution. However, if users decided to suppress it, it + # could lead to undefined behavior. + self._rpc_state.abort_exception = AbortError('Locally aborted.') + + self._rpc_state.status_sent = True + await _send_error_status_from_server( + self._rpc_state, + code.value[0], + details, + trailing_metadata, + self._rpc_state.metadata_sent, + self._loop + ) - raise self._rpc_state.abort_exception + raise self._rpc_state.abort_exception cdef _find_method_handler(str method, list generic_handlers): @@ -207,8 +217,8 @@ async def _handle_unary_unary_rpc(object method_handler, SendInitialMetadataOperation(None, _EMPTY_FLAGS), SendMessageOperation(response_raw, _EMPTY_FLAGS), ) - await execute_batch(rpc_state, send_ops, loop) rpc_state.status_sent = True + await execute_batch(rpc_state, send_ops, loop) async def _handle_unary_stream_rpc(object method_handler, @@ -270,8 +280,8 @@ async def _handle_unary_stream_rpc(object method_handler, ) cdef tuple ops = (op,) - await execute_batch(rpc_state, ops, loop) rpc_state.status_sent = True + await execute_batch(rpc_state, ops, loop) async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): @@ -292,11 +302,12 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): except Exception as e: _LOGGER.exception(e) if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED: - await _perform_abort( + await _send_error_status_from_server( rpc_state, StatusCode.unknown, '%s: %s' % (type(e), e), _EMPTY_METADATA, + rpc_state.metadata_sent, loop ) @@ -333,11 +344,13 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): generic_handlers, ) if method_handler is None: - await _perform_abort( + rpc_state.status_sent = True + await _send_error_status_from_server( rpc_state, StatusCode.unimplemented, b'Method not found!', _EMPTY_METADATA, + rpc_state.metadata_sent, loop ) return