Merge pull request #11591 from dgquintas/rr_state_fix_1.4

Fix RR policy connectivity state upon subchannels shutdown (1.4.x branch)
pull/11640/head
David G. Quintas 8 years ago committed by GitHub
commit 52e68e9383
  1. 23
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 51
      test/cpp/end2end/round_robin_end2end_test.cc

@ -138,6 +138,8 @@ struct round_robin_lb_policy {
size_t num_ready; size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */ /** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures; size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */ /** how many subchannels are in state IDLE */
size_t num_idle; size_t num_idle;
@ -381,6 +383,8 @@ static void update_state_counters_locked(subchannel_data *sd) {
++p->num_ready; ++p->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++p->num_transient_failures; ++p->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
++p->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++p->num_idle; ++p->num_idle;
} }
@ -401,7 +405,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING.
* *
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
* CHECK: p->num_subchannels = 0. * CHECK: p->num_shutdown == p->num_subchannels.
* *
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE. * TRANSIENT_FAILURE.
@ -411,34 +415,35 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: p->num_idle == p->num_subchannels. * CHECK: p->num_idle == p->num_subchannels.
*/ */
round_robin_lb_policy *p = sd->policy; round_robin_lb_policy *p = sd->policy;
grpc_connectivity_state new_state = sd->curr_connectivity_state;
if (p->num_ready > 0) { /* 1) READY */ if (p->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready"); GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY; new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state == } else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting"); "rr_connecting");
return GRPC_CHANNEL_CONNECTING; new_state = GRPC_CHANNEL_CONNECTING;
} else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ } else if (p->num_shutdown == p->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown"); "rr_shutdown");
return GRPC_CHANNEL_SHUTDOWN; new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (p->num_transient_failures == } else if (p->num_transient_failures ==
p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure"); GRPC_ERROR_REF(error), "rr_transient_failure");
return GRPC_CHANNEL_TRANSIENT_FAILURE; new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle"); GRPC_ERROR_NONE, "rr_idle");
return GRPC_CHANNEL_IDLE; new_state = GRPC_CHANNEL_IDLE;
} }
/* no change */ GRPC_ERROR_UNREF(error);
return sd->curr_connectivity_state; return new_state;
} }
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,

@ -88,9 +88,12 @@ class RoundRobinEnd2endTest : public ::testing::Test {
protected: protected:
RoundRobinEnd2endTest() : server_host_("localhost") {} RoundRobinEnd2endTest() : server_host_("localhost") {}
void StartServers(int num_servers) { void StartServers(size_t num_servers,
for (int i = 0; i < num_servers; ++i) { std::vector<int> ports = std::vector<int>()) {
servers_.emplace_back(new ServerData(server_host_)); for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
servers_.emplace_back(new ServerData(server_host_, port));
} }
} }
@ -114,15 +117,19 @@ class RoundRobinEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_); stub_ = grpc::testing::EchoTestService::NewStub(channel_);
} }
void SendRpc(int num_rpcs) { void SendRpc(int num_rpcs, bool expect_ok = true) {
EchoRequest request; EchoRequest request;
EchoResponse response; EchoResponse response;
request.set_message("Live long and prosper."); request.set_message("Live long and prosper.");
for (int i = 0; i < num_rpcs; i++) { for (int i = 0; i < num_rpcs; i++) {
ClientContext context; ClientContext context;
Status status = stub_->Echo(&context, request, &response); Status status = stub_->Echo(&context, request, &response);
if (expect_ok) {
EXPECT_TRUE(status.ok()); EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), request.message()); EXPECT_EQ(response.message(), request.message());
} else {
EXPECT_FALSE(status.ok());
}
} }
} }
@ -131,8 +138,8 @@ class RoundRobinEnd2endTest : public ::testing::Test {
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
MyTestServiceImpl service_; MyTestServiceImpl service_;
explicit ServerData(const grpc::string& server_host) { explicit ServerData(const grpc::string& server_host, int port = 0) {
port_ = grpc_pick_unused_port_or_die(); port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting server on port %d", port_); gpr_log(GPR_INFO, "starting server on port %d", port_);
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
@ -191,6 +198,38 @@ TEST_F(RoundRobinEnd2endTest, RoundRobin) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
} }
TEST_F(RoundRobinEnd2endTest, RoundRobinReconnect) {
// Start servers and send one RPC per server.
const int kNumServers = 1;
std::vector<int> ports;
ports.push_back(grpc_pick_unused_port_or_die());
StartServers(kNumServers, ports);
ResetStub(true /* round_robin */);
// Send one RPC per backend and make sure they are used in order.
// Note: This relies on the fact that the subchannels are reported in
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) {
SendRpc(1);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown();
}
// Client request should fail.
SendRpc(1, false);
// Bring servers back up on the same port (we aren't recreating the channel).
StartServers(kNumServers, ports);
// Client request should succeed.
SendRpc(1);
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save