diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index e97861c639d..662a2f42dc8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -146,6 +146,8 @@ class PickFirst : public LoadBalancingPolicy { // Cancels any pending connectivity watch and unrefs the subchannel. void ShutdownLocked(); + bool seen_transient_failure() const { return seen_transient_failure_; } + private: // Watcher for subchannel connectivity state. class Watcher @@ -199,6 +201,7 @@ class PickFirst : public LoadBalancingPolicy { // Data updated by the watcher. absl::optional connectivity_state_; absl::Status connectivity_status_; + bool seen_transient_failure_ = false; }; SubchannelList(RefCountedPtr policy, @@ -214,6 +217,17 @@ class PickFirst : public LoadBalancingPolicy { void Orphan() override; + bool IsHappyEyeballsPassComplete() const { + // Checking attempting_index_ here is just an optimization -- if + // we haven't actually tried all subchannels yet, then we don't + // need to iterate. + if (attempting_index_ < size()) return false; + for (const SubchannelData& sd : subchannels_) { + if (!sd.seen_transient_failure()) return false; + } + return true; + } + private: // Returns true if all subchannels have seen their initial // connectivity state notifications. @@ -222,11 +236,16 @@ class PickFirst : public LoadBalancingPolicy { // Looks through subchannels_ starting from attempting_index_ to // find the first one not currently in TRANSIENT_FAILURE, then // triggers a connection attempt for that subchannel. If there are - // no more subchannels not in TRANSIENT_FAILURE (i.e., the Happy - // Eyeballs pass is complete), transitions to a mode where we - // try to connect to all subchannels in parallel. + // no more subchannels not in TRANSIENT_FAILURE, calls + // MaybeFinishHappyEyeballsPass(). void StartConnectingNextSubchannel(); + // Checks to see if the initial Happy Eyeballs pass is complete -- + // i.e., all subchannels have seen TRANSIENT_FAILURE state at least once. + // If so, transitions to a mode where we try to connect to all subchannels + // in parallel and returns true. + void MaybeFinishHappyEyeballsPass(); + // Backpointer to owning policy. RefCountedPtr policy_; @@ -254,6 +273,9 @@ class PickFirst : public LoadBalancingPolicy { // After the initial Happy Eyeballs pass, the number of failures // we've seen. Every size() failures, we trigger re-resolution. size_t num_failures_ = 0; + + // The status from the last subchannel that reported TRANSIENT_FAILURE. + absl::Status last_failure_; }; class HealthWatcher @@ -607,16 +629,17 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, " "status=%s, shutting_down=%d, pending_watcher=%p, " - "p->selected_=%p, p->subchannel_list_=%p, " - "p->latest_pending_subchannel_list_=%p", + "seen_transient_failure=%d, p->selected_=%p, " + "p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p", p, subchannel_list_, Index(), subchannel_list_->size(), subchannel_.get(), (connectivity_state_.has_value() ? ConnectivityStateName(*connectivity_state_) : "N/A"), ConnectivityStateName(new_state), status.ToString().c_str(), - subchannel_list_->shutting_down_, pending_watcher_, p->selected_, - p->subchannel_list_.get(), p->latest_pending_subchannel_list_.get()); + subchannel_list_->shutting_down_, pending_watcher_, + seen_transient_failure_, p->selected_, p->subchannel_list_.get(), + p->latest_pending_subchannel_list_.get()); } if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return; // The notification must be for a subchannel in either the current or @@ -626,7 +649,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); absl::optional old_state = connectivity_state_; connectivity_state_ = new_state; - connectivity_status_ = status; + connectivity_status_ = std::move(status); // Handle updates for the currently selected subchannel. if (p->selected_ == this) { GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get()); @@ -657,14 +680,12 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); // Set our state to that of the pending subchannel list. if (IsPickFirstHappyEyeballsEnabled() - ? (p->subchannel_list_->attempting_index_ == - p->subchannel_list_->size()) + ? p->subchannel_list_->IsHappyEyeballsPassComplete() : p->subchannel_list_->in_transient_failure_) { - absl::Status status = absl::UnavailableError(absl::StrCat( + status = absl::UnavailableError(absl::StrCat( "selected subchannel failed; switching to pending update; " "last failure: ", - p->subchannel_list_->subchannels_.back() - .connectivity_status_.ToString())); + p->subchannel_list_->last_failure_.ToString())); p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status, MakeRefCounted(status)); } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -698,6 +719,12 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( ProcessUnselectedReadyLocked(); return; } + // Make sure we note when a subchannel has seen TRANSIENT_FAILURE. + bool prev_seen_transient_failure = seen_transient_failure_; + if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + seen_transient_failure_ = true; + subchannel_list_->last_failure_ = connectivity_status_; + } // If we haven't yet seen the initial connectivity state notification // for all subchannels, do nothing. if (!subchannel_list_->AllSubchannelsSeenInitialState()) return; @@ -724,17 +751,24 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( // Otherwise, process connectivity state change. switch (*connectivity_state_) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { - // If a connection attempt fails before the timer fires, then - // cancel the timer and start connecting on the next subchannel. - if (Index() == subchannel_list_->attempting_index_) { - if (subchannel_list_->timer_handle_.has_value()) { - p->channel_control_helper()->GetEventEngine()->Cancel( - *subchannel_list_->timer_handle_); + // If this is the first failure we've seen on this subchannel, + // then we're still in the Happy Eyeballs pass. + if (!prev_seen_transient_failure && seen_transient_failure_) { + // If a connection attempt fails before the timer fires, then + // cancel the timer and start connecting on the next subchannel. + if (Index() == subchannel_list_->attempting_index_) { + if (subchannel_list_->timer_handle_.has_value()) { + p->channel_control_helper()->GetEventEngine()->Cancel( + *subchannel_list_->timer_handle_); + } + ++subchannel_list_->attempting_index_; + subchannel_list_->StartConnectingNextSubchannel(); + } else { + // If this was the last subchannel to fail, check if the Happy + // Eyeballs pass is complete. + subchannel_list_->MaybeFinishHappyEyeballsPass(); } - ++subchannel_list_->attempting_index_; - subchannel_list_->StartConnectingNextSubchannel(); - } else if (subchannel_list_->attempting_index_ == - subchannel_list_->size()) { + } else if (subchannel_list_->IsHappyEyeballsPassComplete()) { // We're done with the initial Happy Eyeballs pass and in a mode // where we're attempting to connect to every subchannel in // parallel. We count the number of failed connection attempts, @@ -749,7 +783,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( ++subchannel_list_->num_failures_; if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) { p->channel_control_helper()->RequestReresolution(); - absl::Status status = absl::UnavailableError(absl::StrCat( + status = absl::UnavailableError(absl::StrCat( (p->omit_status_message_prefix_ ? "" : "failed to connect to all addresses; last error: "), @@ -764,7 +798,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( // If we've finished the first Happy Eyeballs pass, then we go // into a mode where we immediately try to connect to every // subchannel in parallel. - if (subchannel_list_->attempting_index_ == subchannel_list_->size()) { + if (subchannel_list_->IsHappyEyeballsPassComplete()) { subchannel_->RequestConnection(); } break; @@ -1082,6 +1116,15 @@ void PickFirst::SubchannelList::StartConnectingNextSubchannel() { return; } } + // If we didn't find a subchannel to request a connection on, check to + // see if the Happy Eyeballs pass is complete. + MaybeFinishHappyEyeballsPass(); +} + +void PickFirst::SubchannelList::MaybeFinishHappyEyeballsPass() { + // Make sure all subchannels have finished a connection attempt before + // we consider the Happy Eyeballs pass complete. + if (!IsHappyEyeballsPassComplete()) return; // We didn't find another subchannel not in state TRANSIENT_FAILURE, // so report TRANSIENT_FAILURE and switch to a mode in which we try to // connect to all addresses in parallel. @@ -1115,7 +1158,7 @@ void PickFirst::SubchannelList::StartConnectingNextSubchannel() { absl::StrCat((policy_->omit_status_message_prefix_ ? "" : "failed to connect to all addresses; last error: "), - subchannels_.back().connectivity_status().ToString())); + last_failure_.ToString())); policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status, MakeRefCounted(status)); } diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 380fe6c89f5..81365410c48 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -1008,18 +1008,18 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Expects a state update for the specified state and status, and then // expects the resulting picker to queue picks. - void ExpectStateAndQueuingPicker( + bool ExpectStateAndQueuingPicker( grpc_connectivity_state expected_state, absl::Status expected_status = absl::OkStatus(), SourceLocation location = SourceLocation()) { auto picker = ExpectState(expected_state, expected_status, location); - ExpectPickQueued(picker.get(), {}, location); + return ExpectPickQueued(picker.get(), {}, location); } // Convenient frontend to ExpectStateAndQueuingPicker() for CONNECTING. - void ExpectConnectingUpdate(SourceLocation location = SourceLocation()) { - ExpectStateAndQueuingPicker(GRPC_CHANNEL_CONNECTING, absl::OkStatus(), - location); + bool ExpectConnectingUpdate(SourceLocation location = SourceLocation()) { + return ExpectStateAndQueuingPicker(GRPC_CHANNEL_CONNECTING, + absl::OkStatus(), location); } static std::unique_ptr MakeMetadata( @@ -1038,15 +1038,18 @@ class LoadBalancingPolicyTest : public ::testing::Test { } // Requests a pick on picker and expects a Queue result. - void ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker, + bool ExpectPickQueued(LoadBalancingPolicy::SubchannelPicker* picker, const CallAttributes call_attributes = {}, SourceLocation location = SourceLocation()) { - ASSERT_NE(picker, nullptr); + EXPECT_NE(picker, nullptr) << location.file() << ":" << location.line(); + if (picker == nullptr) return false; auto pick_result = DoPick(picker, call_attributes); - ASSERT_TRUE(absl::holds_alternative( + EXPECT_TRUE(absl::holds_alternative( pick_result.result)) << PickResultString(pick_result) << "\nat " << location.file() << ":" << location.line(); + return absl::holds_alternative( + pick_result.result); } // Requests a pick on picker and expects a Complete result. @@ -1257,7 +1260,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { void DrainConnectingUpdates(SourceLocation location = SourceLocation()) { gpr_log(GPR_INFO, "Draining CONNECTING updates..."); while (!helper_->QueueEmpty()) { - ExpectConnectingUpdate(location); + ASSERT_TRUE(ExpectConnectingUpdate(location)); } gpr_log(GPR_INFO, "Done draining CONNECTING updates"); } diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index 0612711b6e7..4effca242e1 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -528,6 +528,10 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) { subchannel3->SetConnectivityState( GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect")); + ExpectQueueEmpty(); + // Eventually, the first subchannel fails as well. + subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::UnavailableError("failed to connect")); // The LB policy should request re-resolution. ExpectReresolutionRequest(); // The LB policy should report TRANSIENT_FAILURE. @@ -540,14 +544,19 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) { // mode where we try to connect to all subchannels in parallel. // Subchannel 2 was already in state IDLE, so the LB policy will // immediately trigger a connection request on it. It will not do so - // for subchannels 1 (in CONNECTING) or 3 (in TRANSIENT_FAILURE). + // for subchannels 1 or 3, which are in TRANSIENT_FAILURE. EXPECT_FALSE(subchannel->ConnectionRequested()); EXPECT_TRUE(subchannel2->ConnectionRequested()); EXPECT_FALSE(subchannel3->ConnectionRequested()); // Subchannel 2 reports CONNECTING. subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - // Now subchannel 1 reports TF. This is the first failure since we - // finished Happy Eyeballs. + // Now subchannel 1 reports IDLE. This should trigger another + // connection attempt. + subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); + EXPECT_TRUE(subchannel->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // Now subchannel 1 reports TRANSIENT_FAILURE. This is the first failure + // since we finished Happy Eyeballs. subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect")); EXPECT_FALSE(subchannel->ConnectionRequested()); @@ -589,6 +598,76 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) { } } +TEST_F(PickFirstTest, + HappyEyeballsLastSubchannelFailsWhileAnotherIsStillPending) { + if (!IsPickFirstHappyEyeballsEnabled()) return; + // Send an update containing three addresses. + constexpr std::array kAddresses = { + "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; + absl::Status status = ApplyUpdate( + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + EXPECT_TRUE(status.ok()) << status; + // LB policy should have created a subchannel for both addresses. + auto* subchannel = FindSubchannel(kAddresses[0]); + ASSERT_NE(subchannel, nullptr); + auto* subchannel2 = FindSubchannel(kAddresses[1]); + ASSERT_NE(subchannel2, nullptr); + // When the LB policy receives the first subchannel's initial connectivity + // state notification (IDLE), it will request a connection. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // This causes the subchannel to start to connect, so it reports + // CONNECTING. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // The second subchannel should not be connecting. + EXPECT_FALSE(subchannel2->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the second subchannel. + EXPECT_TRUE(subchannel2->ConnectionRequested()); + subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // The second subchannel fails. + subchannel2->SetConnectivityState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::UnavailableError("failed to connect")); + // The LB policy should not yet report TRANSIENT_FAILURE, because the + // first subchannel is still CONNECTING. + DrainConnectingUpdates(); + // Set subchannel 2 back to IDLE, so it's already in that state when + // Happy Eyeballs fails. + subchannel2->SetConnectivityState(GRPC_CHANNEL_IDLE); + // Now the first subchannel fails. + subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::UnavailableError("failed to connect")); + // The LB policy should request re-resolution. + ExpectReresolutionRequest(); + // The LB policy should report TRANSIENT_FAILURE. + WaitForConnectionFailed([&](const absl::Status& status) { + EXPECT_EQ(status, absl::UnavailableError( + "failed to connect to all addresses; " + "last error: UNAVAILABLE: failed to connect")); + }); + // We are now done with the Happy Eyeballs pass, and we move into a + // mode where we try to connect to all subchannels in parallel. + // Subchannel 2 was already in state IDLE, so the LB policy will + // immediately trigger a connection request on it. It will not do so + // for subchannel 1, which is still in TRANSIENT_FAILURE. + EXPECT_FALSE(subchannel->ConnectionRequested()); + EXPECT_TRUE(subchannel2->ConnectionRequested()); + // Subchannel 2 reports CONNECTING. + subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // Subchannel 2 reports READY. + subchannel2->SetConnectivityState(GRPC_CHANNEL_READY); + // The LB policy will report READY. + auto picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + // Picker should return the same subchannel repeatedly. + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[1]); + } +} + TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) { if (!IsPickFirstHappyEyeballsEnabled()) return; // Send an update containing three IPv4 addresses followed by three