diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 7b0faf41c31..e5b38c15abc 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include <grpc/support/port_platform.h> @@ -115,11 +113,17 @@ namespace { // ChannelData definition // +class LoadBalancedCall; + class ChannelData { public: - struct QueuedCall { + struct ResolverQueuedCall { grpc_call_element* elem; - QueuedCall* next = nullptr; + ResolverQueuedCall* next = nullptr; + }; + struct LbQueuedCall { + LoadBalancedCall* lb_call; + LbQueuedCall* next = nullptr; }; static grpc_error* Init(grpc_channel_element* elem, @@ -144,8 +148,9 @@ class ChannelData { Mutex* resolution_mu() const { return &resolution_mu_; } // These methods all require holding resolution_mu_. - void AddResolverQueuedCall(QueuedCall* call, grpc_polling_entity* pollent); - void RemoveResolverQueuedCall(QueuedCall* to_remove, + void AddResolverQueuedCall(ResolverQueuedCall* call, + grpc_polling_entity* pollent); + void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, grpc_polling_entity* pollent); bool received_service_config_data() const { return received_service_config_data_; @@ -166,8 +171,9 @@ class ChannelData { LoadBalancingPolicy::SubchannelPicker* picker() const { return picker_.get(); } - void AddLbQueuedCall(QueuedCall* call, grpc_polling_entity* pollent); - void RemoveLbQueuedCall(QueuedCall* to_remove, grpc_polling_entity* pollent); + void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent); + void RemoveLbQueuedCall(LbQueuedCall* to_remove, + grpc_polling_entity* pollent); RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane( SubchannelInterface* subchannel) const; @@ -322,7 +328,8 @@ class ChannelData { // Fields related to name resolution. Guarded by resolution_mu_. // mutable Mutex resolution_mu_; - QueuedCall* resolver_queued_calls_ = nullptr; // Linked list of queued calls. + // Linked list of calls queued waiting for resolver result. + ResolverQueuedCall* resolver_queued_calls_ = nullptr; // Data from service config. grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE; bool received_service_config_data_ = false; @@ -335,7 +342,8 @@ class ChannelData { // mutable Mutex data_plane_mu_; std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_; - QueuedCall* lb_queued_calls_ = nullptr; // Linked list of queued calls. + // Linked list of calls queued waiting for LB pick. + LbQueuedCall* lb_queued_calls_ = nullptr; // // Fields used in the control plane. Guarded by work_serializer. @@ -414,110 +422,8 @@ class CallData { // method returns. void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error); - // Invoked by channel for queued LB picks when the picker is updated. - static void PickSubchannel(void* arg, grpc_error* error); - // Helper function for performing an LB pick while holding the data plane - // mutex. Returns true if the pick is complete, in which case the caller - // must invoke PickDone() or AsyncPickDone() with the returned error. - bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error); - // Schedules a callback to process the completed pick. The callback - // will not run until after this method returns. - void AsyncPickDone(grpc_call_element* elem, grpc_error* error); - private: class ResolverQueuedCallCanceller; - class LbQueuedCallCanceller; - - class Metadata : public LoadBalancingPolicy::MetadataInterface { - public: - Metadata(CallData* calld, grpc_metadata_batch* batch) - : calld_(calld), batch_(batch) {} - - void Add(absl::string_view key, absl::string_view value) override { - grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>( - calld_->arena_->Alloc(sizeof(grpc_linked_mdelem))); - linked_mdelem->md = grpc_mdelem_from_slices( - ExternallyManagedSlice(key.data(), key.size()), - ExternallyManagedSlice(value.data(), value.size())); - GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) == - GRPC_ERROR_NONE); - } - - iterator begin() const override { - static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t), - "iterator size too large"); - return iterator( - this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head))); - } - iterator end() const override { - static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t), - "iterator size too large"); - return iterator(this, 0); - } - - iterator erase(iterator it) override { - grpc_linked_mdelem* linked_mdelem = - reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it)); - intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next); - grpc_metadata_batch_remove(batch_, linked_mdelem); - return iterator(this, handle); - } - - private: - grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const { - if (entry != nullptr && batch_->idx.named.path == entry) { - return entry->next; - } - return entry; - } - - intptr_t IteratorHandleNext(intptr_t handle) const override { - grpc_linked_mdelem* linked_mdelem = - reinterpret_cast<grpc_linked_mdelem*>(handle); - return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next)); - } - - std::pair<absl::string_view, absl::string_view> IteratorHandleGet( - intptr_t handle) const override { - grpc_linked_mdelem* linked_mdelem = - reinterpret_cast<grpc_linked_mdelem*>(handle); - return std::make_pair( - StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)), - StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md))); - } - - CallData* calld_; - grpc_metadata_batch* batch_; - }; - - class LbCallState : public LoadBalancingPolicy::CallState { - public: - explicit LbCallState(CallData* calld) : calld_(calld) {} - - void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); } - - const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData() - override { - if (calld_->backend_metric_data_ == nullptr) { - grpc_linked_mdelem* md = calld_->recv_trailing_metadata_->idx.named - .x_endpoint_load_metrics_bin; - if (md != nullptr) { - calld_->backend_metric_data_ = - ParseBackendMetricData(GRPC_MDVALUE(md->md), calld_->arena_); - } - } - return calld_->backend_metric_data_; - } - - absl::string_view ExperimentalGetCallAttribute(const char* key) override { - auto it = calld_->call_attributes_.find(key); - if (it == calld_->call_attributes_.end()) return absl::string_view(); - return it->second; - } - - private: - CallData* calld_; - }; // State used for starting a retryable batch on a subchannel call. // This provides its own grpc_transport_stream_op_batch and other data @@ -548,7 +454,7 @@ class CallData { gpr_refcount refs; grpc_call_element* elem; - RefCountedPtr<SubchannelCall> subchannel_call; + RefCountedPtr<LoadBalancedCall> lb_call; // The batch to use in the subchannel call. // Its payload field points to SubchannelCallRetryState::batch_payload. grpc_transport_stream_op_batch batch; @@ -654,11 +560,6 @@ class CallData { grpc_call_element* elem, SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state); - static void RecvTrailingMetadataReadyForLoadBalancingPolicy( - void* arg, grpc_error* error); - void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - grpc_transport_stream_op_batch* batch); - // Returns the index into pending_batches_ to be used for batch. static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); void PendingBatchesAdd(grpc_call_element* elem, @@ -686,7 +587,7 @@ class CallData { grpc_call_element* elem, grpc_error* error, YieldCallCombinerPredicate yield_call_combiner_predicate); static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored); - // Resumes all pending batches on subchannel_call_. + // Resumes all pending batches on lb_call_. void PendingBatchesResume(grpc_call_element* elem); // Returns a pointer to the first pending batch for which predicate(batch) // returns true, or null if not found. @@ -821,15 +722,13 @@ class CallData { // Adds the call (if not already present) to the channel's list of // calls queued for name resolution. void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem); - void MaybeInvokeConfigSelectorCommitCallback(); - void CreateSubchannelCall(grpc_call_element* elem); - // Invoked when a pick is completed, on both success or failure. - static void PickDone(void* arg, grpc_error* error); - // Removes the call from the channel's list of queued picks if present. - void MaybeRemoveCallFromLbQueuedCallsLocked(grpc_call_element* elem); - // Adds the call to the channel's list of queued picks if not already present. - void MaybeAddCallToLbQueuedCallsLocked(grpc_call_element* elem); + static void RecvInitialMetadataReadyForConfigSelectorCommitCallback( + void* arg, grpc_error* error); + void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback( + grpc_transport_stream_op_batch* batch); + + static void CreateLbCall(void* arg, grpc_error* error); // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. @@ -853,36 +752,23 @@ class CallData { // Accessed while holding ChannelData::resolution_mu_. bool queued_pending_resolver_result_ = false; + ChannelData::ResolverQueuedCall resolver_queued_call_; bool service_config_applied_ = false; RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; const ClientChannelMethodParsedConfig* method_params_ = nullptr; - std::map<const char*, absl::string_view> call_attributes_; - std::function<void()> on_call_committed_; ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr; - // Accessed while holding ChannelData::data_plane_mu_. - ChannelData::QueuedCall queued_call_; - bool queued_pending_lb_pick_ = false; - LbCallState lb_call_state_; - const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; - RefCountedPtr<ConnectedSubchannel> connected_subchannel_; - std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*, - LoadBalancingPolicy::CallState*)> - lb_recv_trailing_metadata_ready_; - LbQueuedCallCanceller* lb_call_canceller_ = nullptr; - - RefCountedPtr<SubchannelCall> subchannel_call_; + std::function<void()> on_call_committed_; + grpc_closure* original_recv_initial_metadata_ready_ = nullptr; + grpc_closure recv_initial_metadata_ready_; - // For intercepting recv_trailing_metadata_ready for the LB policy. - grpc_metadata_batch* recv_trailing_metadata_ = nullptr; - grpc_closure recv_trailing_metadata_ready_; - grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + RefCountedPtr<LoadBalancedCall> lb_call_; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have - // passed the batch down to the subchannel call and are not - // intercepting any of its callbacks). + // passed the batch down to the LB call and are not intercepting any of + // its callbacks). PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {}; bool pending_send_initial_metadata_ : 1; bool pending_send_message_ : 1; @@ -931,6 +817,151 @@ class CallData { grpc_metadata_batch send_trailing_metadata_; }; +// +// LoadBalancedCall definition +// + +// This object is ref-counted, but it cannot inherit from RefCounted<>, +// because it is allocated on the arena and can't free its memory when +// its refcount goes to zero. So instead, it manually implements the +// same API as RefCounted<>, so that it can be used with RefCountedPtr<>. +class LoadBalancedCall { + public: + static RefCountedPtr<LoadBalancedCall> Create( + ChannelData* chand, const grpc_call_element_args& args, + grpc_polling_entity* pollent, size_t parent_data_size); + + LoadBalancedCall(ChannelData* chand, const grpc_call_element_args& args, + grpc_polling_entity* pollent); + ~LoadBalancedCall(); + + // Interface of RefCounted<>. + RefCountedPtr<LoadBalancedCall> Ref() GRPC_MUST_USE_RESULT; + RefCountedPtr<LoadBalancedCall> Ref(const DebugLocation& location, + const char* reason) GRPC_MUST_USE_RESULT; + // When refcount drops to 0, destroys itself and the associated call stack, + // but does NOT free the memory because it's in the call arena. + void Unref(); + void Unref(const DebugLocation& location, const char* reason); + + void* GetParentData(); + + void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); + + // Invoked by channel for queued LB picks when the picker is updated. + static void PickSubchannel(void* arg, grpc_error* error); + // Helper function for performing an LB pick while holding the data plane + // mutex. Returns true if the pick is complete, in which case the caller + // must invoke PickDone() or AsyncPickDone() with the returned error. + bool PickSubchannelLocked(grpc_error** error); + // Schedules a callback to process the completed pick. The callback + // will not run until after this method returns. + void AsyncPickDone(grpc_error* error); + + RefCountedPtr<SubchannelCall> subchannel_call() const { + return subchannel_call_; + } + + private: + // Allow RefCountedPtr<> to access IncrementRefCount(). + template <typename T> + friend class ::grpc_core::RefCountedPtr; + + class LbQueuedCallCanceller; + class Metadata; + class LbCallState; + + // Interface of RefCounted<>. + void IncrementRefCount(); + void IncrementRefCount(const DebugLocation& location, const char* reason); + + // Returns the index into pending_batches_ to be used for batch. + static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); + void PendingBatchesAdd(grpc_transport_stream_op_batch* batch); + static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error); + // A predicate type and some useful implementations for PendingBatchesFail(). + typedef bool (*YieldCallCombinerPredicate)( + const CallCombinerClosureList& closures); + static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) { + return true; + } + static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) { + return false; + } + static bool YieldCallCombinerIfPendingBatchesFound( + const CallCombinerClosureList& closures) { + return closures.size() > 0; + } + // Fails all pending batches. + // If yield_call_combiner_predicate returns true, assumes responsibility for + // yielding the call combiner. + void PendingBatchesFail( + grpc_error* error, + YieldCallCombinerPredicate yield_call_combiner_predicate); + static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored); + // Resumes all pending batches on subchannel_call_. + void PendingBatchesResume(); + + static void RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error* error); + void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch); + + void CreateSubchannelCall(); + // Invoked when a pick is completed, on both success or failure. + static void PickDone(void* arg, grpc_error* error); + // Removes the call from the channel's list of queued picks if present. + void MaybeRemoveCallFromLbQueuedCallsLocked(); + // Adds the call to the channel's list of queued picks if not already present. + void MaybeAddCallToLbQueuedCallsLocked(); + + RefCount refs_; + + ChannelData* chand_; + + // TODO(roth): Instead of duplicating these fields in every filter + // that uses any one of them, we should store them in the call + // context. This will save per-call memory overhead. + grpc_slice path_; // Request path. + gpr_cycle_counter call_start_time_; + grpc_millis deadline_; + Arena* arena_; + grpc_call_stack* owning_call_; + CallCombiner* call_combiner_; + grpc_call_context_element* call_context_; + + // Set when we get a cancel_stream op. + grpc_error* cancel_error_ = GRPC_ERROR_NONE; + + grpc_polling_entity* pollent_ = nullptr; + + grpc_closure pick_closure_; + + // Accessed while holding ChannelData::data_plane_mu_. + ChannelData::LbQueuedCall queued_call_; + bool queued_pending_lb_pick_ = false; + const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; + std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*, + LoadBalancingPolicy::CallState*)> + lb_recv_trailing_metadata_ready_; + LbQueuedCallCanceller* lb_call_canceller_ = nullptr; + + RefCountedPtr<SubchannelCall> subchannel_call_; + + // For intercepting recv_trailing_metadata_ready for the LB policy. + grpc_metadata_batch* recv_trailing_metadata_ = nullptr; + grpc_closure recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + + // Batches are added to this list when received from above. + // They are removed when we are done handling the batch (i.e., when + // either we have invoked all of the batch's callbacks or we have + // passed the batch down to the subchannel call and are not + // intercepting any of its callbacks). + grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; +}; + // // ChannelData::SubchannelWrapper // @@ -1834,7 +1865,7 @@ void ChannelData::OnResolverErrorLocked(grpc_error* error) { GRPC_ERROR_UNREF(resolver_transient_failure_error_); resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error); // Process calls that were queued waiting for the resolver result. - for (QueuedCall* call = resolver_queued_calls_; call != nullptr; + for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; call = call->next) { grpc_call_element* elem = call->elem; CallData* calld = static_cast<CallData*>(elem->call_data); @@ -1899,7 +1930,7 @@ OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked( return lb_policy; } -void ChannelData::AddResolverQueuedCall(QueuedCall* call, +void ChannelData::AddResolverQueuedCall(ResolverQueuedCall* call, grpc_polling_entity* pollent) { // Add call to queued calls list. call->next = resolver_queued_calls_; @@ -1909,12 +1940,12 @@ void ChannelData::AddResolverQueuedCall(QueuedCall* call, grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); } -void ChannelData::RemoveResolverQueuedCall(QueuedCall* to_remove, +void ChannelData::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, grpc_polling_entity* pollent) { // Remove call's pollent from channel's interested_parties. grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); // Remove from queued calls list. - for (QueuedCall** call = &resolver_queued_calls_; *call != nullptr; + for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr; call = &(*call)->next) { if (*call == to_remove) { *call = to_remove->next; @@ -2008,7 +2039,7 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { service_config_.swap(service_config); config_selector_.swap(config_selector); // Process calls that were queued waiting for the resolver result. - for (QueuedCall* call = resolver_queued_calls_; call != nullptr; + for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr; call = call->next) { grpc_call_element* elem = call->elem; CallData* calld = static_cast<CallData*>(elem->call_data); @@ -2122,13 +2153,11 @@ void ChannelData::UpdateStateAndPickerLocked( config_selector_to_unref = std::move(config_selector_); } // Re-process queued picks. - for (QueuedCall* call = lb_queued_calls_; call != nullptr; + for (LbQueuedCall* call = lb_queued_calls_; call != nullptr; call = call->next) { - grpc_call_element* elem = call->elem; - CallData* calld = static_cast<CallData*>(elem->call_data); grpc_error* error = GRPC_ERROR_NONE; - if (calld->PickSubchannelLocked(elem, &error)) { - calld->AsyncPickDone(elem, error); + if (call->lb_call->PickSubchannelLocked(&error)) { + call->lb_call->AsyncPickDone(error); } } } @@ -2246,7 +2275,7 @@ void ChannelData::GetChannelInfo(grpc_channel_element* elem, } } -void ChannelData::AddLbQueuedCall(QueuedCall* call, +void ChannelData::AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) { // Add call to queued picks list. call->next = lb_queued_calls_; @@ -2256,12 +2285,12 @@ void ChannelData::AddLbQueuedCall(QueuedCall* call, grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); } -void ChannelData::RemoveLbQueuedCall(QueuedCall* to_remove, +void ChannelData::RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) { // Remove call's pollent from channel's interested_parties. grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); // Remove from queued picks list. - for (QueuedCall** call = &lb_queued_calls_; *call != nullptr; + for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr; call = &(*call)->next) { if (*call == to_remove) { *call = to_remove->next; @@ -2363,7 +2392,6 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand, owning_call_(args.call_stack), call_combiner_(args.call_combiner), call_context_(args.context), - lb_call_state_(this), pending_send_initial_metadata_(false), pending_send_message_(false), pending_send_trailing_metadata_(false), @@ -2374,10 +2402,6 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand, CallData::~CallData() { grpc_slice_unref_internal(path_); GRPC_ERROR_UNREF(cancel_error_); - if (backend_metric_data_ != nullptr) { - backend_metric_data_ - ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData(); - } // Make sure there are no remaining pending batches. for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { GPR_ASSERT(pending_batches_[i].batch == nullptr); @@ -2395,7 +2419,10 @@ void CallData::Destroy(grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, grpc_closure* then_schedule_closure) { CallData* calld = static_cast<CallData*>(elem->call_data); - RefCountedPtr<SubchannelCall> subchannel_call = calld->subchannel_call_; + RefCountedPtr<SubchannelCall> subchannel_call; + if (GPR_LIKELY(calld->lb_call_ != nullptr)) { + subchannel_call = calld->lb_call_->subchannel_call(); + } calld->~CallData(); if (GPR_LIKELY(subchannel_call != nullptr)) { subchannel_call->SetAfterCallStackDestroy(then_schedule_closure); @@ -2413,6 +2440,10 @@ void CallData::StartTransportStreamOpBatch( if (GPR_LIKELY(chand->deadline_checking_enabled())) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } + // Intercept recv_initial_metadata for config selector on-committed callback. + if (batch->recv_initial_metadata) { + calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch); + } // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2438,10 +2469,10 @@ void CallData::StartTransportStreamOpBatch( gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, calld, grpc_error_string(calld->cancel_error_)); } - // If we do not have a subchannel call (i.e., a pick has not yet - // been started), fail all pending batches. Otherwise, send the - // cancellation down to the subchannel call. - if (calld->subchannel_call_ == nullptr) { + // If we do not have an LB call (i.e., a pick has not yet been started), + // fail all pending batches. Otherwise, send the cancellation down to the + // LB call. + if (calld->lb_call_ == nullptr) { // TODO(roth): If there is a pending retry callback, do we need to // cancel it here? calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_), @@ -2451,32 +2482,33 @@ void CallData::StartTransportStreamOpBatch( batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_); } else { // Note: This will release the call combiner. - calld->subchannel_call_->StartTransportStreamOpBatch(batch); + calld->lb_call_->StartTransportStreamOpBatch(batch); } return; } // Add the batch to the pending list. calld->PendingBatchesAdd(elem, batch); - // Check if we've already gotten a subchannel call. - // Note that once we have picked a subchannel, we do not need to acquire - // the channel's data plane mutex, which is more efficient (especially for + // Check if we've already created an LB call. + // Note that once we have created an LB call, we do not need to acquire + // the channel's resolution mutex, which is more efficient (especially for // streaming calls). - if (calld->subchannel_call_ != nullptr) { + if (calld->lb_call_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, - calld, calld->subchannel_call_.get()); + gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on lb_call=%p", + chand, calld, calld->lb_call_.get()); } calld->PendingBatchesResume(elem); return; } - // We do not yet have a subchannel call. + // We do not yet have an LB call. // For batches containing a send_initial_metadata op, acquire the - // channel's data plane mutex to pick a subchannel. + // channel's resolution mutex to apply the service config to the call, + // after which we will create an LB call. if (GPR_LIKELY(batch->send_initial_metadata)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, - "chand=%p calld=%p: grabbing data plane mutex to perform pick", + "chand=%p calld=%p: grabbing resolution mutex to apply service " + "config", chand, calld); } CheckResolution(elem, GRPC_ERROR_NONE); @@ -2597,59 +2629,6 @@ void CallData::FreeCachedSendOpDataForCompletedBatch( } } -// -// LB recv_trailing_metadata_ready handling -// - -void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( - void* arg, grpc_error* error) { - CallData* calld = static_cast<CallData*>(arg); - // Set error if call did not succeed. - grpc_error* error_for_lb = GRPC_ERROR_NONE; - if (error != GRPC_ERROR_NONE) { - error_for_lb = error; - } else { - const auto& fields = calld->recv_trailing_metadata_->idx.named; - GPR_ASSERT(fields.grpc_status != nullptr); - grpc_status_code status = - grpc_get_status_code_from_metadata(fields.grpc_status->md); - std::string msg; - if (status != GRPC_STATUS_OK) { - error_for_lb = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"), - GRPC_ERROR_INT_GRPC_STATUS, status); - if (fields.grpc_message != nullptr) { - error_for_lb = grpc_error_set_str( - error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md))); - } - } - } - // Invoke callback to LB policy. - Metadata trailing_metadata(calld, calld->recv_trailing_metadata_); - calld->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata, - &calld->lb_call_state_); - if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb); - // Chain to original callback. - Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, - GRPC_ERROR_REF(error)); -} - -void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - grpc_transport_stream_op_batch* batch) { - if (lb_recv_trailing_metadata_ready_ != nullptr) { - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, - RecvTrailingMetadataReadyForLoadBalancingPolicy, this, - grpc_schedule_on_exec_ctx); - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; - } -} - // // pending_batches management // @@ -2706,9 +2685,9 @@ void CallData::PendingBatchesAdd(grpc_call_element* elem, chand, this); } SubchannelCallRetryState* retry_state = - subchannel_call_ == nullptr ? nullptr - : static_cast<SubchannelCallRetryState*>( - subchannel_call_->GetParentData()); + lb_call_ == nullptr ? nullptr + : static_cast<SubchannelCallRetryState*>( + lb_call_->GetParentData()); RetryCommit(elem, retry_state); // If we are not going to retry and have not yet started, pretend // retries are disabled so that we don't bother with retry overhead. @@ -2791,9 +2770,6 @@ void CallData::PendingBatchesFail( PendingBatch* pending = &pending_batches_[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - if (batch->recv_trailing_metadata) { - MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); - } batch->handler_private.extra_arg = this; GRPC_CLOSURE_INIT(&batch->handler_private.closure, FailPendingBatchInCallCombiner, batch, @@ -2816,10 +2792,10 @@ void CallData::ResumePendingBatchInCallCombiner(void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast<grpc_transport_stream_op_batch*>(arg); - SubchannelCall* subchannel_call = - static_cast<SubchannelCall*>(batch->handler_private.extra_arg); + auto* lb_call = + static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg); // Note: This will release the call combiner. - subchannel_call->StartTransportStreamOpBatch(batch); + lb_call->StartTransportStreamOpBatch(batch); } // This is called via the call combiner, so access to calld is synchronized. @@ -2837,21 +2813,17 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) { } gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR - " pending batches on subchannel_call=%p", - chand, this, num_batches, subchannel_call_.get()); + " pending batches on lb_call=%p", + chand, this, num_batches, lb_call_.get()); } CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { PendingBatch* pending = &pending_batches_[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - if (batch->recv_trailing_metadata) { - MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); - } - batch->handler_private.extra_arg = subchannel_call_.get(); + batch->handler_private.extra_arg = lb_call_.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, - ResumePendingBatchInCallCombiner, batch, - grpc_schedule_on_exec_ctx); + ResumePendingBatchInCallCombiner, batch, nullptr); closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "PendingBatchesResume"); PendingBatchClear(pending); @@ -2905,8 +2877,8 @@ void CallData::DoRetry(grpc_call_element* elem, GPR_ASSERT(method_params_ != nullptr); const auto* retry_policy = method_params_->retry_policy(); GPR_ASSERT(retry_policy != nullptr); - // Reset subchannel call. - subchannel_call_.reset(); + // Reset LB call. + lb_call_.reset(); // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { @@ -2930,8 +2902,7 @@ void CallData::DoRetry(grpc_call_element* elem, this, next_attempt_time - ExecCtx::Get()->Now()); } // Schedule retry after computed delay. - GRPC_CLOSURE_INIT(&pick_closure_, PickSubchannel, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&pick_closure_, CreateLbCall, elem, nullptr); grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_); // Update bookkeeping. if (retry_state != nullptr) retry_state->retry_dispatched = true; @@ -2952,7 +2923,7 @@ bool CallData::MaybeRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state = nullptr; if (batch_data != nullptr) { retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); if (retry_state->retry_dispatched) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, @@ -3058,10 +3029,9 @@ CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create( CallData::SubchannelCallBatchData::SubchannelCallBatchData( grpc_call_element* elem, CallData* calld, int refcount, bool set_on_complete) - : elem(elem), subchannel_call(calld->subchannel_call_) { + : elem(elem), lb_call(calld->lb_call_) { SubchannelCallRetryState* retry_state = - static_cast<SubchannelCallRetryState*>( - calld->subchannel_call_->GetParentData()); + static_cast<SubchannelCallRetryState*>(calld->lb_call_->GetParentData()); batch.payload = &retry_state->batch_payload; gpr_ref_init(&refs, refcount); if (set_on_complete) { @@ -3074,7 +3044,7 @@ CallData::SubchannelCallBatchData::SubchannelCallBatchData( void CallData::SubchannelCallBatchData::Destroy() { SubchannelCallRetryState* retry_state = - static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData()); + static_cast<SubchannelCallRetryState*>(lb_call->GetParentData()); if (batch.send_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } @@ -3087,7 +3057,7 @@ void CallData::SubchannelCallBatchData::Destroy() { if (batch.recv_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } - subchannel_call.reset(); + lb_call.reset(); CallData* calld = static_cast<CallData*>(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data"); } @@ -3112,7 +3082,7 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) { // Return metadata. SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); grpc_metadata_batch_move( &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); @@ -3144,7 +3114,7 @@ void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { } SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); retry_state->completed_recv_initial_metadata = true; // If a retry was already dispatched, then we're not going to use the // result of this recv_initial_metadata op, so do nothing. @@ -3182,7 +3152,6 @@ void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. calld->RetryCommit(elem, retry_state); - calld->MaybeInvokeConfigSelectorCommitCallback(); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. calld->InvokeRecvInitialMetadataCallback(batch_data, error); @@ -3207,7 +3176,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) { // Return payload. SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); *pending->batch->payload->recv_message.recv_message = std::move(retry_state->recv_message); // Update bookkeeping. @@ -3234,7 +3203,7 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) { } SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); ++retry_state->completed_recv_message_count; // If a retry was already dispatched, then we're not going to use the // result of this recv_message op, so do nothing. @@ -3269,7 +3238,6 @@ void CallData::RecvMessageReady(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. calld->RetryCommit(elem, retry_state); - calld->MaybeInvokeConfigSelectorCommitCallback(); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. calld->InvokeRecvMessageCallback(batch_data, error); @@ -3316,7 +3284,7 @@ void CallData::AddClosureForRecvTrailingMetadataReady( // Return metadata. SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); grpc_metadata_batch_move( &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); @@ -3408,7 +3376,7 @@ void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data, grpc_call_element* elem = batch_data->elem; SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); // Construct list of closures to execute. CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. @@ -3441,7 +3409,7 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); retry_state->completed_recv_trailing_metadata = true; // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; @@ -3471,7 +3439,6 @@ void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } // Not retrying, so commit the call. calld->RetryCommit(elem, retry_state); - calld->MaybeInvokeConfigSelectorCommitCallback(); // Run any necessary closures. calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error)); } @@ -3555,7 +3522,7 @@ void CallData::OnComplete(void* arg, grpc_error* error) { } SubchannelCallRetryState* retry_state = static_cast<SubchannelCallRetryState*>( - batch_data->subchannel_call->GetParentData()); + batch_data->lb_call->GetParentData()); // Update bookkeeping in retry_state. if (batch_data->batch.send_initial_metadata) { retry_state->completed_send_initial_metadata = true; @@ -3610,17 +3577,17 @@ void CallData::OnComplete(void* arg, grpc_error* error) { void CallData::StartBatchInCallCombiner(void* arg, grpc_error* /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast<grpc_transport_stream_op_batch*>(arg); - SubchannelCall* subchannel_call = - static_cast<SubchannelCall*>(batch->handler_private.extra_arg); + auto* lb_call = + static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg); // Note: This will release the call combiner. - subchannel_call->StartTransportStreamOpBatch(batch); + lb_call->StartTransportStreamOpBatch(batch); } void CallData::AddClosureForSubchannelBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) { ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); - batch->handler_private.extra_arg = subchannel_call_.get(); + batch->handler_private.extra_arg = lb_call_.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, batch, grpc_schedule_on_exec_ctx); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -3760,8 +3727,6 @@ void CallData::AddRetriableRecvTrailingMetadataOp( batch_data->batch.payload->recv_trailing_metadata .recv_trailing_metadata_ready = &retry_state->recv_trailing_metadata_ready; - MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - &batch_data->batch); } void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) { @@ -3773,7 +3738,7 @@ void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) { chand, this); } SubchannelCallRetryState* retry_state = - static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData()); + static_cast<SubchannelCallRetryState*>(lb_call_->GetParentData()); // Create batch_data with 2 refs, since this batch will be unreffed twice: // once for the recv_trailing_metadata_ready callback when the subchannel // batch returns, and again when we actually get a recv_trailing_metadata @@ -3783,7 +3748,7 @@ void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) { AddRetriableRecvTrailingMetadataOp(retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. - subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch); + lb_call_->StartTransportStreamOpBatch(&batch_data->batch); } // If there are any cached send ops that need to be replayed on the @@ -3919,8 +3884,6 @@ void CallData::AddSubchannelBatchesForPendingBatches( // If we're not retrying, just send the batch as-is. if (method_params_ == nullptr || method_params_->retry_policy() == nullptr || retry_committed_) { - // TODO(roth) : We should probably call - // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here. AddClosureForSubchannelBatch(elem, batch, closures); PendingBatchClear(pending); continue; @@ -3985,8 +3948,7 @@ void CallData::StartRetriableSubchannelBatches(void* arg, chand, calld); } SubchannelCallRetryState* retry_state = - static_cast<SubchannelCallRetryState*>( - calld->subchannel_call_->GetParentData()); + static_cast<SubchannelCallRetryState*>(calld->lb_call_->GetParentData()); // Construct list of closures to execute, one for each pending batch. CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. @@ -4008,8 +3970,8 @@ void CallData::StartRetriableSubchannelBatches(void* arg, if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR - " retriable batches on subchannel_call=%p", - chand, calld, closures.size(), calld->subchannel_call_.get()); + " retriable batches on lb_call=%p", + chand, calld, closures.size(), calld->lb_call_.get()); } // Note: This will yield the call combiner. closures.RunClosures(calld->call_combiner_); @@ -4070,7 +4032,7 @@ void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( "chand=%p calld=%p: removing from resolver queued picks list", chand, this); } - chand->RemoveResolverQueuedCall(&queued_call_, pollent_); + chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_); queued_pending_resolver_result_ = false; // Lame the call combiner canceller. resolver_call_canceller_ = nullptr; @@ -4085,8 +4047,8 @@ void CallData::MaybeAddCallToResolverQueuedCallsLocked( chand, this); } queued_pending_resolver_result_ = true; - queued_call_.elem = elem; - chand->AddResolverQueuedCall(&queued_call_, pollent_); + resolver_queued_call_.elem = elem; + chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_); // Register call combiner cancellation callback. resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem); } @@ -4104,7 +4066,6 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked( ConfigSelector::CallConfig call_config = config_selector->GetCallConfig({&path_, initial_metadata, arena_}); if (call_config.error != GRPC_ERROR_NONE) return call_config.error; - call_attributes_ = std::move(call_config.call_attributes); on_call_committed_ = std::move(call_config.on_call_committed); // Create a ServiceConfigCallData for the call. This stores a ref to the // ServiceConfig and caches the right set of parsed configs to use for @@ -4113,7 +4074,7 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked( // will be cleaned up when the call ends. auto* service_config_call_data = arena_->New<ServiceConfigCallData>( std::move(call_config.service_config), call_config.method_configs, - call_context_); + std::move(call_config.call_attributes), call_context_); // Apply our own method params to the call. method_params_ = static_cast<ClientChannelMethodParsedConfig*>( service_config_call_data->GetMethodParsedConfig( @@ -4158,11 +4119,29 @@ grpc_error* CallData::ApplyServiceConfigToCallLocked( return GRPC_ERROR_NONE; } -void CallData::MaybeInvokeConfigSelectorCommitCallback() { - if (on_call_committed_ != nullptr) { - on_call_committed_(); - on_call_committed_ = nullptr; +void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback( + void* arg, grpc_error* error) { + auto* self = static_cast<CallData*>(arg); + if (self->on_call_committed_ != nullptr) { + self->on_call_committed_(); + self->on_call_committed_ = nullptr; } + // Chain to original callback. + Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_, + GRPC_ERROR_REF(error)); +} + +// TODO(roth): Consider not intercepting this callback unless we +// actually need to, if this causes a performance problem. +void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback( + grpc_transport_stream_op_batch* batch) { + original_recv_initial_metadata_ready_ = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, + RecvInitialMetadataReadyForConfigSelectorCommitCallback, + this, nullptr); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &recv_initial_metadata_ready_; } void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) { @@ -4183,7 +4162,7 @@ void CallData::ResolutionDone(void* arg, grpc_error* error) { calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); return; } - calld->PickSubchannel(elem, GRPC_ERROR_NONE); + calld->CreateLbCall(elem, GRPC_ERROR_NONE); } void CallData::CheckResolution(void* arg, grpc_error* error) { @@ -4262,126 +4241,567 @@ bool CallData::CheckResolutionLocked(grpc_call_element* elem, return true; } +void CallData::CreateLbCall(void* arg, grpc_error* /*error*/) { + auto* elem = static_cast<grpc_call_element*>(arg); + auto* chand = static_cast<ChannelData*>(elem->channel_data); + auto* calld = static_cast<CallData*>(elem->call_data); + const size_t parent_data_size = + calld->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; + grpc_call_element_args args = { + calld->owning_call_, nullptr, + calld->call_context_, calld->path_, + calld->call_start_time_, calld->deadline_, + calld->arena_, calld->call_combiner_}; + calld->lb_call_ = + LoadBalancedCall::Create(chand, args, calld->pollent_, parent_data_size); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: create lb_call=%p", chand, calld, + calld->lb_call_.get()); + } + if (parent_data_size > 0) { + new (calld->lb_call_->GetParentData()) + SubchannelCallRetryState(calld->call_context_); + } + calld->PendingBatchesResume(elem); +} + // -// LB pick +// LoadBalancedCall::Metadata // -void CallData::CreateSubchannelCall(grpc_call_element* elem) { - ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); - const size_t parent_data_size = - enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; +class LoadBalancedCall::Metadata + : public LoadBalancingPolicy::MetadataInterface { + public: + Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch) + : lb_call_(lb_call), batch_(batch) {} + + void Add(absl::string_view key, absl::string_view value) override { + grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>( + lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem))); + linked_mdelem->md = grpc_mdelem_from_slices( + ExternallyManagedSlice(key.data(), key.size()), + ExternallyManagedSlice(value.data(), value.size())); + GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) == + GRPC_ERROR_NONE); + } + + iterator begin() const override { + static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t), + "iterator size too large"); + return iterator( + this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head))); + } + iterator end() const override { + static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t), + "iterator size too large"); + return iterator(this, 0); + } + + iterator erase(iterator it) override { + grpc_linked_mdelem* linked_mdelem = + reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it)); + intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next); + grpc_metadata_batch_remove(batch_, linked_mdelem); + return iterator(this, handle); + } + + private: + grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const { + if (entry != nullptr && batch_->idx.named.path == entry) { + return entry->next; + } + return entry; + } + + intptr_t IteratorHandleNext(intptr_t handle) const override { + grpc_linked_mdelem* linked_mdelem = + reinterpret_cast<grpc_linked_mdelem*>(handle); + return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next)); + } + + std::pair<absl::string_view, absl::string_view> IteratorHandleGet( + intptr_t handle) const override { + grpc_linked_mdelem* linked_mdelem = + reinterpret_cast<grpc_linked_mdelem*>(handle); + return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)), + StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md))); + } + + LoadBalancedCall* lb_call_; + grpc_metadata_batch* batch_; +}; + +// +// LoadBalancedCall::LbCallState +// + +class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState { + public: + explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {} + + void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); } + + const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData() + override { + if (lb_call_->backend_metric_data_ == nullptr) { + grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named + .x_endpoint_load_metrics_bin; + if (md != nullptr) { + lb_call_->backend_metric_data_ = + ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_); + } + } + return lb_call_->backend_metric_data_; + } + + absl::string_view ExperimentalGetCallAttribute(const char* key) override { + auto* service_config_call_data = static_cast<ServiceConfigCallData*>( + lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); + auto& call_attributes = service_config_call_data->call_attributes(); + auto it = call_attributes.find(key); + if (it == call_attributes.end()) return absl::string_view(); + return it->second; + } + + private: + LoadBalancedCall* lb_call_; +}; + +// +// LoadBalancedCall +// + +RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Create( + ChannelData* chand, const grpc_call_element_args& args, + grpc_polling_entity* pollent, size_t parent_data_size) { + const size_t alloc_size = + parent_data_size > 0 + ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) + + parent_data_size) + : sizeof(LoadBalancedCall); + auto* lb_call = static_cast<LoadBalancedCall*>(args.arena->Alloc(alloc_size)); + new (lb_call) LoadBalancedCall(chand, args, pollent); + return lb_call; +} + +LoadBalancedCall::LoadBalancedCall(ChannelData* chand, + const grpc_call_element_args& args, + grpc_polling_entity* pollent) + : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) + ? "LoadBalancedCall" + : nullptr), + chand_(chand), + path_(grpc_slice_ref_internal(args.path)), + call_start_time_(args.start_time), + deadline_(args.deadline), + arena_(args.arena), + owning_call_(args.call_stack), + call_combiner_(args.call_combiner), + call_context_(args.context), + pollent_(pollent) {} + +LoadBalancedCall::~LoadBalancedCall() { + grpc_slice_unref_internal(path_); + GRPC_ERROR_UNREF(cancel_error_); + if (backend_metric_data_ != nullptr) { + backend_metric_data_ + ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData(); + } + // Make sure there are no remaining pending batches. + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + GPR_ASSERT(pending_batches_[i] == nullptr); + } +} + +RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref() { + IncrementRefCount(); + return RefCountedPtr<LoadBalancedCall>(this); +} + +RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref( + const DebugLocation& location, const char* reason) { + IncrementRefCount(location, reason); + return RefCountedPtr<LoadBalancedCall>(this); +} + +void LoadBalancedCall::Unref() { + if (GPR_UNLIKELY(refs_.Unref())) { + this->~LoadBalancedCall(); + } +} + +void LoadBalancedCall::Unref(const DebugLocation& location, + const char* reason) { + if (GPR_UNLIKELY(refs_.Unref(location, reason))) { + this->~LoadBalancedCall(); + } +} + +void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); } + +void LoadBalancedCall::IncrementRefCount(const DebugLocation& location, + const char* reason) { + refs_.Ref(location, reason); +} + +void* LoadBalancedCall::GetParentData() { + return reinterpret_cast<char*>(this) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)); +} + +size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) { + // Note: It is important the send_initial_metadata be the first entry + // here, since the code in pick_subchannel_locked() assumes it will be. + if (batch->send_initial_metadata) return 0; + if (batch->send_message) return 1; + if (batch->send_trailing_metadata) return 2; + if (batch->recv_initial_metadata) return 3; + if (batch->recv_message) return 4; + if (batch->recv_trailing_metadata) return 5; + GPR_UNREACHABLE_CODE(return (size_t)-1); +} + +// This is called via the call combiner, so access to calld is synchronized. +void LoadBalancedCall::PendingBatchesAdd( + grpc_transport_stream_op_batch* batch) { + const size_t idx = GetBatchIndex(batch); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR, + chand_, this, idx); + } + GPR_ASSERT(pending_batches_[idx] == nullptr); + pending_batches_[idx] = batch; +} + +// This is called via the call combiner, so access to calld is synchronized. +void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg, + grpc_error* error) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(error), self->call_combiner_); +} + +// This is called via the call combiner, so access to calld is synchronized. +void LoadBalancedCall::PendingBatchesFail( + grpc_error* error, + YieldCallCombinerPredicate yield_call_combiner_predicate) { + GPR_ASSERT(error != GRPC_ERROR_NONE); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + if (pending_batches_[i] != nullptr) ++num_batches; + } + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s", + chand_, this, num_batches, grpc_error_string(error)); + } + CallCombinerClosureList closures; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + grpc_transport_stream_op_batch*& batch = pending_batches_[i]; + if (batch != nullptr) { + batch->handler_private.extra_arg = this; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + FailPendingBatchInCallCombiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), + "PendingBatchesFail"); + batch = nullptr; + } + } + if (yield_call_combiner_predicate(closures)) { + closures.RunClosures(call_combiner_); + } else { + closures.RunClosuresWithoutYielding(call_combiner_); + } + GRPC_ERROR_UNREF(error); +} + +// This is called via the call combiner, so access to calld is synchronized. +void LoadBalancedCall::ResumePendingBatchInCallCombiner( + void* arg, grpc_error* /*ignored*/) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + SubchannelCall* subchannel_call = + static_cast<SubchannelCall*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + subchannel_call->StartTransportStreamOpBatch(batch); +} + +// This is called via the call combiner, so access to calld is synchronized. +void LoadBalancedCall::PendingBatchesResume() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + if (pending_batches_[i] != nullptr) ++num_batches; + } + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: starting %" PRIuPTR + " pending batches on subchannel_call=%p", + chand_, this, num_batches, subchannel_call_.get()); + } + CallCombinerClosureList closures; + for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { + grpc_transport_stream_op_batch*& batch = pending_batches_[i]; + if (batch != nullptr) { + batch->handler_private.extra_arg = subchannel_call_.get(); + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + ResumePendingBatchInCallCombiner, batch, + grpc_schedule_on_exec_ctx); + closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, + "PendingBatchesResume"); + batch = nullptr; + } + } + // Note: This will release the call combiner. + closures.RunClosures(call_combiner_); +} + +void LoadBalancedCall::StartTransportStreamOpBatch( + grpc_transport_stream_op_batch* batch) { + // Intercept recv_trailing_metadata_ready for LB callback. + if (batch->recv_trailing_metadata) { + InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); + } + // If we've previously been cancelled, immediately fail any new batches. + if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s", + chand_, this, grpc_error_string(cancel_error_)); + } + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(cancel_error_), call_combiner_); + return; + } + // Handle cancellation. + if (GPR_UNLIKELY(batch->cancel_stream)) { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any batches are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first batch does get passed down. + GRPC_ERROR_UNREF(cancel_error_); + cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s", + chand_, this, grpc_error_string(cancel_error_)); + } + // If we do not have a subchannel call (i.e., a pick has not yet + // been started), fail all pending batches. Otherwise, send the + // cancellation down to the subchannel call. + if (subchannel_call_ == nullptr) { + PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(cancel_error_), call_combiner_); + } else { + // Note: This will release the call combiner. + subchannel_call_->StartTransportStreamOpBatch(batch); + } + return; + } + // Add the batch to the pending list. + PendingBatchesAdd(batch); + // Check if we've already gotten a subchannel call. + // Note that once we have picked a subchannel, we do not need to acquire + // the channel's data plane mutex, which is more efficient (especially for + // streaming calls). + if (subchannel_call_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: starting batch on subchannel_call=%p", + chand_, this, subchannel_call_.get()); + } + PendingBatchesResume(); + return; + } + // We do not yet have a subchannel call. + // For batches containing a send_initial_metadata op, acquire the + // channel's data plane mutex to pick a subchannel. + if (GPR_LIKELY(batch->send_initial_metadata)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: grabbing data plane mutex to perform pick", + chand_, this); + } + PickSubchannel(this, GRPC_ERROR_NONE); + } else { + // For all other batches, release the call combiner. + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: saved batch, yielding call combiner", + chand_, this); + } + GRPC_CALL_COMBINER_STOP(call_combiner_, + "batch does not include send_initial_metadata"); + } +} + +void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error* error) { + auto* self = static_cast<LoadBalancedCall*>(arg); + if (self->lb_recv_trailing_metadata_ready_ != nullptr) { + // Set error if call did not succeed. + grpc_error* error_for_lb = GRPC_ERROR_NONE; + if (error != GRPC_ERROR_NONE) { + error_for_lb = error; + } else { + const auto& fields = self->recv_trailing_metadata_->idx.named; + GPR_ASSERT(fields.grpc_status != nullptr); + grpc_status_code status = + grpc_get_status_code_from_metadata(fields.grpc_status->md); + std::string msg; + if (status != GRPC_STATUS_OK) { + error_for_lb = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"), + GRPC_ERROR_INT_GRPC_STATUS, status); + if (fields.grpc_message != nullptr) { + error_for_lb = grpc_error_set_str( + error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md))); + } + } + } + // Invoke callback to LB policy. + Metadata trailing_metadata(self, self->recv_trailing_metadata_); + LbCallState lb_call_state(self); + self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata, + &lb_call_state); + if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb); + } + // Chain to original callback. + Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_, + GRPC_ERROR_REF(error)); +} + +// TODO(roth): Consider not intercepting this callback unless we +// actually need to, if this causes a performance problem. +void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( + grpc_transport_stream_op_batch* batch) { + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ready_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, + RecvTrailingMetadataReadyForLoadBalancingPolicy, this, + grpc_schedule_on_exec_ctx); + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &recv_trailing_metadata_ready_; +} + +void LoadBalancedCall::CreateSubchannelCall() { SubchannelCall::Args call_args = { std::move(connected_subchannel_), pollent_, path_, call_start_time_, deadline_, arena_, // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. - call_context_, call_combiner_, parent_data_size}; + call_context_, call_combiner_}; grpc_error* error = GRPC_ERROR_NONE; subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", - chand, this, subchannel_call_.get(), grpc_error_string(error)); + gpr_log(GPR_INFO, + "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_, + this, subchannel_call_.get(), grpc_error_string(error)); } if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - PendingBatchesFail(elem, error, YieldCallCombiner); + PendingBatchesFail(error, YieldCallCombiner); } else { - if (parent_data_size > 0) { - new (subchannel_call_->GetParentData()) - SubchannelCallRetryState(call_context_); - } - PendingBatchesResume(elem); + PendingBatchesResume(); } } // A class to handle the call combiner cancellation callback for a // queued pick. -class CallData::LbQueuedCallCanceller { +// TODO(roth): When we implement hedging support, we won't be able to +// register a call combiner cancellation closure for each LB pick, +// because there may be multiple LB picks happening in parallel. +// Instead, we will probably need to maintain a list in the CallData +// object of pending LB picks to be cancelled when the closure runs. +class LoadBalancedCall::LbQueuedCallCanceller { public: - explicit LbQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) { - auto* calld = static_cast<CallData*>(elem->call_data); - GRPC_CALL_STACK_REF(calld->owning_call_, "LbQueuedCallCanceller"); - GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, - grpc_schedule_on_exec_ctx); - calld->call_combiner_->SetNotifyOnCancel(&closure_); + explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call) + : lb_call_(std::move(lb_call)) { + GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller"); + GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr); + lb_call_->call_combiner_->SetNotifyOnCancel(&closure_); } private: static void CancelLocked(void* arg, grpc_error* error) { auto* self = static_cast<LbQueuedCallCanceller*>(arg); - auto* chand = static_cast<ChannelData*>(self->elem_->channel_data); - auto* calld = static_cast<CallData*>(self->elem_->call_data); + auto* lb_call = self->lb_call_.get(); + auto* chand = lb_call->chand_; { MutexLock lock(chand->data_plane_mu()); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling queued pick: " + "chand=%p lb_call=%p: cancelling queued pick: " "error=%s self=%p calld->pick_canceller=%p", - chand, calld, grpc_error_string(error), self, - calld->lb_call_canceller_); + chand, lb_call, grpc_error_string(error), self, + lb_call->lb_call_canceller_); } - if (calld->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) { + if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) { // Remove pick from list of queued picks. - calld->MaybeInvokeConfigSelectorCommitCallback(); - calld->MaybeRemoveCallFromLbQueuedCallsLocked(self->elem_); + lb_call->MaybeRemoveCallFromLbQueuedCallsLocked(); // Fail pending batches on the call. - calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error), - YieldCallCombinerIfPendingBatchesFound); + lb_call->PendingBatchesFail(GRPC_ERROR_REF(error), + YieldCallCombinerIfPendingBatchesFound); } } - GRPC_CALL_STACK_UNREF(calld->owning_call_, "LbQueuedCallCanceller"); + GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller"); delete self; } - grpc_call_element* elem_; + RefCountedPtr<LoadBalancedCall> lb_call_; grpc_closure closure_; }; -void CallData::MaybeRemoveCallFromLbQueuedCallsLocked(grpc_call_element* elem) { +void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() { if (!queued_pending_lb_pick_) return; - auto* chand = static_cast<ChannelData*>(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list", - chand, this); + gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", + chand_, this); } - chand->RemoveLbQueuedCall(&queued_call_, pollent_); + chand_->RemoveLbQueuedCall(&queued_call_, pollent_); queued_pending_lb_pick_ = false; // Lame the call combiner canceller. lb_call_canceller_ = nullptr; } -void CallData::MaybeAddCallToLbQueuedCallsLocked(grpc_call_element* elem) { +void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() { if (queued_pending_lb_pick_) return; - auto* chand = static_cast<ChannelData*>(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand, - this); + gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", + chand_, this); } queued_pending_lb_pick_ = true; - queued_call_.elem = elem; - chand->AddLbQueuedCall(&queued_call_, pollent_); + queued_call_.lb_call = this; + chand_->AddLbQueuedCall(&queued_call_, pollent_); // Register call combiner cancellation callback. - lb_call_canceller_ = new LbQueuedCallCanceller(elem); + lb_call_canceller_ = new LbQueuedCallCanceller(Ref()); } -void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { - GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); +void LoadBalancedCall::AsyncPickDone(grpc_error* error) { + GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } -void CallData::PickDone(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); - CallData* calld = static_cast<CallData*>(elem->call_data); +void LoadBalancedCall::PickDone(void* arg, grpc_error* error) { + auto* self = static_cast<LoadBalancedCall*>(arg); if (error != GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, - "chand=%p calld=%p: failed to pick subchannel: error=%s", chand, - calld, grpc_error_string(error)); + "chand=%p lb_call=%p: failed to pick subchannel: error=%s", + self->chand_, self, grpc_error_string(error)); } - calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); + self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner); return; } - calld->CreateSubchannelCall(elem); + self->CreateSubchannelCall(); } const char* PickResultTypeName( @@ -4397,69 +4817,51 @@ const char* PickResultTypeName( GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void CallData::PickSubchannel(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - CallData* calld = static_cast<CallData*>(elem->call_data); - ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); +void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) { + auto* self = static_cast<LoadBalancedCall*>(arg); bool pick_complete; { - MutexLock lock(chand->data_plane_mu()); - pick_complete = calld->PickSubchannelLocked(elem, &error); + MutexLock lock(self->chand_->data_plane_mu()); + pick_complete = self->PickSubchannelLocked(&error); } if (pick_complete) { - PickDone(elem, error); + PickDone(self, error); GRPC_ERROR_UNREF(error); } } -bool CallData::PickSubchannelLocked(grpc_call_element* elem, - grpc_error** error) { - ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); +bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) { GPR_ASSERT(connected_subchannel_ == nullptr); GPR_ASSERT(subchannel_call_ == nullptr); - // If this is a retry, use the send_initial_metadata payload that - // we've cached; otherwise, use the pending batch. The - // send_initial_metadata batch will be the first pending batch in the - // list, as set by GetBatchIndex() above. - // TODO(roth): What if the LB policy needs to add something to the - // call's initial metadata, and then there's a retry? We don't want - // the new metadata to be added twice. We might need to somehow - // allocate the subchannel batch earlier so that we can give the - // subchannel's copy of the metadata batch (which is copied for each - // attempt) to the LB policy instead the one from the parent channel. + // Grab initial metadata. + auto& send_initial_metadata = + pending_batches_[0]->payload->send_initial_metadata; grpc_metadata_batch* initial_metadata_batch = - seen_send_initial_metadata_ - ? &send_initial_metadata_ - : pending_batches_[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - // Grab initial metadata flags so that we can check later if the call has - // wait_for_ready enabled. + send_initial_metadata.send_initial_metadata; const uint32_t send_initial_metadata_flags = - seen_send_initial_metadata_ ? send_initial_metadata_flags_ - : pending_batches_[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; + send_initial_metadata.send_initial_metadata_flags; // Perform LB pick. LoadBalancingPolicy::PickArgs pick_args; pick_args.path = StringViewFromSlice(path_); - pick_args.call_state = &lb_call_state_; + LbCallState lb_call_state(this); + pick_args.call_state = &lb_call_state; Metadata initial_metadata(this, initial_metadata_batch); pick_args.initial_metadata = &initial_metadata; - auto result = chand->picker()->Pick(pick_args); + auto result = chand_->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: LB pick returned %s (subchannel=%p, error=%s)", - chand, this, PickResultTypeName(result.type), - result.subchannel.get(), grpc_error_string(result.error)); + gpr_log( + GPR_INFO, + "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)", + chand_, this, PickResultTypeName(result.type), result.subchannel.get(), + grpc_error_string(result.error)); } switch (result.type) { case LoadBalancingPolicy::PickResult::PICK_FAILED: { // If we're shutting down, fail all RPCs. - grpc_error* disconnect_error = chand->disconnect_error(); + grpc_error* disconnect_error = chand_->disconnect_error(); if (disconnect_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(result.error); - MaybeRemoveCallFromLbQueuedCallsLocked(elem); - MaybeInvokeConfigSelectorCommitCallback(); + MaybeRemoveCallFromLbQueuedCallsLocked(); *error = GRPC_ERROR_REF(disconnect_error); return true; } @@ -4467,23 +4869,13 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, // attempt's final status. if ((send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { - // Retry if appropriate; otherwise, fail. - grpc_status_code status = GRPC_STATUS_OK; - grpc_error_get_status(result.error, deadline_, &status, nullptr, - nullptr, nullptr); - const bool retried = enable_retries_ && - MaybeRetry(elem, nullptr /* batch_data */, status, - nullptr /* server_pushback_md */); - if (!retried) { - grpc_error* new_error = - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to pick subchannel", &result.error, 1); - GRPC_ERROR_UNREF(result.error); - *error = new_error; - MaybeInvokeConfigSelectorCommitCallback(); - } - MaybeRemoveCallFromLbQueuedCallsLocked(elem); - return !retried; + grpc_error* new_error = + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to pick subchannel", &result.error, 1); + GRPC_ERROR_UNREF(result.error); + *error = new_error; + MaybeRemoveCallFromLbQueuedCallsLocked(); + return true; } // If wait_for_ready is true, then queue to retry when we get a new // picker. @@ -4491,26 +4883,22 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, } // Fallthrough case LoadBalancingPolicy::PickResult::PICK_QUEUE: - MaybeAddCallToLbQueuedCallsLocked(elem); + MaybeAddCallToLbQueuedCallsLocked(); return false; default: // PICK_COMPLETE - MaybeRemoveCallFromLbQueuedCallsLocked(elem); + MaybeRemoveCallFromLbQueuedCallsLocked(); // Handle drops. if (GPR_UNLIKELY(result.subchannel == nullptr)) { result.error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - MaybeInvokeConfigSelectorCommitCallback(); } else { // Grab a ref to the connected subchannel while we're still // holding the data plane mutex. connected_subchannel_ = - chand->GetConnectedSubchannelInDataPlane(result.subchannel.get()); + chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get()); GPR_ASSERT(connected_subchannel_ != nullptr); - if (!enable_retries_ || retry_committed_) { - MaybeInvokeConfigSelectorCommitCallback(); - } } lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready; *error = result.error; diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 4e49ed651ee..0104395afdc 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -252,7 +252,7 @@ HealthCheckClient::CallState::CallState( : health_check_client_(std::move(health_check_client)), pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), arena_(Arena::Create(health_check_client_->connected_subchannel_ - ->GetInitialCallSizeEstimate(0))), + ->GetInitialCallSizeEstimate())), payload_(context_) {} HealthCheckClient::CallState::~CallState() { @@ -291,7 +291,6 @@ void HealthCheckClient::CallState::StartCall() { arena_, context_, &call_combiner_, - 0, // parent_data_size }; grpc_error* error = GRPC_ERROR_NONE; call_ = SubchannelCall::Create(std::move(args), &error).release(); diff --git a/src/core/ext/filters/client_channel/service_config_call_data.h b/src/core/ext/filters/client_channel/service_config_call_data.h index b6b5b73ce08..544683a6548 100644 --- a/src/core/ext/filters/client_channel/service_config_call_data.h +++ b/src/core/ext/filters/client_channel/service_config_call_data.h @@ -19,6 +19,10 @@ #include <grpc/support/port_platform.h> +#include <map> + +#include "absl/strings/string_view.h" + #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/service_config_parser.h" #include "src/core/lib/channel/context.h" @@ -35,13 +39,22 @@ class ServiceConfigCallData { ServiceConfigCallData( RefCountedPtr<ServiceConfig> service_config, const ServiceConfigParser::ParsedConfigVector* method_configs, + std::map<const char*, absl::string_view> call_attributes, grpc_call_context_element* call_context) : service_config_(std::move(service_config)), - method_configs_(method_configs) { + method_configs_(method_configs), + call_attributes_(std::move(call_attributes)) { call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = this; call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].destroy = Destroy; } + ServiceConfigCallData( + RefCountedPtr<ServiceConfig> service_config, + const ServiceConfigParser::ParsedConfigVector* method_configs, + grpc_call_context_element* call_context) + : ServiceConfigCallData(std::move(service_config), method_configs, {}, + call_context) {} + ServiceConfig* service_config() { return service_config_.get(); } ServiceConfigParser::ParsedConfig* GetMethodParsedConfig(size_t index) const { @@ -53,6 +66,10 @@ class ServiceConfigCallData { return service_config_->GetGlobalParsedConfig(index); } + const std::map<const char*, absl::string_view>& call_attributes() const { + return call_attributes_; + } + private: static void Destroy(void* ptr) { ServiceConfigCallData* self = static_cast<ServiceConfigCallData*>(ptr); @@ -61,6 +78,7 @@ class ServiceConfigCallData { RefCountedPtr<ServiceConfig> service_config_; const ServiceConfigParser::ParsedConfigVector* method_configs_ = nullptr; + std::map<const char*, absl::string_view> call_attributes_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9885f4cdcff..d6f9e1fbbc4 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -121,18 +121,9 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate, elem->filter->start_transport_op(elem, op); } -size_t ConnectedSubchannel::GetInitialCallSizeEstimate( - size_t parent_data_size) const { - size_t allocation_size = - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); - if (parent_data_size > 0) { - allocation_size += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + - parent_data_size; - } else { - allocation_size += channel_stack_->call_stack_size; - } - return allocation_size; +size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const { + return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + + channel_stack_->call_stack_size; } // @@ -142,8 +133,7 @@ size_t ConnectedSubchannel::GetInitialCallSizeEstimate( RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args, grpc_error** error) { const size_t allocation_size = - args.connected_subchannel->GetInitialCallSizeEstimate( - args.parent_data_size); + args.connected_subchannel->GetInitialCallSizeEstimate(); Arena* arena = args.arena; return RefCountedPtr<SubchannelCall>(new ( arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); @@ -187,13 +177,6 @@ void SubchannelCall::StartTransportStreamOpBatch( top_elem->filter->start_transport_stream_op_batch(top_elem, batch); } -void* SubchannelCall::GetParentData() { - grpc_channel_stack* chanstk = connected_subchannel_->channel_stack(); - return reinterpret_cast<char*>(this) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); -} - grpc_call_stack* SubchannelCall::GetCallStack() { return SUBCHANNEL_CALL_TO_CALL_STACK(this); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 46ffb2f25b3..5db222e2613 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -89,7 +89,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { return channelz_subchannel_.get(); } - size_t GetInitialCallSizeEstimate(size_t parent_data_size) const; + size_t GetInitialCallSizeEstimate() const; private: grpc_channel_stack* channel_stack_; @@ -111,18 +111,12 @@ class SubchannelCall { Arena* arena; grpc_call_context_element* context; CallCombiner* call_combiner; - size_t parent_data_size; }; static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error); // Continues processing a transport stream op batch. void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); - // Returns a pointer to the parent data associated with the subchannel call. - // The data will be of the size specified in \a parent_data_size field of - // the args passed to \a ConnectedSubchannel::CreateCall(). - void* GetParentData(); - // Returns the call stack of the subchannel call. grpc_call_stack* GetCallStack(); @@ -139,8 +133,6 @@ class SubchannelCall { void Unref(); void Unref(const DebugLocation& location, const char* reason); - static void Destroy(void* arg, grpc_error* error); - private: // Allow RefCountedPtr<> to access IncrementRefCount(). template <typename T> @@ -159,6 +151,8 @@ class SubchannelCall { void IncrementRefCount(); void IncrementRefCount(const DebugLocation& location, const char* reason); + static void Destroy(void* arg, grpc_error* error); + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; grpc_closure* after_call_stack_destroy_ = nullptr; // State needed to support channelz interception of recv trailing metadata.