Removed in_connectivity_shutdown from RR

pull/12303/head
David Garcia Quintas 8 years ago
parent f769bf1273
commit 52ba957f83
  1. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 77
      test/cpp/end2end/client_lb_end2end_test.cc

@ -74,9 +74,6 @@ typedef struct round_robin_lb_policy {
bool started_picking;
/** are we shutting down? */
bool shutdown;
/** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
* service after this point, the policy will never transition out. */
bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@ -424,7 +421,6 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
@ -537,7 +533,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
p->in_connectivity_shutdown = true;
p->shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */

@ -85,7 +85,8 @@ class MyTestServiceImpl : public TestServiceImpl {
class ClientLbEnd2endTest : public ::testing::Test {
protected:
ClientLbEnd2endTest() : server_host_("localhost") {}
ClientLbEnd2endTest()
: server_host_("localhost"), kRequestMessage_("Live long and prosper.") {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@ -139,6 +140,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000);
std::ostringstream uri;
uri << "fake:///";
for (size_t i = 0; i < servers_.size() - 1; ++i) {
@ -150,18 +152,27 @@ class ClientLbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
void SendRpc(bool expect_ok = true) {
Status SendRpc(EchoResponse* response = nullptr) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
EchoResponse response;
request.set_message("Live long and prosper.");
request.set_message(kRequestMessage_);
ClientContext context;
Status status = stub_->Echo(&context, request, &response);
if (expect_ok) {
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), request.message());
} else {
EXPECT_FALSE(status.ok());
}
Status status = stub_->Echo(&context, request, response);
if (local_response) delete response;
return status;
}
void CheckRpcSendOk() {
EchoResponse response;
const Status status = SendRpc(&response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), kRequestMessage_);
}
void CheckRpcSendFailure() {
const Status status = SendRpc();
EXPECT_FALSE(status.ok());
}
struct ServerData {
@ -207,7 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
void WaitForServer(size_t server_idx) {
do {
SendRpc();
CheckRpcSendOk();
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
@ -217,6 +228,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
grpc_fake_resolver_response_generator* response_generator_;
const grpc::string kRequestMessage_;
};
TEST_F(ClientLbEnd2endTest, PickFirst) {
@ -230,7 +242,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
SendRpc();
CheckRpcSendOk();
}
// All requests should have gone to a single server.
bool found = false;
@ -258,7 +270,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
SendRpc();
CheckRpcSendOk();
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
@ -304,7 +316,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
SendRpc();
CheckRpcSendOk();
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
servers_[0]->service_.ResetCounters();
@ -314,7 +326,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
SendRpc();
CheckRpcSendOk();
// We stick to the previously connected server.
WaitForServer(0);
EXPECT_EQ(0, servers_[1]->service_.request_count());
@ -338,7 +350,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
for (size_t i = 0; i < 1000; ++i) {
std::random_shuffle(ports.begin(), ports.end());
SetNextResolution(ports);
if (i % 10 == 0) SendRpc();
if (i % 10 == 0) CheckRpcSendOk();
}
}
// Check LB policy name for the channel.
@ -356,7 +368,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
SendRpc();
CheckRpcSendOk();
}
// One request should have gone to each server.
for (size_t i = 0; i < servers_.size(); ++i) {
@ -378,7 +390,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
WaitForServer(0);
// Send RPCs. They should all go servers_[0]
for (size_t i = 0; i < 10; ++i) SendRpc();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -394,7 +406,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(0, servers_[1]->service_.request_count());
WaitForServer(1);
for (size_t i = 0; i < 10; ++i) SendRpc();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -406,7 +418,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
WaitForServer(2);
for (size_t i = 0; i < 10; ++i) SendRpc();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
@ -423,7 +435,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
WaitForServer(2);
// Send three RPCs, one per server.
for (size_t i = 0; i < 3; ++i) SendRpc();
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk();
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
@ -493,7 +505,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
for (size_t i = 0; i < 1000; ++i) {
std::random_shuffle(ports.begin(), ports.end());
SetNextResolution(ports);
if (i % 10 == 0) SendRpc();
if (i % 10 == 0) CheckRpcSendOk();
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
@ -504,11 +516,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
// update provisions of RR.
}
TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Start servers and send one RPC per server.
const int kNumServers = 1;
const int kNumServers = 3;
std::vector<int> ports;
ports.push_back(grpc_pick_unused_port_or_die());
for (int i = 0; i < kNumServers; ++i) {
ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
@ -517,24 +531,19 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) {
SendRpc();
CheckRpcSendOk();
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(false);
}
// Client request should fail.
SendRpc(false);
CheckRpcSendFailure();
// Bring servers back up on the same port (we aren't recreating the channel).
StartServers(kNumServers, ports);
// Client request should succeed.
SendRpc();
CheckRpcSendOk();
}
} // namespace

Loading…
Cancel
Save