diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index a1824c5c40c..7c150c33054 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -32,11 +32,11 @@ cdef class CallbackFailureHandler: cdef class CallbackWrapper: - def __cinit__(self, object future, CallbackFailureHandler failure_handler): + def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run self.context.waiter = future # TODO(lidiz) switch to future.get_loop() which is available 3.7+. - self.context.loop = future._loop + self.context.loop = loop self.context.failure_handler = failure_handler self.context.callback_wrapper = self # NOTE(lidiz) Not using a list here, because this class is critical in @@ -84,6 +84,7 @@ async def execute_batch(GrpcCallWrapper grpc_call_wrapper, cdef object future = loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, + loop, CallbackFailureHandler('execute_batch', operations, ExecuteBatchError)) cdef grpc_call_error error = grpc_call_start_batch( grpc_call_wrapper.call, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index 4f64b50b355..a2882e64b7f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -81,6 +81,7 @@ cdef class AioChannel: cdef object future = self.loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, + self.loop, _WATCH_CONNECTIVITY_FAILURE_HANDLER) grpc_channel_watch_connectivity_state( self.channel, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 093e8868bf7..e0e3dd6abc7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -74,6 +74,7 @@ cdef class CallbackCompletionQueue(BaseCompletionQueue): self._shutdown_completed = self._loop.create_future() self._wrapper = CallbackWrapper( self._shutdown_completed, + self._loop, CQ_SHUTDOWN_FAILURE_HANDLER) self._cq = grpc_completion_queue_create_for_callback( self._wrapper.c_functor(), diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 07d1e45577f..37c5cffb952 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -631,6 +631,7 @@ cdef class AioServer: self._shutdown_completed = self._loop.create_future() self._shutdown_callback_wrapper = CallbackWrapper( self._shutdown_completed, + self._loop, SERVER_SHUTDOWN_FAILURE_HANDLER) self._crash_exception = None @@ -659,6 +660,7 @@ cdef class AioServer: cdef object future = self._loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, + self._loop, REQUEST_CALL_FAILURE_HANDLER) error = grpc_server_request_call( self._server.c_server, &rpc_state.call, &rpc_state.details, diff --git a/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py index 06631ecb6fb..acf404ed51b 100644 --- a/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py @@ -71,8 +71,7 @@ class TestCompatibility(AioTestBase): func() self.loop.call_soon_threadsafe(work_done.set) - thread = threading.Thread(target=thread_work) - thread.daemon = True + thread = threading.Thread(target=thread_work, daemon=True) thread.start() await work_done.wait() thread.join()