Merge pull request #14170 from dgquintas/fake_resolver_dont_push_rr_reresolve2

Make RR re-resolve when any of its subchannels fail.
pull/14190/head
David G. Quintas 7 years ago committed by GitHub
commit c8f572b443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 82
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  2. 16
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
  3. 2
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  4. 238
      test/cpp/end2end/client_lb_end2end_test.cc

@ -328,18 +328,11 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
* re-resolution).
* CHECK: subchannel_list->num_shutdown ==
* subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_shutdown +
* subchannel_list->num_transient_failures ==
* 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
// TODO(juanlishen): For rule 4, we may want to re-resolve instead.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
@ -351,22 +344,12 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
/* 2) CONNECTING */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
} else if (subchannel_list->num_shutdown ==
} else if (subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 3) IDLE and re-resolve */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"rr_exhausted_subchannels+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
} else if (subchannel_list->num_shutdown +
subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
/* 3) TRANSIENT_FAILURE */
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_exhausted_subchannels");
}
GRPC_ERROR_UNREF(error);
}
@ -387,6 +370,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
p->shutdown, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
GPR_ASSERT(sd->subchannel != nullptr);
// If the policy is shutting down, unref and return.
if (p->shutdown) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
@ -412,14 +396,19 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
sd->connected_subchannel.reset();
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, sd->subchannel);
}
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
break;
}
case GRPC_CHANNEL_READY: {
@ -442,8 +431,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
(void*)p, (void*)p->subchannel_list, num_subchannels,
(void*)sd->subchannel_list, num_subchannels);
p, p->subchannel_list, num_subchannels, sd->subchannel_list,
num_subchannels);
}
if (p->subchannel_list != nullptr) {
// dispose of the current subchannel_list
@ -455,7 +444,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemptively replicates rr_pick()'s actions. */
* p->pending_picks. This preemptively replicates rr_pick()'s actions.
*/
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@ -488,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
// Update state counters and new overall state.
update_state_counters_locked(sd);
// Only update connectivity based on the selected subchannel list.
if (sd->subchannel_list == p->subchannel_list) {
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
@ -562,6 +558,30 @@ static void rr_update_locked(grpc_lb_policy* policy,
return;
}
if (p->started_picking) {
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
const grpc_connectivity_state subchannel_state =
grpc_subchannel_check_connectivity(
subchannel_list->subchannels[i].subchannel, nullptr);
// Override the default setting of IDLE for connectivity notification
// purposes if the subchannel is already in transient failure. Otherwise
// we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
// discrepancy, attempt to re-resolve and end up here again.
// TODO(roth): As part of C++-ifying the subchannel_list API, design a
// better API for notifying the LB policy of subchannel states, which can
// be used both for the subchannel's initial state and for subsequent
// state changes. This will allow us to handle this more generally instead
// of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
// pending picks across all READY subchannels rather than sending them all
// to the first one).
if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
subchannel_list->subchannels[i].curr_connectivity_state =
subchannel_list->subchannels[i].prev_connectivity_state =
subchannel_state;
--subchannel_list->num_idle;
++subchannel_list->num_transient_failures;
}
}
if (p->latest_pending_subchannel_list != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,

@ -54,13 +54,15 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
void grpc_lb_subchannel_data_start_connectivity_watch(
grpc_lb_subchannel_data* sd) {
if (sd->subchannel_list->tracer->enabled()) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): requesting connectivity change notification",
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
sd->subchannel_list,
(size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
gpr_log(
GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): requesting connectivity change "
"notification (from %s)",
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel,
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe));
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(

@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
/** how many subchannels are in state SHUTDOWN */
size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;

@ -39,6 +39,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@ -162,44 +163,82 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_destroy(addresses);
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(ports.size(), nullptr);
for (size_t i = 0; i < ports.size(); ++i) {
char* lb_uri_str;
gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
false /* is balancer */,
"" /* balancer name */, nullptr);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
const grpc_arg fake_addresses =
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args* fake_result =
grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator_, fake_result);
grpc_channel_args_destroy(fake_result);
grpc_lb_addresses_destroy(addresses);
}
std::vector<int> GetServersPorts() {
std::vector<int> ports;
for (const auto& server : servers_) ports.push_back(server->port_);
return ports;
}
void ResetStub(const grpc::string& lb_policy_name,
ChannelArguments args = ChannelArguments()) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
const std::shared_ptr<Channel>& channel) {
return grpc::testing::EchoTestService::NewStub(channel);
}
std::shared_ptr<Channel> BuildChannel(
const grpc::string& lb_policy_name,
ChannelArguments args = ChannelArguments()) {
if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
channel_ =
CreateCustomChannel("fake:///", InsecureChannelCredentials(), args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
return CreateCustomChannel("fake:///", InsecureChannelCredentials(), args);
}
bool SendRpc(EchoResponse* response = nullptr) {
bool SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
EchoResponse* response = nullptr, int timeout_ms = 1000) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
Status status = stub_->Echo(&context, request, response);
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
Status status = stub->Echo(&context, request, response);
if (local_response) delete response;
return status.ok();
}
void CheckRpcSendOk() {
void CheckRpcSendOk(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
const grpc_core::DebugLocation& location) {
EchoResponse response;
const bool success = SendRpc(&response);
EXPECT_TRUE(success);
EXPECT_EQ(response.message(), kRequestMessage_);
const bool success = SendRpc(stub, &response);
if (!success) abort();
ASSERT_TRUE(success) << "From " << location.file() << ":"
<< location.line();
ASSERT_EQ(response.message(), kRequestMessage_)
<< "From " << location.file() << ":" << location.line();
}
void CheckRpcSendFailure() {
const bool success = SendRpc();
void CheckRpcSendFailure(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
const bool success = SendRpc(stub);
EXPECT_FALSE(success);
}
@ -238,7 +277,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
void Shutdown(bool join = true) {
server_->Shutdown();
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
if (join) thread_->join();
}
};
@ -247,9 +286,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
for (const auto& server : servers_) server->service_.ResetCounters();
}
void WaitForServer(size_t server_idx) {
void WaitForServer(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_idx, const grpc_core::DebugLocation& location) {
do {
CheckRpcSendOk();
CheckRpcSendOk(stub, location);
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
@ -280,7 +321,6 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
const grpc::string server_host_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
grpc_fake_resolver_response_generator* response_generator_;
@ -291,14 +331,15 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub(""); // test that pick first is the default.
auto channel = BuildChannel(""); // test that pick first is the default.
auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// All requests should have gone to a single server.
bool found = false;
@ -312,7 +353,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
}
EXPECT_TRUE(found);
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
@ -321,15 +362,16 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
ResetStub("pick_first", args);
auto channel = BuildChannel("pick_first", args);
auto stub = BuildStub(channel);
SetNextResolution(ports);
// The channel won't become connected (there's no server).
ASSERT_FALSE(channel_->WaitForConnected(
ASSERT_FALSE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
// Bring up a server on the chosen port.
StartServers(1, ports);
// Now it will.
ASSERT_TRUE(channel_->WaitForConnected(
ASSERT_TRUE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
@ -349,14 +391,15 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
ResetStub("pick_first", args);
auto channel = BuildChannel("pick_first", args);
auto stub = BuildStub(channel);
SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
grpc_tcp_client_connect_impl = tcp_client_connect_with_delay;
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
channel_->WaitForConnected(
channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
@ -371,14 +414,16 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("pick_first");
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
@ -387,7 +432,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
@ -397,7 +442,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [1] *******");
WaitForServer(1);
WaitForServer(stub, 1, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
// And again for servers_[2]
@ -405,26 +450,28 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [2] *******");
WaitForServer(2);
WaitForServer(stub, 2, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
EXPECT_EQ(servers_[1]->service_.request_count(), 0);
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("pick_first");
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
servers_[0]->service_.ResetCounters();
@ -434,20 +481,21 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
// We stick to the previously connected server.
WaitForServer(0);
WaitForServer(stub, 0, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[1]->service_.request_count());
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("pick_first");
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@ -459,18 +507,19 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
SetNextResolution(ports);
if (i % 10 == 0) CheckRpcSendOk();
if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
}
}
// Check LB policy name for the channel.
EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
std::vector<int> ports;
for (const auto& server : servers_) {
ports.emplace_back(server->port_);
@ -478,15 +527,15 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
SetNextResolution(ports);
// Wait until all backends are ready.
do {
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
} while (!SeenAllServers());
ResetCounters();
// "Sync" to the end of the list. Next sequence of picks will start at the
// first server (index 0).
WaitForServer(servers_.size() - 1);
WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
UpdateConnectionOrder(servers_, &connection_order);
}
// Backends should be iterated over in the order in which the addresses were
@ -494,22 +543,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
const auto expected = std::vector<int>{0, 1, 2};
EXPECT_EQ(expected, connection_order);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
WaitForServer(0);
WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go servers_[0]
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -523,9 +573,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0, servers_[1]->service_.request_count());
WaitForServer(1);
WaitForServer(stub, 1, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -535,9 +585,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.clear();
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
WaitForServer(2);
WaitForServer(stub, 2, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
@ -549,12 +599,12 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
WaitForServer(0);
WaitForServer(1);
WaitForServer(2);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server.
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk();
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
@ -564,7 +614,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
@ -573,26 +623,27 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.clear();
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
WaitForServer(1);
channel_state = channel_->GetState(false /* try to connect */);
WaitForServer(stub, 1, DEBUG_LOCATION);
channel_state = channel->GetState(false /* try to connect */);
GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
WaitForServer(0);
WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go to servers_[0]
for (size_t i = 0; i < 10; ++i) SendRpc();
for (size_t i = 0; i < 10; ++i) SendRpc(stub);
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@ -603,11 +654,11 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
WaitForServer(0);
WaitForServer(2);
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server.
for (size_t i = 0; i < kNumServers; ++i) SendRpc();
for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
// The server in shutdown shouldn't receive any.
EXPECT_EQ(0, servers_[1]->service_.request_count());
}
@ -616,7 +667,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
ResetStub("round_robin");
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@ -625,10 +677,10 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
SetNextResolution(ports);
if (i % 10 == 0) CheckRpcSendOk();
if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
@ -639,16 +691,21 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
std::vector<int> ports;
std::vector<int> first_ports;
std::vector<int> second_ports;
for (int i = 0; i < kNumServers; ++i) {
ports.push_back(grpc_pick_unused_port_or_die());
first_ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
for (int i = 0; i < kNumServers; ++i) {
second_ports.push_back(grpc_pick_unused_port_or_die());
}
StartServers(kNumServers, first_ports);
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
SetNextResolution(first_ports);
// Send a number of RPCs, which succeed.
for (size_t i = 0; i < 100; ++i) {
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
@ -658,18 +715,25 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
// Client requests should fail. Send enough to tickle all subchannels.
for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure();
for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
// Bring servers back up on the same port (we aren't recreating the channel).
// Bring servers back up on a different set of ports. We need to do this to be
// sure that the eventual success is *not* due to subchannel reconnection
// attempts and that an actual re-resolution has happened as a result of the
// RR policy going into transient failure when all its subchannels become
// unavailable (in transient failure as well).
gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
StartServers(kNumServers, ports);
StartServers(kNumServers, second_ports);
// Don't notify of the update. Wait for the LB policy's re-resolution to
// "pull" the new ports.
SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.
const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
while (gpr_time_cmp(deadline, now) > 0) {
if (SendRpc()) break;
if (SendRpc(stub)) break;
now = gpr_now(GPR_CLOCK_MONOTONIC);
}
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
@ -679,11 +743,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
const int kNumServers = 3;
StartServers(kNumServers);
const auto ports = GetServersPorts();
ResetStub("round_robin");
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
SetNextResolution(ports);
for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i);
for (size_t i = 0; i < kNumServers; ++i)
WaitForServer(stub, i, DEBUG_LOCATION);
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk();
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// One request should have gone to each server.
@ -695,10 +761,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
servers_[0]->Shutdown(true);
// Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity.
while (!SendRpc())
while (!SendRpc(stub)) {
; // Retry until success.
}
gpr_log(GPR_INFO, "------------------------------------------------------");
// Send a bunch of RPCs that should succeed.
for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk();
for (int i = 0; i < 10 * kNumServers; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
const auto post_death = servers_[0]->service_.request_count();
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
@ -708,7 +778,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before
// the server was fully back up).
WaitForServer(0);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
} // namespace

Loading…
Cancel
Save