diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 2c08245a8e8..1cd5ef065de 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -84,7 +84,7 @@ class PickFirst : public LoadBalancingPolicy { // Processes the connectivity change to READY for an unselected subchannel. void ProcessUnselectedReadyLocked(); - void StartConnectivityWatchLocked() override; + void CheckConnectivityStateAndStartWatchingLocked(); }; class PickFirstSubchannelList @@ -252,7 +252,8 @@ void PickFirst::StartPickingLocked() { if (subchannel_list_ != nullptr) { for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(i) + ->CheckConnectivityStateAndStartWatchingLocked(); break; } } @@ -391,7 +392,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(0) + ->CheckConnectivityStateAndStartWatchingLocked(); } } else { // We do have a selected subchannel. @@ -445,7 +447,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // subchannel in the new list. if (started_picking_) { latest_pending_subchannel_list_->subchannel(0) - ->StartConnectivityWatchLocked(); + ->CheckConnectivityStateAndStartWatchingLocked(); } } } @@ -545,7 +547,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "exhausted_subchannels"); } - sd->StartConnectivityWatchLocked(); + sd->CheckConnectivityStateAndStartWatchingLocked(); break; } case GRPC_CHANNEL_CONNECTING: @@ -568,8 +570,15 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { PickFirst* p = static_cast(subchannel_list()->policy()); - GPR_ASSERT(p->selected_ != this); - GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY); + if (p->selected_ == this) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_ERROR, + "Pick First %p calling ProcessUnselectedReadyLocked() on " + "selected subchannel %p", + p, subchannel()); + } + return; + } // If we get here, there are two possible cases: // 1. We do not currently have a selected subchannel, and the update is // for a subchannel in p->subchannel_list_ that we're trying to @@ -614,7 +623,8 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { } } -void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() { +void PickFirst::PickFirstSubchannelData:: + CheckConnectivityStateAndStartWatchingLocked() { PickFirst* p = static_cast(subchannel_list()->policy()); grpc_error* error = GRPC_ERROR_NONE; if (p->selected_ != this && @@ -625,7 +635,7 @@ void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() { ProcessUnselectedReadyLocked(); } GRPC_ERROR_UNREF(error); - SubchannelData::StartConnectivityWatchLocked(); + StartConnectivityWatchLocked(); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0a75fc10c09..0fa2f04e73b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -115,7 +115,7 @@ class SubchannelData { // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. - virtual void StartConnectivityWatchLocked(); + void StartConnectivityWatchLocked(); // Renews watching the connectivity state of the subchannel. void RenewConnectivityWatchLocked(); @@ -154,10 +154,6 @@ class SubchannelData { grpc_connectivity_state connectivity_state, grpc_error* error) GRPC_ABSTRACT; - // Returns the connectivity state. Must be called only while there is no - // connectivity notification pending. - grpc_connectivity_state connectivity_state() const; - private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. // Returns true if the connectivity state should be reported. @@ -320,13 +316,6 @@ void SubchannelData -grpc_connectivity_state SubchannelData< - SubchannelListType, SubchannelDataType>::connectivity_state() const { - GPR_ASSERT(!connectivity_notification_pending_); - return pending_connectivity_state_unsafe_; -} - template void SubchannelData::StartConnectivityWatchLocked() { @@ -361,8 +350,7 @@ void SubchannelDatapolicy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); @@ -379,7 +367,8 @@ void SubchannelDatanum_subchannels(), subchannel_); } - GPR_ASSERT(!connectivity_notification_pending_); + GPR_ASSERT(connectivity_notification_pending_); + connectivity_notification_pending_ = false; subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); } @@ -453,7 +442,6 @@ void SubchannelData:: grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), grpc_error_string(error), sd->subchannel_list_->shutting_down()); } - sd->connectivity_notification_pending_ = false; // If shutting down, unref subchannel and stop watching. if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { sd->UnrefSubchannelLocked("connectivity_shutdown"); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 94f62637ec1..f2368ae7dd6 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -291,6 +291,17 @@ class ClientLbEnd2endTest : public ::testing::Test { ResetCounters(); } + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(false /* try_to_connect */)) == + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + bool SeenAllServers() { for (const auto& server : servers_) { if (server->service_.request_count() == 0) return false; @@ -590,18 +601,22 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { gpr_log(GPR_INFO, "****** SERVER RESTARTED *******"); auto channel_2 = BuildChannel("pick_first"); auto stub_2 = BuildStub(channel_2); + // TODO(juanlishen): This resolution result will only be visible to channel 2 + // since the response generator is only associated with channel 2 now. We + // should change the response generator to be able to deliver updates to + // multiple channels at once. SetNextResolution(ports); gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******"); WaitForServer(stub_2, 0, DEBUG_LOCATION, true); gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******"); servers_[0]->Shutdown(); + // Wait until the disconnection has triggered the connectivity notification. + // Otherwise, the subchannel may be picked for next call but will fail soon. + EXPECT_TRUE(WaitForChannelNotReady(channel_2.get())); // Channel 2 will also receive a re-resolution containing the same server. // Both channels will ref the same subchannel that failed. servers_.clear(); StartServers(1, ports); - // Wait for a while so that the disconnection has triggered the connectivity - // notification. Otherwise, the subchannel may be picked but will fail soon. - sleep(1); gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******"); gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******"); // The first call after the server restart will succeed.