[Python Otel] Manage call tracer life cycle use call arena. (#37460)

We're seeing segfault in Python CSM tests:
```
2024-08-03T09:49:45.720555997Z *** SIGSEGV received at time=1722678585 on cpu 0 ***
2024-08-03T09:49:45.721761998Z PC: @     0x7847ffd5c1c9  (unknown)  (unknown)
2024-08-03T09:49:45.722070502Z     @     0x7847fa309d8c         64  absl::lts_20240116::WriteFailureInfo()
2024-08-03T09:49:45.722175904Z     @     0x7847fa309a15        272  absl::lts_20240116::AbslFailureSignalHandler()
2024-08-03T09:49:45.722187675Z     @     0x7847ffc3d050       1592  (unknown)
2024-08-03T09:49:45.723432238Z     @     0x7847e97f9390  (unknown)  (unknown)
2024-08-03T09:49:45.723487349Z     @ ... and at least 1 more frames
2024-08-03T09:49:45.829702781Z [INFO  tini (1)] Spawned child process '/xds_interop_client' with pid '7'
2024-08-03T09:49:45.829766869Z [DEBUG tini (1)] Received SIGCHLD
2024-08-03T09:49:45.829778749Z [DEBUG tini (1)] Reaped child with pid: '7'
2024-08-03T09:49:45.829787070Z [INFO  tini (1)] Main child exited with signal (with signal 'Segmentation fault')
```

### The issue

After investigation, we found that the call tracer was deleted before `RecordEnd` was called.

### Why this fix

* To fix this, we decide to use arena to manage the life cycle of CallTracer.
* Since CallTracer was created in another shard object library (`grpcio_observability`) which don't have a dependency on grpc core, we can't use `grpc_core::Arena` directly when creating the call tracer.
* As a workaround, we created a wrapper class `ClientCallTracerWrapper` to wrap the CallTracer and created another core API `grpc_call_tracer_set_and_manage` so that we can manage the life cycle of CallTracer use the wrapper class.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #37460

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37460 from XuanWang-Amos:fix_otel_segfault 33c0b98c64
PiperOrigin-RevId: 662966853
pull/37480/head
Xuan Wang 3 months ago committed by Copybara-Service
parent d351628769
commit a30347d992
  1. 7
      src/core/lib/surface/call.cc
  2. 19
      src/core/lib/surface/call.h
  3. 1
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  4. 6
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  5. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  6. 2
      src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
  7. 33
      src/python/grpcio/grpc/_observability.py
  8. 9
      src/python/grpcio_observability/grpc_observability/_cyobservability.pyx
  9. 5
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py

@ -497,6 +497,13 @@ void grpc_call_tracer_set(grpc_call* call,
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
arena->ManagedNew<ClientCallTracerWrapper>(tracer);
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void* grpc_call_tracer_get(grpc_call* call) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
auto* call_tracer =

@ -265,6 +265,16 @@ void grpc_call_log_batch(const char* file, int line, const grpc_op* ops,
void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);
// Sets call tracer on the call and manages its life by using the call's arena.
// When using this API, the tracer will be destroyed by grpc_call arena when
// grpc_call is about to be destroyed. The caller of this API SHOULD NOT
// manually destroy the tracer. This API is used by Python as a way of using
// Arena to manage the lifetime of the call tracer. Python needs this API
// because the tracer was created within a separate shared object library which
// doesn't have access to core functions like arena->ManagedNew<>.
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer);
void* grpc_call_tracer_get(grpc_call* call);
#define GRPC_CALL_LOG_BATCH(ops, nops) \
@ -276,6 +286,15 @@ void* grpc_call_tracer_get(grpc_call* call);
uint8_t grpc_call_is_client(grpc_call* call);
class ClientCallTracerWrapper {
public:
explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer)
: tracer_(tracer) {}
private:
std::unique_ptr<grpc_core::ClientCallTracer> tracer_;
};
// Return an appropriate compression algorithm for the requested compression \a
// level in the context of \a call.
grpc_compression_algorithm grpc_call_compression_for_level(

@ -30,7 +30,6 @@ cdef class _CallState:
cdef object call_tracer_capsule
cdef void maybe_save_registered_method(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef void delete_call(self) except *

@ -76,12 +76,6 @@ cdef class _CallState:
with nogil:
grpc_call_unref(self.c_call)
self.c_call = NULL
self.maybe_delete_call_tracer()
cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void maybe_save_registered_method(self, bytes method_name) except *:
with _observability.get_plugin() as plugin:

@ -76,7 +76,7 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
void RegisterGlobal(ServerCallTracerFactory* factory) nogil
cdef extern from "src/core/lib/surface/call.h":
void grpc_call_tracer_set(grpc_call* call, void* value) nogil
void grpc_call_tracer_set_and_manage(grpc_call* call, void* value) nogil
void* grpc_call_tracer_get(grpc_call* call) nogil

@ -50,7 +50,7 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None:
cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_tracer_set(call, call_tracer)
grpc_call_tracer_set_and_manage(call, call_tracer)
cdef void* _get_call_tracer(grpc_call* call):

@ -100,23 +100,6 @@ class ObservabilityPlugin(
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
client_call_tracer: A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()
@abc.abstractmethod
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
@ -276,22 +259,6 @@ def observability_deinit() -> None:
_cygrpc.clear_server_call_tracer_factory()
def delete_call_tracer(client_call_tracer_capsule: Any) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
This method will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as the name.
Args:
client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object.
"""
with get_plugin() as plugin:
if plugin and plugin.observability_enabled:
plugin.delete_client_call_tracer(client_call_tracer_capsule)
def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
"""Record the latency of the RPC, if the plugin is registered and stats is enabled.

@ -155,15 +155,6 @@ def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifi
return capsule
def delete_client_call_tracer(object client_call_tracer) -> None:
client_call_tracer: grpc._observability.ClientCallTracerCapsule
if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER):
capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER)
call_tracer_ptr = <ClientCallTracer*>capsule_ptr
del call_tracer_ptr
def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]:
py_labels = {}
for label in c_labels:

@ -438,11 +438,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
)
return capsule
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:

Loading…
Cancel
Save