Change RR to consider a subchannel in TF if it has failed since it was READY.

pull/20245/head
Mark D. Roth 5 years ago
parent 1bfdbc1f6c
commit 01eeec72c6
  1. 28
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  2. 89
      test/cpp/end2end/client_lb_end2end_test.cc

@ -88,14 +88,21 @@ class RoundRobin : public LoadBalancingPolicy {
return last_connectivity_state_; return last_connectivity_state_;
} }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
// Performs connectivity state updates that need to be done both when we
// first start watching and when a watcher notification is received.
void UpdateConnectivityStateLocked( void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state); grpc_connectivity_state connectivity_state);
private: private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked( void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override; grpc_connectivity_state connectivity_state) override;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
}; };
// A list of subchannels. // A list of subchannels.
@ -375,8 +382,25 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state_name(last_connectivity_state_), grpc_connectivity_state_name(last_connectivity_state_),
grpc_connectivity_state_name(connectivity_state)); grpc_connectivity_state_name(connectivity_state));
} }
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, // Decide what state to report for aggregation purposes.
connectivity_state); // If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent
// state changes until we go back into state READY.
if (!seen_failure_since_ready_) {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_failure_since_ready_ = true;
}
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
} else {
if (connectivity_state == GRPC_CHANNEL_READY) {
seen_failure_since_ready_ = false;
subchannel_list()->UpdateStateCountersLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
}
}
// Record last seen connectivity state.
last_connectivity_state_ = connectivity_state; last_connectivity_state_ = connectivity_state;
} }

@ -399,26 +399,31 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters(); ResetCounters();
} }
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { bool WaitForChannelState(
Channel* channel, std::function<bool(grpc_connectivity_state)> predicate,
bool try_to_connect = false, int timeout_seconds = 5) {
const gpr_timespec deadline = const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds); grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state; while (true) {
while ((state = channel->GetState(false /* try_to_connect */)) == grpc_connectivity_state state = channel->GetState(try_to_connect);
GRPC_CHANNEL_READY) { if (predicate(state)) break;
if (!channel->WaitForStateChange(state, deadline)) return false; if (!channel->WaitForStateChange(state, deadline)) return false;
} }
return true; return true;
} }
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
auto predicate = [](grpc_connectivity_state state) {
return state != GRPC_CHANNEL_READY;
};
return WaitForChannelState(channel, predicate, false, timeout_seconds);
}
bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) { bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline = auto predicate = [](grpc_connectivity_state state) {
grpc_timeout_seconds_to_deadline(timeout_seconds); return state == GRPC_CHANNEL_READY;
grpc_connectivity_state state; };
while ((state = channel->GetState(true /* try_to_connect */)) != return WaitForChannelState(channel, predicate, true, timeout_seconds);
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
} }
bool SeenAllServers() { bool SeenAllServers() {
@ -1176,7 +1181,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
auto channel = BuildChannel("round_robin", response_generator); auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel); auto stub = BuildStub(channel);
std::vector<int> ports; std::vector<int> ports;
// Start with a single server. // Start with a single server.
ports.emplace_back(servers_[0]->port_); ports.emplace_back(servers_[0]->port_);
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
@ -1187,7 +1191,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[0]->service_.ResetCounters(); servers_[0]->service_.ResetCounters();
// Shutdown one of the servers to be sent in the update. // Shutdown one of the servers to be sent in the update.
servers_[1]->Shutdown(); servers_[1]->Shutdown();
ports.emplace_back(servers_[1]->port_); ports.emplace_back(servers_[1]->port_);
@ -1195,7 +1198,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
response_generator.SetNextResolution(ports); response_generator.SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server. // Send three RPCs, one per server.
for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub); for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
// The server in shutdown shouldn't receive any. // The server in shutdown shouldn't receive any.
@ -1279,6 +1281,63 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
ASSERT_GT(gpr_time_cmp(deadline, now), 0); ASSERT_GT(gpr_time_cmp(deadline, now), 0);
} }
TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailure) {
// Start servers and create channel. Channel should go to READY state.
const int kNumServers = 3;
StartServers(kNumServers);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution(GetServersPorts());
EXPECT_TRUE(WaitForChannelReady(channel.get()));
// Now kill the servers. The channel should transition to TRANSIENT_FAILURE.
// TODO(roth): This test should ideally check that even when the
// subchannels are in state CONNECTING for an extended period of time,
// we will still report TRANSIENT_FAILURE. Unfortunately, we don't
// currently have a good way to get a subchannel to report CONNECTING
// for a long period of time, since the servers in this test framework
// are on the loopback interface, which will immediately return a
// "Connection refused" error, so the subchannels will only be in
// CONNECTING state very briefly. When we have time, see if we can
// find a way to fix this.
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown();
}
auto predicate = [](grpc_connectivity_state state) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate));
}
TEST_F(ClientLbEnd2endTest, RoundRobinTransientFailureAtStartup) {
// Create channel and return servers that don't exist. Channel should
// quickly transition into TRANSIENT_FAILURE.
// TODO(roth): This test should ideally check that even when the
// subchannels are in state CONNECTING for an extended period of time,
// we will still report TRANSIENT_FAILURE. Unfortunately, we don't
// currently have a good way to get a subchannel to report CONNECTING
// for a long period of time, since the servers in this test framework
// are on the loopback interface, which will immediately return a
// "Connection refused" error, so the subchannels will only be in
// CONNECTING state very briefly. When we have time, see if we can
// find a way to fix this.
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
response_generator.SetNextResolution({
grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die(),
});
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown();
}
auto predicate = [](grpc_connectivity_state state) {
return state == GRPC_CHANNEL_TRANSIENT_FAILURE;
};
EXPECT_TRUE(WaitForChannelState(channel.get(), predicate, true));
}
TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);

Loading…
Cancel
Save