From 01eeec72c638f2ba6349be1bebe0243aa845780f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 12 Sep 2019 11:12:08 -0700 Subject: [PATCH] Change RR to consider a subchannel in TF if it has failed since it was READY. --- .../lb_policy/round_robin/round_robin.cc | 28 +++++- test/cpp/end2end/client_lb_end2end_test.cc | 89 +++++++++++++++---- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 04308ee254c..5f69a657b61 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -88,14 +88,21 @@ class RoundRobin : public LoadBalancingPolicy { return last_connectivity_state_; } + bool seen_failure_since_ready() const { return seen_failure_since_ready_; } + + // Performs connectivity state updates that need to be done both when we + // first start watching and when a watcher notification is received. void UpdateConnectivityStateLocked( grpc_connectivity_state connectivity_state); private: + // Performs connectivity state updates that need to be done only + // after we have started watching. void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; + bool seen_failure_since_ready_ = false; }; // A list of subchannels. @@ -375,8 +382,25 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( grpc_connectivity_state_name(last_connectivity_state_), grpc_connectivity_state_name(connectivity_state)); } - subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, - connectivity_state); + // Decide what state to report for aggregation purposes. + // If we haven't seen a failure since the last time we were in state + // READY, then we report the state change as-is. However, once we do see + // a failure, we report TRANSIENT_FAILURE and do not report any subsequent + // state changes until we go back into state READY. + if (!seen_failure_since_ready_) { + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + seen_failure_since_ready_ = true; + } + subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, + connectivity_state); + } else { + if (connectivity_state == GRPC_CHANNEL_READY) { + seen_failure_since_ready_ = false; + subchannel_list()->UpdateStateCountersLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state); + } + } + // Record last seen connectivity state. last_connectivity_state_ = connectivity_state; } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e0b4b074ff2..ab543511bb7 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -399,26 +399,31 @@ class ClientLbEnd2endTest : public ::testing::Test { ResetCounters(); } - bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + bool WaitForChannelState( + Channel* channel, std::function predicate, + bool try_to_connect = false, int timeout_seconds = 5) { const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(timeout_seconds); - grpc_connectivity_state state; - while ((state = channel->GetState(false /* try_to_connect */)) == - GRPC_CHANNEL_READY) { + while (true) { + grpc_connectivity_state state = channel->GetState(try_to_connect); + if (predicate(state)) break; if (!channel->WaitForStateChange(state, deadline)) return false; } return true; } + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + auto predicate = [](grpc_connectivity_state state) { + return state != GRPC_CHANNEL_READY; + }; + return WaitForChannelState(channel, predicate, false, timeout_seconds); + } + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) { - const gpr_timespec deadline = - grpc_timeout_seconds_to_deadline(timeout_seconds); - grpc_connectivity_state state; - while ((state = channel->GetState(true /* try_to_connect */)) != - GRPC_CHANNEL_READY) { - if (!channel->WaitForStateChange(state, deadline)) return false; - } - return true; + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_READY; + }; + return WaitForChannelState(channel, predicate, true, timeout_seconds); } bool SeenAllServers() { @@ -1176,7 +1181,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { auto channel = BuildChannel("round_robin", response_generator); auto stub = BuildStub(channel); std::vector ports; - // Start with a single server. ports.emplace_back(servers_[0]->port_); response_generator.SetNextResolution(ports); @@ -1187,7 +1191,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); servers_[0]->service_.ResetCounters(); - // Shutdown one of the servers to be sent in the update. servers_[1]->Shutdown(); ports.emplace_back(servers_[1]->port_); @@ -1195,7 +1198,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { response_generator.SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION); - // Send three RPCs, one per server. for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub); // The server in shutdown shouldn't receive any. @@ -1279,6 +1281,63 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { ASSERT_GT(gpr_time_cmp(deadline, now), 0); } +TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) { + // Start servers and create channel. Channel should go to READY state. + const int kNumServers = 3; + StartServers(kNumServers); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + EXPECT_TRUE(WaitForChannelReady(channel.get())); + // Now kill the servers. The channel should transition to TRANSIENT_FAILURE. + // TODO(roth): This test should ideally check that even when the + // subchannels are in state CONNECTING for an extended period of time, + // we will still report TRANSIENT_FAILURE. Unfortunately, we don't + // currently have a good way to get a subchannel to report CONNECTING + // for a long period of time, since the servers in this test framework + // are on the loopback interface, which will immediately return a + // "Connection refused" error, so the subchannels will only be in + // CONNECTING state very briefly. When we have time, see if we can + // find a way to fix this. + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE(WaitForChannelState(channel.get(), predicate)); +} + +TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) { + // Create channel and return servers that don't exist. Channel should + // quickly transition into TRANSIENT_FAILURE. + // TODO(roth): This test should ideally check that even when the + // subchannels are in state CONNECTING for an extended period of time, + // we will still report TRANSIENT_FAILURE. Unfortunately, we don't + // currently have a good way to get a subchannel to report CONNECTING + // for a long period of time, since the servers in this test framework + // are on the loopback interface, which will immediately return a + // "Connection refused" error, so the subchannels will only be in + // CONNECTING state very briefly. When we have time, see if we can + // find a way to fix this. + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + response_generator.SetNextResolution({ + grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die(), + grpc_pick_unused_port_or_die(), + }); + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + auto predicate = [](grpc_connectivity_state state) { + return state == GRPC_CHANNEL_TRANSIENT_FAILURE; + }; + EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true)); +} + TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { const int kNumServers = 3; StartServers(kNumServers);