From 98f57af91cb739a88dd5d4a5d36d7ca8f54b05b2 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 26 Jul 2021 06:49:28 -0700 Subject: [PATCH] Revert "use CallTracer API in client channel code (#26714)" (#26772) This reverts commit 011fdefe3615880a48775ea4c5d541ae92dec3d2. --- .../filters/client_channel/client_channel.cc | 238 +++++------------- .../filters/client_channel/client_channel.h | 63 ++--- .../filters/client_channel/retry_filter.cc | 8 +- src/core/lib/channel/call_tracer.h | 21 +- src/core/lib/channel/channel_stack.h | 2 +- src/core/lib/channel/context.h | 3 - .../filters/census/open_census_call_tracer.h | 13 +- 7 files changed, 108 insertions(+), 240 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 27982e46cdb..acc2d31b0ac 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -199,7 +199,7 @@ class ClientChannel::CallData { grpc_polling_entity* pollent_ = nullptr; - grpc_closure resolution_done_closure_; + grpc_closure pick_closure_; // Accessed while holding ClientChannel::resolution_mu_. bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = @@ -319,11 +319,13 @@ class DynamicTerminationFilter::CallData { const grpc_call_final_info* /*final_info*/, grpc_closure* then_schedule_closure) { auto* calld = static_cast(elem->call_data); - RefCountedPtr lb_call = - std::move(calld->lb_call_); + RefCountedPtr subchannel_call; + if (GPR_LIKELY(calld->lb_call_ != nullptr)) { + subchannel_call = calld->lb_call_->subchannel_call(); + } calld->~CallData(); - if (GPR_LIKELY(lb_call != nullptr)) { - lb_call->set_on_call_destruction_complete(then_schedule_closure); + if (GPR_LIKELY(subchannel_call != nullptr)) { + subchannel_call->SetAfterCallStackDestroy(then_schedule_closure); } else { // TODO(yashkt) : This can potentially be a Closure::Run ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE); @@ -341,16 +343,16 @@ class DynamicTerminationFilter::CallData { auto* calld = static_cast(elem->call_data); auto* chand = static_cast(elem->channel_data); ClientChannel* client_channel = chand->chand_; - grpc_call_element_args args = {calld->owning_call_, nullptr, - calld->call_context_, calld->path_, - /*start_time=*/0, calld->deadline_, - calld->arena_, calld->call_combiner_}; + grpc_call_element_args args = { + calld->owning_call_, nullptr, + calld->call_context_, calld->path_, + calld->call_start_time_, calld->deadline_, + calld->arena_, calld->call_combiner_}; auto* service_config_call_data = static_cast( calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); calld->lb_call_ = client_channel->CreateLoadBalancedCall( args, pollent, nullptr, - service_config_call_data->call_dispatch_controller(), - /*is_transparent_retry=*/false); + service_config_call_data->call_dispatch_controller()); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand, @@ -361,6 +363,7 @@ class DynamicTerminationFilter::CallData { private: explicit CallData(const grpc_call_element_args& args) : path_(grpc_slice_ref_internal(args.path)), + call_start_time_(args.start_time), deadline_(args.deadline), arena_(args.arena), owning_call_(args.call_stack), @@ -370,6 +373,7 @@ class DynamicTerminationFilter::CallData { ~CallData() { grpc_slice_unref_internal(path_); } grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; grpc_millis deadline_; Arena* arena_; grpc_call_stack* owning_call_; @@ -1165,11 +1169,10 @@ RefCountedPtr ClientChannel::CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) { - return args.arena->New( - this, args, pollent, on_call_destruction_complete, - call_dispatch_controller, is_transparent_retry); + ConfigSelector::CallDispatchController* call_dispatch_controller) { + return args.arena->New(this, args, pollent, + on_call_destruction_complete, + call_dispatch_controller); } namespace { @@ -2310,9 +2313,8 @@ void ClientChannel::CallData:: void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error) { - // TODO(roth): Does this callback need to hold a ref to the call stack? - GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error); + GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } void ClientChannel::CallData::ResolutionDone(void* arg, @@ -2548,28 +2550,16 @@ class ClientChannel::LoadBalancedCall::LbCallState // LoadBalancedCall // -namespace { - -CallTracer::CallAttemptTracer* GetCallAttemptTracer( - grpc_call_context_element* context, bool is_transparent_retry) { - auto* call_tracer = - static_cast(context[GRPC_CONTEXT_CALL_TRACER].value); - if (call_tracer == nullptr) return nullptr; - return call_tracer->StartNewAttempt(is_transparent_retry); -} - -} // namespace - ClientChannel::LoadBalancedCall::LoadBalancedCall( ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) + ConfigSelector::CallDispatchController* call_dispatch_controller) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) ? "LoadBalancedCall" : nullptr), chand_(chand), path_(grpc_slice_ref_internal(args.path)), + call_start_time_(args.start_time), deadline_(args.deadline), arena_(args.arena), owning_call_(args.call_stack), @@ -2577,9 +2567,7 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall( call_context_(args.context), pollent_(pollent), on_call_destruction_complete_(on_call_destruction_complete), - call_dispatch_controller_(call_dispatch_controller), - call_attempt_tracer_( - GetCallAttemptTracer(args.context, is_transparent_retry)) {} + call_dispatch_controller_(call_dispatch_controller) {} ClientChannel::LoadBalancedCall::~LoadBalancedCall() { grpc_slice_unref_internal(path_); @@ -2593,17 +2581,7 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { GPR_ASSERT(pending_batches_[i] == nullptr); } - // Compute latency and report it to the tracer. - if (call_attempt_tracer_ != nullptr) { - gpr_timespec latency = - gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_); - call_attempt_tracer_->RecordEnd(latency); - } - // Arrange to invoke on_call_destruction_complete_ once we know that - // the call stack has been destroyed. - if (GPR_LIKELY(subchannel_call_ != nullptr)) { - subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_); - } else { + if (on_call_destruction_complete_ != nullptr) { ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_, GRPC_ERROR_NONE); } @@ -2724,65 +2702,9 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() { void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { - // Handle call tracing. - if (call_attempt_tracer_ != nullptr) { - // Record send ops in tracer. - if (batch->cancel_stream) { - call_attempt_tracer_->RecordCancel( - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); - } - if (batch->send_initial_metadata) { - call_attempt_tracer_->RecordSendInitialMetadata( - batch->payload->send_initial_metadata.send_initial_metadata, - batch->payload->send_initial_metadata.send_initial_metadata_flags); - peer_string_ = batch->payload->send_initial_metadata.peer_string; - original_send_initial_metadata_on_complete_ = batch->on_complete; - GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_, - SendInitialMetadataOnComplete, this, nullptr); - batch->on_complete = &send_initial_metadata_on_complete_; - } - if (batch->send_message) { - call_attempt_tracer_->RecordSendMessage( - *batch->payload->send_message.send_message); - } - if (batch->send_trailing_metadata) { - call_attempt_tracer_->RecordSendTrailingMetadata( - batch->payload->send_trailing_metadata.send_trailing_metadata); - } - // Intercept recv ops. - if (batch->recv_initial_metadata) { - recv_initial_metadata_ = - batch->payload->recv_initial_metadata.recv_initial_metadata; - recv_initial_metadata_flags_ = - batch->payload->recv_initial_metadata.recv_flags; - original_recv_initial_metadata_ready_ = - batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, - this, nullptr); - batch->payload->recv_initial_metadata.recv_initial_metadata_ready = - &recv_initial_metadata_ready_; - } - if (batch->recv_message) { - recv_message_ = batch->payload->recv_message.recv_message; - original_recv_message_ready_ = - batch->payload->recv_message.recv_message_ready; - GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr); - batch->payload->recv_message.recv_message_ready = &recv_message_ready_; - } - } - // Intercept recv_trailing_metadata even if there is no call tracer, - // since we may need to notify the LB policy about trailing metadata. + // Intercept recv_trailing_metadata_ready for LB callback. if (batch->recv_trailing_metadata) { - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - transport_stream_stats_ = - batch->payload->recv_trailing_metadata.collect_stats; - original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, - this, nullptr); - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; + InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); } // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { @@ -2859,79 +2781,38 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( } } -void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete( - void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata( - self->peer_string_); - Closure::Run(DEBUG_LOCATION, - self->original_send_initial_metadata_on_complete_, - GRPC_ERROR_REF(error)); -} - -void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady( - void* arg, grpc_error_handle error) { +void ClientChannel::LoadBalancedCall:: + RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg, + grpc_error_handle error) { auto* self = static_cast(arg); - self->call_attempt_tracer_->RecordReceivedInitialMetadata( - self->recv_initial_metadata_, *self->recv_initial_metadata_flags_); - Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_, - GRPC_ERROR_REF(error)); -} - -void ClientChannel::LoadBalancedCall::RecvMessageReady( - void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_); - Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_, - GRPC_ERROR_REF(error)); -} - -void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady( - void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - // Check if we have a tracer or an LB callback to invoke. - if (self->call_attempt_tracer_ != nullptr || - self->lb_recv_trailing_metadata_ready_ != nullptr) { - // Get the call's status. - absl::Status status; + if (self->lb_recv_trailing_metadata_ready_ != nullptr) { + // Set error if call did not succeed. + grpc_error_handle error_for_lb = GRPC_ERROR_NONE; if (error != GRPC_ERROR_NONE) { - // Get status from error. - grpc_status_code code; - grpc_slice message = grpc_empty_slice(); - grpc_error_get_status(error, self->deadline_, &code, &message, - /*http_error=*/nullptr, /*error_string=*/nullptr); - status = absl::Status(static_cast(code), - StringViewFromSlice(message)); + error_for_lb = error; } else { - // Get status from headers. const auto& fields = self->recv_trailing_metadata_->idx.named; GPR_ASSERT(fields.grpc_status != nullptr); - grpc_status_code code = + grpc_status_code status = grpc_get_status_code_from_metadata(fields.grpc_status->md); - if (code != GRPC_STATUS_OK) { - absl::string_view message; + std::string msg; + if (status != GRPC_STATUS_OK) { + error_for_lb = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"), + GRPC_ERROR_INT_GRPC_STATUS, status); if (fields.grpc_message != nullptr) { - message = StringViewFromSlice(GRPC_MDVALUE(fields.grpc_message->md)); + error_for_lb = grpc_error_set_str( + error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md))); } - status = absl::Status(static_cast(code), message); } } - // If we have a tracer, notify it. - if (self->call_attempt_tracer_ != nullptr) { - self->call_attempt_tracer_->RecordReceivedTrailingMetadata( - status, self->recv_trailing_metadata_, - *self->transport_stream_stats_); - } - // If the LB policy requested a callback for trailing metadata, invoke - // the callback. - if (self->lb_recv_trailing_metadata_ready_ != nullptr) { - grpc_error_handle error_for_lb = absl_status_to_grpc_error(status); - Metadata trailing_metadata(self, self->recv_trailing_metadata_); - LbCallState lb_call_state(self); - self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata, - &lb_call_state); - GRPC_ERROR_UNREF(error_for_lb); - } + // Invoke callback to LB policy. + Metadata trailing_metadata(self, self->recv_trailing_metadata_); + LbCallState lb_call_state(self); + self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata, + &lb_call_state); + if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb); } // Chain to original callback. if (self->failure_error_ != GRPC_ERROR_NONE) { @@ -2944,9 +2825,23 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady( error); } +void ClientChannel::LoadBalancedCall:: + InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch) { + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ready_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, + RecvTrailingMetadataReadyForLoadBalancingPolicy, this, + grpc_schedule_on_exec_ctx); + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &recv_trailing_metadata_ready_; +} + void ClientChannel::LoadBalancedCall::CreateSubchannelCall() { SubchannelCall::Args call_args = { - std::move(connected_subchannel_), pollent_, path_, /*start_time=*/0, + std::move(connected_subchannel_), pollent_, path_, call_start_time_, deadline_, arena_, // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. @@ -2958,6 +2853,10 @@ void ClientChannel::LoadBalancedCall::CreateSubchannelCall() { "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_, this, subchannel_call_.get(), grpc_error_std_string(error).c_str()); } + if (on_call_destruction_complete_ != nullptr) { + subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_); + on_call_destruction_complete_ = nullptr; + } if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { PendingBatchesFail(error, YieldCallCombiner); } else { @@ -3038,7 +2937,6 @@ void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { } void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) { - // TODO(roth): Does this callback need to hold a ref to LoadBalancedCall? GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index b07ce45a398..1add6061f8d 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -39,7 +39,6 @@ #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" -#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -136,8 +135,7 @@ class ClientChannel { RefCountedPtr CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + ConfigSelector::CallDispatchController* call_dispatch_controller); private: class CallData; @@ -371,30 +369,25 @@ class ClientChannel { // ClientChannel::LoadBalancedCall // -// TODO(roth): Consider whether this actually needs to be RefCounted<>. -// Ideally, it should be single-owner, or at least InternallyRefCounted<>. +// This object is ref-counted, but it cannot inherit from RefCounted<>, +// because it is allocated on the arena and can't free its memory when +// its refcount goes to zero. So instead, it manually implements the +// same API as RefCounted<>, so that it can be used with RefCountedPtr<>. class ClientChannel::LoadBalancedCall : public RefCounted { public: // If on_call_destruction_complete is non-null, then it will be // invoked once the LoadBalancedCall is completely destroyed. - // If it is null, then the caller is responsible for calling - // set_on_call_destruction_complete() before the LoadBalancedCall is - // destroyed. + // If it is null, then the caller is responsible for checking whether + // the LB call has a subchannel call and ensuring that the + // on_call_destruction_complete closure passed down from the surface + // is not invoked until after the subchannel call stack is destroyed. LoadBalancedCall( ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + ConfigSelector::CallDispatchController* call_dispatch_controller); ~LoadBalancedCall() override; - // Callers must call this before unreffing if they did not set the - // closure via the ctor. - void set_on_call_destruction_complete( - grpc_closure* on_call_destruction_complete) { - on_call_destruction_complete_ = on_call_destruction_complete; - } - void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); // Invoked by channel for queued LB picks when the picker is updated. @@ -408,6 +401,10 @@ class ClientChannel::LoadBalancedCall // will not run until after this method returns. void AsyncPickDone(grpc_error_handle error); + RefCountedPtr subchannel_call() const { + return subchannel_call_; + } + private: class LbQueuedCallCanceller; class Metadata; @@ -442,10 +439,10 @@ class ClientChannel::LoadBalancedCall // Resumes all pending batches on subchannel_call_. void PendingBatchesResume(); - static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error); - static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); - static void RecvMessageReady(void* arg, grpc_error_handle error); - static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); + static void RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error_handle error); + void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch); void CreateSubchannelCall(); // Invoked when a pick is completed, on both success or failure. @@ -463,6 +460,7 @@ class ClientChannel::LoadBalancedCall // that uses any one of them, we should store them in the call // context. This will save per-call memory overhead. grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; grpc_millis deadline_; Arena* arena_; grpc_call_stack* owning_call_; @@ -472,10 +470,6 @@ class ClientChannel::LoadBalancedCall grpc_closure* on_call_destruction_complete_; ConfigSelector::CallDispatchController* call_dispatch_controller_; - CallTracer::CallAttemptTracer* call_attempt_tracer_; - - gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter(); - // Set when we get a cancel_stream op. grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; @@ -500,25 +494,8 @@ class ClientChannel::LoadBalancedCall RefCountedPtr subchannel_call_; - // For intercepting send_initial_metadata on_complete. - gpr_atm* peer_string_ = nullptr; - grpc_closure send_initial_metadata_on_complete_; - grpc_closure* original_send_initial_metadata_on_complete_ = nullptr; - - // For intercepting recv_initial_metadata_ready. - grpc_metadata_batch* recv_initial_metadata_ = nullptr; - uint32_t* recv_initial_metadata_flags_ = nullptr; - grpc_closure recv_initial_metadata_ready_; - grpc_closure* original_recv_initial_metadata_ready_ = nullptr; - - // For intercepting recv_message_ready. - OrphanablePtr* recv_message_ = nullptr; - grpc_closure recv_message_ready_; - grpc_closure* original_recv_message_ready_ = nullptr; - - // For intercepting recv_trailing_metadata_ready. + // For intercepting recv_trailing_metadata_ready for the LB policy. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; - grpc_transport_stream_stats* transport_stream_stats_ = nullptr; grpc_closure recv_trailing_metadata_ready_; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index 111bd91a314..d4a4e2b056e 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -512,6 +512,7 @@ class RetryFilter::CallData { BackOff retry_backoff_; grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; grpc_millis deadline_; Arena* arena_; grpc_call_stack* owning_call_; @@ -2043,6 +2044,7 @@ RetryFilter::CallData::CallData(RetryFilter* chand, .set_max_backoff( retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())), path_(grpc_slice_ref_internal(args.path)), + call_start_time_(args.start_time), deadline_(args.deadline), arena_(args.arena), owning_call_(args.call_stack), @@ -2176,16 +2178,14 @@ RefCountedPtr RetryFilter::CallData::CreateLoadBalancedCall( ConfigSelector::CallDispatchController* call_dispatch_controller) { grpc_call_element_args args = {owning_call_, nullptr, call_context_, - path_, /*start_time=*/0, deadline_, + path_, call_start_time_, deadline_, arena_, call_combiner_}; return chand_->client_channel_->CreateLoadBalancedCall( args, pollent_, // This callback holds a ref to the CallStackDestructionBarrier // object until the LB call is destroyed. call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this), - call_dispatch_controller, - // TODO(roth): Change this when we support transparent retries. - /*is_transparent_retry=*/false); + call_dispatch_controller); } void RetryFilter::CallData::CreateCallAttempt() { diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index 577104ab012..1e9244f6619 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -45,27 +45,26 @@ class CallTracer { // arguments. virtual void RecordSendInitialMetadata( grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0; + virtual void RecordOnDoneSendInitialMetadata() = 0; + virtual void RecordSendTrailingMetadata( + grpc_metadata_batch* send_trailing_metadata) = 0; + virtual void RecordSendMessage(const ByteStream& send_message) = 0; + // The `RecordReceivedxx()` methods should only be invoked when the + // metadata/message was successfully received, i.e., without any error. // TODO(yashkt): We are using gpr_atm here instead of absl::string_view // since that's what the transport API uses, and performing an atomic load // is unnecessary if the census tracer does not need it at present. Fix this // when the transport API changes. - virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0; - virtual void RecordSendTrailingMetadata( - grpc_metadata_batch* send_trailing_metadata) = 0; - virtual void RecordSendMessage(const ByteStream& send_message) = 0; - // The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()` - // methods should only be invoked when the metadata/message was - // successfully received, i.e., without any error. virtual void RecordReceivedInitialMetadata( - grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0; + grpc_metadata_batch* recv_initial_metadata, uint32_t flags, + gpr_atm* peer_string) = 0; virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0; virtual void RecordReceivedTrailingMetadata( - absl::Status status, grpc_metadata_batch* recv_trailing_metadata, - const grpc_transport_stream_stats& transport_stream_stats) = 0; + grpc_metadata_batch* recv_trailing_metadata) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0; // Should be the last API call to the object. Once invoked, the tracer // library is free to destroy the object. - virtual void RecordEnd(const gpr_timespec& latency) = 0; + virtual void RecordEnd(const grpc_call_final_info& final_info) = 0; }; virtual ~CallTracer() {} diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 5e6b3594db8..4832303caba 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -78,7 +78,7 @@ struct grpc_call_element_args { const void* server_transport_data; grpc_call_context_element* context; const grpc_slice& path; - gpr_cycle_counter start_time; // Note: not populated in subchannel stack. + gpr_cycle_counter start_time; grpc_millis deadline; grpc_core::Arena* arena; grpc_core::CallCombiner* call_combiner; diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 0c0556abcdc..bd7fd495e4a 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -32,9 +32,6 @@ typedef enum { /// Value is a \a census_context. GRPC_CONTEXT_TRACING, - /// Value is a CallTracer object. - GRPC_CONTEXT_CALL_TRACER, - /// Reserved for traffic_class_context. GRPC_CONTEXT_TRAFFIC, diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 0b95f14c162..26bd08bbd69 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -33,23 +33,20 @@ class OpenCensusCallTracer : public grpc_core::CallTracer { void RecordSendInitialMetadata( grpc_metadata_batch* /* send_initial_metadata */, uint32_t /* flags */) override {} - void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {} + void RecordOnDoneSendInitialMetadata() override {} void RecordSendTrailingMetadata( grpc_metadata_batch* /* send_trailing_metadata */) override {} void RecordSendMessage( const grpc_core::ByteStream& /* send_message */) override {} void RecordReceivedInitialMetadata( - grpc_metadata_batch* /* recv_initial_metadata */, - uint32_t /* flags */) override {} + grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */, + gpr_atm* /* peer_string */) override {} void RecordReceivedMessage( const grpc_core::ByteStream& /* recv_message */) override {} void RecordReceivedTrailingMetadata( - absl::Status /* status */, - grpc_metadata_batch* /* recv_trailing_metadata */, - const grpc_transport_stream_stats& /* transport_stream_stats */) - override {} + grpc_metadata_batch* /* recv_trailing_metadata */) override {} void RecordCancel(grpc_error_handle /* cancel_error */) override {} - void RecordEnd(const gpr_timespec& /* latency */) override {} + void RecordEnd(const grpc_call_final_info& /* final_info */) override {} CensusContext* context() { return &context_; }