From 8cc1534c895c96c4c3dffbc2afad1c1c63a473e9 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 16 Jul 2021 16:58:06 -0700 Subject: [PATCH] support call tracer in client channel code --- .../filters/client_channel/client_channel.cc | 125 ++++++++++++++---- .../filters/client_channel/client_channel.h | 30 ++++- .../filters/client_channel/retry_filter.cc | 4 +- src/core/lib/channel/call_tracer.h | 6 +- src/core/lib/channel/context.h | 3 + .../filters/census/open_census_call_tracer.h | 6 +- 6 files changed, 134 insertions(+), 40 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d073978cdb9..58dc5342cea 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -352,7 +352,8 @@ class DynamicTerminationFilter::CallData { calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); calld->lb_call_ = client_channel->CreateLoadBalancedCall( args, pollent, nullptr, - service_config_call_data->call_dispatch_controller()); + service_config_call_data->call_dispatch_controller(), + /*is_transparent_retry=*/false); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand, @@ -1174,10 +1175,11 @@ RefCountedPtr ClientChannel::CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller) { - return args.arena->New(this, args, pollent, - on_call_destruction_complete, - call_dispatch_controller); + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry) { + return args.arena->New( + this, args, pollent, on_call_destruction_complete, + call_dispatch_controller, is_transparent_retry); } namespace { @@ -2553,10 +2555,23 @@ class ClientChannel::LoadBalancedCall::LbCallState // LoadBalancedCall // +namespace { + +CallTracer::CallAttemptTracer* GetCallAttemptTracer( + grpc_call_context_element* context, bool is_transparent_retry) { + auto* call_tracer = + static_cast(context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer == nullptr) return nullptr; + return call_tracer->RecordNewAttempt(is_transparent_retry); +} + +} // namespace + ClientChannel::LoadBalancedCall::LoadBalancedCall( ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller) + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) ? "LoadBalancedCall" : nullptr), @@ -2570,7 +2585,9 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall( call_context_(args.context), pollent_(pollent), on_call_destruction_complete_(on_call_destruction_complete), - call_dispatch_controller_(call_dispatch_controller) {} + call_dispatch_controller_(call_dispatch_controller), + call_attempt_tracer_( + GetCallAttemptTracer(args.context, is_transparent_retry)) {} ClientChannel::LoadBalancedCall::~LoadBalancedCall() { grpc_slice_unref_internal(path_); @@ -2705,9 +2722,54 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() { void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { - // Intercept recv_trailing_metadata_ready for LB callback. + // Record send ops in tracer. + if (call_attempt_tracer_ != nullptr) { + if (batch->send_initial_metadata) { + call_attempt_tracer_->RecordSendInitialMetadata( + batch->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata_flags); + } + if (batch->send_message) { + call_attempt_tracer_->RecordSendMessage( + batch->payload->send_message.send_message.get()); + } + if (batch->send_trailing_metadata) { + call_attempt_tracer_->RecordSendTrailingMetadata( + batch->payload->send_trailing_metadata.send_trailing_metadata); + } + // Intercept recv ops. + if (batch->recv_initial_metadata) { + recv_initial_metadata_ = + batch->payload->recv_initial_metadata.recv_initial_metadata; + recv_initial_metadata_flags_ = + batch->payload->recv_initial_metadata.recv_flags; + peer_string_ = batch->payload->recv_initial_metadata.peer_string; + original_recv_initial_metadata_ready_ = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, + this, nullptr); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &recv_initial_metadata_ready_; + } + if (batch->recv_message) { + recv_message_ = batch->payload->recv_message.recv_message; + original_recv_message_ready_ = + batch->payload->recv_message.recv_message_ready; + GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr); + batch->payload->recv_message.recv_message_ready = &recv_message_ready_; + } + } + // Intercept recv_trailing_metadata even if there is no call tracer, + // since we may need to notify the LB policy about trailing metadata. if (batch->recv_trailing_metadata) { - InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ready_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, + this, nullptr); + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &recv_trailing_metadata_ready_; } // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { @@ -2784,10 +2846,35 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( } } -void ClientChannel::LoadBalancedCall:: - RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg, - grpc_error_handle error) { +void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady( + void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + self->call_attempt_tracer_->RecordReceivedInitialMetadata( + self->recv_initial_metadata_, *self->recv_initial_metadata_flags_, + self->peer_string_); + Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_, + GRPC_ERROR_REF(error)); +} + +void ClientChannel::LoadBalancedCall::RecvMessageReady( + void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + self->call_attempt_tracer_->RecordReceivedMessage( + self->recv_message_->get()); + Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_, + GRPC_ERROR_REF(error)); +} + +void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady( + void* arg, grpc_error_handle error) { auto* self = static_cast(arg); + // Notify the call tracer about trailing metadata. + if (self->call_attempt_tracer_ != nullptr) { + self->call_attempt_tracer_->RecordReceivedTrailingMetadata( + self->recv_trailing_metadata_); + } + // If the LB policy requested a callback for trailing metadata, invoke + // the callback. if (self->lb_recv_trailing_metadata_ready_ != nullptr) { // Set error if call did not succeed. grpc_error_handle error_for_lb = GRPC_ERROR_NONE; @@ -2828,20 +2915,6 @@ void ClientChannel::LoadBalancedCall:: error); } -void ClientChannel::LoadBalancedCall:: - InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - grpc_transport_stream_op_batch* batch) { - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, - RecvTrailingMetadataReadyForLoadBalancingPolicy, this, - grpc_schedule_on_exec_ctx); - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; -} - void ClientChannel::LoadBalancedCall::CreateSubchannelCall() { SubchannelCall::Args call_args = { std::move(connected_subchannel_), pollent_, path_, call_start_time_, diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 2096a203e7f..c60c88013b5 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -39,6 +39,7 @@ #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -135,7 +136,8 @@ class ClientChannel { RefCountedPtr CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller); + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry); private: class CallData; @@ -386,7 +388,8 @@ class ClientChannel::LoadBalancedCall LoadBalancedCall( ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller); + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry); ~LoadBalancedCall() override; void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); @@ -440,10 +443,9 @@ class ClientChannel::LoadBalancedCall // Resumes all pending batches on subchannel_call_. void PendingBatchesResume(); - static void RecvTrailingMetadataReadyForLoadBalancingPolicy( - void* arg, grpc_error_handle error); - void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - grpc_transport_stream_op_batch* batch); + static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); + static void RecvMessageReady(void* arg, grpc_error_handle error); + static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); void CreateSubchannelCall(); // Invoked when a pick is completed, on both success or failure. @@ -471,6 +473,8 @@ class ClientChannel::LoadBalancedCall grpc_closure* on_call_destruction_complete_; ConfigSelector::CallDispatchController* call_dispatch_controller_; + CallTracer::CallAttemptTracer* call_attempt_tracer_; + // Set when we get a cancel_stream op. grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; @@ -495,7 +499,19 @@ class ClientChannel::LoadBalancedCall RefCountedPtr subchannel_call_; - // For intercepting recv_trailing_metadata_ready for the LB policy. + // For intercepting recv_initial_metadata_ready. + grpc_metadata_batch* recv_initial_metadata_ = nullptr; + uint32_t* recv_initial_metadata_flags_ = nullptr; + gpr_atm* peer_string_ = nullptr; + grpc_closure recv_initial_metadata_ready_; + grpc_closure* original_recv_initial_metadata_ready_ = nullptr; + + // For intercepting recv_message_ready. + OrphanablePtr* recv_message_ = nullptr; + grpc_closure recv_message_ready_; + grpc_closure* original_recv_message_ready_ = nullptr; + + // For intercepting recv_trailing_metadata_ready. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; grpc_closure recv_trailing_metadata_ready_; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index 6e6d3bb370d..17a7b4a8afa 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -2179,7 +2179,9 @@ RetryFilter::CallData::CreateLoadBalancedCall( // This callback holds a ref to the CallStackDestructionBarrier // object until the LB call is destroyed. call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this), - call_dispatch_controller); + call_dispatch_controller, + // TODO(roth): Change this when we support transparent retries. + /*is_transparent_retry=*/false); } void RetryFilter::CallData::CreateCallAttempt() { diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index 2712a9d4768..0bd2638c47f 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -46,11 +46,11 @@ class CallTracer { virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0; virtual void RecordSendTrailingMetadata( grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const ByteStream& send_message) = 0; + virtual void RecordSendMessage(const ByteStream* send_message) = 0; virtual void RecordReceivedInitialMetadata( - grpc_metadata_batch* recv_initial_metadata, uint32_t* flags, + grpc_metadata_batch* recv_initial_metadata, uint32_t flags, gpr_atm* peer_string) = 0; - virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0; + virtual void RecordReceivedMessage(const ByteStream* recv_message) = 0; virtual void RecordReceivedTrailingMetadata( grpc_metadata_batch* recv_trailing_metadata) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0; diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index bd7fd495e4a..0c0556abcdc 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -32,6 +32,9 @@ typedef enum { /// Value is a \a census_context. GRPC_CONTEXT_TRACING, + /// Value is a CallTracer object. + GRPC_CONTEXT_CALL_TRACER, + /// Reserved for traffic_class_context. GRPC_CONTEXT_TRAFFIC, diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index c5cfa940438..f80dc31b6ac 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -39,12 +39,12 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { void RecordSendTrailingMetadata( grpc_metadata_batch* /* send_trailing_metadata */) override {} void RecordSendMessage( - const grpc_core::ByteStream& /* send_message */) override {} + const grpc_core::ByteStream* /* send_message */) override {} void RecordReceivedInitialMetadata( - grpc_metadata_batch* /* recv_initial_metadata */, uint32_t* /* flags */, + grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */, gpr_atm* /* peer_string */) override {} void RecordReceivedMessage( - const grpc_core::ByteStream& /* recv_message */) override {} + const grpc_core::ByteStream* /* recv_message */) override {} void RecordReceivedTrailingMetadata( grpc_metadata_batch* /* recv_trailing_metadata */) override {} void RecordCancel(grpc_error_handle /* cancel_error */) override {}