|
|
|
@ -85,10 +85,10 @@ cdef class CallbackCompletionQueue: |
|
|
|
|
grpc_completion_queue_destroy(self._cq) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CallbackStartBatchError(Exception): pass |
|
|
|
|
class ExecuteBatchError(Exception): pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def async_start_batch(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
async def execute_batch(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
tuple operations, |
|
|
|
|
object loop): |
|
|
|
|
"""The callback version of start batch operations.""" |
|
|
|
@ -98,7 +98,7 @@ async def async_start_batch(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
cdef object future = loop.create_future() |
|
|
|
|
cdef CallbackWrapper wrapper = CallbackWrapper( |
|
|
|
|
future, |
|
|
|
|
CallbackFailureHandler('async_start_batch', operations, CallbackStartBatchError)) |
|
|
|
|
CallbackFailureHandler('execute_batch', operations, ExecuteBatchError)) |
|
|
|
|
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed |
|
|
|
|
# when calling "await". This is an over-optimization by Cython. |
|
|
|
|
cpython.Py_INCREF(wrapper) |
|
|
|
@ -109,7 +109,7 @@ async def async_start_batch(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
wrapper.c_functor(), NULL) |
|
|
|
|
|
|
|
|
|
if error != GRPC_CALL_OK: |
|
|
|
|
raise CallbackStartBatchError("Failed grpc_call_start_batch: {}".format(error)) |
|
|
|
|
raise ExecuteBatchError("Failed grpc_call_start_batch: {}".format(error)) |
|
|
|
|
|
|
|
|
|
await future |
|
|
|
|
cpython.Py_DECREF(wrapper) |
|
|
|
@ -129,8 +129,8 @@ async def _receive_message(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
cdef ReceiveMessageOperation receive_op = ReceiveMessageOperation(_EMPTY_FLAG) |
|
|
|
|
cdef tuple ops = (receive_op,) |
|
|
|
|
try: |
|
|
|
|
await async_start_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
except CallbackStartBatchError as e: |
|
|
|
|
await execute_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
except ExecuteBatchError as e: |
|
|
|
|
# NOTE(lidiz) The receive message operation has two ways to indicate |
|
|
|
|
# finish state : 1) returns empty message due to EOF; 2) fails inside |
|
|
|
|
# the callback (e.g. cancelled). |
|
|
|
@ -154,7 +154,7 @@ async def _send_message(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
SendInitialMetadataOperation(None, _EMPTY_FLAG), |
|
|
|
|
op, |
|
|
|
|
) |
|
|
|
|
await async_start_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
await execute_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
@ -164,12 +164,12 @@ async def _send_initial_metadata(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
metadata, |
|
|
|
|
_EMPTY_FLAG) |
|
|
|
|
cdef tuple ops = (op,) |
|
|
|
|
await async_start_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
await execute_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _receive_initial_metadata(GrpcCallWrapper grpc_call_wrapper, |
|
|
|
|
object loop): |
|
|
|
|
cdef ReceiveInitialMetadataOperation op = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) |
|
|
|
|
cdef tuple ops = (op,) |
|
|
|
|
await async_start_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
await execute_batch(grpc_call_wrapper, ops, loop) |
|
|
|
|
return op.initial_metadata() |
|
|
|
|