[context] Move legacy tracing contexts to arena contexts (#36776)

Closes #36776

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36776 from ctiller:ctx2 8be4cdcf43
PiperOrigin-RevId: 639133808
pull/36779/head^2
Craig Tiller 6 months ago committed by Copybara-Service
parent d09edb126f
commit 53540ae5d6
  1. 42
      CMakeLists.txt
  2. 110
      build_autogenerated.yaml
  3. 2
      src/core/BUILD
  4. 36
      src/core/client_channel/client_channel_filter.cc
  5. 12
      src/core/client_channel/client_channel_filter.h
  6. 4
      src/core/client_channel/load_balanced_call_destination.cc
  7. 8
      src/core/ext/filters/http/message_compress/compression_filter.cc
  8. 43
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  9. 7
      src/core/ext/transport/chttp2/transport/internal.h
  10. 9
      src/core/ext/transport/chttp2/transport/parsing.cc
  11. 8
      src/core/ext/transport/chttp2/transport/writing.cc
  12. 23
      src/core/lib/channel/context.h
  13. 10
      src/core/lib/promise/cancel_callback.h
  14. 27
      src/core/lib/surface/call.cc
  15. 4
      src/core/lib/surface/call.h
  16. 15
      src/core/server/server_call_tracer_filter.cc
  17. 46
      src/core/telemetry/call_tracer.cc
  18. 16
      src/core/telemetry/call_tracer.h
  19. 9
      src/core/telemetry/metrics.cc
  20. 4
      src/core/telemetry/metrics.h
  21. 26
      src/cpp/ext/filters/census/client_filter.cc
  22. 13
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  23. 4
      src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
  24. 47
      test/core/telemetry/call_tracer_test.cc
  25. 6
      test/core/test_util/fake_stats_plugin.cc
  26. 4
      test/cpp/ext/otel/otel_test_library.cc

42
CMakeLists.txt generated

@ -9128,7 +9128,43 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(cancel_callback_test
src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
src/core/lib/debug/trace.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/closure.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/trace.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/connection_quota.cc
src/core/lib/resource_quota/memory_quota.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/resource_quota/resource_quota.cc
src/core/lib/resource_quota/thread_quota.cc
src/core/lib/resource_quota/trace.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
test/core/promise/cancel_callback_test.cc
third_party/upb/upb/mini_descriptor/build_enum.c
third_party/upb/upb/mini_descriptor/decode.c
third_party/upb/upb/mini_descriptor/internal/base92.c
third_party/upb/upb/mini_descriptor/internal/encode.c
third_party/upb/upb/mini_descriptor/link.c
third_party/upb/upb/wire/decode.c
third_party/upb/upb/wire/encode.c
third_party/upb/upb/wire/eps_copy_input_stream.c
third_party/upb/upb/wire/internal/decode_fast.c
third_party/upb/upb/wire/reader.c
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
@ -9161,7 +9197,13 @@ target_include_directories(cancel_callback_test
target_link_libraries(cancel_callback_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
utf8_range_lib
upb_message_lib
absl::config
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
gpr
)

@ -6974,14 +6974,124 @@ targets:
build: test
language: c++
headers:
- src/core/ext/upb-gen/google/protobuf/any.upb.h
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.h
- src/core/ext/upb-gen/google/rpc/status.upb.h
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.h
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/event_engine_context.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/promise/activity.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/connection_quota.h
- src/core/lib/resource_quota/memory_quota.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/resource_quota/resource_quota.h
- src/core/lib/resource_quota/thread_quota.h
- src/core/lib/resource_quota/trace.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
- third_party/upb/upb/mini_descriptor/build_enum.h
- third_party/upb/upb/mini_descriptor/decode.h
- third_party/upb/upb/mini_descriptor/internal/base92.h
- third_party/upb/upb/mini_descriptor/internal/decoder.h
- third_party/upb/upb/mini_descriptor/internal/encode.h
- third_party/upb/upb/mini_descriptor/internal/encode.hpp
- third_party/upb/upb/mini_descriptor/internal/modifiers.h
- third_party/upb/upb/mini_descriptor/internal/wire_constants.h
- third_party/upb/upb/mini_descriptor/link.h
- third_party/upb/upb/wire/decode.h
- third_party/upb/upb/wire/encode.h
- third_party/upb/upb/wire/eps_copy_input_stream.h
- third_party/upb/upb/wire/internal/constants.h
- third_party/upb/upb/wire/internal/decode_fast.h
- third_party/upb/upb/wire/internal/decoder.h
- third_party/upb/upb/wire/internal/reader.h
- third_party/upb/upb/wire/reader.h
- third_party/upb/upb/wire/types.h
src:
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
- src/core/lib/debug/trace.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/closure.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/trace.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/connection_quota.cc
- src/core/lib/resource_quota/memory_quota.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/resource_quota/resource_quota.cc
- src/core/lib/resource_quota/thread_quota.cc
- src/core/lib/resource_quota/trace.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/cancel_callback_test.cc
- third_party/upb/upb/mini_descriptor/build_enum.c
- third_party/upb/upb/mini_descriptor/decode.c
- third_party/upb/upb/mini_descriptor/internal/base92.c
- third_party/upb/upb/mini_descriptor/internal/encode.c
- third_party/upb/upb/mini_descriptor/link.c
- third_party/upb/upb/wire/decode.c
- third_party/upb/upb/wire/encode.c
- third_party/upb/upb/wire/eps_copy_input_stream.c
- third_party/upb/upb/wire/internal/decode_fast.c
- third_party/upb/upb/wire/reader.c
deps:
- gtest
- utf8_range_lib
- upb_message_lib
- absl/base:config
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- gpr
uses_polling: false
- name: cancel_in_a_vacuum_test

@ -685,6 +685,8 @@ grpc_cc_library(
"lib/promise/cancel_callback.h",
],
deps = [
"arena",
"context",
"promise_like",
"//:gpr_platform",
],

@ -2133,8 +2133,7 @@ absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
}
// If the call was queued, add trace annotation.
if (was_queued) {
auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
auto* call_tracer = arena()->GetContext<CallTracerAnnotationInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordAnnotation("Delayed name resolution complete.");
}
@ -2574,7 +2573,7 @@ class ClientChannelFilter::LoadBalancedCall::LbCallState final
public:
explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
// Internal API to allow first-party LB policies to access per-call
// attributes set by the ConfigSelector.
@ -2696,7 +2695,7 @@ class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
recv_trailing_metadata_ != nullptr) {
if (const auto* md = recv_trailing_metadata_->get_pointer(
EndpointLoadMetricsBinMetadata())) {
BackendMetricAllocator allocator(lb_call_->arena());
BackendMetricAllocator allocator(lb_call_->arena_);
lb_call_->backend_metric_data_ =
ParseBackendMetricData(md->as_string_view(), &allocator);
}
@ -2731,28 +2730,29 @@ class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
namespace {
void CreateCallAttemptTracer(grpc_call_context_element* context,
bool is_transparent_retry) {
auto* call_tracer = static_cast<ClientCallTracer*>(
context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) {
auto* call_tracer = DownCast<ClientCallTracer*>(
arena->GetContext<CallTracerAnnotationInterface>());
if (call_tracer == nullptr) return;
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
arena->SetContext<CallTracerInterface>(tracer);
}
} // namespace
ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
ClientChannelFilter* chand, grpc_call_context_element* call_context,
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
Arena* arena, absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry)
: InternallyRefCounted(
GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)
? "LoadBalancedCall"
: nullptr),
chand_(chand),
on_commit_(std::move(on_commit)),
call_context_(call_context) {
CreateCallAttemptTracer(call_context, is_transparent_retry);
call_context_(call_context),
arena_(arena) {
CreateCallAttemptTracer(arena, is_transparent_retry);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
}
@ -3019,9 +3019,8 @@ ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
ClientChannelFilter* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
: LoadBalancedCall(chand, args.context, std::move(on_commit),
: LoadBalancedCall(chand, args.context, args.arena, std::move(on_commit),
is_transparent_retry),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
pollent_(pollent),
@ -3464,7 +3463,7 @@ void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
SubchannelCall::Args call_args = {
connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
static_cast<Call*>(call_context()[GRPC_CONTEXT_CALL].value)->deadline(),
arena_,
arena(),
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
call_context(), call_combiner_};
@ -3494,7 +3493,8 @@ ClientChannelFilter::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall(
ClientChannelFilter* chand, absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry)
: LoadBalancedCall(chand, GetContext<grpc_call_context_element>(),
std::move(on_commit), is_transparent_retry) {}
GetContext<Arena>(), std::move(on_commit),
is_transparent_retry) {}
ArenaPromise<ServerMetadataHandle>
ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise(
@ -3610,10 +3610,6 @@ ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise(
});
}
Arena* ClientChannelFilter::PromiseBasedLoadBalancedCall::arena() const {
return GetContext<Arena>();
}
grpc_metadata_batch*
ClientChannelFilter::PromiseBasedLoadBalancedCall::send_initial_metadata()
const {

@ -372,7 +372,7 @@ class ClientChannelFilter::LoadBalancedCall
: public InternallyRefCounted<LoadBalancedCall, UnrefCallDtor> {
public:
LoadBalancedCall(ClientChannelFilter* chand,
grpc_call_context_element* call_context,
grpc_call_context_element* call_context, Arena* arena,
absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry);
~LoadBalancedCall() override;
@ -391,8 +391,8 @@ class ClientChannelFilter::LoadBalancedCall
protected:
ClientChannelFilter* chand() const { return chand_; }
ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const {
return static_cast<ClientCallTracer::CallAttemptTracer*>(
call_context_[GRPC_CONTEXT_CALL_TRACER].value);
return DownCast<ClientCallTracer::CallAttemptTracer*>(
arena_->GetContext<CallTracerInterface>());
}
ConnectedSubchannel* connected_subchannel() const {
return connected_subchannel_.get();
@ -401,6 +401,7 @@ class ClientChannelFilter::LoadBalancedCall
lb_subchannel_call_tracker() const {
return lb_subchannel_call_tracker_.get();
}
Arena* arena() const { return arena_; }
void Commit() {
auto on_commit = std::move(on_commit_);
@ -433,7 +434,6 @@ class ClientChannelFilter::LoadBalancedCall
class Metadata;
class BackendMetricAccessor;
virtual Arena* arena() const = 0;
virtual grpc_polling_entity* pollent() = 0;
virtual grpc_metadata_batch* send_initial_metadata() const = 0;
@ -460,6 +460,7 @@ class ClientChannelFilter::LoadBalancedCall
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
lb_subchannel_call_tracker_;
grpc_call_context_element* const call_context_;
Arena* const arena_;
};
class ClientChannelFilter::FilterBasedLoadBalancedCall final
@ -495,7 +496,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final
using LoadBalancedCall::chand;
using LoadBalancedCall::Commit;
Arena* arena() const override { return arena_; }
grpc_polling_entity* pollent() override { return pollent_; }
grpc_metadata_batch* send_initial_metadata() const override {
return pending_batches_[0]
@ -550,7 +550,6 @@ class ClientChannelFilter::FilterBasedLoadBalancedCall final
// TODO(roth): Instead of duplicating these fields in every filter
// that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead.
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_polling_entity* pollent_;
@ -598,7 +597,6 @@ class ClientChannelFilter::PromiseBasedLoadBalancedCall final
CallArgs call_args, OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call);
private:
Arena* arena() const override;
grpc_polling_entity* pollent() override { return &pollent_; }
grpc_metadata_batch* send_initial_metadata() const override;

@ -121,9 +121,7 @@ class LbCallState : public ClientChannelLbCallState {
}
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientCallTracer::CallAttemptTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER].value);
return GetContext<ClientCallTracer::CallAttemptTracer>();
}
};

@ -119,9 +119,7 @@ MessageHandle ChannelCompression::CompressMessage(
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
message->payload()->Length(), algorithm, message->flags());
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
}
@ -178,9 +176,7 @@ absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
message->payload()->Length(),
args.max_recv_message_length.value_or(-1), args.algorithm);
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
auto* call_tracer = MaybeGetContext<CallTracerInterface>();
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message->payload());
}

@ -227,14 +227,13 @@ namespace {
using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) {
if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
grpc_core::CallTracerAnnotationInterface* CallTracerIfSampled(
grpc_chttp2_stream* s) {
if (!grpc_core::IsTraceRecordCallopsEnabled()) {
return nullptr;
}
auto* call_tracer = static_cast<grpc_core::CallTracerInterface*>(
static_cast<grpc_call_context_element*>(
s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value);
auto* call_tracer =
s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
if (call_tracer == nullptr || !call_tracer->IsSampled()) {
return nullptr;
}
@ -243,13 +242,11 @@ grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) {
std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
grpc_chttp2_stream* s) {
if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
if (!grpc_core::IsTraceRecordCallopsEnabled()) {
return nullptr;
}
auto* call_attempt_tracer = static_cast<grpc_core::CallTracerInterface*>(
static_cast<grpc_call_context_element*>(
s->context)[GRPC_CONTEXT_CALL_TRACER]
.value);
auto* call_attempt_tracer =
s->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) {
return nullptr;
}
@ -391,10 +388,10 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
// ContextList::Execute follows semantics of a callback function and does not
// take a ref on error
if (cl != nullptr) {
grpc_core::ForEachContextListEntryExecute(cl, nullptr, error);
if (context_list != nullptr) {
grpc_core::ForEachContextListEntryExecute(context_list, nullptr, error);
}
cl = nullptr;
context_list = nullptr;
grpc_slice_buffer_destroy(&read_buffer);
grpc_chttp2_goaway_parser_destroy(&goaway_parser);
@ -617,7 +614,7 @@ grpc_chttp2_transport::grpc_chttp2_transport(
&memory_owner),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
is_client(is_client) {
cl = new grpc_core::ContextList();
context_list = new grpc_core::ContextList();
CHECK(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@ -784,7 +781,8 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
grpc_stream_refcount* refcount,
const void* server_data)
const void* server_data,
grpc_core::Arena* arena)
: t(t->Ref()),
refcount([refcount]() {
// We reserve one 'active stream' that's dropped when the stream is
@ -798,6 +796,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
#endif
return refcount;
}()),
arena(arena),
flow_control(&t->flow_control) {
t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
if (server_data) {
@ -855,8 +854,8 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
void grpc_chttp2_transport::InitStream(grpc_stream* gs,
grpc_stream_refcount* refcount,
const void* server_data,
grpc_core::Arena*) {
new (gs) grpc_chttp2_stream(this, refcount, server_data);
grpc_core::Arena* arena) {
new (gs) grpc_chttp2_stream(this, refcount, server_data, arena);
}
static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
@ -1015,13 +1014,13 @@ static void write_action_begin_locked(
}
static void write_action(grpc_chttp2_transport* t) {
void* cl = t->cl;
if (!t->cl->empty()) {
void* cl = t->context_list;
if (!t->context_list->empty()) {
// Transfer the ownership of the context list to the endpoint and create and
// associate a new context list with the transport.
// The old context list is stored in the cl local variable which is passed
// to the endpoint. Its upto the endpoint to manage its lifetime.
t->cl = new grpc_core::ContextList();
t->context_list = new grpc_core::ContextList();
} else {
// t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
// to nullptr.
@ -1376,7 +1375,7 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->send_initial_metadata) {
if (s->call_tracer) {
if (s->call_tracer != nullptr) {
s->call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))

@ -466,7 +466,7 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
grpc_chttp2_keepalive_state keepalive_state;
// Soft limit on max header size.
uint32_t max_header_list_size_soft_limit = 0;
grpc_core::ContextList* cl = nullptr;
grpc_core::ContextList* context_list = nullptr;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
uint32_t num_messages_in_next_write = 0;
/// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
@ -545,12 +545,13 @@ typedef enum {
struct grpc_chttp2_stream {
grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
const void* server_data);
const void* server_data, grpc_core::Arena* arena);
~grpc_chttp2_stream();
void* context = nullptr;
const grpc_core::RefCountedPtr<grpc_chttp2_transport> t;
grpc_stream_refcount* refcount;
grpc_core::Arena* const arena;
grpc_closure destroy_stream;
grpc_closure* destroy_stream_arg;
@ -643,7 +644,7 @@ struct grpc_chttp2_stream {
int64_t write_counter = 0;
/// Only set when enabled.
grpc_core::CallTracerInterface* call_tracer = nullptr;
grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
/// Only set when enabled.
std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer;

@ -942,13 +942,8 @@ grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
if (s != nullptr) {
s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice);
if (s->context != nullptr) {
call_tracer = static_cast<grpc_core::CallTracerAnnotationInterface*>(
static_cast<grpc_call_context_element*>(
s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value);
}
call_tracer =
s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
}
grpc_error_handle error = parser->Parse(
slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer);

@ -682,10 +682,10 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_core::GrpcHttp2GetCopyContextFn();
if (copy_context_fn != nullptr &&
grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) {
t->cl->emplace_back(copy_context_fn(s->context),
outbuf_relative_start_pos, num_stream_bytes,
s->byte_counter, s->write_counter - 1,
s->tcp_tracer);
t->context_list->emplace_back(copy_context_fn(s->context),
outbuf_relative_start_pos,
num_stream_bytes, s->byte_counter,
s->write_counter - 1, s->tcp_tracer);
}
}
outbuf_relative_start_pos += num_stream_bytes;

@ -35,16 +35,6 @@ typedef enum {
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
/// Value is a CallTracerAnnotationInterface. (ClientCallTracer object on the
/// client-side call, or ServerCallTracer on the server-side.)
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
/// Value is a CallTracerInterface (ServerCallTracer on the server-side,
/// CallAttemptTracer on a subchannel call.)
/// TODO(yashykt): Maybe come up with a better name. This will go away in the
/// future anyway, so not super important.
GRPC_CONTEXT_CALL_TRACER,
/// Holds a pointer to ServiceConfigCallData associated with this call.
GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA,
@ -62,8 +52,6 @@ struct grpc_call_context_element {
namespace grpc_core {
class Call;
class CallTracerAnnotationInterface;
class CallTracerInterface;
class ServiceConfigCallData;
// Bind the legacy context array into the new style structure
@ -82,17 +70,6 @@ struct OldStyleContext<Call> {
static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL;
};
template <>
struct OldStyleContext<CallTracerAnnotationInterface> {
static constexpr grpc_context_index kIndex =
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE;
};
template <>
struct OldStyleContext<CallTracerInterface> {
static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL_TRACER;
};
template <>
struct OldStyleContext<ServiceConfigCallData> {
static constexpr grpc_context_index kIndex =

@ -17,7 +17,9 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
@ -31,6 +33,7 @@ class Handler {
Handler& operator=(const Handler&) = delete;
~Handler() {
if (!done_) {
promise_detail::Context<Arena> ctx(arena_.get());
fn_();
}
}
@ -48,6 +51,13 @@ class Handler {
private:
Fn fn_;
// Since cancellation happens at destruction time we need to either capture
// context here (via the arena), or make sure that no promise is destructed
// without an Arena context on the stack. The latter is an eternal game of
// whackamole, so we're choosing the former for now.
// TODO(ctiller): re-evaluate at some point in the future.
RefCountedPtr<Arena> arena_ =
HasContext<Arena>() ? GetContext<Arena>()->Ref() : nullptr;
bool done_ = false;
};

@ -755,7 +755,7 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
GrpcRegisteredMethod(), reinterpret_cast<void*>(static_cast<uintptr_t>(
args->registered_method)));
channel_stack->stats_plugin_group->AddClientCallTracers(
Slice(CSliceRef(path)), args->registered_method, call->context_);
Slice(CSliceRef(path)), args->registered_method, call->GetArena());
} else {
global_stats().IncrementServerCallsCreated();
call->final_op_.server.cancelled = nullptr;
@ -778,12 +778,11 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
// GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
// promise-based world, we would just a single tracer object for each
// stack (call, subchannel_call, server_call.)
call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
server_call_tracer, nullptr);
call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
arena->SetContext<CallTracerAnnotationInterface>(server_call_tracer);
arena->SetContext<CallTracerInterface>(server_call_tracer);
}
}
channel_stack->stats_plugin_group->AddServerCallTracers(call->context_);
channel_stack->stats_plugin_group->AddServerCallTracers(arena.get());
}
Call* parent = Call::FromC(args->parent);
@ -1230,8 +1229,7 @@ FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
*pslot = bctl;
}
bctl->call_ = this;
bctl->call_tracer_ = static_cast<CallTracerAnnotationInterface*>(
ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
bctl->call_tracer_ = arena()->GetContext<CallTracerAnnotationInterface>();
bctl->op_.payload = &stream_op_payload_;
return bctl;
}
@ -2612,7 +2610,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
}
ScopedContext context(this);
args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers(
*args->path, args->registered_method, this->context());
*args->path, args->registered_method, GetArena());
send_initial_metadata_ = Arena::MakePooled<ClientMetadata>();
send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path));
if (args->authority.has_value()) {
@ -3762,6 +3760,19 @@ void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
return grpc_core::Call::FromC(call)->ContextGet(elem);
}
void grpc_call_tracer_set(grpc_call* call,
grpc_core::ClientCallTracer* tracer) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}
void* grpc_call_tracer_get(grpc_call* call) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
auto* call_tracer =
arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
return call_tracer;
}
uint8_t grpc_call_is_client(grpc_call* call) {
return grpc_core::Call::FromC(call)->is_client();
}

@ -338,6 +338,10 @@ void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
// Get a context pointer.
void* grpc_call_context_get(grpc_call* call, grpc_context_index elem);
void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);
void* grpc_call_tracer_get(grpc_call* call);
#define GRPC_CALL_LOG_BATCH(sev, ops, nops) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace)) { \

@ -56,25 +56,25 @@ class ServerCallTracerFilter
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& client_initial_metadata) {
auto* call_tracer = CallTracer();
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordReceivedInitialMetadata(&client_initial_metadata);
}
void OnServerInitialMetadata(ServerMetadata& server_initial_metadata) {
auto* call_tracer = CallTracer();
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordSendInitialMetadata(&server_initial_metadata);
}
void OnFinalize(const grpc_call_final_info* final_info) {
auto* call_tracer = CallTracer();
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordEnd(final_info);
}
void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata) {
auto* call_tracer = CallTracer();
auto* call_tracer = MaybeGetContext<ServerCallTracer>();
if (call_tracer == nullptr) return;
call_tracer->RecordSendTrailingMetadata(&server_trailing_metadata);
}
@ -82,13 +82,6 @@ class ServerCallTracerFilter
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerToClientMessage;
private:
static ServerCallTracer* CallTracer() {
auto* call_context = GetContext<grpc_call_context_element>();
return static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
}
};
};

@ -299,63 +299,53 @@ class DelegatingServerCallTracer : public ServerCallTracer {
std::vector<ServerCallTracer*> tracers_;
};
void AddClientCallTracerToContext(grpc_call_context_element* call_context,
ClientCallTracer* tracer) {
if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
nullptr) {
void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer) {
if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
// This is the first call tracer. Set it directly.
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
nullptr;
arena->SetContext<CallTracerAnnotationInterface>(tracer);
} else {
// There was already a call tracer present.
auto* orig_tracer = static_cast<ClientCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
auto* orig_tracer = DownCast<ClientCallTracer*>(
arena->GetContext<CallTracerAnnotationInterface>());
if (orig_tracer->IsDelegatingTracer()) {
// We already created a delegating tracer. Just add the new tracer to the
// list.
static_cast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
DownCast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
} else {
// Create a new delegating tracer and add the first tracer and the new
// tracer to the list.
auto* delegating_tracer =
GetContext<Arena>()->ManagedNew<DelegatingClientCallTracer>(
orig_tracer);
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
delegating_tracer;
arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
delegating_tracer->AddTracer(tracer);
}
}
}
void AddServerCallTracerToContext(grpc_call_context_element* call_context,
ServerCallTracer* tracer) {
DCHECK(call_context[GRPC_CONTEXT_CALL_TRACER].value ==
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
nullptr) {
void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer) {
DCHECK_EQ(arena->GetContext<CallTracerInterface>(),
arena->GetContext<CallTracerAnnotationInterface>());
if (arena->GetContext<CallTracerAnnotationInterface>() == nullptr) {
// This is the first call tracer. Set it directly.
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
nullptr;
arena->SetContext<CallTracerAnnotationInterface>(tracer);
arena->SetContext<CallTracerInterface>(tracer);
} else {
// There was already a call tracer present.
auto* orig_tracer = static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
auto* orig_tracer = DownCast<ServerCallTracer*>(
arena->GetContext<CallTracerAnnotationInterface>());
if (orig_tracer->IsDelegatingTracer()) {
// We already created a delegating tracer. Just add the new tracer to the
// list.
static_cast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
DownCast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
} else {
// Create a new delegating tracer and add the first tracer and the new
// tracer to the list.
auto* delegating_tracer =
GetContext<Arena>()->ManagedNew<DelegatingServerCallTracer>(
orig_tracer);
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
delegating_tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].value = delegating_tracer;
arena->SetContext<CallTracerAnnotationInterface>(delegating_tracer);
arena->SetContext<CallTracerInterface>(delegating_tracer);
delegating_tracer->AddTracer(tracer);
}
}

@ -214,13 +214,21 @@ class ServerCallTracerFactory {
// Convenience functions to add call tracers to a call context. Allows setting
// multiple call tracers to a single call. It is only valid to add client call
// tracers before the client_channel filter sees the send_initial_metadata op.
void AddClientCallTracerToContext(grpc_call_context_element* call_context,
ClientCallTracer* tracer);
void AddClientCallTracerToContext(Arena* arena, ClientCallTracer* tracer);
// TODO(yashykt): We want server call tracers to be registered through the
// ServerCallTracerFactory, which has yet to be made into a list.
void AddServerCallTracerToContext(grpc_call_context_element* call_context,
ServerCallTracer* tracer);
void AddServerCallTracerToContext(Arena* arena, ServerCallTracer* tracer);
template <>
struct ArenaContextType<CallTracerInterface> {
static void Destroy(CallTracerAnnotationInterface*) {}
};
template <>
struct ArenaContextType<CallTracerAnnotationInterface> {
static void Destroy(CallTracerAnnotationInterface*) {}
};
template <>
struct ContextSubclass<ClientCallTracer::CallAttemptTracer> {

@ -100,23 +100,22 @@ RegisteredMetricCallback::~RegisteredMetricCallback() {
}
void GlobalStatsPluginRegistry::StatsPluginGroup::AddClientCallTracers(
const Slice& path, bool registered_method,
grpc_call_context_element* call_context) {
const Slice& path, bool registered_method, Arena* arena) {
for (auto& state : plugins_state_) {
auto* call_tracer = state.plugin->GetClientCallTracer(
path, registered_method, state.scope_config);
if (call_tracer != nullptr) {
AddClientCallTracerToContext(call_context, call_tracer);
AddClientCallTracerToContext(arena, call_tracer);
}
}
}
void GlobalStatsPluginRegistry::StatsPluginGroup::AddServerCallTracers(
grpc_call_context_element* call_context) {
Arena* arena) {
for (auto& state : plugins_state_) {
auto* call_tracer = state.plugin->GetServerCallTracer(state.scope_config);
if (call_tracer != nullptr) {
AddServerCallTracerToContext(call_context, call_tracer);
AddServerCallTracerToContext(arena, call_tracer);
}
}
}

@ -464,10 +464,10 @@ class GlobalStatsPluginRegistry {
// Adds all available client call tracers associated with the stats plugins
// within the group to \a call_context.
void AddClientCallTracers(const Slice& path, bool registered_method,
grpc_call_context_element* call_context);
Arena* arena);
// Adds all available server call tracers associated with the stats plugins
// within the group to \a call_context.
void AddServerCallTracers(grpc_call_context_element* call_context);
void AddServerCallTracers(Arena* arena);
private:
friend class RegisteredMetricCallback;

@ -100,17 +100,15 @@ OpenCensusClientFilter::MakeCallPromise(
grpc_core::NextPromiseFactory next_promise_factory) {
auto* path = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata());
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* tracer =
grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusCallTracer>(
call_context, path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>(),
OpenCensusTracingEnabled() && tracing_enabled_);
DCHECK(call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr;
auto* arena = grpc_core::GetContext<grpc_core::Arena>();
auto* tracer = arena->ManagedNew<OpenCensusCallTracer>(
grpc_core::GetContext<grpc_call_context_element>(),
path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>(),
OpenCensusTracingEnabled() && tracing_enabled_);
DCHECK_EQ(arena->GetContext<grpc_core::CallTracerAnnotationInterface>(),
nullptr);
grpc_core::SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
return next_promise_factory(std::move(call_args));
}
@ -424,9 +422,9 @@ class OpenCensusClientInterceptor : public grpc::experimental::Interceptor {
grpc::experimental::InterceptorBatchMethods* methods) override {
if (methods->QueryInterceptionHookPoint(
grpc::experimental::InterceptionHookPoints::POST_RECV_STATUS)) {
auto* tracer = static_cast<OpenCensusCallTracer*>(
grpc_call_context_get(info_->client_context()->c_call(),
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
auto* tracer = grpc_core::DownCast<OpenCensusCallTracer*>(
grpc_call_get_arena(info_->client_context()->c_call())
->GetContext<grpc_core::CallTracerAnnotationInterface>());
if (tracer != nullptr) {
tracer->RecordApiLatency(absl::Now() - start_time_,
static_cast<absl::StatusCode>(

@ -63,6 +63,9 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
cdef cppclass ClientCallTracer:
pass
cdef cppclass CallTracerAnnotationInterface:
pass
cdef cppclass ServerCallTracer:
string TraceId() nogil
string SpanId() nogil
@ -72,14 +75,10 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
@staticmethod
void RegisterGlobal(ServerCallTracerFactory* factory) nogil
cdef extern from "src/core/lib/channel/context.h":
ctypedef enum grpc_context_index:
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE
cdef extern from "src/core/lib/surface/call.h":
void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
void* value, void (*destroy)(void* value)) nogil
void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil
void grpc_call_tracer_set(grpc_call* call, void* value) nogil
void* grpc_call_tracer_get(grpc_call* call) nogil
cdef extern from "grpc/support/alloc.h":

@ -50,11 +50,11 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None:
cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL)
grpc_call_tracer_set(call, call_tracer)
cdef void* _get_call_tracer(grpc_call* call):
cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)
cdef void* call_tracer = grpc_call_tracer_get(call)
return call_tracer

@ -38,16 +38,13 @@ namespace {
class CallTracerTest : public ::testing::Test {
protected:
RefCountedPtr<Arena> arena_ = SimpleArenaAllocator()->MakeArena();
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
std::vector<std::string> annotation_logger_;
};
TEST_F(CallTracerTest, BasicClientCallTracer) {
FakeClientCallTracer client_call_tracer(&annotation_logger_);
AddClientCallTracerToContext(context_, &client_call_tracer);
static_cast<CallTracerAnnotationInterface*>(
context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value)
->RecordAnnotation("Test");
AddClientCallTracerToContext(arena_.get(), &client_call_tracer);
arena_->GetContext<CallTracerAnnotationInterface>()->RecordAnnotation("Test");
EXPECT_EQ(annotation_logger_, std::vector<std::string>{"Test"});
}
@ -56,12 +53,10 @@ TEST_F(CallTracerTest, MultipleClientCallTracers) {
FakeClientCallTracer client_call_tracer1(&annotation_logger_);
FakeClientCallTracer client_call_tracer2(&annotation_logger_);
FakeClientCallTracer client_call_tracer3(&annotation_logger_);
AddClientCallTracerToContext(context_, &client_call_tracer1);
AddClientCallTracerToContext(context_, &client_call_tracer2);
AddClientCallTracerToContext(context_, &client_call_tracer3);
static_cast<CallTracerAnnotationInterface*>(
context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value)
->RecordAnnotation("Test");
AddClientCallTracerToContext(arena_.get(), &client_call_tracer1);
AddClientCallTracerToContext(arena_.get(), &client_call_tracer2);
AddClientCallTracerToContext(arena_.get(), &client_call_tracer3);
arena_->GetContext<CallTracerAnnotationInterface>()->RecordAnnotation("Test");
EXPECT_EQ(annotation_logger_,
std::vector<std::string>({"Test", "Test", "Test"}));
}
@ -71,12 +66,12 @@ TEST_F(CallTracerTest, MultipleClientCallAttemptTracers) {
FakeClientCallTracer client_call_tracer1(&annotation_logger_);
FakeClientCallTracer client_call_tracer2(&annotation_logger_);
FakeClientCallTracer client_call_tracer3(&annotation_logger_);
AddClientCallTracerToContext(context_, &client_call_tracer1);
AddClientCallTracerToContext(context_, &client_call_tracer2);
AddClientCallTracerToContext(context_, &client_call_tracer3);
AddClientCallTracerToContext(arena_.get(), &client_call_tracer1);
AddClientCallTracerToContext(arena_.get(), &client_call_tracer2);
AddClientCallTracerToContext(arena_.get(), &client_call_tracer3);
auto* attempt_tracer =
static_cast<ClientCallTracer*>(
context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value)
DownCast<ClientCallTracer*>(
arena_->GetContext<CallTracerAnnotationInterface>())
->StartNewAttempt(true /* is_transparent_retry */);
attempt_tracer->RecordAnnotation("Test");
EXPECT_EQ(annotation_logger_,
@ -86,13 +81,9 @@ TEST_F(CallTracerTest, MultipleClientCallAttemptTracers) {
TEST_F(CallTracerTest, BasicServerCallTracerTest) {
FakeServerCallTracer server_call_tracer(&annotation_logger_);
AddServerCallTracerToContext(context_, &server_call_tracer);
static_cast<CallTracerAnnotationInterface*>(
context_[GRPC_CONTEXT_CALL_TRACER].value)
->RecordAnnotation("Test");
static_cast<CallTracerAnnotationInterface*>(
context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value)
->RecordAnnotation("Test");
AddServerCallTracerToContext(arena_.get(), &server_call_tracer);
arena_->GetContext<CallTracerAnnotationInterface>()->RecordAnnotation("Test");
arena_->GetContext<CallTracerAnnotationInterface>()->RecordAnnotation("Test");
EXPECT_EQ(annotation_logger_, std::vector<std::string>({"Test", "Test"}));
}
@ -101,12 +92,10 @@ TEST_F(CallTracerTest, MultipleServerCallTracers) {
FakeServerCallTracer server_call_tracer1(&annotation_logger_);
FakeServerCallTracer server_call_tracer2(&annotation_logger_);
FakeServerCallTracer server_call_tracer3(&annotation_logger_);
AddServerCallTracerToContext(context_, &server_call_tracer1);
AddServerCallTracerToContext(context_, &server_call_tracer2);
AddServerCallTracerToContext(context_, &server_call_tracer3);
static_cast<CallTracerAnnotationInterface*>(
context_[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value)
->RecordAnnotation("Test");
AddServerCallTracerToContext(arena_.get(), &server_call_tracer1);
AddServerCallTracerToContext(arena_.get(), &server_call_tracer2);
AddServerCallTracerToContext(arena_.get(), &server_call_tracer3);
arena_->GetContext<CallTracerAnnotationInterface>()->RecordAnnotation("Test");
EXPECT_EQ(annotation_logger_,
std::vector<std::string>({"Test", "Test", "Test"}));
}

@ -57,11 +57,7 @@ ArenaPromise<ServerMetadataHandle> FakeStatsClientFilter::MakeCallPromise(
FakeClientCallTracer* client_call_tracer =
fake_client_call_tracer_factory_->CreateFakeClientCallTracer();
if (client_call_tracer != nullptr) {
auto* call_context = GetContext<grpc_call_context_element>();
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
client_call_tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
nullptr;
SetContext<CallTracerAnnotationInterface>(client_call_tracer);
}
return next_promise_factory(std::move(call_args));
}

@ -68,9 +68,7 @@ class AddLabelsFilter : public grpc_core::ChannelFilter {
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override {
using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallAttemptTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
auto* call_tracer = grpc_core::GetContext<CallAttemptTracer>();
EXPECT_NE(call_tracer, nullptr);
for (const auto& pair : labels_to_inject_) {
call_tracer->SetOptionalLabel(pair.first, pair.second);

Loading…
Cancel
Save