diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index defff7fe881..7447f4be217 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -19,7 +19,7 @@ _EMPTY_FLAGS = 0 _EMPTY_MASK = 0 _EMPTY_METADATA = None -_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled due to unknown reason.' +_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.' cdef class _AioCall: @@ -80,12 +80,13 @@ cdef class _AioCall: self._grpc_call_wrapper.call = NULL cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future): - """Cancels the RPC in C-Core, and return the final RPC status.""" + """Cancels the RPC in Core, and return the final RPC status.""" cdef AioRpcStatus status cdef object details cdef char *c_details + cdef grpc_call_error error # Try to fetch application layer cancellation details in the future. - # * If calcellation details present, cancel with status; + # * If cancellation details present, cancel with status; # * If details not present, cancel with unknown reason. if cancellation_future.done(): status = cancellation_future.result() @@ -93,16 +94,18 @@ cdef class _AioCall: self._references.append(details) c_details = details # By implementation, grpc_call_cancel_with_status always return OK - grpc_call_cancel_with_status( + error = grpc_call_cancel_with_status( self._grpc_call_wrapper.call, status.c_code(), c_details, NULL, ) + assert error == GRPC_CALL_OK return status else: # By implementation, grpc_call_cancel always return OK - grpc_call_cancel(self._grpc_call_wrapper.call, NULL) + error = grpc_call_cancel(self._grpc_call_wrapper.call, NULL) + assert error == GRPC_CALL_OK status = AioRpcStatus( StatusCode.cancelled, _UNKNOWN_CANCELLATION_DETAILS, @@ -148,7 +151,7 @@ cdef class _AioCall: try: self._create_grpc_call(deadline, method) try: - await callback_start_batch(self._grpc_call_wrapper, + await async_start_batch(self._grpc_call_wrapper, ops, self._loop) except asyncio.CancelledError: @@ -156,7 +159,8 @@ cdef class _AioCall: status_observer(status) raise finally: - # If the RPC failed, this method will return None instead of crash. + # If the RPC failed, receive_initial_metadata_op.initial_metadata + # will return None instead of crash. initial_metadata_observer( receive_initial_metadata_op.initial_metadata() ) @@ -181,7 +185,7 @@ cdef class _AioCall: """Handles the status sent by peer once received.""" cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) cdef tuple ops = (op,) - await callback_start_batch(self._grpc_call_wrapper, ops, self._loop) + await async_start_batch(self._grpc_call_wrapper, ops, self._loop) cdef AioRpcStatus status = AioRpcStatus( op.code(), op.details(), @@ -218,7 +222,7 @@ cdef class _AioCall: self._loop ) if received_message is None: - # The read operation failed, C-Core should explain why it fails + # The read operation failed, Core should explain why it fails await self._status_received.wait() return else: @@ -253,12 +257,12 @@ cdef class _AioCall: send_close_op, ) - # Creates the grpc_call C-Core object, it needs to be deleted explicitly + # Creates the grpc_call Core object, it needs to be deleted explicitly # through _destroy_grpc_call call in other methods. self._create_grpc_call(deadline, method) # Actually sends out the request message. - await callback_start_batch(self._grpc_call_wrapper, + await async_start_batch(self._grpc_call_wrapper, outbound_ops, self._loop) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi index 8bed055fdb1..6c653fa42e6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -28,10 +28,10 @@ cdef struct CallbackContext: # # Attributes: # functor: A grpc_experimental_completion_queue_functor represents the - # callback function in the only way C-Core understands. + # callback function in the only way Core understands. # waiter: An asyncio.Future object that fulfills when the callback is - # invoked by C-Core. - # failure_handler: A CallbackFailureHandler object that called when C-Core + # invoked by Core. + # failure_handler: A CallbackFailureHandler object that called when Core # returns 'success == 0' state. grpc_experimental_completion_queue_functor functor cpython.PyObject *waiter diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 796a6ab9bf5..4d02d399dc2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -88,7 +88,7 @@ cdef class CallbackCompletionQueue: class CallbackStartBatchError(Exception): pass -async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper, +async def async_start_batch(GrpcCallWrapper grpc_call_wrapper, tuple operations, object loop): """The callback version of start batch operations.""" @@ -98,7 +98,7 @@ async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper, cdef object future = loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, - CallbackFailureHandler('callback_start_batch', operations, CallbackStartBatchError)) + CallbackFailureHandler('async_start_batch', operations, CallbackStartBatchError)) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -120,23 +120,23 @@ async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper, async def _receive_message(GrpcCallWrapper grpc_call_wrapper, object loop): - """Retrives parsed messages from C-Core. - - The messages maybe already in C-Core's buffer, so there isn't a 1-to-1 + """Retrives parsed messages from Core. + + The messages maybe already in Core's buffer, so there isn't a 1-to-1 mapping between this and the underlying "socket.read()". Also, eventually, this function will end with an EOF, which reads empty message. """ cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG) cdef tuple ops = (receive_op,) try: - await callback_start_batch(grpc_call_wrapper, ops, loop) + await async_start_batch(grpc_call_wrapper, ops, loop) except CallbackStartBatchError as e: # NOTE(lidiz) The receive message operation has two ways to indicate # finish state : 1) returns empty message due to EOF; 2) fails inside # the callback (e.g. cancelled). # # Since they all indicates finish, they are better be merged. - _LOGGER.exception(e) + _LOGGER.debug(e) return receive_op.message() @@ -154,7 +154,7 @@ async def _send_message(GrpcCallWrapper grpc_call_wrapper, SendInitialMetadataOperation(None, _EMPTY_FLAG), op, ) - await callback_start_batch(grpc_call_wrapper, ops, loop) + await async_start_batch(grpc_call_wrapper, ops, loop) async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, @@ -164,12 +164,12 @@ async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, metadata, _EMPTY_FLAG) cdef tuple ops = (op,) - await callback_start_batch(grpc_call_wrapper, ops, loop) + await async_start_batch(grpc_call_wrapper, ops, loop) async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper, object loop): cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) cdef tuple ops = (op,) - await callback_start_batch(grpc_call_wrapper, ops, loop) + await async_start_batch(grpc_call_wrapper, ops, loop) return op.initial_metadata() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 367148afe8a..5e13daa1043 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -134,9 +134,9 @@ cdef class _AsyncioSocket: cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb): """Performs write to network socket in AsyncIO. - For each socket, C-Core guarantees there'll be only one ongoing write. + For each socket, Core guarantees there'll be only one ongoing write. When the write is finished, we need to call grpc_write_cb to notify - C-Core that the work is done. + Core that the work is done. """ cdef char* start cdef bytearray outbound_buffer = bytearray() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 30e8447c0e5..58515038484 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -77,7 +77,7 @@ cdef class _ServicerContext: async def send_initial_metadata(self, tuple metadata): if self._metadata_sent: - raise ValueError('Send initial metadata failed: already sent') + raise RuntimeError('Send initial metadata failed: already sent') else: _send_initial_metadata(self._rpc_state, self._loop) self._metadata_sent = True @@ -135,7 +135,7 @@ async def _handle_unary_unary_rpc(object method_handler, SendInitialMetadataOperation(None, _EMPTY_FLAGS), SendMessageOperation(response_raw, _EMPTY_FLAGS), ) - await callback_start_batch(rpc_state, send_ops, loop) + await async_start_batch(rpc_state, send_ops, loop) async def _handle_unary_stream_rpc(object method_handler, @@ -185,7 +185,7 @@ async def _handle_unary_stream_rpc(object method_handler, ) cdef tuple ops = (op,) - await callback_start_batch(rpc_state, ops, loop) + await async_start_batch(rpc_state, ops, loop) async def _handle_cancellation_from_core(object rpc_task, @@ -193,7 +193,7 @@ async def _handle_cancellation_from_core(object rpc_task, object loop): cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) cdef tuple ops = (op,) - await callback_start_batch(rpc_state, ops, loop) + await async_start_batch(rpc_state, ops, loop) if op.cancelled() and not rpc_task.done(): rpc_task.cancel() @@ -318,7 +318,7 @@ cdef class AioServer: if self._status != AIO_SERVER_STATUS_RUNNING: break - # Accepts new request from C-Core + # Accepts new request from Core rpc_state = await _server_call_request_call( self._server, self._cq, @@ -333,7 +333,7 @@ cdef class AioServer: ) ) - # Fires off a task that listening on the cancellation from client. + # Fires off a task that listens on the cancellation from client. self._loop.create_task( _handle_cancellation_from_core( rpc_task, @@ -387,7 +387,7 @@ cdef class AioServer: pass async def shutdown(self, grace): - """Gracefully shutdown the C-Core server. + """Gracefully shutdown the Core server. Application should only call shutdown once. diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 7db9630d6b4..be7a48157d0 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -202,7 +202,7 @@ class Call(_base_call.Call): """Private method to set final status of the RPC. This method may be called multiple time due to data race between local - cancellation (by application) and C-Core receiving status from peer. We + cancellation (by application) and Core receiving status from peer. We make no promise here which one will win. """ if self._status.done(): @@ -388,12 +388,12 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): """Forwards the application cancellation reasoning. Async generator will receive an exception. The cancellation will go - deep down into C-Core, and then propagates backup as the + deep down into Core, and then propagates backup as the `cygrpc.AioRpcStatus` exception. So, under race condition, e.g. the server sent out final state headers and the client calling "cancel" at the same time, this method respects - the winner in C-Core. + the winner in Core. """ if not self._status.done() and not self._cancellation.done(): self._cancellation.set_result(status)