From 53540ae5d610820c0959d7788a42c097ce38927f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 May 2024 13:23:09 -0700 Subject: [PATCH] [context] Move legacy tracing contexts to arena contexts (#36776) Closes #36776 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36776 from ctiller:ctx2 8be4cdcf43f00a371dfd4ce0fb01594ecd5483a8 PiperOrigin-RevId: 639133808 --- CMakeLists.txt | 42 +++++++ build_autogenerated.yaml | 110 ++++++++++++++++++ src/core/BUILD | 2 + .../client_channel/client_channel_filter.cc | 36 +++--- .../client_channel/client_channel_filter.h | 12 +- .../load_balanced_call_destination.cc | 4 +- .../message_compress/compression_filter.cc | 8 +- .../chttp2/transport/chttp2_transport.cc | 43 ++++--- .../ext/transport/chttp2/transport/internal.h | 7 +- .../ext/transport/chttp2/transport/parsing.cc | 9 +- .../ext/transport/chttp2/transport/writing.cc | 8 +- src/core/lib/channel/context.h | 23 ---- src/core/lib/promise/cancel_callback.h | 10 ++ src/core/lib/surface/call.cc | 27 +++-- src/core/lib/surface/call.h | 4 + src/core/server/server_call_tracer_filter.cc | 15 +-- src/core/telemetry/call_tracer.cc | 46 +++----- src/core/telemetry/call_tracer.h | 16 ++- src/core/telemetry/metrics.cc | 9 +- src/core/telemetry/metrics.h | 4 +- src/cpp/ext/filters/census/client_filter.cc | 26 ++--- .../grpcio/grpc/_cython/_cygrpc/grpc.pxi | 13 +-- .../_cython/_cygrpc/observability.pyx.pxi | 4 +- test/core/telemetry/call_tracer_test.cc | 47 +++----- test/core/test_util/fake_stats_plugin.cc | 6 +- test/cpp/ext/otel/otel_test_library.cc | 4 +- 26 files changed, 322 insertions(+), 213 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6595ee1d494..984cb875e22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9128,7 +9128,43 @@ endif() if(gRPC_BUILD_TESTS) add_executable(cancel_callback_test + src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c + src/core/ext/upb-gen/google/rpc/status.upb_minitable.c + src/core/lib/debug/trace.cc + src/core/lib/experiments/config.cc + src/core/lib/experiments/experiments.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/time.cc + src/core/lib/iomgr/closure.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/promise/activity.cc + src/core/lib/promise/trace.cc + src/core/lib/resource_quota/arena.cc + src/core/lib/resource_quota/connection_quota.cc + src/core/lib/resource_quota/memory_quota.cc + src/core/lib/resource_quota/periodic_update.cc + src/core/lib/resource_quota/resource_quota.cc + src/core/lib/resource_quota/thread_quota.cc + src/core/lib/resource_quota/trace.cc + src/core/lib/slice/percent_encoding.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_refcount.cc + src/core/lib/slice/slice_string_helpers.cc test/core/promise/cancel_callback_test.cc + third_party/upb/upb/mini_descriptor/build_enum.c + third_party/upb/upb/mini_descriptor/decode.c + third_party/upb/upb/mini_descriptor/internal/base92.c + third_party/upb/upb/mini_descriptor/internal/encode.c + third_party/upb/upb/mini_descriptor/link.c + third_party/upb/upb/wire/decode.c + third_party/upb/upb/wire/encode.c + third_party/upb/upb/wire/eps_copy_input_stream.c + third_party/upb/upb/wire/internal/decode_fast.c + third_party/upb/upb/wire/reader.c ) if(WIN32 AND MSVC) if(BUILD_SHARED_LIBS) @@ -9161,7 +9197,13 @@ target_include_directories(cancel_callback_test target_link_libraries(cancel_callback_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + utf8_range_lib + upb_message_lib + absl::config + absl::function_ref + absl::hash absl::type_traits + absl::statusor gpr ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 26b1d9d6beb..6c1a2db5f15 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6974,14 +6974,124 @@ targets: build: test language: c++ headers: + - src/core/ext/upb-gen/google/protobuf/any.upb.h + - src/core/ext/upb-gen/google/protobuf/any.upb_minitable.h + - src/core/ext/upb-gen/google/rpc/status.upb.h + - src/core/ext/upb-gen/google/rpc/status.upb_minitable.h + - src/core/lib/debug/trace.h + - src/core/lib/event_engine/event_engine_context.h + - src/core/lib/experiments/config.h + - src/core/lib/experiments/experiments.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h + - src/core/lib/gprpp/manual_constructor.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/time.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/promise/activity.h - src/core/lib/promise/cancel_callback.h + - src/core/lib/promise/context.h + - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/seq_state.h + - src/core/lib/promise/detail/status.h + - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/loop.h + - src/core/lib/promise/map.h - src/core/lib/promise/poll.h + - src/core/lib/promise/race.h + - src/core/lib/promise/seq.h + - src/core/lib/promise/trace.h + - src/core/lib/resource_quota/arena.h + - src/core/lib/resource_quota/connection_quota.h + - src/core/lib/resource_quota/memory_quota.h + - src/core/lib/resource_quota/periodic_update.h + - src/core/lib/resource_quota/resource_quota.h + - src/core/lib/resource_quota/thread_quota.h + - src/core/lib/resource_quota/trace.h + - src/core/lib/slice/percent_encoding.h + - src/core/lib/slice/slice.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_string_helpers.h + - src/core/util/spinlock.h + - third_party/upb/upb/generated_code_support.h + - third_party/upb/upb/mini_descriptor/build_enum.h + - third_party/upb/upb/mini_descriptor/decode.h + - third_party/upb/upb/mini_descriptor/internal/base92.h + - third_party/upb/upb/mini_descriptor/internal/decoder.h + - third_party/upb/upb/mini_descriptor/internal/encode.h + - third_party/upb/upb/mini_descriptor/internal/encode.hpp + - third_party/upb/upb/mini_descriptor/internal/modifiers.h + - third_party/upb/upb/mini_descriptor/internal/wire_constants.h + - third_party/upb/upb/mini_descriptor/link.h + - third_party/upb/upb/wire/decode.h + - third_party/upb/upb/wire/encode.h + - third_party/upb/upb/wire/eps_copy_input_stream.h + - third_party/upb/upb/wire/internal/constants.h + - third_party/upb/upb/wire/internal/decode_fast.h + - third_party/upb/upb/wire/internal/decoder.h + - third_party/upb/upb/wire/internal/reader.h + - third_party/upb/upb/wire/reader.h + - third_party/upb/upb/wire/types.h src: + - src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c + - src/core/ext/upb-gen/google/rpc/status.upb_minitable.c + - src/core/lib/debug/trace.cc + - src/core/lib/experiments/config.cc + - src/core/lib/experiments/experiments.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/time.cc + - src/core/lib/iomgr/closure.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/promise/activity.cc + - src/core/lib/promise/trace.cc + - src/core/lib/resource_quota/arena.cc + - src/core/lib/resource_quota/connection_quota.cc + - src/core/lib/resource_quota/memory_quota.cc + - src/core/lib/resource_quota/periodic_update.cc + - src/core/lib/resource_quota/resource_quota.cc + - src/core/lib/resource_quota/thread_quota.cc + - src/core/lib/resource_quota/trace.cc + - src/core/lib/slice/percent_encoding.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_refcount.cc + - src/core/lib/slice/slice_string_helpers.cc - test/core/promise/cancel_callback_test.cc + - third_party/upb/upb/mini_descriptor/build_enum.c + - third_party/upb/upb/mini_descriptor/decode.c + - third_party/upb/upb/mini_descriptor/internal/base92.c + - third_party/upb/upb/mini_descriptor/internal/encode.c + - third_party/upb/upb/mini_descriptor/link.c + - third_party/upb/upb/wire/decode.c + - third_party/upb/upb/wire/encode.c + - third_party/upb/upb/wire/eps_copy_input_stream.c + - third_party/upb/upb/wire/internal/decode_fast.c + - third_party/upb/upb/wire/reader.c deps: - gtest + - utf8_range_lib + - upb_message_lib + - absl/base:config + - absl/functional:function_ref + - absl/hash:hash - absl/meta:type_traits + - absl/status:statusor - gpr uses_polling: false - name: cancel_in_a_vacuum_test diff --git a/src/core/BUILD b/src/core/BUILD index 9d23ed2f1a4..42f6fdba028 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -685,6 +685,8 @@ grpc_cc_library( "lib/promise/cancel_callback.h", ], deps = [ + "arena", + "context", "promise_like", "//:gpr_platform", ], diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index f0c58c1580f..5eceb724703 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -2133,8 +2133,7 @@ absl::optional ClientChannelFilter::CallData::CheckResolution( } // If the call was queued, add trace annotation. if (was_queued) { - auto* call_tracer = static_cast( - call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); + auto* call_tracer = arena()->GetContext(); if (call_tracer != nullptr) { call_tracer->RecordAnnotation("Delayed name resolution complete."); } @@ -2574,7 +2573,7 @@ class ClientChannelFilter::LoadBalancedCall::LbCallState final public: explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {} - void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); } + void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); } // Internal API to allow first-party LB policies to access per-call // attributes set by the ConfigSelector. @@ -2696,7 +2695,7 @@ class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final recv_trailing_metadata_ != nullptr) { if (const auto* md = recv_trailing_metadata_->get_pointer( EndpointLoadMetricsBinMetadata())) { - BackendMetricAllocator allocator(lb_call_->arena()); + BackendMetricAllocator allocator(lb_call_->arena_); lb_call_->backend_metric_data_ = ParseBackendMetricData(md->as_string_view(), &allocator); } @@ -2731,28 +2730,29 @@ class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final namespace { -void CreateCallAttemptTracer(grpc_call_context_element* context, - bool is_transparent_retry) { - auto* call_tracer = static_cast( - context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); +void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) { + auto* call_tracer = DownCast( + arena->GetContext()); if (call_tracer == nullptr) return; auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); - context[GRPC_CONTEXT_CALL_TRACER].value = tracer; + arena->SetContext(tracer); } } // namespace ClientChannelFilter::LoadBalancedCall::LoadBalancedCall( ClientChannelFilter* chand, grpc_call_context_element* call_context, - absl::AnyInvocable on_commit, bool is_transparent_retry) + Arena* arena, absl::AnyInvocable on_commit, + bool is_transparent_retry) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ? "LoadBalancedCall" : nullptr), chand_(chand), on_commit_(std::move(on_commit)), - call_context_(call_context) { - CreateCallAttemptTracer(call_context, is_transparent_retry); + call_context_(call_context), + arena_(arena) { + CreateCallAttemptTracer(arena, is_transparent_retry); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this); } @@ -3019,9 +3019,8 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( ClientChannelFilter* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, absl::AnyInvocable on_commit, bool is_transparent_retry) - : LoadBalancedCall(chand, args.context, std::move(on_commit), + : LoadBalancedCall(chand, args.context, args.arena, std::move(on_commit), is_transparent_retry), - arena_(args.arena), owning_call_(args.call_stack), call_combiner_(args.call_combiner), pollent_(pollent), @@ -3464,7 +3463,7 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() { SubchannelCall::Args call_args = { connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0, static_cast(call_context()[GRPC_CONTEXT_CALL].value)->deadline(), - arena_, + arena(), // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. call_context(), call_combiner_}; @@ -3494,7 +3493,8 @@ ClientChannelFilter::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall( ClientChannelFilter* chand, absl::AnyInvocable on_commit, bool is_transparent_retry) : LoadBalancedCall(chand, GetContext(), - std::move(on_commit), is_transparent_retry) {} + GetContext(), std::move(on_commit), + is_transparent_retry) {} ArenaPromise ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise( @@ -3610,10 +3610,6 @@ ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise( }); } -Arena* ClientChannelFilter::PromiseBasedLoadBalancedCall::arena() const { - return GetContext(); -} - grpc_metadata_batch* ClientChannelFilter::PromiseBasedLoadBalancedCall::send_initial_metadata() const { diff --git a/src/core/client_channel/client_channel_filter.h b/src/core/client_channel/client_channel_filter.h index 9233c846c6f..e1c5e9aee25 100644 --- a/src/core/client_channel/client_channel_filter.h +++ b/src/core/client_channel/client_channel_filter.h @@ -372,7 +372,7 @@ class ClientChannelFilter::LoadBalancedCall : public InternallyRefCounted { public: LoadBalancedCall(ClientChannelFilter* chand, - grpc_call_context_element* call_context, + grpc_call_context_element* call_context, Arena* arena, absl::AnyInvocable on_commit, bool is_transparent_retry); ~LoadBalancedCall() override; @@ -391,8 +391,8 @@ class ClientChannelFilter::LoadBalancedCall protected: ClientChannelFilter* chand() const { return chand_; } ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { - return static_cast( - call_context_[GRPC_CONTEXT_CALL_TRACER].value); + return DownCast( + arena_->GetContext()); } ConnectedSubchannel* connected_subchannel() const { return connected_subchannel_.get(); @@ -401,6 +401,7 @@ class ClientChannelFilter::LoadBalancedCall lb_subchannel_call_tracker() const { return lb_subchannel_call_tracker_.get(); } + Arena* arena() const { return arena_; } void Commit() { auto on_commit = std::move(on_commit_); @@ -433,7 +434,6 @@ class ClientChannelFilter::LoadBalancedCall class Metadata; class BackendMetricAccessor; - virtual Arena* arena() const = 0; virtual grpc_polling_entity* pollent() = 0; virtual grpc_metadata_batch* send_initial_metadata() const = 0; @@ -460,6 +460,7 @@ class ClientChannelFilter::LoadBalancedCall std::unique_ptr lb_subchannel_call_tracker_; grpc_call_context_element* const call_context_; + Arena* const arena_; }; class ClientChannelFilter::FilterBasedLoadBalancedCall final @@ -495,7 +496,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final using LoadBalancedCall::chand; using LoadBalancedCall::Commit; - Arena* arena() const override { return arena_; } grpc_polling_entity* pollent() override { return pollent_; } grpc_metadata_batch* send_initial_metadata() const override { return pending_batches_[0] @@ -550,7 +550,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final // TODO(roth): Instead of duplicating these fields in every filter // that uses any one of them, we should store them in the call // context. This will save per-call memory overhead. - Arena* arena_; grpc_call_stack* owning_call_; CallCombiner* call_combiner_; grpc_polling_entity* pollent_; @@ -598,7 +597,6 @@ class ClientChannelFilter::PromiseBasedLoadBalancedCall final CallArgs call_args, OrphanablePtr lb_call); private: - Arena* arena() const override; grpc_polling_entity* pollent() override { return &pollent_; } grpc_metadata_batch* send_initial_metadata() const override; diff --git a/src/core/client_channel/load_balanced_call_destination.cc b/src/core/client_channel/load_balanced_call_destination.cc index 4d702a5858c..002a778e142 100644 --- a/src/core/client_channel/load_balanced_call_destination.cc +++ b/src/core/client_channel/load_balanced_call_destination.cc @@ -121,9 +121,7 @@ class LbCallState : public ClientChannelLbCallState { } ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override { - auto* legacy_context = GetContext(); - return static_cast( - legacy_context[GRPC_CONTEXT_CALL_TRACER].value); + return GetContext(); } }; diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index 43fa643bbb8..96272570aa4 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -119,9 +119,7 @@ MessageHandle ChannelCompression::CompressMessage( gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", message->payload()->Length(), algorithm, message->flags()); } - auto* call_context = GetContext(); - auto* call_tracer = static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER].value); + auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { call_tracer->RecordSendMessage(*message->payload()); } @@ -178,9 +176,7 @@ absl::StatusOr ChannelCompression::DecompressMessage( message->payload()->Length(), args.max_recv_message_length.value_or(-1), args.algorithm); } - auto* call_context = GetContext(); - auto* call_tracer = static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER].value); + auto* call_tracer = MaybeGetContext(); if (call_tracer != nullptr) { call_tracer->RecordReceivedMessage(*message->payload()); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 90cfdf3ad5e..a20e4c71c0d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -227,14 +227,13 @@ namespace { using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle; -grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) { - if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) { +grpc_core::CallTracerAnnotationInterface* CallTracerIfSampled( + grpc_chttp2_stream* s) { + if (!grpc_core::IsTraceRecordCallopsEnabled()) { return nullptr; } - auto* call_tracer = static_cast( - static_cast( - s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE] - .value); + auto* call_tracer = + s->arena->GetContext(); if (call_tracer == nullptr || !call_tracer->IsSampled()) { return nullptr; } @@ -243,13 +242,11 @@ grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) { std::shared_ptr TcpTracerIfSampled( grpc_chttp2_stream* s) { - if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) { + if (!grpc_core::IsTraceRecordCallopsEnabled()) { return nullptr; } - auto* call_attempt_tracer = static_cast( - static_cast( - s->context)[GRPC_CONTEXT_CALL_TRACER] - .value); + auto* call_attempt_tracer = + s->arena->GetContext(); if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) { return nullptr; } @@ -391,10 +388,10 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed"); // ContextList::Execute follows semantics of a callback function and does not // take a ref on error - if (cl != nullptr) { - grpc_core::ForEachContextListEntryExecute(cl, nullptr, error); + if (context_list != nullptr) { + grpc_core::ForEachContextListEntryExecute(context_list, nullptr, error); } - cl = nullptr; + context_list = nullptr; grpc_slice_buffer_destroy(&read_buffer); grpc_chttp2_goaway_parser_destroy(&goaway_parser); @@ -617,7 +614,7 @@ grpc_chttp2_transport::grpc_chttp2_transport( &memory_owner), deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0), is_client(is_client) { - cl = new grpc_core::ContextList(); + context_list = new grpc_core::ContextList(); CHECK(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); @@ -784,7 +781,8 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, - const void* server_data) + const void* server_data, + grpc_core::Arena* arena) : t(t->Ref()), refcount([refcount]() { // We reserve one 'active stream' that's dropped when the stream is @@ -798,6 +796,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, #endif return refcount; }()), + arena(arena), flow_control(&t->flow_control) { t->streams_allocated.fetch_add(1, std::memory_order_relaxed); if (server_data) { @@ -855,8 +854,8 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { void grpc_chttp2_transport::InitStream(grpc_stream* gs, grpc_stream_refcount* refcount, const void* server_data, - grpc_core::Arena*) { - new (gs) grpc_chttp2_stream(this, refcount, server_data); + grpc_core::Arena* arena) { + new (gs) grpc_chttp2_stream(this, refcount, server_data, arena); } static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) { @@ -1015,13 +1014,13 @@ static void write_action_begin_locked( } static void write_action(grpc_chttp2_transport* t) { - void* cl = t->cl; - if (!t->cl->empty()) { + void* cl = t->context_list; + if (!t->context_list->empty()) { // Transfer the ownership of the context list to the endpoint and create and // associate a new context list with the transport. // The old context list is stored in the cl local variable which is passed // to the endpoint. Its upto the endpoint to manage its lifetime. - t->cl = new grpc_core::ContextList(); + t->context_list = new grpc_core::ContextList(); } else { // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl // to nullptr. @@ -1376,7 +1375,7 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { - if (s->call_tracer) { + if (s->call_tracer != nullptr) { s->call_tracer->RecordAnnotation( grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, gpr_now(GPR_CLOCK_REALTIME)) diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ed596278686..ea68725210d 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -466,7 +466,7 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport, grpc_chttp2_keepalive_state keepalive_state; // Soft limit on max header size. uint32_t max_header_list_size_soft_limit = 0; - grpc_core::ContextList* cl = nullptr; + grpc_core::ContextList* context_list = nullptr; grpc_core::RefCountedPtr channelz_socket; uint32_t num_messages_in_next_write = 0; /// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and @@ -545,12 +545,13 @@ typedef enum { struct grpc_chttp2_stream { grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, - const void* server_data); + const void* server_data, grpc_core::Arena* arena); ~grpc_chttp2_stream(); void* context = nullptr; const grpc_core::RefCountedPtr t; grpc_stream_refcount* refcount; + grpc_core::Arena* const arena; grpc_closure destroy_stream; grpc_closure* destroy_stream_arg; @@ -643,7 +644,7 @@ struct grpc_chttp2_stream { int64_t write_counter = 0; /// Only set when enabled. - grpc_core::CallTracerInterface* call_tracer = nullptr; + grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr; /// Only set when enabled. std::shared_ptr tcp_tracer; diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 856d3bcb220..8e81482d24d 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -942,13 +942,8 @@ grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser, grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr; if (s != nullptr) { s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice); - - if (s->context != nullptr) { - call_tracer = static_cast( - static_cast( - s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE] - .value); - } + call_tracer = + s->arena->GetContext(); } grpc_error_handle error = parser->Parse( slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 0cc28270314..a40d7c54359 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -682,10 +682,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_core::GrpcHttp2GetCopyContextFn(); if (copy_context_fn != nullptr && grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) { - t->cl->emplace_back(copy_context_fn(s->context), - outbuf_relative_start_pos, num_stream_bytes, - s->byte_counter, s->write_counter - 1, - s->tcp_tracer); + t->context_list->emplace_back(copy_context_fn(s->context), + outbuf_relative_start_pos, + num_stream_bytes, s->byte_counter, + s->write_counter - 1, s->tcp_tracer); } } outbuf_relative_start_pos += num_stream_bytes; diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 4e3cdae4bfd..100f3927def 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -35,16 +35,6 @@ typedef enum { /// Value is a \a census_context. GRPC_CONTEXT_TRACING, - /// Value is a CallTracerAnnotationInterface. (ClientCallTracer object on the - /// client-side call, or ServerCallTracer on the server-side.) - GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, - - /// Value is a CallTracerInterface (ServerCallTracer on the server-side, - /// CallAttemptTracer on a subchannel call.) - /// TODO(yashykt): Maybe come up with a better name. This will go away in the - /// future anyway, so not super important. - GRPC_CONTEXT_CALL_TRACER, - /// Holds a pointer to ServiceConfigCallData associated with this call. GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA, @@ -62,8 +52,6 @@ struct grpc_call_context_element { namespace grpc_core { class Call; -class CallTracerAnnotationInterface; -class CallTracerInterface; class ServiceConfigCallData; // Bind the legacy context array into the new style structure @@ -82,17 +70,6 @@ struct OldStyleContext { static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL; }; -template <> -struct OldStyleContext { - static constexpr grpc_context_index kIndex = - GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE; -}; - -template <> -struct OldStyleContext { - static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL_TRACER; -}; - template <> struct OldStyleContext { static constexpr grpc_context_index kIndex = diff --git a/src/core/lib/promise/cancel_callback.h b/src/core/lib/promise/cancel_callback.h index f4f002b8f37..92ac2019d4f 100644 --- a/src/core/lib/promise/cancel_callback.h +++ b/src/core/lib/promise/cancel_callback.h @@ -17,7 +17,9 @@ #include +#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_like.h" +#include "src/core/lib/resource_quota/arena.h" namespace grpc_core { @@ -31,6 +33,7 @@ class Handler { Handler& operator=(const Handler&) = delete; ~Handler() { if (!done_) { + promise_detail::Context ctx(arena_.get()); fn_(); } } @@ -48,6 +51,13 @@ class Handler { private: Fn fn_; + // Since cancellation happens at destruction time we need to either capture + // context here (via the arena), or make sure that no promise is destructed + // without an Arena context on the stack. The latter is an eternal game of + // whackamole, so we're choosing the former for now. + // TODO(ctiller): re-evaluate at some point in the future. + RefCountedPtr arena_ = + HasContext() ? GetContext()->Ref() : nullptr; bool done_ = false; }; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 5f0d4a99310..b793bba7df9 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -755,7 +755,7 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, GrpcRegisteredMethod(), reinterpret_cast(static_cast( args->registered_method))); channel_stack->stats_plugin_group->AddClientCallTracers( - Slice(CSliceRef(path)), args->registered_method, call->context_); + Slice(CSliceRef(path)), args->registered_method, call->GetArena()); } else { global_stats().IncrementServerCallsCreated(); call->final_op_.server.cancelled = nullptr; @@ -778,12 +778,11 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future // promise-based world, we would just a single tracer object for each // stack (call, subchannel_call, server_call.) - call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, - server_call_tracer, nullptr); - call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); + arena->SetContext(server_call_tracer); + arena->SetContext(server_call_tracer); } } - channel_stack->stats_plugin_group->AddServerCallTracers(call->context_); + channel_stack->stats_plugin_group->AddServerCallTracers(arena.get()); } Call* parent = Call::FromC(args->parent); @@ -1230,8 +1229,7 @@ FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl( *pslot = bctl; } bctl->call_ = this; - bctl->call_tracer_ = static_cast( - ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); + bctl->call_tracer_ = arena()->GetContext(); bctl->op_.payload = &stream_op_payload_; return bctl; } @@ -2612,7 +2610,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall { } ScopedContext context(this); args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers( - *args->path, args->registered_method, this->context()); + *args->path, args->registered_method, GetArena()); send_initial_metadata_ = Arena::MakePooled(); send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path)); if (args->authority.has_value()) { @@ -3762,6 +3760,19 @@ void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) { return grpc_core::Call::FromC(call)->ContextGet(elem); } +void grpc_call_tracer_set(grpc_call* call, + grpc_core::ClientCallTracer* tracer) { + grpc_core::Arena* arena = grpc_call_get_arena(call); + return arena->SetContext(tracer); +} + +void* grpc_call_tracer_get(grpc_call* call) { + grpc_core::Arena* arena = grpc_call_get_arena(call); + auto* call_tracer = + arena->GetContext(); + return call_tracer; +} + uint8_t grpc_call_is_client(grpc_call* call) { return grpc_core::Call::FromC(call)->is_client(); } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index d464de2ec89..0d557629388 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -338,6 +338,10 @@ void grpc_call_context_set(grpc_call* call, grpc_context_index elem, // Get a context pointer. void* grpc_call_context_get(grpc_call* call, grpc_context_index elem); +void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer); + +void* grpc_call_tracer_get(grpc_call* call); + #define GRPC_CALL_LOG_BATCH(sev, ops, nops) \ do { \ if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace)) { \ diff --git a/src/core/server/server_call_tracer_filter.cc b/src/core/server/server_call_tracer_filter.cc index 7a63b4768b6..efe05fc1f0c 100644 --- a/src/core/server/server_call_tracer_filter.cc +++ b/src/core/server/server_call_tracer_filter.cc @@ -56,25 +56,25 @@ class ServerCallTracerFilter class Call { public: void OnClientInitialMetadata(ClientMetadata& client_initial_metadata) { - auto* call_tracer = CallTracer(); + auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; call_tracer->RecordReceivedInitialMetadata(&client_initial_metadata); } void OnServerInitialMetadata(ServerMetadata& server_initial_metadata) { - auto* call_tracer = CallTracer(); + auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; call_tracer->RecordSendInitialMetadata(&server_initial_metadata); } void OnFinalize(const grpc_call_final_info* final_info) { - auto* call_tracer = CallTracer(); + auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; call_tracer->RecordEnd(final_info); } void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata) { - auto* call_tracer = CallTracer(); + auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; call_tracer->RecordSendTrailingMetadata(&server_trailing_metadata); } @@ -82,13 +82,6 @@ class ServerCallTracerFilter static const NoInterceptor OnClientToServerMessage; static const NoInterceptor OnClientToServerHalfClose; static const NoInterceptor OnServerToClientMessage; - - private: - static ServerCallTracer* CallTracer() { - auto* call_context = GetContext(); - return static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER].value); - } }; }; diff --git a/src/core/telemetry/call_tracer.cc b/src/core/telemetry/call_tracer.cc index 093664e85f6..4b70f220395 100644 --- a/src/core/telemetry/call_tracer.cc +++ b/src/core/telemetry/call_tracer.cc @@ -299,63 +299,53 @@ class DelegatingServerCallTracer : public ServerCallTracer { std::vector tracers_; }; -void AddClientCallTracerToContext(grpc_call_context_element* call_context, - ClientCallTracer* tracer) { - if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value == - nullptr) { +void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer) { + if (arena->GetContext() == nullptr) { // This is the first call tracer. Set it directly. - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer; - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = - nullptr; + arena->SetContext(tracer); } else { // There was already a call tracer present. - auto* orig_tracer = static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); + auto* orig_tracer = DownCast( + arena->GetContext()); if (orig_tracer->IsDelegatingTracer()) { // We already created a delegating tracer. Just add the new tracer to the // list. - static_cast(orig_tracer)->AddTracer(tracer); + DownCast(orig_tracer)->AddTracer(tracer); } else { // Create a new delegating tracer and add the first tracer and the new // tracer to the list. auto* delegating_tracer = GetContext()->ManagedNew( orig_tracer); - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = - delegating_tracer; + arena->SetContext(delegating_tracer); delegating_tracer->AddTracer(tracer); } } } -void AddServerCallTracerToContext(grpc_call_context_element* call_context, - ServerCallTracer* tracer) { - DCHECK(call_context[GRPC_CONTEXT_CALL_TRACER].value == - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); - if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value == - nullptr) { +void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer) { + DCHECK_EQ(arena->GetContext(), + arena->GetContext()); + if (arena->GetContext() == nullptr) { // This is the first call tracer. Set it directly. - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer; - call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer; - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = - nullptr; + arena->SetContext(tracer); + arena->SetContext(tracer); } else { // There was already a call tracer present. - auto* orig_tracer = static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); + auto* orig_tracer = DownCast( + arena->GetContext()); if (orig_tracer->IsDelegatingTracer()) { // We already created a delegating tracer. Just add the new tracer to the // list. - static_cast(orig_tracer)->AddTracer(tracer); + DownCast(orig_tracer)->AddTracer(tracer); } else { // Create a new delegating tracer and add the first tracer and the new // tracer to the list. auto* delegating_tracer = GetContext()->ManagedNew( orig_tracer); - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = - delegating_tracer; - call_context[GRPC_CONTEXT_CALL_TRACER].value = delegating_tracer; + arena->SetContext(delegating_tracer); + arena->SetContext(delegating_tracer); delegating_tracer->AddTracer(tracer); } } diff --git a/src/core/telemetry/call_tracer.h b/src/core/telemetry/call_tracer.h index 967658fa36b..1053777d85d 100644 --- a/src/core/telemetry/call_tracer.h +++ b/src/core/telemetry/call_tracer.h @@ -214,13 +214,21 @@ class ServerCallTracerFactory { // Convenience functions to add call tracers to a call context. Allows setting // multiple call tracers to a single call. It is only valid to add client call // tracers before the client_channel filter sees the send_initial_metadata op. -void AddClientCallTracerToContext(grpc_call_context_element* call_context, - ClientCallTracer* tracer); +void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer); // TODO(yashykt): We want server call tracers to be registered through the // ServerCallTracerFactory, which has yet to be made into a list. -void AddServerCallTracerToContext(grpc_call_context_element* call_context, - ServerCallTracer* tracer); +void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer); + +template <> +struct ArenaContextType { + static void Destroy(CallTracerAnnotationInterface*) {} +}; + +template <> +struct ArenaContextType { + static void Destroy(CallTracerAnnotationInterface*) {} +}; template <> struct ContextSubclass { diff --git a/src/core/telemetry/metrics.cc b/src/core/telemetry/metrics.cc index b403a8649f2..db47f1f8608 100644 --- a/src/core/telemetry/metrics.cc +++ b/src/core/telemetry/metrics.cc @@ -100,23 +100,22 @@ RegisteredMetricCallback::~RegisteredMetricCallback() { } void GlobalStatsPluginRegistry::StatsPluginGroup::AddClientCallTracers( - const Slice& path, bool registered_method, - grpc_call_context_element* call_context) { + const Slice& path, bool registered_method, Arena* arena) { for (auto& state : plugins_state_) { auto* call_tracer = state.plugin->GetClientCallTracer( path, registered_method, state.scope_config); if (call_tracer != nullptr) { - AddClientCallTracerToContext(call_context, call_tracer); + AddClientCallTracerToContext(arena, call_tracer); } } } void GlobalStatsPluginRegistry::StatsPluginGroup::AddServerCallTracers( - grpc_call_context_element* call_context) { + Arena* arena) { for (auto& state : plugins_state_) { auto* call_tracer = state.plugin->GetServerCallTracer(state.scope_config); if (call_tracer != nullptr) { - AddServerCallTracerToContext(call_context, call_tracer); + AddServerCallTracerToContext(arena, call_tracer); } } } diff --git a/src/core/telemetry/metrics.h b/src/core/telemetry/metrics.h index 2fb92d4597e..8551b74bc62 100644 --- a/src/core/telemetry/metrics.h +++ b/src/core/telemetry/metrics.h @@ -464,10 +464,10 @@ class GlobalStatsPluginRegistry { // Adds all available client call tracers associated with the stats plugins // within the group to \a call_context. void AddClientCallTracers(const Slice& path, bool registered_method, - grpc_call_context_element* call_context); + Arena* arena); // Adds all available server call tracers associated with the stats plugins // within the group to \a call_context. - void AddServerCallTracers(grpc_call_context_element* call_context); + void AddServerCallTracers(Arena* arena); private: friend class RegisteredMetricCallback; diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index efb191da95d..4733040f152 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -100,17 +100,15 @@ OpenCensusClientFilter::MakeCallPromise( grpc_core::NextPromiseFactory next_promise_factory) { auto* path = call_args.client_initial_metadata->get_pointer( grpc_core::HttpPathMetadata()); - auto* call_context = grpc_core::GetContext(); - auto* tracer = - grpc_core::GetContext() - ->ManagedNew( - call_context, path != nullptr ? path->Ref() : grpc_core::Slice(), - grpc_core::GetContext(), - OpenCensusTracingEnabled() && tracing_enabled_); - DCHECK(call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value == - nullptr); - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer; - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr; + auto* arena = grpc_core::GetContext(); + auto* tracer = arena->ManagedNew( + grpc_core::GetContext(), + path != nullptr ? path->Ref() : grpc_core::Slice(), + grpc_core::GetContext(), + OpenCensusTracingEnabled() && tracing_enabled_); + DCHECK_EQ(arena->GetContext(), + nullptr); + grpc_core::SetContext(tracer); return next_promise_factory(std::move(call_args)); } @@ -424,9 +422,9 @@ class OpenCensusClientInterceptor : public grpc::experimental::Interceptor { grpc::experimental::InterceptorBatchMethods* methods) override { if (methods->QueryInterceptionHookPoint( grpc::experimental::InterceptionHookPoints::POST_RECV_STATUS)) { - auto* tracer = static_cast( - grpc_call_context_get(info_->client_context()->c_call(), - GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); + auto* tracer = grpc_core::DownCast( + grpc_call_get_arena(info_->client_context()->c_call()) + ->GetContext()); if (tracer != nullptr) { tracer->RecordApiLatency(absl::Now() - start_time_, static_cast( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 5f5afcfc944..e1467aeb565 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -63,6 +63,9 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core": cdef cppclass ClientCallTracer: pass + cdef cppclass CallTracerAnnotationInterface: + pass + cdef cppclass ServerCallTracer: string TraceId() nogil string SpanId() nogil @@ -72,14 +75,10 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core": @staticmethod void RegisterGlobal(ServerCallTracerFactory* factory) nogil -cdef extern from "src/core/lib/channel/context.h": - ctypedef enum grpc_context_index: - GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE - cdef extern from "src/core/lib/surface/call.h": - void grpc_call_context_set(grpc_call* call, grpc_context_index elem, - void* value, void (*destroy)(void* value)) nogil - void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil + void grpc_call_tracer_set(grpc_call* call, void* value) nogil + + void* grpc_call_tracer_get(grpc_call* call) nogil cdef extern from "grpc/support/alloc.h": diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi index a29ccdd9376..5f62a0efe64 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi @@ -50,11 +50,11 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None: cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr): cdef ClientCallTracer* call_tracer = capsule_ptr - grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL) + grpc_call_tracer_set(call, call_tracer) cdef void* _get_call_tracer(grpc_call* call): - cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE) + cdef void* call_tracer = grpc_call_tracer_get(call) return call_tracer diff --git a/test/core/telemetry/call_tracer_test.cc b/test/core/telemetry/call_tracer_test.cc index 3adbc5d6204..c9af4d745b4 100644 --- a/test/core/telemetry/call_tracer_test.cc +++ b/test/core/telemetry/call_tracer_test.cc @@ -38,16 +38,13 @@ namespace { class CallTracerTest : public ::testing::Test { protected: RefCountedPtr arena_ = SimpleArenaAllocator()->MakeArena(); - grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; std::vector annotation_logger_; }; TEST_F(CallTracerTest, BasicClientCallTracer) { FakeClientCallTracer client_call_tracer(&annotation_logger_); - AddClientCallTracerToContext(context_, &client_call_tracer); - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value) - ->RecordAnnotation("Test"); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer); + arena_->GetContext()->RecordAnnotation("Test"); EXPECT_EQ(annotation_logger_, std::vector{"Test"}); } @@ -56,12 +53,10 @@ TEST_F(CallTracerTest, MultipleClientCallTracers) { FakeClientCallTracer client_call_tracer1(&annotation_logger_); FakeClientCallTracer client_call_tracer2(&annotation_logger_); FakeClientCallTracer client_call_tracer3(&annotation_logger_); - AddClientCallTracerToContext(context_, &client_call_tracer1); - AddClientCallTracerToContext(context_, &client_call_tracer2); - AddClientCallTracerToContext(context_, &client_call_tracer3); - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value) - ->RecordAnnotation("Test"); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer1); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer2); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer3); + arena_->GetContext()->RecordAnnotation("Test"); EXPECT_EQ(annotation_logger_, std::vector({"Test", "Test", "Test"})); } @@ -71,12 +66,12 @@ TEST_F(CallTracerTest, MultipleClientCallAttemptTracers) { FakeClientCallTracer client_call_tracer1(&annotation_logger_); FakeClientCallTracer client_call_tracer2(&annotation_logger_); FakeClientCallTracer client_call_tracer3(&annotation_logger_); - AddClientCallTracerToContext(context_, &client_call_tracer1); - AddClientCallTracerToContext(context_, &client_call_tracer2); - AddClientCallTracerToContext(context_, &client_call_tracer3); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer1); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer2); + AddClientCallTracerToContext(arena_.get(), &client_call_tracer3); auto* attempt_tracer = - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value) + DownCast( + arena_->GetContext()) ->StartNewAttempt(true /* is_transparent_retry */); attempt_tracer->RecordAnnotation("Test"); EXPECT_EQ(annotation_logger_, @@ -86,13 +81,9 @@ TEST_F(CallTracerTest, MultipleClientCallAttemptTracers) { TEST_F(CallTracerTest, BasicServerCallTracerTest) { FakeServerCallTracer server_call_tracer(&annotation_logger_); - AddServerCallTracerToContext(context_, &server_call_tracer); - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER].value) - ->RecordAnnotation("Test"); - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value) - ->RecordAnnotation("Test"); + AddServerCallTracerToContext(arena_.get(), &server_call_tracer); + arena_->GetContext()->RecordAnnotation("Test"); + arena_->GetContext()->RecordAnnotation("Test"); EXPECT_EQ(annotation_logger_, std::vector({"Test", "Test"})); } @@ -101,12 +92,10 @@ TEST_F(CallTracerTest, MultipleServerCallTracers) { FakeServerCallTracer server_call_tracer1(&annotation_logger_); FakeServerCallTracer server_call_tracer2(&annotation_logger_); FakeServerCallTracer server_call_tracer3(&annotation_logger_); - AddServerCallTracerToContext(context_, &server_call_tracer1); - AddServerCallTracerToContext(context_, &server_call_tracer2); - AddServerCallTracerToContext(context_, &server_call_tracer3); - static_cast( - context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value) - ->RecordAnnotation("Test"); + AddServerCallTracerToContext(arena_.get(), &server_call_tracer1); + AddServerCallTracerToContext(arena_.get(), &server_call_tracer2); + AddServerCallTracerToContext(arena_.get(), &server_call_tracer3); + arena_->GetContext()->RecordAnnotation("Test"); EXPECT_EQ(annotation_logger_, std::vector({"Test", "Test", "Test"})); } diff --git a/test/core/test_util/fake_stats_plugin.cc b/test/core/test_util/fake_stats_plugin.cc index abc0a6a82ff..59546f9ce45 100644 --- a/test/core/test_util/fake_stats_plugin.cc +++ b/test/core/test_util/fake_stats_plugin.cc @@ -57,11 +57,7 @@ ArenaPromise FakeStatsClientFilter::MakeCallPromise( FakeClientCallTracer* client_call_tracer = fake_client_call_tracer_factory_->CreateFakeClientCallTracer(); if (client_call_tracer != nullptr) { - auto* call_context = GetContext(); - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = - client_call_tracer; - call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = - nullptr; + SetContext(client_call_tracer); } return next_promise_factory(std::move(call_args)); } diff --git a/test/cpp/ext/otel/otel_test_library.cc b/test/cpp/ext/otel/otel_test_library.cc index 40153f13dda..338527903b4 100644 --- a/test/cpp/ext/otel/otel_test_library.cc +++ b/test/cpp/ext/otel/otel_test_library.cc @@ -68,9 +68,7 @@ class AddLabelsFilter : public grpc_core::ChannelFilter { grpc_core::CallArgs call_args, grpc_core::NextPromiseFactory next_promise_factory) override { using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer; - auto* call_context = grpc_core::GetContext(); - auto* call_tracer = static_cast( - call_context[GRPC_CONTEXT_CALL_TRACER].value); + auto* call_tracer = grpc_core::GetContext(); EXPECT_NE(call_tracer, nullptr); for (const auto& pair : labels_to_inject_) { call_tracer->SetOptionalLabel(pair.first, pair.second);