Merge pull request #17383 from markdroth/pick_first_choose_ready_subchannel_at_startup

Change pick_first to immediately select the first subchannel in READY state.
pull/17432/head
Mark D. Roth 6 years ago committed by GitHub
commit e9efc6fa02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 74
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  2. 48
      test/cpp/end2end/client_lb_end2end_test.cc

@ -380,6 +380,31 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
selected_ = nullptr; selected_ = nullptr;
return; return;
} }
// If one of the subchannels in the new list is already in state
// READY, then select it immediately. This can happen when the
// currently selected subchannel is also present in the update. It
// can also happen if one of the subchannels in the update is already
// in the subchannel index because it's in use by another channel.
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_error* error = GRPC_ERROR_NONE;
grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error);
GRPC_ERROR_UNREF(error);
if (state == GRPC_CHANNEL_READY) {
subchannel_list_ = std::move(subchannel_list);
sd->ProcessUnselectedReadyLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
// Make sure that subsequent calls to ExitIdleLocked() don't cause
// us to start watching a subchannel other than the one we've
// selected.
started_picking_ = true;
return;
}
}
if (selected_ == nullptr) { if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current // We don't yet have a selected subchannel, so replace the current
// subchannel list immediately. // subchannel list immediately.
@ -387,46 +412,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
// If we've started picking, start trying to connect to the first // If we've started picking, start trying to connect to the first
// subchannel in the new list. // subchannel in the new list.
if (started_picking_) { if (started_picking_) {
subchannel_list_->subchannel(0) // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
->CheckConnectivityStateAndStartWatchingLocked(); // here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
} }
} else { } else {
// We do have a selected subchannel. // We do have a selected subchannel, so keep using it until one of
// Check if it's present in the new list. If so, we're done. // the subchannels in the new list reports READY.
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
if (sd->subchannel() == selected_->subchannel()) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
this, selected_->subchannel(), i,
subchannel_list->num_subchannels());
}
// Make sure it's in state READY. It might not be if we grabbed
// the combiner while a connectivity state notification
// informing us otherwise is pending.
// Note that CheckConnectivityStateLocked() also takes a ref to
// the connected subchannel.
grpc_error* error = GRPC_ERROR_NONE;
if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
selected_ = sd;
subchannel_list_ = std::move(subchannel_list);
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
return;
}
GRPC_ERROR_UNREF(error);
}
}
// Not keeping the previous selected subchannel, so set the latest
// pending subchannel list to the new subchannel list. We will wait
// for it to report READY before swapping it into the current
// subchannel list.
if (latest_pending_subchannel_list_ != nullptr) { if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -440,8 +433,11 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
// If we've started picking, start trying to connect to the first // If we've started picking, start trying to connect to the first
// subchannel in the new list. // subchannel in the new list.
if (started_picking_) { if (started_picking_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0) latest_pending_subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked(); ->StartConnectivityWatchLocked();
} }
} }
} }

@ -116,7 +116,10 @@ class MyTestServiceImpl : public TestServiceImpl {
class ClientLbEnd2endTest : public ::testing::Test { class ClientLbEnd2endTest : public ::testing::Test {
protected: protected:
ClientLbEnd2endTest() ClientLbEnd2endTest()
: server_host_("localhost"), kRequestMessage_("Live long and prosper.") { : server_host_("localhost"),
kRequestMessage_("Live long and prosper."),
creds_(new SecureChannelCredentials(
grpc_fake_transport_security_credentials_create())) {
// Make the backup poller poll very frequently in order to pick up // Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs. // updates from all the subchannels's FDs.
gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
@ -215,9 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
} // else, default to pick first } // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get()); response_generator_.get());
std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials( return CreateCustomChannel("fake:///", creds_, args);
grpc_fake_transport_security_credentials_create()));
return CreateCustomChannel("fake:///", std::move(creds), args);
} }
bool SendRpc( bool SendRpc(
@ -265,6 +266,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
MyTestServiceImpl service_; MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_; std::unique_ptr<std::thread> thread_;
bool server_ready_ = false; bool server_ready_ = false;
bool started_ = false;
explicit ServerData(int port = 0) { explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
@ -272,6 +274,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
void Start(const grpc::string& server_host) { void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_); gpr_log(GPR_INFO, "starting server on port %d", port_);
started_ = true;
std::mutex mu; std::mutex mu;
std::unique_lock<std::mutex> lock(mu); std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond; std::condition_variable cond;
@ -297,9 +300,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
cond->notify_one(); cond->notify_one();
} }
void Shutdown(bool join = true) { void Shutdown() {
if (!started_) return;
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
if (join) thread_->join(); thread_->join();
started_ = false;
} }
void SetServingStatus(const grpc::string& service, bool serving) { void SetServingStatus(const grpc::string& service, bool serving) {
@ -378,6 +383,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator_; response_generator_;
const grpc::string kRequestMessage_; const grpc::string kRequestMessage_;
std::shared_ptr<ChannelCredentials> creds_;
}; };
TEST_F(ClientLbEnd2endTest, PickFirst) { TEST_F(ClientLbEnd2endTest, PickFirst) {
@ -422,6 +428,30 @@ TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
CheckRpcSendOk(second_stub, DEBUG_LOCATION); CheckRpcSendOk(second_stub, DEBUG_LOCATION);
} }
TEST_F(ClientLbEnd2endTest, PickFirstSelectsReadyAtStartup) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 5000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
// Create 2 servers, but start only the second one.
std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
CreateServers(2, ports);
StartServer(1);
auto channel1 = BuildChannel("pick_first", args);
auto stub1 = BuildStub(channel1);
SetNextResolution(ports);
// Wait for second server to be ready.
WaitForServer(stub1, 1, DEBUG_LOCATION);
// Create a second channel with the same addresses. Its PF instance
// should immediately pick the second subchannel, since it's already
// in READY state.
auto channel2 = BuildChannel("pick_first", args);
SetNextResolution(ports);
// Check that the channel reports READY without waiting for the
// initial backoff.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1 /* timeout_seconds */));
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args; ChannelArguments args;
constexpr int kInitialBackOffMs = 100; constexpr int kInitialBackOffMs = 100;
@ -899,7 +929,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
servers_[0]->service_.ResetCounters(); servers_[0]->service_.ResetCounters();
// Shutdown one of the servers to be sent in the update. // Shutdown one of the servers to be sent in the update.
servers_[1]->Shutdown(false); servers_[1]->Shutdown();
ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_); ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports); SetNextResolution(ports);
@ -958,7 +988,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Kill all servers // Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******"); gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(true); servers_[i]->Shutdown();
} }
gpr_log(GPR_INFO, "****** SERVERS KILLED *******"); gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******"); gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
@ -1006,7 +1036,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
} }
const auto pre_death = servers_[0]->service_.request_count(); const auto pre_death = servers_[0]->service_.request_count();
// Kill the first server. // Kill the first server.
servers_[0]->Shutdown(true); servers_[0]->Shutdown();
// Client request still succeed. May need retrying if RR had returned a pick // Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity. // before noticing the change in the server's connectivity.
while (!SendRpc(stub)) { while (!SendRpc(stub)) {

Loading…
Cancel
Save