From 08f145da990aaabdfc9b6226c12b97eaf913e6e5 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 5 Nov 2019 16:57:04 -0800 Subject: [PATCH] Unify the logic around Core Callback API for both sides --- src/python/grpcio/grpc/_cython/BUILD.bazel | 69 +-------- .../grpc/_cython/_cygrpc/aio/call.pxd.pxi | 12 +- .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 132 ++++++++---------- ...ontext.pxd.pxi => callback_common.pxd.pxi} | 34 +++++ .../_cygrpc/aio/callback_common.pyx.pxi | 113 +++++++++++++++ .../grpc/_cython/_cygrpc/aio/channel.pxd.pxi | 1 + .../grpc/_cython/_cygrpc/aio/channel.pyx.pxi | 1 + .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 26 +--- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 117 +--------------- src/python/grpcio/grpc/_cython/cygrpc.pxd | 2 +- src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + .../grpcio_tests/tests_aio/unit/call_test.py | 4 +- 12 files changed, 222 insertions(+), 290 deletions(-) rename src/python/grpcio/grpc/_cython/_cygrpc/aio/{callbackcontext.pxd.pxi => callback_common.pxd.pxi} (60%) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 0ed004bb690..8f7dcdacb62 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -4,72 +4,11 @@ load("//bazel:cython_library.bzl", "pyx_library") pyx_library( name = "cygrpc", - srcs = [ - "__init__.py", - "_cygrpc/_hooks.pxd.pxi", - "_cygrpc/_hooks.pyx.pxi", - "_cygrpc/aio/call.pxd.pxi", - "_cygrpc/aio/call.pyx.pxi", - "_cygrpc/aio/callbackcontext.pxd.pxi", - "_cygrpc/aio/cancel_status.pxd.pxi", - "_cygrpc/aio/cancel_status.pyx.pxi", - "_cygrpc/aio/channel.pxd.pxi", - "_cygrpc/aio/channel.pyx.pxi", - "_cygrpc/aio/grpc_aio.pxd.pxi", - "_cygrpc/aio/grpc_aio.pyx.pxi", - "_cygrpc/aio/iomgr/iomgr.pyx.pxi", - "_cygrpc/aio/iomgr/resolver.pxd.pxi", - "_cygrpc/aio/iomgr/resolver.pyx.pxi", - "_cygrpc/aio/iomgr/socket.pxd.pxi", - "_cygrpc/aio/iomgr/socket.pyx.pxi", - "_cygrpc/aio/iomgr/timer.pxd.pxi", - "_cygrpc/aio/iomgr/timer.pyx.pxi", - "_cygrpc/aio/rpc_error.pxd.pxi", - "_cygrpc/aio/rpc_error.pyx.pxi", - "_cygrpc/aio/server.pxd.pxi", - "_cygrpc/aio/server.pyx.pxi", - "_cygrpc/arguments.pxd.pxi", - "_cygrpc/arguments.pyx.pxi", - "_cygrpc/call.pxd.pxi", - "_cygrpc/call.pyx.pxi", - "_cygrpc/channel.pxd.pxi", - "_cygrpc/channel.pyx.pxi", - "_cygrpc/channelz.pyx.pxi", - "_cygrpc/completion_queue.pxd.pxi", - "_cygrpc/completion_queue.pyx.pxi", - "_cygrpc/credentials.pxd.pxi", - "_cygrpc/credentials.pyx.pxi", - "_cygrpc/event.pxd.pxi", - "_cygrpc/event.pyx.pxi", - "_cygrpc/fork_posix.pxd.pxi", - "_cygrpc/fork_posix.pyx.pxi", - "_cygrpc/grpc.pxi", - "_cygrpc/grpc_gevent.pxd.pxi", - "_cygrpc/grpc_gevent.pyx.pxi", - "_cygrpc/grpc_string.pyx.pxi", - "_cygrpc/iomgr.pxd.pxi", - "_cygrpc/iomgr.pyx.pxi", - "_cygrpc/metadata.pxd.pxi", - "_cygrpc/metadata.pyx.pxi", - "_cygrpc/operation.pxd.pxi", - "_cygrpc/operation.pyx.pxi", - "_cygrpc/propagation_bits.pxd.pxi", - "_cygrpc/propagation_bits.pyx.pxi", - "_cygrpc/records.pxd.pxi", - "_cygrpc/records.pyx.pxi", - "_cygrpc/security.pxd.pxi", - "_cygrpc/security.pyx.pxi", - "_cygrpc/server.pxd.pxi", - "_cygrpc/server.pyx.pxi", - "_cygrpc/tag.pxd.pxi", - "_cygrpc/tag.pyx.pxi", - "_cygrpc/time.pxd.pxi", - "_cygrpc/time.pyx.pxi", - "_cygrpc/vtable.pxd.pxi", - "_cygrpc/vtable.pyx.pxi", + srcs = glob([ + "**/*.pxi", "cygrpc.pxd", - "cygrpc.pyx", - ], + "cygrpc.pyx" + ]), deps = [ "//:grpc", ], diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi index 687c5999d0b..c795cba7a68 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -16,13 +16,9 @@ cdef class _AioCall: cdef: AioChannel _channel - CallbackContext _watcher_call - grpc_completion_queue * _cq - grpc_experimental_completion_queue_functor _functor - object _waiter_call + list _references + GrpcCallWrapper _grpc_call_wrapper - @staticmethod - cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int success) with gil - @staticmethod - cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int success) with gil + cdef grpc_call* _create_grpc_call(self, object timeout, bytes method) except * + cdef void _destroy_grpc_call(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 0ce716a9d6e..fba1244a813 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -16,149 +16,127 @@ cimport cpython import grpc _EMPTY_FLAGS = 0 +_EMPTY_MASK = 0 _EMPTY_METADATA = None -_OP_ARRAY_LENGTH = 6 cdef class _AioCall: def __cinit__(self, AioChannel channel): self._channel = channel - self._functor.functor_run = _AioCall.functor_run - - self._cq = grpc_completion_queue_create_for_callback( - &self._functor, - NULL - ) - - self._watcher_call.functor.functor_run = _AioCall.watcher_call_functor_run - self._watcher_call.waiter = self - self._waiter_call = None self._references = [] - - def __dealloc__(self): - grpc_completion_queue_shutdown(self._cq) - grpc_completion_queue_destroy(self._cq) + self._grpc_call_wrapper = GrpcCallWrapper() def __repr__(self): class_name = self.__class__.__name__ id_ = id(self) return f"<{class_name} {id_}>" - @staticmethod - cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int success) with gil: - pass - - @staticmethod - cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int success) with gil: - call = <_AioCall>(functor).waiter - - if not call._waiter_call.done(): - if success == 0: - call._waiter_call.set_exception(Exception("Some error occurred")) - else: - call._waiter_call.set_result(None) - - async def unary_unary(self, bytes method, bytes request, object timeout, AioCancelStatus cancel_status): - cdef grpc_call * call + cdef grpc_call* _create_grpc_call(self, + object timeout, + bytes method) except *: + """Creates the corresponding Core object for this RPC. + + For unary calls, the grpc_call lives shortly and can be destroied after + invoke start_batch. However, if either side is streaming, the grpc_call + life span will be longer than one function. So, it would better save it + as an instance variable than a stack variable, which reflects its + nature in Core. + """ cdef grpc_slice method_slice - cdef grpc_op * ops - - cdef Operation initial_metadata_operation - cdef Operation send_message_operation - cdef Operation send_close_from_client_operation - cdef Operation receive_initial_metadata_operation - cdef Operation receive_message_operation - cdef Operation receive_status_on_client_operation - - cdef grpc_call_error call_status cdef gpr_timespec deadline = _timespec_from_time(timeout) - cdef char *c_details = NULL method_slice = grpc_slice_from_copied_buffer( method, len(method) ) - - call = grpc_channel_create_call( + self._grpc_call_wrapper.call = grpc_channel_create_call( self._channel.channel, NULL, - 0, - self._cq, + _EMPTY_MASK, + self._channel.cq.c_ptr(), method_slice, NULL, deadline, NULL ) - grpc_slice_unref(method_slice) - ops = gpr_malloc(sizeof(grpc_op) * _OP_ARRAY_LENGTH) + cdef void _destroy_grpc_call(self): + """Destroys the corresponding Core object for this RPC.""" + grpc_call_unref(self._grpc_call_wrapper.call) + + async def unary_unary(self, bytes method, bytes request, object timeout, AioCancelStatus cancel_status): + cdef object loop = asyncio.get_event_loop() + + cdef tuple operations + cdef Operation initial_metadata_operation + cdef Operation send_message_operation + cdef Operation send_close_from_client_operation + cdef Operation receive_initial_metadata_operation + cdef Operation receive_message_operation + cdef Operation receive_status_on_client_operation + + cdef char *c_details = NULL initial_metadata_operation = SendInitialMetadataOperation(_EMPTY_METADATA, GRPC_INITIAL_METADATA_USED_MASK) initial_metadata_operation.c() - ops[0] = initial_metadata_operation.c_op send_message_operation = SendMessageOperation(request, _EMPTY_FLAGS) send_message_operation.c() - ops[1] = send_message_operation.c_op send_close_from_client_operation = SendCloseFromClientOperation(_EMPTY_FLAGS) send_close_from_client_operation.c() - ops[2] = send_close_from_client_operation.c_op receive_initial_metadata_operation = ReceiveInitialMetadataOperation(_EMPTY_FLAGS) receive_initial_metadata_operation.c() - ops[3] = receive_initial_metadata_operation.c_op receive_message_operation = ReceiveMessageOperation(_EMPTY_FLAGS) receive_message_operation.c() - ops[4] = receive_message_operation.c_op receive_status_on_client_operation = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) receive_status_on_client_operation.c() - ops[5] = receive_status_on_client_operation.c_op - self._waiter_call = asyncio.get_event_loop().create_future() - - call_status = grpc_call_start_batch( - call, - ops, - _OP_ARRAY_LENGTH, - &self._watcher_call.functor, - NULL + operations = ( + initial_metadata_operation, + send_message_operation, + send_close_from_client_operation, + receive_initial_metadata_operation, + receive_message_operation, + receive_status_on_client_operation, ) try: - if call_status != GRPC_CALL_OK: - self._waiter_call = None - raise Exception("Error with grpc_call_start_batch {}".format(call_status)) + self._create_grpc_call( + timeout, + method, + ) try: - await self._waiter_call + await callback_start_batch( + self._grpc_call_wrapper, + operations, + loop + ) except asyncio.CancelledError: if cancel_status: details = str_to_bytes(cancel_status.details()) self._references.append(details) c_details = details call_status = grpc_call_cancel_with_status( - call, cancel_status.code(), c_details, NULL) + self._grpc_call_wrapper.call, + cancel_status.code(), + c_details, + NULL, + ) else: call_status = grpc_call_cancel( - call, NULL) + self._grpc_call_wrapper.call, NULL) if call_status != GRPC_CALL_OK: raise Exception("RPC call couldn't be cancelled. Error {}".format(call_status)) raise finally: - initial_metadata_operation.un_c() - send_message_operation.un_c() - send_close_from_client_operation.un_c() - receive_initial_metadata_operation.un_c() - receive_message_operation.un_c() - receive_status_on_client_operation.un_c() - - grpc_call_unref(call) - gpr_free(ops) + self._destroy_grpc_call() if receive_status_on_client_operation.code() == StatusCode.ok: return receive_message_operation.message() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi similarity index 60% rename from src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi rename to src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi index beada919f4d..8bed055fdb1 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -14,6 +14,15 @@ cimport cpython + +cdef class CallbackFailureHandler: + cdef str _core_function_name + cdef object _error_details + cdef object _exception_type + + cdef handle(self, object future) + + cdef struct CallbackContext: # C struct to store callback context in the form of pointers. # @@ -27,3 +36,28 @@ cdef struct CallbackContext: grpc_experimental_completion_queue_functor functor cpython.PyObject *waiter cpython.PyObject *failure_handler + + +cdef class CallbackWrapper: + cdef CallbackContext context + cdef object _reference_of_future + cdef object _reference_of_failure_handler + + @staticmethod + cdef void functor_run( + grpc_experimental_completion_queue_functor* functor, + int succeed) + + cdef grpc_experimental_completion_queue_functor *c_functor(self) + + +cdef class CallbackCompletionQueue: + cdef grpc_completion_queue *_cq + cdef object _shutdown_completed # asyncio.Future + cdef CallbackWrapper _wrapper + + cdef grpc_completion_queue* c_ptr(self) + + +cdef class GrpcCallWrapper: + cdef grpc_call* call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi new file mode 100644 index 00000000000..63f4b59ecd8 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -0,0 +1,113 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class CallbackFailureHandler: + + def __cinit__(self, + str core_function_name, + object error_details, + object exception_type): + """Handles failure by raising exception.""" + self._core_function_name = core_function_name + self._error_details = error_details + self._exception_type = exception_type + + cdef handle(self, object future): + future.set_exception(self._exception_type( + 'Failed "%s": %s' % (self._core_function_name, self._error_details) + )) + + +cdef class CallbackWrapper: + + def __cinit__(self, object future, CallbackFailureHandler failure_handler): + self.context.functor.functor_run = self.functor_run + self.context.waiter = future + self.context.failure_handler = failure_handler + # NOTE(lidiz) Not using a list here, because this class is critical in + # data path. We should make it as efficient as possible. + self._reference_of_future = future + self._reference_of_failure_handler = failure_handler + + @staticmethod + cdef void functor_run( + grpc_experimental_completion_queue_functor* functor, + int success): + cdef CallbackContext *context = functor + if success == 0: + (context.failure_handler).handle( + context.waiter) + else: + (context.waiter).set_result(None) + + cdef grpc_experimental_completion_queue_functor *c_functor(self): + return &self.context.functor + + +cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( + 'grpc_completion_queue_shutdown', + 'Unknown', + RuntimeError) + + +cdef class CallbackCompletionQueue: + + def __cinit__(self): + self._shutdown_completed = asyncio.get_event_loop().create_future() + self._wrapper = CallbackWrapper( + self._shutdown_completed, + CQ_SHUTDOWN_FAILURE_HANDLER) + self._cq = grpc_completion_queue_create_for_callback( + self._wrapper.c_functor(), + NULL + ) + + cdef grpc_completion_queue* c_ptr(self): + return self._cq + + async def shutdown(self): + grpc_completion_queue_shutdown(self._cq) + await self._shutdown_completed + grpc_completion_queue_destroy(self._cq) + + +async def callback_start_batch(GrpcCallWrapper grpc_call_wrapper, + tuple operations, + object loop): + """The callback version of start batch operations.""" + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) + batch_operation_tag.prepare() + + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + CallbackFailureHandler('callback_start_batch', operations, RuntimeError)) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + cdef grpc_call_error error = grpc_call_start_batch( + grpc_call_wrapper.call, + batch_operation_tag.c_ops, + batch_operation_tag.c_nops, + wrapper.c_functor(), NULL) + + if error != GRPC_CALL_OK: + raise RuntimeError("Error with callback_start_batch {}".format(error)) + + await future + cpython.Py_DECREF(wrapper) + cdef grpc_event c_event + # Tag.event must be called, otherwise messages won't be parsed from C + batch_operation_tag.event(c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi index f5e1a5d3095..6e187d65c92 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi @@ -15,4 +15,5 @@ cdef class AioChannel: cdef: grpc_channel * channel + CallbackCompletionQueue cq bytes _target diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index 526dade7f51..d31df71a316 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -15,6 +15,7 @@ cdef class AioChannel: def __cinit__(self, bytes target): self.channel = grpc_insecure_channel_create(target, NULL, NULL) + self.cq = CallbackCompletionQueue() self._target = target def __repr__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index ca4a6a837ea..4d85bafc338 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -17,27 +17,13 @@ cdef class _HandlerCallDetails: cdef readonly tuple invocation_metadata -cdef class RPCState: - cdef grpc_call* call, +cdef class RPCState(GrpcCallWrapper): cdef grpc_call_details details cdef grpc_metadata_array request_metadata cdef bytes method(self) -cdef class CallbackWrapper: - cdef CallbackContext context - cdef object _reference_of_future - cdef object _reference_of_failure_handler - - @staticmethod - cdef void functor_run( - grpc_experimental_completion_queue_functor* functor, - int succeed) - - cdef grpc_experimental_completion_queue_functor *c_functor(self) - - cdef enum AioServerStatus: AIO_SERVER_STATUS_UNKNOWN AIO_SERVER_STATUS_READY @@ -46,17 +32,9 @@ cdef enum AioServerStatus: AIO_SERVER_STATUS_STOPPING -cdef class _CallbackCompletionQueue: - cdef grpc_completion_queue *_cq - cdef grpc_completion_queue* c_ptr(self) - cdef object _shutdown_completed # asyncio.Future - cdef CallbackWrapper _wrapper - cdef object _loop # asyncio.EventLoop - - cdef class AioServer: cdef Server _server - cdef _CallbackCompletionQueue _cq + cdef CallbackCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status cdef object _loop # asyncio.EventLoop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 61335ca9e60..d332ecbd384 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -26,54 +26,6 @@ cdef class _HandlerCallDetails: class _ServicerContextPlaceHolder(object): pass -cdef class _CallbackFailureHandler: - cdef str _core_function_name - cdef object _error_details - cdef object _exception_type - - def __cinit__(self, - str core_function_name, - object error_details, - object exception_type): - """Handles failure by raising exception.""" - self._core_function_name = core_function_name - self._error_details = error_details - self._exception_type = exception_type - - cdef handle(self, object future): - future.set_exception(self._exception_type( - 'Failed "%s": %s' % (self._core_function_name, self._error_details) - )) - - -# TODO(https://github.com/grpc/grpc/issues/20669) -# Apply this to the client-side -cdef class CallbackWrapper: - - def __cinit__(self, object future, _CallbackFailureHandler failure_handler): - self.context.functor.functor_run = self.functor_run - self.context.waiter = future - self.context.failure_handler = failure_handler - # NOTE(lidiz) Not using a list here, because this class is critical in - # data path. We should make it as efficient as possible. - self._reference_of_future = future - self._reference_of_failure_handler = failure_handler - - @staticmethod - cdef void functor_run( - grpc_experimental_completion_queue_functor* functor, - int success): - cdef CallbackContext *context = functor - if success == 0: - (<_CallbackFailureHandler>context.failure_handler).handle( - context.waiter) - else: - (context.waiter).set_result(None) - - cdef grpc_experimental_completion_queue_functor *c_functor(self): - return &self.context.functor - - cdef class RPCState: def __cinit__(self): @@ -105,36 +57,6 @@ cdef _find_method_handler(str method, list generic_handlers): return None -async def callback_start_batch(RPCState rpc_state, - tuple operations, - object loop): - """The callback version of start batch operations.""" - cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) - batch_operation_tag.prepare() - - cdef object future = loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper( - future, - _CallbackFailureHandler('callback_start_batch', operations, RuntimeError)) - # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed - # when calling "await". This is an over-optimization by Cython. - cpython.Py_INCREF(wrapper) - cdef grpc_call_error error = grpc_call_start_batch( - rpc_state.call, - batch_operation_tag.c_ops, - batch_operation_tag.c_nops, - wrapper.c_functor(), NULL) - - if error != GRPC_CALL_OK: - raise RuntimeError("Error with callback_start_batch {}".format(error)) - - await future - cpython.Py_DECREF(wrapper) - cdef grpc_event c_event - # Tag.event must be called, otherwise messages won't be parsed from C - batch_operation_tag.event(c_event) - - async def _handle_unary_unary_rpc(object method_handler, RPCState rpc_state, object loop): @@ -172,9 +94,6 @@ async def _handle_unary_unary_rpc(object method_handler, await callback_start_batch(rpc_state, send_ops, loop) - - - async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): # Finds the method handler (application logic) cdef object method_handler = _find_method_handler( @@ -198,12 +117,12 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): class _RequestCallError(Exception): pass -cdef _CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = _CallbackFailureHandler( +cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( 'grpc_server_request_call', 'server shutdown', _RequestCallError) async def _server_call_request_call(Server server, - _CallbackCompletionQueue cq, + CallbackCompletionQueue cq, object loop): cdef grpc_call_error error cdef RPCState rpc_state = RPCState() @@ -238,35 +157,7 @@ async def _handle_cancellation_from_core(object rpc_task, rpc_task.cancel() -cdef _CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( - 'grpc_completion_queue_shutdown', - 'Unknown', - RuntimeError) - - -cdef class _CallbackCompletionQueue: - - def __cinit__(self, object loop): - self._loop = loop - self._shutdown_completed = loop.create_future() - self._wrapper = CallbackWrapper( - self._shutdown_completed, - CQ_SHUTDOWN_FAILURE_HANDLER) - self._cq = grpc_completion_queue_create_for_callback( - self._wrapper.c_functor(), - NULL - ) - - cdef grpc_completion_queue* c_ptr(self): - return self._cq - - async def shutdown(self): - grpc_completion_queue_shutdown(self._cq) - await self._shutdown_completed - grpc_completion_queue_destroy(self._cq) - - -cdef _CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( +cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( 'grpc_server_shutdown_and_notify', 'Unknown', RuntimeError) @@ -279,7 +170,7 @@ cdef class AioServer: # NOTE(lidiz) Core objects won't be deallocated automatically. # If AioServer.shutdown is not called, those objects will leak. self._server = Server(options) - self._cq = _CallbackCompletionQueue(loop) + self._cq = CallbackCompletionQueue() grpc_server_register_completion_queue( self._server.c_server, self._cq.c_ptr(), diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 9ab15528a6b..b9a85e3619b 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -44,7 +44,7 @@ include "_cygrpc/aio/iomgr/socket.pxd.pxi" include "_cygrpc/aio/iomgr/timer.pxd.pxi" include "_cygrpc/aio/iomgr/resolver.pxd.pxi" include "_cygrpc/aio/grpc_aio.pxd.pxi" -include "_cygrpc/aio/callbackcontext.pxd.pxi" +include "_cygrpc/aio/callback_common.pxd.pxi" include "_cygrpc/aio/call.pxd.pxi" include "_cygrpc/aio/cancel_status.pxd.pxi" include "_cygrpc/aio/channel.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 7be32661c87..b123a9b7bd6 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -62,6 +62,7 @@ include "_cygrpc/aio/iomgr/timer.pyx.pxi" include "_cygrpc/aio/iomgr/resolver.pyx.pxi" include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" +include "_cygrpc/aio/callback_common.pyx.pxi" include "_cygrpc/aio/cancel_status.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" include "_cygrpc/aio/rpc_error.pyx.pxi" diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index b1f470a3756..cadd7e416bb 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -163,8 +163,8 @@ class TestCall(AioTestBase): self.assertFalse(call.cancelled()) - # Force the loop to execute the RPC task, cython - # code is executed. + # TODO(https://github.com/grpc/grpc/issues/20869) remove sleep. + # Force the loop to execute the RPC task. await asyncio.sleep(0) self.assertTrue(call.cancel())