diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 2569932891f..8db3029e267 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -117,9 +117,9 @@ namespace { class ChannelData { public: - struct QueuedPick { + struct QueuedCall { grpc_call_element* elem; - QueuedPick* next = nullptr; + QueuedCall* next = nullptr; }; static grpc_error* Init(grpc_channel_element* elem, @@ -142,14 +142,11 @@ class ChannelData { return disconnect_error_.Load(MemoryOrder::ACQUIRE); } - Mutex* data_plane_mu() const { return &data_plane_mu_; } - - LoadBalancingPolicy::SubchannelPicker* picker() const { - return picker_.get(); - } - void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent); - void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent); - + Mutex* resolution_mu() const { return &resolution_mu_; } + // These methods all require holding resolution_mu_. + void AddResolverQueuedCall(QueuedCall* call, grpc_polling_entity* pollent); + void RemoveResolverQueuedCall(QueuedCall* to_remove, + grpc_polling_entity* pollent); bool received_service_config_data() const { return received_service_config_data_; } @@ -163,11 +160,19 @@ class ChannelData { return service_config_; } ConfigSelector* config_selector() const { return config_selector_.get(); } - WorkSerializer* work_serializer() const { return work_serializer_.get(); } + Mutex* data_plane_mu() const { return &data_plane_mu_; } + // These methods all require holding data_plane_mu_. + LoadBalancingPolicy::SubchannelPicker* picker() const { + return picker_.get(); + } + void AddLbQueuedCall(QueuedCall* call, grpc_polling_entity* pollent); + void RemoveLbQueuedCall(QueuedCall* to_remove, grpc_polling_entity* pollent); RefCountedPtr GetConnectedSubchannelInDataPlane( SubchannelInterface* subchannel) const; + WorkSerializer* work_serializer() const { return work_serializer_.get(); } + grpc_connectivity_state CheckConnectivityState(bool try_to_connect); void AddExternalConnectivityWatcher(grpc_polling_entity pollent, @@ -255,7 +260,7 @@ class ChannelData { } void ReturnError(grpc_error* error) override { - chand_->OnResolverError(error); + chand_->OnResolverErrorLocked(error); } private: @@ -265,8 +270,11 @@ class ChannelData { ChannelData(grpc_channel_element_args* args, grpc_error** error); ~ChannelData(); + // Note: All methods with "Locked" suffix must be invoked from within + // work_serializer_. + void OnResolverResultChangedLocked(Resolver::Result result); - void OnResolverError(grpc_error* error); + void OnResolverErrorLocked(grpc_error* error); void CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, @@ -277,8 +285,7 @@ class ChannelData { void UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, - std::unique_ptr picker, - grpc_error* resolver_transient_failure_error = GRPC_ERROR_NONE); + std::unique_ptr picker); void UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, @@ -312,11 +319,10 @@ class ChannelData { channelz::ChannelNode* channelz_node_; // - // Fields used in the data plane. Guarded by data_plane_mu. + // Fields related to name resolution. Guarded by resolution_mu_. // - mutable Mutex data_plane_mu_; - std::unique_ptr picker_; - QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks. + mutable Mutex resolution_mu_; + QueuedCall* resolver_queued_calls_ = nullptr; // Linked list of queued calls. // Data from service config. grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE; bool received_service_config_data_ = false; @@ -324,6 +330,13 @@ class ChannelData { RefCountedPtr service_config_; RefCountedPtr config_selector_; + // + // Fields used in the data plane. Guarded by data_plane_mu_. + // + mutable Mutex data_plane_mu_; + std::unique_ptr picker_; + QueuedCall* lb_queued_calls_ = nullptr; // Linked list of queued calls. + // // Fields used in the control plane. Guarded by work_serializer. // @@ -388,22 +401,34 @@ class CallData { grpc_call_element* elem, grpc_transport_stream_op_batch* batch); static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent); - RefCountedPtr subchannel_call() { return subchannel_call_; } - - // Invoked by channel for queued picks when the picker is updated. + // Invoked by channel for queued calls when name resolution is completed. + static void CheckResolution(void* arg, grpc_error* error); + // Helper function for applying the service config to a call while + // holding ChannelData::resolution_mu_. + // Returns true if the service config has been applied to the call, in which + // case the caller must invoke ResolutionDone() or AsyncResolutionDone() + // with the returned error. + bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error); + // Schedules a callback to continue processing the call once + // resolution is complete. The callback will not run until after this + // method returns. + void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error); + + // Invoked by channel for queued LB picks when the picker is updated. static void PickSubchannel(void* arg, grpc_error* error); - - // Helper function for performing a pick while holding the data plane + // Helper function for performing an LB pick while holding the data plane // mutex. Returns true if the pick is complete, in which case the caller // must invoke PickDone() or AsyncPickDone() with the returned error. bool PickSubchannelLocked(grpc_call_element* elem, grpc_error** error); - // Schedules a callback to process the completed pick. The callback // will not run until after this method returns. void AsyncPickDone(grpc_call_element* elem, grpc_error* error); + RefCountedPtr subchannel_call() { return subchannel_call_; } + private: - class QueuedPickCanceller; + class ResolverQueuedCallCanceller; + class LbQueuedCallCanceller; class Metadata : public LoadBalancingPolicy::MetadataInterface { public: @@ -783,21 +808,31 @@ class CallData { // subchannel call. static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored); - void CreateSubchannelCall(grpc_call_element* elem); - // Invoked when a pick is completed, on both success or failure. - static void PickDone(void* arg, grpc_error* error); - // Removes the call from the channel's list of queued picks if present. - void MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem); - // Adds the call to the channel's list of queued picks if not already present. - void MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem); // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. // If an error is returned, the error indicates the status with which // the call should be failed. grpc_error* ApplyServiceConfigToCallLocked( grpc_call_element* elem, grpc_metadata_batch* initial_metadata); + // Invoked when the resolver result is applied to the caller, on both + // success or failure. + static void ResolutionDone(void* arg, grpc_error* error); + // Removes the call (if present) from the channel's list of calls queued + // for name resolution. + void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem); + // Adds the call (if not already present) to the channel's list of + // calls queued for name resolution. + void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem); void MaybeInvokeConfigSelectorCommitCallback(); + void CreateSubchannelCall(grpc_call_element* elem); + // Invoked when a pick is completed, on both success or failure. + static void PickDone(void* arg, grpc_error* error); + // Removes the call from the channel's list of queued picks if present. + void MaybeRemoveCallFromLbQueuedCallsLocked(grpc_call_element* elem); + // Adds the call to the channel's list of queued picks if not already present. + void MaybeAddCallToLbQueuedCallsLocked(grpc_call_element* elem); + // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state @@ -814,35 +849,37 @@ class CallData { CallCombiner* call_combiner_; grpc_call_context_element* call_context_; + grpc_polling_entity* pollent_ = nullptr; + + grpc_closure pick_closure_; + + // Accessed while holding ChannelData::resolution_mu_. + bool queued_pending_resolver_result_ = false; + bool service_config_applied_ = false; RefCountedPtr retry_throttle_data_; const ClientChannelMethodParsedConfig* method_params_ = nullptr; std::map call_attributes_; std::function on_call_committed_; + ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr; - RefCountedPtr subchannel_call_; - - // Set when we get a cancel_stream op. - grpc_error* cancel_error_ = GRPC_ERROR_NONE; - - ChannelData::QueuedPick pick_; - bool pick_queued_ = false; - bool service_config_applied_ = false; - QueuedPickCanceller* pick_canceller_ = nullptr; + // Accessed while holding ChannelData::data_plane_mu_. + ChannelData::QueuedCall queued_call_; + bool queued_pending_lb_pick_ = false; LbCallState lb_call_state_; const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; RefCountedPtr connected_subchannel_; std::function lb_recv_trailing_metadata_ready_; - grpc_closure pick_closure_; + LbQueuedCallCanceller* lb_call_canceller_ = nullptr; + + RefCountedPtr subchannel_call_; // For intercepting recv_trailing_metadata_ready for the LB policy. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; grpc_closure recv_trailing_metadata_ready_; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; - grpc_polling_entity* pollent_ = nullptr; - // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have @@ -853,6 +890,9 @@ class CallData { bool pending_send_message_ : 1; bool pending_send_trailing_metadata_ : 1; + // Set when we get a cancel_stream op. + grpc_error* cancel_error_ = GRPC_ERROR_NONE; + // Retry state. bool enable_retries_ : 1; bool retry_committed_ : 1; @@ -1709,7 +1749,7 @@ void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) { // We received an invalid service config and we don't have a // previous service config to fall back to. Put the channel into // TRANSIENT_FAILURE. - OnResolverError(GRPC_ERROR_REF(service_config_error)); + OnResolverErrorLocked(GRPC_ERROR_REF(service_config_error)); trace_strings.push_back("no valid service config"); } } else if (result.service_config == nullptr) { @@ -1777,7 +1817,7 @@ void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) { GRPC_ERROR_UNREF(service_config_error); } -void ChannelData::OnResolverError(grpc_error* error) { +void ChannelData::OnResolverErrorLocked(grpc_error* error) { if (resolver_ == nullptr) { GRPC_ERROR_UNREF(error); return; @@ -1792,12 +1832,28 @@ void ChannelData::OnResolverError(grpc_error* error) { if (lb_policy_ == nullptr) { grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Resolver transient failure", &error, 1); + { + MutexLock lock(&resolution_mu_); + // Update resolver transient failure. + GRPC_ERROR_UNREF(resolver_transient_failure_error_); + resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error); + // Process calls that were queued waiting for the resolver result. + for (QueuedCall* call = resolver_queued_calls_; call != nullptr; + call = call->next) { + grpc_call_element* elem = call->elem; + CallData* calld = static_cast(elem->call_data); + grpc_error* error = GRPC_ERROR_NONE; + if (calld->CheckResolutionLocked(elem, &error)) { + calld->AsyncResolutionDone(elem, error); + } + } + } + // Update connectivity state. UpdateStateAndPickerLocked( GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error), "resolver failure", absl::make_unique( - GRPC_ERROR_REF(state_error)), - state_error); + state_error)); } GRPC_ERROR_UNREF(error); } @@ -1847,84 +1903,28 @@ OrphanablePtr ChannelData::CreateLbPolicyLocked( return lb_policy; } -void ChannelData::UpdateStateAndPickerLocked( - grpc_connectivity_state state, const absl::Status& status, - const char* reason, - std::unique_ptr picker, - grpc_error* resolver_transient_failure_error) { - // Clean the control plane when entering IDLE. - if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { - health_check_service_name_.reset(); - saved_service_config_.reset(); - saved_config_selector_.reset(); - } - // Update connectivity state. - state_tracker_.SetState(state, status, reason); - if (channelz_node_ != nullptr) { - channelz_node_->SetConnectivityState(state); - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - channelz::ChannelNode::GetChannelConnectivityStateChangeString( - state))); - } - // Grab data plane lock to do subchannel updates and update the picker. - // - // Note that we want to minimize the work done while holding the data - // plane lock, to keep the critical section small. So, for all of the - // objects that we might wind up unreffing here, we actually hold onto - // the refs until after we release the lock, and then unref them at - // that point. This includes the following: - // - refs to subchannel wrappers in the keys of pending_subchannel_updates_ - // - ref stored in retry_throttle_data_ - // - ref stored in service_config_ - // - ref stored in config_selector_ - // - ownership of the existing picker in picker_ - RefCountedPtr retry_throttle_data_to_unref; - RefCountedPtr service_config_to_unref; - RefCountedPtr config_selector_to_unref; - { - MutexLock lock(&data_plane_mu_); - // Update resolver transient failure. - GRPC_ERROR_UNREF(resolver_transient_failure_error_); - resolver_transient_failure_error_ = resolver_transient_failure_error; - // Handle subchannel updates. - for (auto& p : pending_subchannel_updates_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p: updating subchannel wrapper %p data plane " - "connected_subchannel to %p", - this, p.first.get(), p.second.get()); - } - // Note: We do not remove the entry from pending_subchannel_updates_ - // here, since this would unref the subchannel wrapper; instead, - // we wait until we've released the lock to clear the map. - p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); - } - // Swap out the picker. - // Note: Original value will be destroyed after the lock is released. - picker_.swap(picker); - // Clean the data plane if the updated picker is nullptr. - if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { - received_service_config_data_ = false; - // Note: We save the objects to unref until after the lock is released. - retry_throttle_data_to_unref = std::move(retry_throttle_data_); - service_config_to_unref = std::move(service_config_); - config_selector_to_unref = std::move(config_selector_); - } - // Re-process queued picks. - for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { - grpc_call_element* elem = pick->elem; - CallData* calld = static_cast(elem->call_data); - grpc_error* error = GRPC_ERROR_NONE; - if (calld->PickSubchannelLocked(elem, &error)) { - calld->AsyncPickDone(elem, error); - } +void ChannelData::AddResolverQueuedCall(QueuedCall* call, + grpc_polling_entity* pollent) { + // Add call to queued calls list. + call->next = resolver_queued_calls_; + resolver_queued_calls_ = call; + // Add call's pollent to channel's interested_parties, so that I/O + // can be done under the call's CQ. + grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); +} + +void ChannelData::RemoveResolverQueuedCall(QueuedCall* to_remove, + grpc_polling_entity* pollent) { + // Remove call's pollent from channel's interested_parties. + grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); + // Remove from queued calls list. + for (QueuedCall** call = &resolver_queued_calls_; *call != nullptr; + call = &(*call)->next) { + if (*call == to_remove) { + *call = to_remove->next; + return; } } - // Clear the pending update map after releasing the lock, to keep the - // critical section small. - pending_subchannel_updates_.clear(); } void ChannelData::UpdateServiceConfigInControlPlaneLocked( @@ -2000,8 +2000,9 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { // // We defer unreffing the old values (and deallocating memory) until // after releasing the lock to keep the critical section small. + std::set calls_pending_resolver_result; { - MutexLock lock(&data_plane_mu_); + MutexLock lock(&resolution_mu_); GRPC_ERROR_UNREF(resolver_transient_failure_error_); resolver_transient_failure_error_ = GRPC_ERROR_NONE; // Update service config. @@ -2010,13 +2011,14 @@ void ChannelData::UpdateServiceConfigInDataPlaneLocked() { retry_throttle_data_.swap(retry_throttle_data); service_config_.swap(service_config); config_selector_.swap(config_selector); - // Re-process queued picks. - for (QueuedPick* pick = queued_picks_; pick != nullptr; pick = pick->next) { - grpc_call_element* elem = pick->elem; + // Process calls that were queued waiting for the resolver result. + for (QueuedCall* call = resolver_queued_calls_; call != nullptr; + call = call->next) { + grpc_call_element* elem = call->elem; CallData* calld = static_cast(elem->call_data); grpc_error* error = GRPC_ERROR_NONE; - if (calld->PickSubchannelLocked(elem, &error)) { - calld->AsyncPickDone(elem, error); + if (calld->CheckResolutionLocked(elem, &error)) { + calld->AsyncResolutionDone(elem, error); } } } @@ -2062,6 +2064,83 @@ void ChannelData::DestroyResolverAndLbPolicyLocked() { } } +void ChannelData::UpdateStateAndPickerLocked( + grpc_connectivity_state state, const absl::Status& status, + const char* reason, + std::unique_ptr picker) { + // Clean the control plane when entering IDLE. + if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { + health_check_service_name_.reset(); + saved_service_config_.reset(); + saved_config_selector_.reset(); + } + // Update connectivity state. + state_tracker_.SetState(state, status, reason); + if (channelz_node_ != nullptr) { + channelz_node_->SetConnectivityState(state); + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + channelz::ChannelNode::GetChannelConnectivityStateChangeString( + state))); + } + // Grab data plane lock to do subchannel updates and update the picker. + // + // Note that we want to minimize the work done while holding the data + // plane lock, to keep the critical section small. So, for all of the + // objects that we might wind up unreffing here, we actually hold onto + // the refs until after we release the lock, and then unref them at + // that point. This includes the following: + // - refs to subchannel wrappers in the keys of pending_subchannel_updates_ + // - ref stored in retry_throttle_data_ + // - ref stored in service_config_ + // - ref stored in config_selector_ + // - ownership of the existing picker in picker_ + RefCountedPtr retry_throttle_data_to_unref; + RefCountedPtr service_config_to_unref; + RefCountedPtr config_selector_to_unref; + { + MutexLock lock(&data_plane_mu_); + // Handle subchannel updates. + for (auto& p : pending_subchannel_updates_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: updating subchannel wrapper %p data plane " + "connected_subchannel to %p", + this, p.first.get(), p.second.get()); + } + // Note: We do not remove the entry from pending_subchannel_updates_ + // here, since this would unref the subchannel wrapper; instead, + // we wait until we've released the lock to clear the map. + p.first->set_connected_subchannel_in_data_plane(std::move(p.second)); + } + // Swap out the picker. + // Note: Original value will be destroyed after the lock is released. + picker_.swap(picker); + // Clean the data plane if the updated picker is nullptr. + if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { + received_service_config_data_ = false; + // Note: We save the objects to unref until after the lock is released. + retry_throttle_data_to_unref = std::move(retry_throttle_data_); + service_config_to_unref = std::move(service_config_); + config_selector_to_unref = std::move(config_selector_); + } + // Re-process queued picks. + for (QueuedCall* call = lb_queued_calls_; call != nullptr; + call = call->next) { + grpc_call_element* elem = call->elem; + CallData* calld = static_cast(elem->call_data); + grpc_error* error = GRPC_ERROR_NONE; + if (calld->PickSubchannelLocked(elem, &error)) { + calld->AsyncPickDone(elem, error); + } + } + } + // Clear the pending update map after releasing the lock, to keep the + // critical section small. + pending_subchannel_updates_.clear(); +} + grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { if (state_tracker_.state() != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); @@ -2171,25 +2250,25 @@ void ChannelData::GetChannelInfo(grpc_channel_element* elem, } } -void ChannelData::AddQueuedPick(QueuedPick* pick, - grpc_polling_entity* pollent) { +void ChannelData::AddLbQueuedCall(QueuedCall* call, + grpc_polling_entity* pollent) { // Add call to queued picks list. - pick->next = queued_picks_; - queued_picks_ = pick; + call->next = lb_queued_calls_; + lb_queued_calls_ = call; // Add call's pollent to channel's interested_parties, so that I/O // can be done under the call's CQ. grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_); } -void ChannelData::RemoveQueuedPick(QueuedPick* to_remove, - grpc_polling_entity* pollent) { +void ChannelData::RemoveLbQueuedCall(QueuedCall* to_remove, + grpc_polling_entity* pollent) { // Remove call's pollent from channel's interested_parties. grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_); // Remove from queued picks list. - for (QueuedPick** pick = &queued_picks_; *pick != nullptr; - pick = &(*pick)->next) { - if (*pick == to_remove) { - *pick = to_remove->next; + for (QueuedCall** call = &lb_queued_calls_; *call != nullptr; + call = &(*call)->next) { + if (*call == to_remove) { + *call = to_remove->next; return; } } @@ -2404,7 +2483,7 @@ void CallData::StartTransportStreamOpBatch( "chand=%p calld=%p: grabbing data plane mutex to perform pick", chand, calld); } - PickSubchannel(elem, GRPC_ERROR_NONE); + CheckResolution(elem, GRPC_ERROR_NONE); } else { // For all other batches, release the call combiner. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -3939,64 +4018,16 @@ void CallData::StartRetriableSubchannelBatches(void* arg, } // -// LB pick +// name resolution // -void CallData::CreateSubchannelCall(grpc_call_element* elem) { - ChannelData* chand = static_cast(elem->channel_data); - const size_t parent_data_size = - enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; - SubchannelCall::Args call_args = { - std::move(connected_subchannel_), pollent_, path_, call_start_time_, - deadline_, arena_, - // TODO(roth): When we implement hedging support, we will probably - // need to use a separate call context for each subchannel call. - call_context_, call_combiner_, parent_data_size}; - grpc_error* error = GRPC_ERROR_NONE; - subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error); - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", - chand, this, subchannel_call_.get(), grpc_error_string(error)); - } - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - PendingBatchesFail(elem, error, YieldCallCombiner); - } else { - if (parent_data_size > 0) { - new (subchannel_call_->GetParentData()) - SubchannelCallRetryState(call_context_); - } - PendingBatchesResume(elem); - } -} - -void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { - GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); - ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); -} - -void CallData::PickDone(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - ChannelData* chand = static_cast(elem->channel_data); - CallData* calld = static_cast(elem->call_data); - if (error != GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: failed to pick subchannel: error=%s", chand, - calld, grpc_error_string(error)); - } - calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); - return; - } - calld->CreateSubchannelCall(elem); -} - // A class to handle the call combiner cancellation callback for a // queued pick. -class CallData::QueuedPickCanceller { +class CallData::ResolverQueuedCallCanceller { public: - explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) { + explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) { auto* calld = static_cast(elem->call_data); - GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller"); + GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller"); GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, grpc_schedule_on_exec_ctx); calld->call_combiner_->SetNotifyOnCancel(&closure_); @@ -4004,26 +4035,27 @@ class CallData::QueuedPickCanceller { private: static void CancelLocked(void* arg, grpc_error* error) { - auto* self = static_cast(arg); + auto* self = static_cast(arg); auto* chand = static_cast(self->elem_->channel_data); auto* calld = static_cast(self->elem_->call_data); - MutexLock lock(chand->data_plane_mu()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling queued pick: " - "error=%s self=%p calld->pick_canceller=%p", - chand, calld, grpc_error_string(error), self, - calld->pick_canceller_); - } - if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) { - // Remove pick from list of queued picks. - calld->MaybeInvokeConfigSelectorCommitCallback(); - calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_); - // Fail pending batches on the call. - calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error), - YieldCallCombinerIfPendingBatchesFound); + { + MutexLock lock(chand->resolution_mu()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling resolver queued pick: " + "error=%s self=%p calld->resolver_pick_canceller=%p", + chand, calld, grpc_error_string(error), self, + calld->resolver_call_canceller_); + } + if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) { + // Remove pick from list of queued picks. + calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_); + // Fail pending batches on the call. + calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error), + YieldCallCombinerIfPendingBatchesFound); + } } - GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller"); + GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller"); delete self; } @@ -4031,31 +4063,34 @@ class CallData::QueuedPickCanceller { grpc_closure closure_; }; -void CallData::MaybeRemoveCallFromQueuedPicksLocked(grpc_call_element* elem) { - if (!pick_queued_) return; +void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked( + grpc_call_element* elem) { + if (!queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list", + gpr_log(GPR_INFO, + "chand=%p calld=%p: removing from resolver queued picks list", chand, this); } - chand->RemoveQueuedPick(&pick_, pollent_); - pick_queued_ = false; + chand->RemoveResolverQueuedCall(&queued_call_, pollent_); + queued_pending_resolver_result_ = false; // Lame the call combiner canceller. - pick_canceller_ = nullptr; + resolver_call_canceller_ = nullptr; } -void CallData::MaybeAddCallToQueuedPicksLocked(grpc_call_element* elem) { - if (pick_queued_) return; +void CallData::MaybeAddCallToResolverQueuedCallsLocked( + grpc_call_element* elem) { + if (queued_pending_resolver_result_) return; auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand, - this); + gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list", + chand, this); } - pick_queued_ = true; - pick_.elem = elem; - chand->AddQueuedPick(&pick_, pollent_); + queued_pending_resolver_result_ = true; + queued_call_.elem = elem; + chand->AddResolverQueuedCall(&queued_call_, pollent_); // Register call combiner cancellation callback. - pick_canceller_ = new QueuedPickCanceller(elem); + resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem); } grpc_error* CallData::ApplyServiceConfigToCallLocked( @@ -4132,46 +4167,52 @@ void CallData::MaybeInvokeConfigSelectorCommitCallback() { } } -const char* PickResultTypeName( - LoadBalancingPolicy::PickResult::ResultType type) { - switch (type) { - case LoadBalancingPolicy::PickResult::PICK_COMPLETE: - return "COMPLETE"; - case LoadBalancingPolicy::PickResult::PICK_QUEUE: - return "QUEUE"; - case LoadBalancingPolicy::PickResult::PICK_FAILED: - return "FAILED"; +void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) { + GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr); + ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); +} + +void CallData::ResolutionDone(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + ChannelData* chand = static_cast(elem->channel_data); + CallData* calld = static_cast(elem->call_data); + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: error applying config to call: error=%s", + chand, calld, grpc_error_string(error)); + } + calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); + return; } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); + calld->PickSubchannel(elem, GRPC_ERROR_NONE); } -void CallData::PickSubchannel(void* arg, grpc_error* error) { +void CallData::CheckResolution(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); ChannelData* chand = static_cast(elem->channel_data); - bool pick_complete; + bool resolution_complete; { - MutexLock lock(chand->data_plane_mu()); - pick_complete = calld->PickSubchannelLocked(elem, &error); + MutexLock lock(chand->resolution_mu()); + resolution_complete = calld->CheckResolutionLocked(elem, &error); } - if (pick_complete) { - PickDone(elem, error); + if (resolution_complete) { + ResolutionDone(elem, error); GRPC_ERROR_UNREF(error); } } -bool CallData::PickSubchannelLocked(grpc_call_element* elem, - grpc_error** error) { +bool CallData::CheckResolutionLocked(grpc_call_element* elem, + grpc_error** error) { ChannelData* chand = static_cast(elem->channel_data); - GPR_ASSERT(connected_subchannel_ == nullptr); - GPR_ASSERT(subchannel_call_ == nullptr); - // The picker being null means that the channel is currently in IDLE state. - // The incoming call will make the channel exit IDLE. - if (chand->picker() == nullptr) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "PickSubchannelLocked"); - // Bounce into the control plane work serializer to exit IDLE. Since we are - // holding on to the data plane mutex here, we offload it on the ExecCtx so - // that we don't deadlock with ourselves. + // If we're still in IDLE, we need to start resolving. + if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) { + // Bounce into the control plane work serializer to start resolving, + // in case we are still in IDLE state. Since we are holding on to the + // resolution mutex here, we offload it on the ExecCtx so that we don't + // deadlock with ourselves. + GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked"); ExecCtx::Run( DEBUG_LOCATION, GRPC_CLOSURE_CREATE( @@ -4181,30 +4222,22 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, [chand]() { chand->CheckConnectivityState(/*try_to_connect=*/true); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(), - "PickSubchannelLocked"); + "CheckResolutionLocked"); }, DEBUG_LOCATION); }, chand, nullptr), GRPC_ERROR_NONE); - // Queue the pick, so that it will be attempted once the channel - // becomes connected. - MaybeAddCallToQueuedPicksLocked(elem); - return false; } + // Get send_initial_metadata batch and flags. + auto& send_initial_metadata = + pending_batches_[0].batch->payload->send_initial_metadata; grpc_metadata_batch* initial_metadata_batch = - seen_send_initial_metadata_ - ? &send_initial_metadata_ - : pending_batches_[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - // Grab initial metadata flags so that we can check later if the call has - // wait_for_ready enabled. + send_initial_metadata.send_initial_metadata; const uint32_t send_initial_metadata_flags = - seen_send_initial_metadata_ ? send_initial_metadata_flags_ - : pending_batches_[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - // Avoid picking if we haven't yet received service config data. + send_initial_metadata.send_initial_metadata_flags; + // If we don't yet have a resolver result, we need to queue the call + // until we get one. if (GPR_UNLIKELY(!chand->received_service_config_data())) { // If the resolver returned transient failure before returning the // first service config, fail any non-wait_for_ready calls. @@ -4212,22 +4245,180 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, if (resolver_error != GRPC_ERROR_NONE && (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { - MaybeRemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromResolverQueuedCallsLocked(elem); *error = GRPC_ERROR_REF(resolver_error); return true; } // Either the resolver has not yet returned a result, or it has // returned transient failure but the call is wait_for_ready. In // either case, queue the call. - MaybeAddCallToQueuedPicksLocked(elem); + MaybeAddCallToResolverQueuedCallsLocked(elem); return false; } // Apply service config to call if not yet applied. if (GPR_LIKELY(!service_config_applied_)) { service_config_applied_ = true; *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch); - if (*error != GRPC_ERROR_NONE) return true; } + MaybeRemoveCallFromResolverQueuedCallsLocked(elem); + return true; +} + +// +// LB pick +// + +void CallData::CreateSubchannelCall(grpc_call_element* elem) { + ChannelData* chand = static_cast(elem->channel_data); + const size_t parent_data_size = + enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; + SubchannelCall::Args call_args = { + std::move(connected_subchannel_), pollent_, path_, call_start_time_, + deadline_, arena_, + // TODO(roth): When we implement hedging support, we will probably + // need to use a separate call context for each subchannel call. + call_context_, call_combiner_, parent_data_size}; + grpc_error* error = GRPC_ERROR_NONE; + subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + chand, this, subchannel_call_.get(), grpc_error_string(error)); + } + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + PendingBatchesFail(elem, error, YieldCallCombiner); + } else { + if (parent_data_size > 0) { + new (subchannel_call_->GetParentData()) + SubchannelCallRetryState(call_context_); + } + PendingBatchesResume(elem); + } +} + +// A class to handle the call combiner cancellation callback for a +// queued pick. +class CallData::LbQueuedCallCanceller { + public: + explicit LbQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) { + auto* calld = static_cast(elem->call_data); + GRPC_CALL_STACK_REF(calld->owning_call_, "LbQueuedCallCanceller"); + GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, + grpc_schedule_on_exec_ctx); + calld->call_combiner_->SetNotifyOnCancel(&closure_); + } + + private: + static void CancelLocked(void* arg, grpc_error* error) { + auto* self = static_cast(arg); + auto* chand = static_cast(self->elem_->channel_data); + auto* calld = static_cast(self->elem_->call_data); + { + MutexLock lock(chand->data_plane_mu()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling queued pick: " + "error=%s self=%p calld->pick_canceller=%p", + chand, calld, grpc_error_string(error), self, + calld->lb_call_canceller_); + } + if (calld->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) { + // Remove pick from list of queued picks. + calld->MaybeInvokeConfigSelectorCommitCallback(); + calld->MaybeRemoveCallFromLbQueuedCallsLocked(self->elem_); + // Fail pending batches on the call. + calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error), + YieldCallCombinerIfPendingBatchesFound); + } + } + GRPC_CALL_STACK_UNREF(calld->owning_call_, "LbQueuedCallCanceller"); + delete self; + } + + grpc_call_element* elem_; + grpc_closure closure_; +}; + +void CallData::MaybeRemoveCallFromLbQueuedCallsLocked(grpc_call_element* elem) { + if (!queued_pending_lb_pick_) return; + auto* chand = static_cast(elem->channel_data); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list", + chand, this); + } + chand->RemoveLbQueuedCall(&queued_call_, pollent_); + queued_pending_lb_pick_ = false; + // Lame the call combiner canceller. + lb_call_canceller_ = nullptr; +} + +void CallData::MaybeAddCallToLbQueuedCallsLocked(grpc_call_element* elem) { + if (queued_pending_lb_pick_) return; + auto* chand = static_cast(elem->channel_data); + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand, + this); + } + queued_pending_lb_pick_ = true; + queued_call_.elem = elem; + chand->AddLbQueuedCall(&queued_call_, pollent_); + // Register call combiner cancellation callback. + lb_call_canceller_ = new LbQueuedCallCanceller(elem); +} + +void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { + GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); + ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); +} + +void CallData::PickDone(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + ChannelData* chand = static_cast(elem->channel_data); + CallData* calld = static_cast(elem->call_data); + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failed to pick subchannel: error=%s", chand, + calld, grpc_error_string(error)); + } + calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner); + return; + } + calld->CreateSubchannelCall(elem); +} + +const char* PickResultTypeName( + LoadBalancingPolicy::PickResult::ResultType type) { + switch (type) { + case LoadBalancingPolicy::PickResult::PICK_COMPLETE: + return "COMPLETE"; + case LoadBalancingPolicy::PickResult::PICK_QUEUE: + return "QUEUE"; + case LoadBalancingPolicy::PickResult::PICK_FAILED: + return "FAILED"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +void CallData::PickSubchannel(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + CallData* calld = static_cast(elem->call_data); + ChannelData* chand = static_cast(elem->channel_data); + bool pick_complete; + { + MutexLock lock(chand->data_plane_mu()); + pick_complete = calld->PickSubchannelLocked(elem, &error); + } + if (pick_complete) { + PickDone(elem, error); + GRPC_ERROR_UNREF(error); + } +} + +bool CallData::PickSubchannelLocked(grpc_call_element* elem, + grpc_error** error) { + ChannelData* chand = static_cast(elem->channel_data); + GPR_ASSERT(connected_subchannel_ == nullptr); + GPR_ASSERT(subchannel_call_ == nullptr); // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the @@ -4238,12 +4429,24 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, // allocate the subchannel batch earlier so that we can give the // subchannel's copy of the metadata batch (which is copied for each // attempt) to the LB policy instead the one from the parent channel. + grpc_metadata_batch* initial_metadata_batch = + seen_send_initial_metadata_ + ? &send_initial_metadata_ + : pending_batches_[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + // Grab initial metadata flags so that we can check later if the call has + // wait_for_ready enabled. + const uint32_t send_initial_metadata_flags = + seen_send_initial_metadata_ ? send_initial_metadata_flags_ + : pending_batches_[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + // Perform LB pick. LoadBalancingPolicy::PickArgs pick_args; pick_args.path = StringViewFromSlice(path_); pick_args.call_state = &lb_call_state_; Metadata initial_metadata(this, initial_metadata_batch); pick_args.initial_metadata = &initial_metadata; - // Attempt pick. auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, @@ -4257,7 +4460,7 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, grpc_error* disconnect_error = chand->disconnect_error(); if (disconnect_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(result.error); - MaybeRemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromLbQueuedCallsLocked(elem); MaybeInvokeConfigSelectorCommitCallback(); *error = GRPC_ERROR_REF(disconnect_error); return true; @@ -4281,7 +4484,7 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, *error = new_error; MaybeInvokeConfigSelectorCommitCallback(); } - MaybeRemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromLbQueuedCallsLocked(elem); return !retried; } // If wait_for_ready is true, then queue to retry when we get a new @@ -4290,10 +4493,10 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, } // Fallthrough case LoadBalancingPolicy::PickResult::PICK_QUEUE: - MaybeAddCallToQueuedPicksLocked(elem); + MaybeAddCallToLbQueuedCallsLocked(elem); return false; default: // PICK_COMPLETE - MaybeRemoveCallFromQueuedPicksLocked(elem); + MaybeRemoveCallFromLbQueuedCallsLocked(elem); // Handle drops. if (GPR_UNLIKELY(result.subchannel == nullptr)) { result.error = grpc_error_set_int( diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 7776d8f8f96..61b9c17bf24 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -1651,14 +1651,17 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) { // The initial channel state should be IDLE. EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); // After sending RPC, channel state should be READY. + gpr_log(GPR_INFO, "*** SENDING RPC, CHANNEL SHOULD CONNECT ***"); response_generator.SetNextResolution(GetServersPorts()); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // After a period time not using the channel, the channel state should switch // to IDLE. + gpr_log(GPR_INFO, "*** WAITING FOR CHANNEL TO GO IDLE ***"); gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200)); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); // Sending a new RPC should awake the IDLE channel. + gpr_log(GPR_INFO, "*** SENDING ANOTHER RPC, CHANNEL SHOULD RECONNECT ***"); response_generator.SetNextResolution(GetServersPorts()); CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);