[Python Observability] Make sure call_tracers are deleted (#37247)

Make sure call_tracers are deleted as long as c_call is unrefed.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #37247

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37247 from XuanWang-Amos:fixHangingCallTracers b22a45cca9
PiperOrigin-RevId: 654865870
pull/37281/head
Xuan Wang 4 months ago committed by Copybara-Service
parent bbd37c3197
commit 40c0419982
  1. 1
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  2. 28
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi

@ -31,6 +31,7 @@ cdef class _CallState:
cdef void maybe_save_registered_method(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef void delete_call(self) except *
cdef class _ChannelState:

@ -72,6 +72,12 @@ cdef class _CallState:
def __cinit__(self):
self.due = set()
cdef void delete_call(self) except *:
with nogil:
grpc_call_unref(self.c_call)
self.c_call = NULL
self.maybe_delete_call_tracer()
cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
@ -291,9 +297,7 @@ cdef void _call(
grpc_call_credentials_release(c_call_credentials)
if c_call_error != GRPC_CALL_OK:
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.delete_call()
_raise_call_error_no_metadata(c_call_error)
started_tags = set()
for operations, user_tag in operationses_and_user_tags:
@ -303,9 +307,7 @@ cdef void _call(
else:
grpc_call_cancel(call_state.c_call, NULL)
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.delete_call()
_raise_call_error(c_call_error, metadata)
else:
call_state.due.update(started_tags)
@ -319,10 +321,7 @@ cdef void _process_integrated_call_tag(
cdef _CallState call_state = state.integrated_call_states.pop(tag)
call_state.due.remove(tag)
if not call_state.due:
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
call_state.delete_call()
cdef class IntegratedCall:
@ -362,10 +361,7 @@ cdef object _process_segregated_call_tag(
call_state.due.remove(tag)
if not call_state.due:
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
call_state.delete_call()
state.segregated_call_states.remove(call_state)
_destroy_c_completion_queue(c_completion_queue)
return True
@ -392,9 +388,7 @@ cdef class SegregatedCall:
self._channel_state, self._call_state, self._c_completion_queue, tag)
def on_failure():
self._call_state.due.clear()
with nogil:
grpc_call_unref(self._call_state.c_call)
self._call_state.c_call = NULL
self._call_state.delete_call()
self._channel_state.segregated_call_states.remove(self._call_state)
_destroy_c_completion_queue(self._c_completion_queue)
return _next_call_event(

Loading…
Cancel
Save