From 9b72650125cd0f8b21ebd77825388e7dc3a9d191 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Thu, 9 Aug 2018 18:50:53 -0700 Subject: [PATCH] PF: Check connectivity state before watching --- .../lb_policy/pick_first/pick_first.cc | 103 ++++++++++++------ .../lb_policy/subchannel_list.h | 20 +++- test/cpp/end2end/client_lb_end2end_test.cc | 40 +++++++ 3 files changed, 124 insertions(+), 39 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 2b6a9ba8c53..2c08245a8e8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -80,6 +80,11 @@ class PickFirst : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; + + // Processes the connectivity change to READY for an unselected subchannel. + void ProcessUnselectedReadyLocked(); + + void StartConnectivityWatchLocked() override; }; class PickFirstSubchannelList @@ -519,41 +524,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // select in place of the current one. switch (connectivity_state) { case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list_ to - // p->subchannel_list_. - if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p promoting pending subchannel list %p to " - "replace %p", - p, p->latest_pending_subchannel_list_.get(), - p->subchannel_list_.get()); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - // Cases 1 and 2. - grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, - GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = this; - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - subchannel()); - } - // Drop all other subchannels, since we are now connected. - p->DestroyUnselectedSubchannelsLocked(); - // Update any calls that were waiting for a pick. - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - pick->connected_subchannel = - p->selected_->connected_subchannel()->Ref(); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Servicing pending pick with selected subchannel %p", - p->selected_->subchannel()); - } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } + ProcessUnselectedReadyLocked(); // Renew notification. RenewConnectivityWatchLocked(); break; @@ -595,6 +566,68 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_ERROR_UNREF(error); } +void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { + PickFirst* p = static_cast(subchannel_list()->policy()); + GPR_ASSERT(p->selected_ != this); + GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY); + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list_ that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list_. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); + // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_. + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Cases 1 and 2. + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "subchannel_ready"); + p->selected_ = this; + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); + } + // Drop all other subchannels, since we are now connected. + p->DestroyUnselectedSubchannelsLocked(); + // Update any calls that were waiting for a pick. + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; + pick->connected_subchannel = p->selected_->connected_subchannel()->Ref(); + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", + p->selected_->subchannel()); + } + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } +} + +void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() { + PickFirst* p = static_cast(subchannel_list()->policy()); + grpc_error* error = GRPC_ERROR_NONE; + if (p->selected_ != this && + CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { + // We must process the READY subchannel before we start watching it. + // Otherwise, we won't know it's READY because we will be waiting for its + // connectivity state to change from READY. + ProcessUnselectedReadyLocked(); + } + GRPC_ERROR_UNREF(error); + SubchannelData::StartConnectivityWatchLocked(); +} + // // factory // diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0fa2f04e73b..0a75fc10c09 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -115,7 +115,7 @@ class SubchannelData { // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. - void StartConnectivityWatchLocked(); + virtual void StartConnectivityWatchLocked(); // Renews watching the connectivity state of the subchannel. void RenewConnectivityWatchLocked(); @@ -154,6 +154,10 @@ class SubchannelData { grpc_connectivity_state connectivity_state, grpc_error* error) GRPC_ABSTRACT; + // Returns the connectivity state. Must be called only while there is no + // connectivity notification pending. + grpc_connectivity_state connectivity_state() const; + private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. // Returns true if the connectivity state should be reported. @@ -316,6 +320,13 @@ void SubchannelData +grpc_connectivity_state SubchannelData< + SubchannelListType, SubchannelDataType>::connectivity_state() const { + GPR_ASSERT(!connectivity_notification_pending_); + return pending_connectivity_state_unsafe_; +} + template void SubchannelData::StartConnectivityWatchLocked() { @@ -350,7 +361,8 @@ void SubchannelDatapolicy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); @@ -367,8 +379,7 @@ void SubchannelDatanum_subchannels(), subchannel_); } - GPR_ASSERT(connectivity_notification_pending_); - connectivity_notification_pending_ = false; + GPR_ASSERT(!connectivity_notification_pending_); subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); } @@ -442,6 +453,7 @@ void SubchannelData:: grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), grpc_error_string(error), sd->subchannel_list_->shutting_down()); } + sd->connectivity_notification_pending_ = false; // If shutting down, unref subchannel and stop watching. if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { sd->UnrefSubchannelLocked("connectivity_shutdown"); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 7fe0da8aaec..94f62637ec1 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -573,6 +573,46 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) { EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) { + std::vector ports = {grpc_pick_unused_port_or_die()}; + StartServers(1, ports); + auto channel_1 = BuildChannel("pick_first"); + auto stub_1 = BuildStub(channel_1); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******"); + WaitForServer(stub_1, 0, DEBUG_LOCATION); + gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******"); + servers_[0]->Shutdown(); + // Channel 1 will receive a re-resolution containing the same server. It will + // create a new subchannel and hold a ref to it. + servers_.clear(); + StartServers(1, ports); + gpr_log(GPR_INFO, "****** SERVER RESTARTED *******"); + auto channel_2 = BuildChannel("pick_first"); + auto stub_2 = BuildStub(channel_2); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******"); + WaitForServer(stub_2, 0, DEBUG_LOCATION, true); + gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******"); + servers_[0]->Shutdown(); + // Channel 2 will also receive a re-resolution containing the same server. + // Both channels will ref the same subchannel that failed. + servers_.clear(); + StartServers(1, ports); + // Wait for a while so that the disconnection has triggered the connectivity + // notification. Otherwise, the subchannel may be picked but will fail soon. + sleep(1); + gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******"); + gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******"); + // The first call after the server restart will succeed. + CheckRpcSendOk(stub_2, DEBUG_LOCATION); + gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******"); + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName()); + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName()); +} + TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3;