Adopt advices from gnossen@:

* Fixes typos in comments
* Replaces terms in comments
* Adjusts logging level
* Adds more assertions
pull/21232/head
Lidi Zheng 5 years ago
parent 25f6723a7e
commit 6f9b77103a
  1. 26
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  2. 6
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  3. 20
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  4. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  5. 14
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  6. 6
      src/python/grpcio/grpc/experimental/aio/_call.py

@ -19,7 +19,7 @@ _EMPTY_FLAGS = 0
_EMPTY_MASK = 0
_EMPTY_METADATA = None
_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled due to unknown reason.'
_UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
cdef class _AioCall:
@ -80,12 +80,13 @@ cdef class _AioCall:
self._grpc_call_wrapper.call = NULL
cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future):
"""Cancels the RPC in C-Core, and return the final RPC status."""
"""Cancels the RPC in Core, and return the final RPC status."""
cdef AioRpcStatus status
cdef object details
cdef char *c_details
cdef grpc_call_error error
# Try to fetch application layer cancellation details in the future.
# * If calcellation details present, cancel with status;
# * If cancellation details present, cancel with status;
# * If details not present, cancel with unknown reason.
if cancellation_future.done():
status = cancellation_future.result()
@ -93,16 +94,18 @@ cdef class _AioCall:
self._references.append(details)
c_details = <char *>details
# By implementation, grpc_call_cancel_with_status always return OK
grpc_call_cancel_with_status(
error = grpc_call_cancel_with_status(
self._grpc_call_wrapper.call,
status.c_code(),
c_details,
NULL,
)
assert error == GRPC_CALL_OK
return status
else:
# By implementation, grpc_call_cancel always return OK
grpc_call_cancel(self._grpc_call_wrapper.call, NULL)
error = grpc_call_cancel(self._grpc_call_wrapper.call, NULL)
assert error == GRPC_CALL_OK
status = AioRpcStatus(
StatusCode.cancelled,
_UNKNOWN_CANCELLATION_DETAILS,
@ -148,7 +151,7 @@ cdef class _AioCall:
try:
self._create_grpc_call(deadline, method)
try:
await callback_start_batch(self._grpc_call_wrapper,
await async_start_batch(self._grpc_call_wrapper,
ops,
self._loop)
except asyncio.CancelledError:
@ -156,7 +159,8 @@ cdef class _AioCall:
status_observer(status)
raise
finally:
# If the RPC failed, this method will return None instead of crash.
# If the RPC failed, receive_initial_metadata_op.initial_metadata
# will return None instead of crash.
initial_metadata_observer(
receive_initial_metadata_op.initial_metadata()
)
@ -181,7 +185,7 @@ cdef class _AioCall:
"""Handles the status sent by peer once received."""
cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
cdef tuple ops = (op,)
await callback_start_batch(self._grpc_call_wrapper, ops, self._loop)
await async_start_batch(self._grpc_call_wrapper, ops, self._loop)
cdef AioRpcStatus status = AioRpcStatus(
op.code(),
op.details(),
@ -218,7 +222,7 @@ cdef class _AioCall:
self._loop
)
if received_message is None:
# The read operation failed, C-Core should explain why it fails
# The read operation failed, Core should explain why it fails
await self._status_received.wait()
return
else:
@ -253,12 +257,12 @@ cdef class _AioCall:
send_close_op,
)
# Creates the grpc_call C-Core object, it needs to be deleted explicitly
# Creates the grpc_call Core object, it needs to be deleted explicitly
# through _destroy_grpc_call call in other methods.
self._create_grpc_call(deadline, method)
# Actually sends out the request message.
await callback_start_batch(self._grpc_call_wrapper,
await async_start_batch(self._grpc_call_wrapper,
outbound_ops,
self._loop)

@ -28,10 +28,10 @@ cdef struct CallbackContext:
#
# Attributes:
# functor: A grpc_experimental_completion_queue_functor represents the
# callback function in the only way C-Core understands.
# callback function in the only way Core understands.
# waiter: An asyncio.Future object that fulfills when the callback is
# invoked by C-Core.
# failure_handler: A CallbackFailureHandler object that called when C-Core
# invoked by Core.
# failure_handler: A CallbackFailureHandler object that called when Core
# returns 'success == 0' state.
grpc_experimental_completion_queue_functor functor
cpython.PyObject *waiter

@ -88,7 +88,7 @@ cdef class CallbackCompletionQueue:
class CallbackStartBatchError(Exception): pass
async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper,
async def async_start_batch(GrpcCallWrapper grpc_call_wrapper,
tuple operations,
object loop):
"""The callback version of start batch operations."""
@ -98,7 +98,7 @@ async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper,
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
CallbackFailureHandler('callback_start_batch', operations, CallbackStartBatchError))
CallbackFailureHandler('async_start_batch', operations, CallbackStartBatchError))
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
@ -120,23 +120,23 @@ async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper,
async def _receive_message(GrpcCallWrapper grpc_call_wrapper,
object loop):
"""Retrives parsed messages from C-Core.
The messages maybe already in C-Core's buffer, so there isn't a 1-to-1
"""Retrives parsed messages from Core.
The messages maybe already in Core's buffer, so there isn't a 1-to-1
mapping between this and the underlying "socket.read()". Also, eventually,
this function will end with an EOF, which reads empty message.
"""
cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG)
cdef tuple ops = (receive_op,)
try:
await callback_start_batch(grpc_call_wrapper, ops, loop)
await async_start_batch(grpc_call_wrapper, ops, loop)
except CallbackStartBatchError as e:
# NOTE(lidiz) The receive message operation has two ways to indicate
# finish state : 1) returns empty message due to EOF; 2) fails inside
# the callback (e.g. cancelled).
#
# Since they all indicates finish, they are better be merged.
_LOGGER.exception(e)
_LOGGER.debug(e)
return receive_op.message()
@ -154,7 +154,7 @@ async def _send_message(GrpcCallWrapper grpc_call_wrapper,
SendInitialMetadataOperation(None, _EMPTY_FLAG),
op,
)
await callback_start_batch(grpc_call_wrapper, ops, loop)
await async_start_batch(grpc_call_wrapper, ops, loop)
async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
@ -164,12 +164,12 @@ async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
metadata,
_EMPTY_FLAG)
cdef tuple ops = (op,)
await callback_start_batch(grpc_call_wrapper, ops, loop)
await async_start_batch(grpc_call_wrapper, ops, loop)
async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper,
object loop):
cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
cdef tuple ops = (op,)
await callback_start_batch(grpc_call_wrapper, ops, loop)
await async_start_batch(grpc_call_wrapper, ops, loop)
return op.initial_metadata()

@ -134,9 +134,9 @@ cdef class _AsyncioSocket:
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb):
"""Performs write to network socket in AsyncIO.
For each socket, C-Core guarantees there'll be only one ongoing write.
For each socket, Core guarantees there'll be only one ongoing write.
When the write is finished, we need to call grpc_write_cb to notify
C-Core that the work is done.
Core that the work is done.
"""
cdef char* start
cdef bytearray outbound_buffer = bytearray()

@ -77,7 +77,7 @@ cdef class _ServicerContext:
async def send_initial_metadata(self, tuple metadata):
if self._metadata_sent:
raise ValueError('Send initial metadata failed: already sent')
raise RuntimeError('Send initial metadata failed: already sent')
else:
_send_initial_metadata(self._rpc_state, self._loop)
self._metadata_sent = True
@ -135,7 +135,7 @@ async def _handle_unary_unary_rpc(object method_handler,
SendInitialMetadataOperation(None, _EMPTY_FLAGS),
SendMessageOperation(response_raw, _EMPTY_FLAGS),
)
await callback_start_batch(rpc_state, send_ops, loop)
await async_start_batch(rpc_state, send_ops, loop)
async def _handle_unary_stream_rpc(object method_handler,
@ -185,7 +185,7 @@ async def _handle_unary_stream_rpc(object method_handler,
)
cdef tuple ops = (op,)
await callback_start_batch(rpc_state, ops, loop)
await async_start_batch(rpc_state, ops, loop)
async def _handle_cancellation_from_core(object rpc_task,
@ -193,7 +193,7 @@ async def _handle_cancellation_from_core(object rpc_task,
object loop):
cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG)
cdef tuple ops = (op,)
await callback_start_batch(rpc_state, ops, loop)
await async_start_batch(rpc_state, ops, loop)
if op.cancelled() and not rpc_task.done():
rpc_task.cancel()
@ -318,7 +318,7 @@ cdef class AioServer:
if self._status != AIO_SERVER_STATUS_RUNNING:
break
# Accepts new request from C-Core
# Accepts new request from Core
rpc_state = await _server_call_request_call(
self._server,
self._cq,
@ -333,7 +333,7 @@ cdef class AioServer:
)
)
# Fires off a task that listening on the cancellation from client.
# Fires off a task that listens on the cancellation from client.
self._loop.create_task(
_handle_cancellation_from_core(
rpc_task,
@ -387,7 +387,7 @@ cdef class AioServer:
pass
async def shutdown(self, grace):
"""Gracefully shutdown the C-Core server.
"""Gracefully shutdown the Core server.
Application should only call shutdown once.

@ -202,7 +202,7 @@ class Call(_base_call.Call):
"""Private method to set final status of the RPC.
This method may be called multiple time due to data race between local
cancellation (by application) and C-Core receiving status from peer. We
cancellation (by application) and Core receiving status from peer. We
make no promise here which one will win.
"""
if self._status.done():
@ -388,12 +388,12 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall):
"""Forwards the application cancellation reasoning.
Async generator will receive an exception. The cancellation will go
deep down into C-Core, and then propagates backup as the
deep down into Core, and then propagates backup as the
`cygrpc.AioRpcStatus` exception.
So, under race condition, e.g. the server sent out final state headers
and the client calling "cancel" at the same time, this method respects
the winner in C-Core.
the winner in Core.
"""
if not self._status.done() and not self._cancellation.done():
self._cancellation.set_result(status)

Loading…
Cancel
Save