From b22a45cca9a0797324d2406c99fa3ea9a5a3877d Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Thu, 18 Jul 2024 17:01:26 +0000 Subject: [PATCH] Refactor delete call path --- .../grpc/_cython/_cygrpc/channel.pxd.pxi | 1 + .../grpc/_cython/_cygrpc/channel.pyx.pxi | 31 +++++++------------ 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi index 86a98b6803a..ecefbd9ea81 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -31,6 +31,7 @@ cdef class _CallState: cdef void maybe_save_registered_method(self, bytes method_name) except * cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except * cdef void maybe_delete_call_tracer(self) except * + cdef void delete_call(self) except * cdef class _ChannelState: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 3c0fef744fb..d2e98250d53 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -72,6 +72,12 @@ cdef class _CallState: def __cinit__(self): self.due = set() + cdef void delete_call(self) except *: + with nogil: + grpc_call_unref(self.c_call) + self.c_call = NULL + self.maybe_delete_call_tracer() + cdef void maybe_delete_call_tracer(self) except *: if not self.call_tracer_capsule: return @@ -291,10 +297,7 @@ cdef void _call( grpc_call_credentials_release(c_call_credentials) if c_call_error != GRPC_CALL_OK: #TODO(xuanwn): Expand the scope of nogil - with nogil: - grpc_call_unref(call_state.c_call) - call_state.c_call = NULL - call_state.maybe_delete_call_tracer() + call_state.delete_call() _raise_call_error_no_metadata(c_call_error) started_tags = set() for operations, user_tag in operationses_and_user_tags: @@ -304,10 +307,7 @@ cdef void _call( else: grpc_call_cancel(call_state.c_call, NULL) #TODO(xuanwn): Expand the scope of nogil - with nogil: - grpc_call_unref(call_state.c_call) - call_state.c_call = NULL - call_state.maybe_delete_call_tracer() + call_state.delete_call() _raise_call_error(c_call_error, metadata) else: call_state.due.update(started_tags) @@ -321,10 +321,7 @@ cdef void _process_integrated_call_tag( cdef _CallState call_state = state.integrated_call_states.pop(tag) call_state.due.remove(tag) if not call_state.due: - with nogil: - grpc_call_unref(call_state.c_call) - call_state.c_call = NULL - call_state.maybe_delete_call_tracer() + call_state.delete_call() cdef class IntegratedCall: @@ -364,10 +361,7 @@ cdef object _process_segregated_call_tag( call_state.due.remove(tag) if not call_state.due: #TODO(xuanwn): Expand the scope of nogil - with nogil: - grpc_call_unref(call_state.c_call) - call_state.c_call = NULL - call_state.maybe_delete_call_tracer() + call_state.delete_call() state.segregated_call_states.remove(call_state) _destroy_c_completion_queue(c_completion_queue) return True @@ -394,10 +388,7 @@ cdef class SegregatedCall: self._channel_state, self._call_state, self._c_completion_queue, tag) def on_failure(): self._call_state.due.clear() - with nogil: - grpc_call_unref(self._call_state.c_call) - self._call_state.c_call = NULL - self._call_state.maybe_delete_call_tracer() + self._call_state.delete_call() self._channel_state.segregated_call_states.remove(self._call_state) _destroy_c_completion_queue(self._c_completion_queue) return _next_call_event(