From bb5741f9c006957e72e6a91e3389c0c3bc34d6b8 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 6 Dec 2018 10:18:44 -0800 Subject: [PATCH] Change pick_first to immediately select the first subchannel in READY state. --- .../lb_policy/pick_first/pick_first.cc | 74 +++++++++---------- test/cpp/end2end/client_lb_end2end_test.cc | 48 +++++++++--- 2 files changed, 74 insertions(+), 48 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d454401a669..d1a05f12554 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -380,6 +380,31 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, selected_ = nullptr; return; } + // If one of the subchannels in the new list is already in state + // READY, then select it immediately. This can happen when the + // currently selected subchannel is also present in the update. It + // can also happen if one of the subchannels in the update is already + // in the subchannel index because it's in use by another channel. + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error); + GRPC_ERROR_UNREF(error); + if (state == GRPC_CHANNEL_READY) { + subchannel_list_ = std::move(subchannel_list); + sd->ProcessUnselectedReadyLocked(); + sd->StartConnectivityWatchLocked(); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); + // Make sure that subsequent calls to ExitIdleLocked() don't cause + // us to start watching a subchannel other than the one we've + // selected. + started_picking_ = true; + return; + } + } if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. @@ -387,46 +412,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } } else { - // We do have a selected subchannel. - // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - PickFirstSubchannelData* sd = subchannel_list->subchannel(i); - if (sd->subchannel() == selected_->subchannel()) { - // The currently selected subchannel is in the update: we are done. - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p " - "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel(), i, - subchannel_list->num_subchannels()); - } - // Make sure it's in state READY. It might not be if we grabbed - // the combiner while a connectivity state notification - // informing us otherwise is pending. - // Note that CheckConnectivityStateLocked() also takes a ref to - // the connected subchannel. - grpc_error* error = GRPC_ERROR_NONE; - if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { - selected_ = sd; - subchannel_list_ = std::move(subchannel_list); - sd->StartConnectivityWatchLocked(); - // If there was a previously pending update (which may or may - // not have contained the currently selected subchannel), drop - // it, so that it doesn't override what we've done here. - latest_pending_subchannel_list_.reset(); - return; - } - GRPC_ERROR_UNREF(error); - } - } - // Not keeping the previous selected subchannel, so set the latest - // pending subchannel list to the new subchannel list. We will wait - // for it to report READY before swapping it into the current - // subchannel list. + // We do have a selected subchannel, so keep using it until one of + // the subchannels in the new list reports READY. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, @@ -440,8 +433,11 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. latest_pending_subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + ->StartConnectivityWatchLocked(); } } } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index b667460cf08..759847be3e9 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -116,7 +116,10 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: ClientLbEnd2endTest() - : server_host_("localhost"), kRequestMessage_("Live long and prosper.") { + : server_host_("localhost"), + kRequestMessage_("Live long and prosper."), + creds_(new SecureChannelCredentials( + grpc_fake_transport_security_credentials_create())) { // Make the backup poller poll very frequently in order to pick up // updates from all the subchannels's FDs. gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); @@ -215,9 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); - std::shared_ptr creds(new SecureChannelCredentials( - grpc_fake_transport_security_credentials_create())); - return CreateCustomChannel("fake:///", std::move(creds), args); + return CreateCustomChannel("fake:///", creds_, args); } bool SendRpc( @@ -265,6 +266,7 @@ class ClientLbEnd2endTest : public ::testing::Test { MyTestServiceImpl service_; std::unique_ptr thread_; bool server_ready_ = false; + bool started_ = false; explicit ServerData(int port = 0) { port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); @@ -272,6 +274,7 @@ class ClientLbEnd2endTest : public ::testing::Test { void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); + started_ = true; std::mutex mu; std::unique_lock lock(mu); std::condition_variable cond; @@ -297,9 +300,11 @@ class ClientLbEnd2endTest : public ::testing::Test { cond->notify_one(); } - void Shutdown(bool join = true) { + void Shutdown() { + if (!started_) return; server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); - if (join) thread_->join(); + thread_->join(); + started_ = false; } void SetServingStatus(const grpc::string& service, bool serving) { @@ -378,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_core::RefCountedPtr response_generator_; const grpc::string kRequestMessage_; + std::shared_ptr creds_; }; TEST_F(ClientLbEnd2endTest, PickFirst) { @@ -422,6 +428,30 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) { CheckRpcSendOk(second_stub, DEBUG_LOCATION); } +TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 5000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + // Create 2 servers, but start only the second one. + std::vector ports = {grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die()}; + CreateServers(2, ports); + StartServer(1); + auto channel1 = BuildChannel("pick_first", args); + auto stub1 = BuildStub(channel1); + SetNextResolution(ports); + // Wait for second server to be ready. + WaitForServer(stub1, 1, DEBUG_LOCATION); + // Create a second channel with the same addresses. Its PF instance + // should immediately pick the second subchannel, since it's already + // in READY state. + auto channel2 = BuildChannel("pick_first", args); + SetNextResolution(ports); + // Check that the channel reports READY without waiting for the + // initial backoff. + EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */)); +} + TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { ChannelArguments args; constexpr int kInitialBackOffMs = 100; @@ -899,7 +929,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { servers_[0]->service_.ResetCounters(); // Shutdown one of the servers to be sent in the update. - servers_[1]->Shutdown(false); + servers_[1]->Shutdown(); ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[2]->port_); SetNextResolution(ports); @@ -958,7 +988,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { // Kill all servers gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); for (size_t i = 0; i < servers_.size(); ++i) { - servers_[i]->Shutdown(true); + servers_[i]->Shutdown(); } gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); @@ -1006,7 +1036,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { } const auto pre_death = servers_[0]->service_.request_count(); // Kill the first server. - servers_[0]->Shutdown(true); + servers_[0]->Shutdown(); // Client request still succeed. May need retrying if RR had returned a pick // before noticing the change in the server's connectivity. while (!SendRpc(stub)) {