xds_e2e_test: clean up WaitForAllBackends() and add timeout (#28514)

* xds_e2e_test: clean up WaitForAllBackends() and add timeout

* fix drop tests

* restore resetting of backend counters

* fix BalancerRestart test

* fix StressTest

* clang-format
pull/28530/head
Mark D. Roth 3 years ago committed by GitHub
parent 95b313de82
commit bccd1c7c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 345
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -1090,26 +1090,30 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return true; return true;
} }
void SendRpcAndCount( // Sends num_rpcs RPCs, counting how many of them fail with a message
int* num_total, int* num_ok, int* num_failure, int* num_drops, // matching the specfied drop_error_message_prefix.
const RpcOptions& rpc_options = RpcOptions(), // Any failure with a non-matching message is a test failure.
const char* drop_error_message_prefix = "EDS-configured drop: ") { size_t SendRpcsAndCountFailuresWithMessage(
const Status status = SendRpc(rpc_options); size_t num_rpcs, const char* drop_error_message_prefix,
if (status.ok()) { const RpcOptions& rpc_options = RpcOptions()) {
++*num_ok; size_t num_failed = 0;
} else { for (size_t i = 0; i < num_rpcs; ++i) {
if (absl::StartsWith(status.error_message(), drop_error_message_prefix)) { Status status = SendRpc(rpc_options);
++*num_drops; if (!status.ok()) {
} else { EXPECT_THAT(status.error_message(),
++*num_failure; ::testing::StartsWith(drop_error_message_prefix))
<< "code=" << status.error_code()
<< " message=" << status.error_message();
++num_failed;
} }
} }
++*num_total; return num_failed;
} }
struct WaitForBackendOptions { struct WaitForBackendOptions {
bool reset_counters = true; bool reset_counters = true;
bool allow_failures = false; bool allow_failures = false;
int timeout_ms = 5000;
WaitForBackendOptions() {} WaitForBackendOptions() {}
@ -1122,48 +1126,46 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
allow_failures = enable; allow_failures = enable;
return *this; return *this;
} }
WaitForBackendOptions& set_timeout_ms(int ms) {
timeout_ms = ms;
return *this;
}
}; };
std::tuple<int, int, int> WaitForAllBackends( // Returns the total number of RPCs sent.
size_t WaitForAllBackends(
size_t start_index = 0, size_t stop_index = 0, size_t start_index = 0, size_t stop_index = 0,
const WaitForBackendOptions& wait_options = WaitForBackendOptions(), const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
const RpcOptions& rpc_options = RpcOptions()) { const RpcOptions& rpc_options = RpcOptions()) {
int num_ok = 0; size_t num_rpcs = 0;
int num_failure = 0; auto deadline = absl::Now() + (absl::Milliseconds(wait_options.timeout_ms) *
int num_drops = 0; grpc_test_slowdown_factor());
int num_total = 0; gpr_log(GPR_INFO,
gpr_log(GPR_INFO, "========= WAITING FOR All BACKEND %lu TO %lu ==========", "========= WAITING FOR BACKENDS [%" PRIuPTR ", %" PRIuPTR
static_cast<unsigned long>(start_index), ") ==========",
static_cast<unsigned long>(stop_index)); start_index, stop_index);
while (!SeenAllBackends(start_index, stop_index, rpc_options.service)) { while (!SeenAllBackends(start_index, stop_index, rpc_options.service)) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops, Status status = SendRpc(rpc_options);
rpc_options); if (!wait_options.allow_failures) {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
EXPECT_LE(absl::Now(), deadline);
if (absl::Now() >= deadline) break;
++num_rpcs;
} }
if (wait_options.reset_counters) ResetBackendCounters(); if (wait_options.reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "Backends up; sent %" PRIuPTR " warm up requests",
"Performed %d warm up requests against the backends. " num_rpcs);
"%d succeeded, %d failed, %d dropped.", return num_rpcs;
num_total, num_ok, num_failure, num_drops);
if (!wait_options.allow_failures) EXPECT_EQ(num_failure, 0);
return std::make_tuple(num_ok, num_failure, num_drops);
} }
void WaitForBackend( void WaitForBackend(
size_t backend_idx, size_t backend_idx,
const WaitForBackendOptions& wait_options = WaitForBackendOptions(), const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
const RpcOptions& rpc_options = RpcOptions()) { const RpcOptions& rpc_options = RpcOptions()) {
gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========", WaitForAllBackends(backend_idx, backend_idx + 1, wait_options, rpc_options);
static_cast<unsigned long>(backend_idx));
do {
Status status = SendRpc(rpc_options);
if (!wait_options.allow_failures) {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
} while (!SeenBackend(backend_idx, rpc_options.service));
if (wait_options.reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
static_cast<unsigned long>(backend_idx));
} }
grpc_core::ServerAddressList CreateAddressListFromPortList( grpc_core::ServerAddressList CreateAddressListFromPortList(
@ -2177,9 +2179,7 @@ TEST_P(XdsResolverOnlyTest, ChangeClusters) {
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config); new_route_config);
// Wait for all new backends to be used. // Wait for all new backends to be used.
std::tuple<int, int, int> counts = WaitForAllBackends(2, 4); WaitForAllBackends(2, 4);
// Make sure no RPCs failed in the transition.
EXPECT_EQ(0, std::get<1>(counts));
} }
// Tests that we go into TRANSIENT_FAILURE if the Cluster disappears. // Tests that we go into TRANSIENT_FAILURE if the Cluster disappears.
@ -2251,9 +2251,7 @@ TEST_P(XdsResolverOnlyTest, RestartsRequestsUponReconnection) {
->set_cluster(kNewClusterName); ->set_cluster(kNewClusterName);
balancer_->ads_service()->SetRdsResource(new_route_config); balancer_->ads_service()->SetRdsResource(new_route_config);
// Wait for all new backends to be used. // Wait for all new backends to be used.
std::tuple<int, int, int> counts = WaitForAllBackends(2, 4); WaitForAllBackends(2, 4);
// Make sure no RPCs failed in the transition.
EXPECT_EQ(0, std::get<1>(counts));
} }
TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) { TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) {
@ -11194,22 +11192,19 @@ TEST_P(LocalityMapTest, StressTest) {
args.locality_list.emplace_back(std::move(locality)); args.locality_list.emplace_back(std::move(locality));
} }
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// The second ADS response contains 1 locality, which contains backend 1.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(1, 2)},
});
std::thread delayed_resource_setter(
std::bind(&BasicTest::SetEdsResourceWithDelay, this, balancer_.get(),
BuildEdsResource(args), 60 * 1000));
// Wait until backend 0 is ready, before which kNumLocalities localities are // Wait until backend 0 is ready, before which kNumLocalities localities are
// received and handled by the xds policy. // received and handled by the xds policy.
WaitForBackend(0, WaitForBackendOptions().set_reset_counters(false), WaitForBackend(0, WaitForBackendOptions().set_reset_counters(false),
RpcOptions().set_timeout_ms(kRpcTimeoutMs)); RpcOptions().set_timeout_ms(kRpcTimeoutMs));
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count()); EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
// The second ADS response contains 1 locality, which contains backend 1.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(1, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until backend 1 is ready, before which kNumLocalities localities are // Wait until backend 1 is ready, before which kNumLocalities localities are
// removed by the xds policy. // removed by the xds policy.
WaitForBackend(1); WaitForBackend(1);
delayed_resource_setter.join();
} }
// Tests that the localities in a locality map are picked correctly after // Tests that the localities in a locality map are picked correctly after
@ -11376,9 +11371,7 @@ TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
}); });
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for all backends to be used. // Wait for all backends to be used.
std::tuple<int, int, int> counts = WaitForAllBackends(); WaitForAllBackends();
// Make sure no RPCs failed in the transition.
EXPECT_EQ(0, std::get<1>(counts));
} }
// If the higher priority localities are not reachable, failover to the // If the higher priority localities are not reachable, failover to the
@ -11574,21 +11567,9 @@ TEST_P(DropTest, Vanilla) {
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}, args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}; {kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops. // Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0; size_t num_drops =
for (size_t i = 0; i < kNumRpcs; ++i) { SendRpcsAndCountFailuresWithMessage(kNumRpcs, "EDS-configured drop: ");
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
// The drop rate should be roughly equal to the expectation. // The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs; const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle, EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
@ -11608,21 +11589,9 @@ TEST_P(DropTest, DropPerHundred) {
args.drop_categories = {{kLbDropType, kDropPerHundredForLb}}; args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
args.drop_denominator = FractionalPercent::HUNDRED; args.drop_denominator = FractionalPercent::HUNDRED;
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops. // Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0; size_t num_drops =
for (size_t i = 0; i < kNumRpcs; ++i) { SendRpcsAndCountFailuresWithMessage(kNumRpcs, "EDS-configured drop: ");
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
// The drop rate should be roughly equal to the expectation. // The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs; const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, EXPECT_THAT(seen_drop_rate,
@ -11642,21 +11611,9 @@ TEST_P(DropTest, DropPerTenThousand) {
args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}}; args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
args.drop_denominator = FractionalPercent::TEN_THOUSAND; args.drop_denominator = FractionalPercent::TEN_THOUSAND;
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops. // Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0; size_t num_drops =
for (size_t i = 0; i < kNumRpcs; ++i) { SendRpcsAndCountFailuresWithMessage(kNumRpcs, "EDS-configured drop: ");
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
// The drop rate should be roughly equal to the expectation. // The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs; const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, EXPECT_THAT(seen_drop_rate,
@ -11682,22 +11639,10 @@ TEST_P(DropTest, Update) {
}); });
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}}; args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends();
// Send kNumRpcsLbOnly RPCs and count the drops. // Send kNumRpcsLbOnly RPCs and count the drops.
size_t num_drops = 0;
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
for (size_t i = 0; i < kNumRpcsLbOnly; ++i) { size_t num_drops = SendRpcsAndCountFailuresWithMessage(
EchoResponse response; kNumRpcsLbOnly, "EDS-configured drop: ");
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// The drop rate should be roughly equal to the expectation. // The drop rate should be roughly equal to the expectation.
double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly; double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
@ -11729,20 +11674,9 @@ TEST_P(DropTest, Update) {
seen_drop_rate = static_cast<double>(num_drops) / num_rpcs; seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
} }
// Send kNumRpcsBoth RPCs and count the drops. // Send kNumRpcsBoth RPCs and count the drops.
num_drops = 0;
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
for (size_t i = 0; i < kNumRpcsBoth; ++i) { num_drops = SendRpcsAndCountFailuresWithMessage(kNumRpcsBoth,
EchoResponse response; "EDS-configured drop: ");
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// The new drop rate should be roughly equal to the expectation. // The new drop rate should be roughly equal to the expectation.
seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth; seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
@ -11762,13 +11696,9 @@ TEST_P(DropTest, DropAll) {
{kThrottleDropType, kDropPerMillionForThrottle}}; {kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and all of them are dropped. // Send kNumRpcs RPCs and all of them are dropped.
for (size_t i = 0; i < kNumRpcs; ++i) { size_t num_drops =
EchoResponse response; SendRpcsAndCountFailuresWithMessage(kNumRpcs, "EDS-configured drop: ");
const Status status = SendRpc(RpcOptions(), &response); EXPECT_EQ(num_drops, kNumRpcs);
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(status.error_message(),
::testing::StartsWith("EDS-configured drop: "));
}
} }
class ClientLoadReportingTest : public XdsEnd2endTest { class ClientLoadReportingTest : public XdsEnd2endTest {
@ -11791,10 +11721,7 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
}); });
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready. // Wait until all backends are ready.
int num_ok = 0; size_t num_warmup_rpcs = WaitForAllBackends();
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
CheckRpcSendFailure(CheckRpcSendFailureOptions() CheckRpcSendFailure(CheckRpcSendFailureOptions()
@ -11810,13 +11737,13 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
balancer_->lrs_service()->WaitForLoadReport(); balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL); ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front(); ClientStats& client_stats = load_report.front();
EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_rpcs,
client_stats.total_successful_requests()); client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * num_backends_ + EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * num_backends_ +
num_ok + num_failure, num_warmup_rpcs,
client_stats.total_issued_requests()); client_stats.total_issued_requests());
EXPECT_EQ(kNumFailuresPerAddress * num_backends_ + num_failure, EXPECT_EQ(kNumFailuresPerAddress * num_backends_,
client_stats.total_error_requests()); client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests());
// The LRS service got a single request, and sent a single response. // The LRS service got a single request, and sent a single response.
@ -11836,10 +11763,7 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) {
}); });
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready. // Wait until all backends are ready.
int num_ok = 0; size_t num_warmup_rpcs = WaitForAllBackends();
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
CheckRpcSendFailure(CheckRpcSendFailureOptions() CheckRpcSendFailure(CheckRpcSendFailureOptions()
@ -11855,13 +11779,13 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) {
balancer_->lrs_service()->WaitForLoadReport(); balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL); ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front(); ClientStats& client_stats = load_report.front();
EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_rpcs,
client_stats.total_successful_requests()); client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * num_backends_ + EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * num_backends_ +
num_ok + num_failure, num_warmup_rpcs,
client_stats.total_issued_requests()); client_stats.total_issued_requests());
EXPECT_EQ(kNumFailuresPerAddress * num_backends_ + num_failure, EXPECT_EQ(kNumFailuresPerAddress * num_backends_,
client_stats.total_error_requests()); client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests());
// The LRS service got a single request, and sent a single response. // The LRS service got a single request, and sent a single response.
@ -11879,10 +11803,7 @@ TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
}); });
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready. // Wait until all backends are ready.
int num_ok = 0; WaitForAllBackends();
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests. // Each backend should have gotten 100 requests.
@ -11910,18 +11831,13 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
}); });
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends returned by the balancer are ready. // Wait until all backends returned by the balancer are ready.
int num_ok = 0; size_t num_rpcs = WaitForAllBackends(
int num_failure = 0; /*start_index=*/0, /*stop_index=*/kNumBackendsFirstPass);
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) =
WaitForAllBackends(/* start_index */ 0,
/* stop_index */ kNumBackendsFirstPass);
std::vector<ClientStats> load_report = std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport(); balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL); ASSERT_EQ(load_report.size(), 1UL);
ClientStats client_stats = std::move(load_report.front()); ClientStats client_stats = std::move(load_report.front());
EXPECT_EQ(static_cast<size_t>(num_ok), EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests()); EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests());
@ -11938,8 +11854,8 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
// subchannel list, which resets the start index randomly. So we need // subchannel list, which resets the start index randomly. So we need
// to be a little more permissive here to avoid spurious failures. // to be a little more permissive here to avoid spurious failures.
ResetBackendCounters(); ResetBackendCounters();
int num_started = std::get<0>(WaitForAllBackends( num_rpcs = WaitForAllBackends(/*start_index=*/0,
/* start_index */ 0, /* stop_index */ kNumBackendsFirstPass)); /*stop_index=*/kNumBackendsFirstPass);
// Now restart the balancer, this time pointing to the new backends. // Now restart the balancer, this time pointing to the new backends.
balancer_->Start(); balancer_->Start();
args = EdsResourceArgs({ args = EdsResourceArgs({
@ -11948,17 +11864,15 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for queries to start going to one of the new backends. // Wait for queries to start going to one of the new backends.
// This tells us that we're now using the new serverlist. // This tells us that we're now using the new serverlist.
std::tie(num_ok, num_failure, num_drops) = num_rpcs += WaitForAllBackends(/*start_index=*/kNumBackendsFirstPass);
WaitForAllBackends(/* start_index */ kNumBackendsFirstPass);
num_started += num_ok + num_failure + num_drops;
// Send one RPC per backend. // Send one RPC per backend.
CheckRpcSendOk(kNumBackendsSecondPass); CheckRpcSendOk(kNumBackendsSecondPass);
num_started += kNumBackendsSecondPass; num_rpcs += kNumBackendsSecondPass;
// Check client stats. // Check client stats.
load_report = balancer_->lrs_service()->WaitForLoadReport(); load_report = balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL); ASSERT_EQ(load_report.size(), 1UL);
client_stats = std::move(load_report.front()); client_stats = std::move(load_report.front());
EXPECT_EQ(num_started, client_stats.total_successful_requests()); EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests()); EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests());
@ -11988,10 +11902,7 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
kNewEdsServiceName); kNewEdsServiceName);
balancer_->ads_service()->SetCdsResource(new_cluster); balancer_->ads_service()->SetCdsResource(new_cluster);
// Wait for all backends to come online. // Wait for all backends to come online.
int num_ok = 0; size_t num_rpcs = WaitForAllBackends(0, 2);
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(0, 2);
// The load report received at the balancer should be correct. // The load report received at the balancer should be correct.
std::vector<ClientStats> load_report = std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport(); balancer_->lrs_service()->WaitForLoadReport();
@ -12006,18 +11917,17 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
::testing::AllOf( ::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_successful_requests, total_successful_requests,
num_ok), num_rpcs),
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress, total_requests_in_progress,
0UL), 0UL),
::testing::Field( ::testing::Field(
&ClientStats::LocalityStats::total_error_requests, &ClientStats::LocalityStats::total_error_requests,
num_failure), 0UL),
::testing::Field( ::testing::Field(
&ClientStats::LocalityStats::total_issued_requests, &ClientStats::LocalityStats::total_issued_requests,
num_failure + num_ok))))), num_rpcs))))),
::testing::Property(&ClientStats::total_dropped_requests, ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
num_drops))));
// Change RDS resource to point to new cluster. // Change RDS resource to point to new cluster.
RouteConfiguration new_route_config = default_route_config_; RouteConfiguration new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0) new_route_config.mutable_virtual_hosts(0)
@ -12027,7 +11937,7 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config); new_route_config);
// Wait for all new backends to be used. // Wait for all new backends to be used.
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(2, 4); num_rpcs = WaitForAllBackends(2, 4);
// The load report received at the balancer should be correct. // The load report received at the balancer should be correct.
load_report = balancer_->lrs_service()->WaitForLoadReport(); load_report = balancer_->lrs_service()->WaitForLoadReport();
EXPECT_THAT( EXPECT_THAT(
@ -12043,19 +11953,17 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
::testing::AllOf( ::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_successful_requests, total_successful_requests,
::testing::Lt(num_ok)), ::testing::Lt(num_rpcs)),
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress, total_requests_in_progress,
0UL), 0UL),
::testing::Field( ::testing::Field(
&ClientStats::LocalityStats::total_error_requests, &ClientStats::LocalityStats::total_error_requests,
::testing::Le(num_failure)), 0UL),
::testing::Field( ::testing::Field(&ClientStats::LocalityStats::
&ClientStats::LocalityStats:: total_issued_requests,
total_issued_requests, ::testing::Le(num_rpcs)))))),
::testing::Le(num_failure + num_ok)))))), ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
::testing::Property(&ClientStats::total_dropped_requests,
num_drops)),
::testing::AllOf( ::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kNewClusterName), ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
::testing::Property( ::testing::Property(
@ -12065,27 +11973,22 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
::testing::AllOf( ::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_successful_requests, total_successful_requests,
::testing::Le(num_ok)), ::testing::Le(num_rpcs)),
::testing::Field(&ClientStats::LocalityStats:: ::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress, total_requests_in_progress,
0UL), 0UL),
::testing::Field( ::testing::Field(
&ClientStats::LocalityStats::total_error_requests, &ClientStats::LocalityStats::total_error_requests,
::testing::Le(num_failure)), 0UL),
::testing::Field( ::testing::Field(&ClientStats::LocalityStats::
&ClientStats::LocalityStats:: total_issued_requests,
total_issued_requests, ::testing::Le(num_rpcs)))))),
::testing::Le(num_failure + num_ok)))))), ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
::testing::Property(&ClientStats::total_dropped_requests, size_t total_ok = 0;
num_drops))));
int total_ok = 0;
int total_failure = 0;
for (const ClientStats& client_stats : load_report) { for (const ClientStats& client_stats : load_report) {
total_ok += client_stats.total_successful_requests(); total_ok += client_stats.total_successful_requests();
total_failure += client_stats.total_error_requests();
} }
EXPECT_EQ(total_ok, num_ok); EXPECT_EQ(total_ok, num_rpcs);
EXPECT_EQ(total_failure, num_failure);
// The LRS service got a single request, and sent a single response. // The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
@ -12112,6 +12015,7 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle; kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
const size_t kNumRpcs = const size_t kNumRpcs =
ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance); ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
const char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
// The ADS response contains two drop categories. // The ADS response contains two drop categories.
EdsResourceArgs args({ EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()}, {"locality0", CreateEndpointsForBackends()},
@ -12119,30 +12023,14 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}, args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}; {kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
const size_t num_warmup = num_ok + num_failure + num_drops;
// Send kNumRpcs RPCs and count the drops. // Send kNumRpcs RPCs and count the drops.
for (size_t i = 0; i < kNumRpcs; ++i) { size_t num_drops =
EchoResponse response; SendRpcsAndCountFailuresWithMessage(kNumRpcs, kStatusMessageDropPrefix);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage);
}
}
// The drop rate should be roughly equal to the expectation. // The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs; const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle, EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
kErrorTolerance)); kErrorTolerance));
// Check client stats. // Check client stats.
const size_t total_rpc = num_warmup + kNumRpcs;
ClientStats client_stats; ClientStats client_stats;
do { do {
std::vector<ClientStats> load_reports = std::vector<ClientStats> load_reports =
@ -12152,14 +12040,14 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
} }
} while (client_stats.total_issued_requests() + } while (client_stats.total_issued_requests() +
client_stats.total_dropped_requests() < client_stats.total_dropped_requests() <
total_rpc); kNumRpcs);
EXPECT_EQ(num_drops, client_stats.total_dropped_requests()); EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) / EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
total_rpc, kNumRpcs,
::testing::DoubleNear(kDropRateForLb, kErrorTolerance)); ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
EXPECT_THAT( EXPECT_THAT(
static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) / static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
(total_rpc * (1 - kDropRateForLb)), (kNumRpcs * (1 - kDropRateForLb)),
::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance)); ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
} }
@ -12287,13 +12175,8 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageAbort) {
// Config fault injection via different setup // Config fault injection via different setup
SetFilterConfig(http_fault); SetFilterConfig(http_fault);
// Send kNumRpcs RPCs and count the aborts. // Send kNumRpcs RPCs and count the aborts.
int num_total = 0, num_ok = 0, num_failure = 0, num_aborted = 0; size_t num_aborted =
for (size_t i = 0; i < kNumRpcs; ++i) { SendRpcsAndCountFailuresWithMessage(kNumRpcs, "Fault injected");
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_aborted,
RpcOptions(), "Fault injected");
}
EXPECT_EQ(kNumRpcs, num_total);
EXPECT_EQ(0, num_failure);
// The abort rate should be roughly equal to the expectation. // The abort rate should be roughly equal to the expectation.
const double seen_abort_rate = static_cast<double>(num_aborted) / kNumRpcs; const double seen_abort_rate = static_cast<double>(num_aborted) / kNumRpcs;
EXPECT_THAT(seen_abort_rate, EXPECT_THAT(seen_abort_rate,
@ -12323,14 +12206,8 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageAbortViaHeaders) {
{"x-envoy-fault-abort-grpc-request", "10"}, {"x-envoy-fault-abort-grpc-request", "10"},
{"x-envoy-fault-abort-percentage", std::to_string(kAbortPercentage)}, {"x-envoy-fault-abort-percentage", std::to_string(kAbortPercentage)},
}; };
int num_total = 0, num_ok = 0, num_failure = 0, num_aborted = 0; size_t num_aborted = SendRpcsAndCountFailuresWithMessage(
RpcOptions options = RpcOptions().set_metadata(metadata); kNumRpcs, "Fault injected", RpcOptions().set_metadata(metadata));
for (size_t i = 0; i < kNumRpcs; ++i) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_aborted, options,
"Fault injected");
}
EXPECT_EQ(kNumRpcs, num_total);
EXPECT_EQ(0, num_failure);
// The abort rate should be roughly equal to the expectation. // The abort rate should be roughly equal to the expectation.
const double seen_abort_rate = static_cast<double>(num_aborted) / kNumRpcs; const double seen_abort_rate = static_cast<double>(num_aborted) / kNumRpcs;
EXPECT_THAT(seen_abort_rate, EXPECT_THAT(seen_abort_rate,

Loading…
Cancel
Save