diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index be53eee5332..9de50af109d 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1347,11 +1347,8 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) { // Otherwise, we go into TRANSIENT_FAILURE. if (lb_policy_ == nullptr) { // Update connectivity state. - // TODO(roth): We should be updating the connectivity state here but - // not the picker. - UpdateStateAndPickerLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure", - MakeRefCounted(status)); + UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status, + "resolver failure"); { MutexLock lock(&resolution_mu_); // Update resolver transient failure. @@ -1395,6 +1392,14 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked( // Creates a new LB policy. OrphanablePtr ClientChannel::CreateLbPolicyLocked( const ChannelArgs& args) { + // The LB policy will start in state CONNECTING but will not + // necessarily send us an update synchronously, so set state to + // CONNECTING (in case the resolver had previously failed and put the + // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker. + UpdateStateAndPickerLocked( + GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", + MakeRefCounted(nullptr)); + // Now create the LB policy. LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = work_serializer_; lb_policy_args.channel_control_helper = @@ -1495,13 +1500,8 @@ void ClientChannel::CreateResolverLocked() { // Since the validity of the args was checked when the channel was created, // CreateResolver() must return a non-null result. GPR_ASSERT(resolver_ != nullptr); - // TODO(roth): We should be updating the connectivity state here but - // not the picker. But we need to make sure that we are initializing - // the picker to a queueing picker somewhere, in case the LB policy - // does not immediately return a new picker. - UpdateStateAndPickerLocked( - GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving", - MakeRefCounted(nullptr)); + UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(), + "started resolving"); resolver_->StartLocked(); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get()); @@ -1515,24 +1515,7 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() { resolver_.get()); } resolver_.reset(); - if (lb_policy_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this, - lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), - interested_parties_); - lb_policy_.reset(); - } - } -} - -void ClientChannel::UpdateStateAndPickerLocked( - grpc_connectivity_state state, const absl::Status& status, - const char* reason, - RefCountedPtr picker) { - // Special case for IDLE and SHUTDOWN states. - if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { + // Clear resolution state. saved_service_config_.reset(); saved_config_selector_.reset(); // Acquire resolution lock to update config selector and associated state. @@ -1548,8 +1531,22 @@ void ClientChannel::UpdateStateAndPickerLocked( config_selector_to_unref = std::move(config_selector_); dynamic_filters_to_unref = std::move(dynamic_filters_); } + // Clear LB policy if set. + if (lb_policy_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { + gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } } - // Update connectivity state. +} + +void ClientChannel::UpdateStateLocked(grpc_connectivity_state state, + const absl::Status& status, + const char* reason) { state_tracker_.SetState(state, status, reason); if (channelz_node_ != nullptr) { channelz_node_->SetConnectivityState(state); @@ -1559,19 +1556,24 @@ void ClientChannel::UpdateStateAndPickerLocked( channelz::ChannelNode::GetChannelConnectivityStateChangeString( state))); } +} + +void ClientChannel::UpdateStateAndPickerLocked( + grpc_connectivity_state state, const absl::Status& status, + const char* reason, + RefCountedPtr picker) { + UpdateStateLocked(state, status, reason); // Grab the LB lock to update the picker and trigger reprocessing of the // queued picks. // Old picker will be unreffed after releasing the lock. - { - MutexLock lock(&lb_mu_); - picker_.swap(picker); - // Reprocess queued picks. - for (LoadBalancedCall* call : lb_queued_calls_) { - call->RemoveCallFromLbQueuedCallsLocked(); - call->RetryPickLocked(); - } - lb_queued_calls_.clear(); + MutexLock lock(&lb_mu_); + picker_.swap(picker); + // Reprocess queued picks. + for (LoadBalancedCall* call : lb_queued_calls_) { + call->RemoveCallFromLbQueuedCallsLocked(); + call->RetryPickLocked(); } + lb_queued_calls_.clear(); } namespace { @@ -1685,10 +1687,13 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { StatusIntProperty::ChannelConnectivityState, &value) && static_cast(value) == GRPC_CHANNEL_IDLE) { - if (disconnect_error_.ok()) { + if (disconnect_error_.ok()) { // Ignore if we're shutting down. // Enter IDLE state. UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(), "channel entering IDLE", nullptr); + // TODO(roth): Do we need to check for any queued picks here, in + // case there's a race condition in the client_idle filter? + // And maybe also check for calls in the resolver queue? } } else { // Disconnect. diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 1152b86eebc..112b274376a 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -247,6 +247,10 @@ class ClientChannel { OrphanablePtr CreateLbPolicyLocked( const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); + void UpdateStateLocked(grpc_connectivity_state state, + const absl::Status& status, const char* reason) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); + void UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index dddfd65a35e..05009e412f6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -240,12 +240,6 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { MakeRefCounted(status)); channel_control_helper()->RequestReresolution(); } - // Otherwise, if this is the initial update, report CONNECTING. - else if (subchannel_list_.get() == nullptr) { - channel_control_helper()->UpdateState( - GRPC_CHANNEL_CONNECTING, absl::Status(), - MakeRefCounted(Ref(DEBUG_LOCATION, "QueuePicker"))); - } // If the new update is empty or we don't yet have a selected subchannel in // the current list, replace the current subchannel list immediately. if (latest_pending_subchannel_list_->num_subchannels() == 0 || diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 7cda9b74a35..47a9908e1ee 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -357,7 +357,7 @@ class RlsLb : public LoadBalancingPolicy { RefCountedPtr pending_config_; grpc_connectivity_state connectivity_state_ ABSL_GUARDED_BY(&RlsLb::mu_) = - GRPC_CHANNEL_IDLE; + GRPC_CHANNEL_CONNECTING; RefCountedPtr picker_ ABSL_GUARDED_BY(&RlsLb::mu_); }; @@ -732,9 +732,9 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr lb_policy, : DualRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace) ? "ChildPolicyWrapper" : nullptr), - lb_policy_(lb_policy), + lb_policy_(std::move(lb_policy)), target_(std::move(target)), - picker_(MakeRefCounted(std::move(lb_policy))) { + picker_(MakeRefCounted(nullptr)) { lb_policy_->child_policy_map_.emplace(target_, this); } @@ -895,6 +895,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState( { MutexLock lock(&wrapper_->lb_policy_->mu_); if (wrapper_->is_shutdown_) return; + // TODO(roth): It looks like this ignores subsequent TF updates that + // might change the status used to fail picks, which seems wrong. if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && state != GRPC_CHANNEL_READY) { return; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 0c439e6819d..03b15c0ac8f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -318,12 +318,9 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { return status; } // Otherwise, if this is the initial update, immediately promote it to - // subchannel_list_ and report CONNECTING. + // subchannel_list_. if (subchannel_list_.get() == nullptr) { subchannel_list_ = std::move(latest_pending_subchannel_list_); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_CONNECTING, absl::Status(), - MakeRefCounted(Ref(DEBUG_LOCATION, "QueuePicker"))); } return absl::OkStatus(); } diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 95260a6281d..5b52714060d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -729,12 +729,9 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { return status; } // Otherwise, if this is the initial update, immediately promote it to - // subchannel_list_ and report CONNECTING. + // subchannel_list_. if (subchannel_list_.get() == nullptr) { subchannel_list_ = std::move(latest_pending_subchannel_list_); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_CONNECTING, absl::Status(), - MakeRefCounted(Ref(DEBUG_LOCATION, "QueuePicker"))); } return absl::OkStatus(); } diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index ac42aaa2923..bcf771bd7fa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -519,7 +519,9 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() { WeightedTargetLb::WeightedChild::WeightedChild( RefCountedPtr weighted_target_policy, const std::string& name) - : weighted_target_policy_(std::move(weighted_target_policy)), name_(name) { + : weighted_target_policy_(std::move(weighted_target_policy)), + name_(name), + picker_(MakeRefCounted(nullptr)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s", weighted_target_policy_.get(), this, name_.c_str()); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index 13482aff9a5..1770544a22c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -200,7 +200,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { OrphanablePtr child_policy_; RefCountedPtr picker_; - grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; + grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; // States for delayed removal. absl::optional delayed_removal_timer_handle_; @@ -409,7 +409,8 @@ XdsClusterManagerLb::ClusterChild::ClusterChild( RefCountedPtr xds_cluster_manager_policy, const std::string& name) : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)), - name_(name) { + name_(name), + picker_(MakeRefCounted(nullptr)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] created ClusterChild %p for %s", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index 4fa540ad4b0..e1915e0f8e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -319,7 +319,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { OrphanablePtr child_policy_; // Latest state and picker reported by the child policy. - grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; + grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; absl::Status status_; RefCountedPtr picker_; Mutex subchannel_map_mu_; diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index 090a301fd73..c1fa4e71c6c 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -366,6 +366,19 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// whether the LB policy accepted the update; if non-OK, informs /// polling-based resolvers that they should go into backoff delay and /// eventually reattempt the resolution. + /// + /// The first time that UpdateLocked() is called, the LB policy will + /// generally not be able to determine the appropriate connectivity + /// state by the time UpdateLocked() returns (e.g., it will need to + /// wait for connectivity state notifications from each subchannel, + /// which will be delivered asynchronously). In this case, the LB + /// policy should not call the helper's UpdateState() method until it + /// does have a clear picture of the connectivity state (e.g., it + /// should wait for all subchannels to report connectivity state + /// before calling the helper's UpdateState() method), although it is + /// expected to do so within some short period of time. The parent of + /// the LB policy will assume that the policy's initial state is + /// CONNECTING and that picks should be queued. virtual absl::Status UpdateLocked(UpdateArgs) = 0; // NOLINT /// Tries to enter a READY connectivity state. diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index fd9ace9de89..09d3b6170bb 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -46,8 +46,6 @@ TEST_F(PickFirstTest, Basic) { absl::Status status = ApplyUpdate(BuildUpdate({kAddressUri}), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have reported CONNECTING state. - ExpectConnectingUpdate(); // LB policy should have created a subchannel for the address with the // GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. auto* subchannel = FindSubchannel( @@ -58,6 +56,8 @@ TEST_F(PickFirstTest, Basic) { EXPECT_TRUE(subchannel->ConnectionRequested()); // This causes the subchannel to start to connect, so it reports CONNECTING. subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); // When the subchannel becomes connected, it reports READY. subchannel->SetConnectivityState(GRPC_CHANNEL_READY); // The LB policy will report CONNECTING some number of times (doesn't