The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#) https://grpc.io/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1743 lines
77 KiB

// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <numeric>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
[config] Move global config alongside core configuration (#30788) This is a big rewrite of global config. It does a few things, all somewhat intertwined: 1. centralize the list of configuration we have to a yaml file that can be parsed, and code generated from it 2. add an initialization and a reset stage so that config vars can be centrally accessed very quickly without the need for caching them 3. makes the syntax more C++ like (less macros!) 4. (optionally) adds absl flags to the OSS build This first round of changes is intended to keep the system where it is without major changes. We pick up absl flags to match internal code and remove one point of deviation - but importantly continue to read from the environment variables. In doing so we don't force absl flags on our customers - it's possible to configure grpc without the flags - but instead allow users that do use absl flags to configure grpc using that mechanism. Importantly this lets internal customers configure grpc the same everywhere. Future changes along this path will be two-fold: 1. Move documentation generation into the code generation step, so that within the source of truth yaml file we can find all documentation and data about a configuration knob - eliminating the chance of forgetting to document something in all the right places. 2. Provide fuzzing over configurations. Currently most config variables get stashed in static constants across the codebase. To fuzz over these we'd need a way to reset those cached values between fuzzing rounds, something that is terrifically difficult right now, but with these changes should simply be a reset on `ConfigVars`. <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. --> --------- Co-authored-by: ctiller <ctiller@users.noreply.github.com>
2 years ago
#include "src/core/lib/config/config_vars.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
namespace testing {
namespace {
using ::envoy::config::cluster::v3::CircuitBreakers;
using ::envoy::config::cluster::v3::RoutingPriority;
using ::envoy::config::core::v3::HealthStatus;
using ::envoy::type::v3::FractionalPercent;
using ClientStats = LrsServiceImpl::ClientStats;
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
//
// CDS tests
//
using CdsTest = XdsEnd2endTest;
INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
&XdsTestType::Name);
// Tests that CDS client should send an ACK upon correct CDS response.
TEST_P(CdsTest, Vanilla) {
(void)SendRpc();
auto response_state = balancer_->ads_service()->cds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
}
xDS cluster resource type: use ValidationErrors and add unit test (#31025) * XdsBootstrap: move two more methods out of the interface * Automated change: Fix sanity tests * XdsClient: add unit test * Automated change: Fix sanity tests * fix memory leaks * add helper method * add unsubscription * add test for multiple subscriptions * clang-format * fix build * fix flakiness * add checking for other node fields * add v2 test * add response builder * add test for update from server * add test for update containing only changed resources * clang-format * fix build * add test for resource not existing upon subscription * add test for stream closed by server * add test for multiple watchers for the same resource * add test for connection failure * clang-format * add test for resources wrapped in Resource wrapper message * add test for resource validation failure * add test for multiple invalid resources, and fix a case in XdsClient * add test for validation failure for already-cached resource * add test for server not resending resources after stream disconnect * clang-format * fix XdsClient to report channel errors to newly started watchers * fix XdsClient to send cached errors/does-not-exists to newly started watchers * fix watcher to ensure events arrive in the expected order * fix tests * clang-format * add test for multiple resource types * fix xds_cluster_e2e_test * Automated change: Fix sanity tests * cleanup * add federation tests * clang-format * remove now-unnecessary XdsCertificateProviderPluginMapInterface * code review comments * simplify XdsResourceType::Decode() API * XdsClient: add unit tests for XdsClusterResourceType * add XdsClient with gRPC bootstrap config * add LB policy tests * started adding CertificateProvider tests * update for recent API changes * fix merge bugs * xDS resource validation: identify extensions by type_url instead of name * fix build * migrate to ValidationErrors * add xds_common_types_test * finish TLS tests and add LRS tests * move ScopedExperimentalEnvVar to its own library and remove redundant e2e tests * add circuit breaking and outlier detection tests * add validation to outlier detection LB policy parsing * clang-format * Automated change: Fix sanity tests * fix signedness * fix sanity * fix sanity * iwyu * update code for XdsResourceTypeImpl changes Co-authored-by: markdroth <markdroth@users.noreply.github.com>
2 years ago
// Testing just one example of an invalid resource here.
// Unit tests for XdsClusterResourceType have exhaustive tests for all
// of the invalid cases.
TEST_P(CdsTest, InvalidClusterResource) {
auto cluster = default_cluster_;
cluster.set_type(Cluster::STATIC);
balancer_->ads_service()->SetCdsResource(cluster);
const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
xDS cluster resource type: use ValidationErrors and add unit test (#31025) * XdsBootstrap: move two more methods out of the interface * Automated change: Fix sanity tests * XdsClient: add unit test * Automated change: Fix sanity tests * fix memory leaks * add helper method * add unsubscription * add test for multiple subscriptions * clang-format * fix build * fix flakiness * add checking for other node fields * add v2 test * add response builder * add test for update from server * add test for update containing only changed resources * clang-format * fix build * add test for resource not existing upon subscription * add test for stream closed by server * add test for multiple watchers for the same resource * add test for connection failure * clang-format * add test for resources wrapped in Resource wrapper message * add test for resource validation failure * add test for multiple invalid resources, and fix a case in XdsClient * add test for validation failure for already-cached resource * add test for server not resending resources after stream disconnect * clang-format * fix XdsClient to report channel errors to newly started watchers * fix XdsClient to send cached errors/does-not-exists to newly started watchers * fix watcher to ensure events arrive in the expected order * fix tests * clang-format * add test for multiple resource types * fix xds_cluster_e2e_test * Automated change: Fix sanity tests * cleanup * add federation tests * clang-format * remove now-unnecessary XdsCertificateProviderPluginMapInterface * code review comments * simplify XdsResourceType::Decode() API * XdsClient: add unit tests for XdsClusterResourceType * add XdsClient with gRPC bootstrap config * add LB policy tests * started adding CertificateProvider tests * update for recent API changes * fix merge bugs * xDS resource validation: identify extensions by type_url instead of name * fix build * migrate to ValidationErrors * add xds_common_types_test * finish TLS tests and add LRS tests * move ScopedExperimentalEnvVar to its own library and remove redundant e2e tests * add circuit breaking and outlier detection tests * add validation to outlier detection LB policy parsing * clang-format * Automated change: Fix sanity tests * fix signedness * fix sanity * fix sanity * iwyu * update code for XdsResourceTypeImpl changes Co-authored-by: markdroth <markdroth@users.noreply.github.com>
2 years ago
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: ["
"resource index 0: cluster_name: "
"INVALID_ARGUMENT: errors validating Cluster resource: ["
"field:type error:unknown discovery type]]");
}
// Tests that we don't trigger does-not-exist callbacks for a resource
// that was previously valid but is updated to be invalid.
TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Check that everything works.
CheckRpcSendOk(DEBUG_LOCATION);
// Now send an update changing the Cluster to be invalid.
auto cluster = default_cluster_;
cluster.set_type(Cluster::STATIC);
balancer_->ads_service()->SetCdsResource(cluster);
const auto response_state =
WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
xDS cluster resource type: use ValidationErrors and add unit test (#31025) * XdsBootstrap: move two more methods out of the interface * Automated change: Fix sanity tests * XdsClient: add unit test * Automated change: Fix sanity tests * fix memory leaks * add helper method * add unsubscription * add test for multiple subscriptions * clang-format * fix build * fix flakiness * add checking for other node fields * add v2 test * add response builder * add test for update from server * add test for update containing only changed resources * clang-format * fix build * add test for resource not existing upon subscription * add test for stream closed by server * add test for multiple watchers for the same resource * add test for connection failure * clang-format * add test for resources wrapped in Resource wrapper message * add test for resource validation failure * add test for multiple invalid resources, and fix a case in XdsClient * add test for validation failure for already-cached resource * add test for server not resending resources after stream disconnect * clang-format * fix XdsClient to report channel errors to newly started watchers * fix XdsClient to send cached errors/does-not-exists to newly started watchers * fix watcher to ensure events arrive in the expected order * fix tests * clang-format * add test for multiple resource types * fix xds_cluster_e2e_test * Automated change: Fix sanity tests * cleanup * add federation tests * clang-format * remove now-unnecessary XdsCertificateProviderPluginMapInterface * code review comments * simplify XdsResourceType::Decode() API * XdsClient: add unit tests for XdsClusterResourceType * add XdsClient with gRPC bootstrap config * add LB policy tests * started adding CertificateProvider tests * update for recent API changes * fix merge bugs * xDS resource validation: identify extensions by type_url instead of name * fix build * migrate to ValidationErrors * add xds_common_types_test * finish TLS tests and add LRS tests * move ScopedExperimentalEnvVar to its own library and remove redundant e2e tests * add circuit breaking and outlier detection tests * add validation to outlier detection LB policy parsing * clang-format * Automated change: Fix sanity tests * fix signedness * fix sanity * fix sanity * iwyu * update code for XdsResourceTypeImpl changes Co-authored-by: markdroth <markdroth@users.noreply.github.com>
2 years ago
"xDS response validation errors: ["
"resource index 0: cluster_name: "
"INVALID_ARGUMENT: errors validating Cluster resource: ["
"field:type error:unknown discovery type]]");
CheckRpcSendOk(DEBUG_LOCATION);
}
// Tests round robin is not implacted by the endpoint weight, and that the
// localities in a locality map are picked according to their weights.
TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
CreateAndStartBackends(2);
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
const double kLocalityWeightRate0 =
static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
// ADS response contains 2 localities, each of which contains 1 backend.
EdsResourceArgs args({
{"locality0",
{CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
kLocalityWeight0},
{"locality1",
{CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
kLocalityWeight1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for both backends to be ready.
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// The locality picking rates should be roughly equal to the expectation.
const double locality_picked_rate_0 =
static_cast<double>(backends_[0]->backend_service()->request_count()) /
kNumRpcs;
const double locality_picked_rate_1 =
static_cast<double>(backends_[1]->backend_service()->request_count()) /
kNumRpcs;
EXPECT_THAT(locality_picked_rate_0,
::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
EXPECT_THAT(locality_picked_rate_1,
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
// In most of our tests, we use different names for different resource
// types, to make sure that there are no cut-and-paste errors in the code
// that cause us to look at data for the wrong resource type. So we add
// this test to make sure that the EDS resource name defaults to the
// cluster name if not specified in the CDS resource.
TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kDefaultClusterName));
Cluster cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->clear_service_name();
balancer_->ads_service()->SetCdsResource(cluster);
CheckRpcSendOk(DEBUG_LOCATION, /*times=*/1,
RpcOptions().set_timeout_ms(5000));
}
// Tests switching over from one cluster to another.
TEST_P(CdsTest, ChangeClusters) {
CreateAndStartBackends(2);
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsServiceName = "new_eds_service_name";
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// We need to wait for all backends to come online.
WaitForAllBackends(DEBUG_LOCATION, 0, 1);
// Populate new EDS resource.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsServiceName));
// Populate new CDS resource.
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
balancer_->ads_service()->SetCdsResource(new_cluster);
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Wait for all new backends to be used.
WaitForAllBackends(DEBUG_LOCATION, 1, 2);
}
TEST_P(CdsTest, CircuitBreaking) {
CreateAndStartBackends(1);
constexpr size_t kMaxConcurrentRequests = 10;
// Populate new EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Update CDS resource to set max concurrent request.
CircuitBreakers circuit_breaks;
Cluster cluster = default_cluster_;
auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
threshold->set_priority(RoutingPriority::DEFAULT);
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancer_->ads_service()->SetCdsResource(cluster);
// Send exactly max_concurrent_requests long RPCs.
LongRunningRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(stub_.get());
}
// Wait for all RPCs to be in flight.
while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
kMaxConcurrentRequests) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
}
// Sending a RPC now should fail, the error message should tell us
// we hit the max concurrent requests limit and got dropped.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"circuit breaker drop");
// Cancel one RPC to allow another one through.
rpcs[0].CancelRpc();
// Add a sleep here to ensure the RPC cancellation has completed correctly
// before trying the next RPC. There maybe a slight delay between return of
// CANCELLED RPC status and update of internal state tracking the number of
// concurrent active requests.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1000, GPR_TIMESPAN)));
CheckRpcSendOk(DEBUG_LOCATION);
// Clean up.
for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
rpcs[i].CancelRpc();
}
}
TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
CreateAndStartBackends(1);
constexpr size_t kMaxConcurrentRequests = 10;
// Populate new EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Update CDS resource to set max concurrent request.
CircuitBreakers circuit_breaks;
Cluster cluster = default_cluster_;
auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
threshold->set_priority(RoutingPriority::DEFAULT);
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancer_->ads_service()->SetCdsResource(cluster);
auto channel2 = CreateChannel();
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
// Send exactly max_concurrent_requests long RPCs, alternating between
// the two channels.
LongRunningRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
}
// Wait for all RPCs to be in flight.
while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
kMaxConcurrentRequests) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
}
// Sending a RPC now should fail, the error message should tell us
// we hit the max concurrent requests limit and got dropped.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"circuit breaker drop");
// Cancel one RPC to allow another one through
rpcs[0].CancelRpc();
// Add a sleep here to ensure the RPC cancellation has completed correctly
// before trying the next RPC. There maybe a slight delay between return of
// CANCELLED RPC status and update of internal state tracking the number of
// concurrent active requests.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1000, GPR_TIMESPAN)));
CheckRpcSendOk(DEBUG_LOCATION);
// Clean up.
for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
rpcs[i].CancelRpc();
}
}
TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
CreateAndStartBackends(2);
const char* kNewEdsResourceName = "new_eds_resource_name";
// Populate EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Check that the channel is working.
CheckRpcSendOk(DEBUG_LOCATION);
// Stop and restart the balancer.
balancer_->Shutdown();
balancer_->Start();
// Create new EDS resource.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
// Change CDS resource to point to new EDS resource.
auto cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
// Make sure client sees the change.
WaitForBackend(DEBUG_LOCATION, 1);
}
//
// CDS deletion tests
//
class CdsDeletionTest : public XdsEnd2endTest {
protected:
void SetUp() override {} // Individual tests call InitClient().
};
INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
::testing::Values(XdsTestType()), &XdsTestType::Name);
// Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
TEST_P(CdsDeletionTest, ClusterDeleted) {
InitClient();
CreateAndStartBackends(1);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// We need to wait for all backends to come online.
WaitForAllBackends(DEBUG_LOCATION);
// Unset CDS resource.
balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
// Wait for RPCs to start failing.
SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) {
if (result.status.ok()) return true; // Keep going.
EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
EXPECT_EQ(absl::StrCat("CDS resource \"", kDefaultClusterName,
"\" does not exist"),
result.status.error_message());
return false;
});
// Make sure we ACK'ed the update.
auto response_state = balancer_->ads_service()->cds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
}
// Tests that we ignore Cluster deletions if configured to do so.
TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
InitClient(BootstrapBuilder().SetIgnoreResourceDeletion());
CreateAndStartBackends(2);
// Bring up client pointing to backend 0 and wait for it to connect.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION, 0, 1);
// Make sure we ACKed the CDS update.
auto response_state = balancer_->ads_service()->cds_response_state();
ASSERT_TRUE(response_state.has_value());
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Unset CDS resource and wait for client to ACK the update.
balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
const auto deadline = absl::Now() + absl::Seconds(30);
while (true) {
ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
response_state = balancer_->ads_service()->cds_response_state();
if (response_state.has_value()) break;
}
EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
// Make sure we can still send RPCs.
CheckRpcSendOk(DEBUG_LOCATION);
// Now recreate the CDS resource pointing to a new EDS resource that
// specified backend 1, and make sure the client uses it.
const char* kNewEdsResourceName = "new_eds_resource_name";
auto cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
balancer_->ads_service()->SetCdsResource(cluster);
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsResourceName));
// Wait for client to start using backend 1.
WaitForAllBackends(DEBUG_LOCATION, 1, 2);
}
//
// EDS tests
//
using EdsTest = XdsEnd2endTest;
INSTANTIATE_TEST_SUITE_P(
XdsTest, EdsTest,
::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
&XdsTestType::Name);
// Tests that the balancer sends the correct response to the client, and the
// client sends RPCs to the backends using the default child policy.
TEST_P(EdsTest, Vanilla) {
CreateAndStartBackends(3);
const size_t kNumRpcsPerAddress = 100;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
// Check LB policy name for the channel.
EXPECT_EQ("xds_cluster_manager_experimental",
channel_->GetLoadBalancingPolicyName());
}
TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
CreateAndStartBackends(2);
const size_t kNumRpcsPerAddress = 100;
auto endpoints = CreateEndpointsForBackends();
endpoints.push_back(MakeNonExistantEndpoint());
endpoints.back().health_status = HealthStatus::DRAINING;
EdsResourceArgs args({
{"locality0", std::move(endpoints), kDefaultLocalityWeight,
kDefaultLocalityPriority},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
}
TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
CreateAndStartBackends(1);
// Initial EDS resource has one locality with no endpoints.
EdsResourceArgs::Locality empty_locality("locality0", {});
EdsResourceArgs args({std::move(empty_locality)});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should fail.
constexpr char kErrorMessage[] =
xDS: implement xds_wrr_locality LB policy and return xDS LB config from XdsClient (#31160) * XdsBootstrap: move two more methods out of the interface * Automated change: Fix sanity tests * XdsClient: add unit test * Automated change: Fix sanity tests * fix memory leaks * add helper method * add unsubscription * add test for multiple subscriptions * clang-format * fix build * fix flakiness * add checking for other node fields * add v2 test * add response builder * add test for update from server * add test for update containing only changed resources * clang-format * fix build * add test for resource not existing upon subscription * add test for stream closed by server * add test for multiple watchers for the same resource * add test for connection failure * clang-format * add test for resources wrapped in Resource wrapper message * add test for resource validation failure * add test for multiple invalid resources, and fix a case in XdsClient * add test for validation failure for already-cached resource * add test for server not resending resources after stream disconnect * clang-format * fix XdsClient to report channel errors to newly started watchers * fix XdsClient to send cached errors/does-not-exists to newly started watchers * fix watcher to ensure events arrive in the expected order * fix tests * clang-format * add test for multiple resource types * fix xds_cluster_e2e_test * Automated change: Fix sanity tests * cleanup * add federation tests * clang-format * remove now-unnecessary XdsCertificateProviderPluginMapInterface * code review comments * simplify XdsResourceType::Decode() API * XdsClient: add unit tests for XdsClusterResourceType * add XdsClient with gRPC bootstrap config * add LB policy tests * started adding CertificateProvider tests * update for recent API changes * fix merge bugs * xDS resource validation: identify extensions by type_url instead of name * fix build * migrate to ValidationErrors * add xds_common_types_test * finish TLS tests and add LRS tests * move ScopedExperimentalEnvVar to its own library and remove redundant e2e tests * add circuit breaking and outlier detection tests * add validation to outlier detection LB policy parsing * clang-format * Automated change: Fix sanity tests * fix signedness * fix sanity * xDS: implement xds_wrr_locality LB policy and return xDS LB config from XdsClient * fix unused parameter * fix sanity * fix test * Automated change: Fix sanity tests * fix aggregate cluster bug * Automated change: Fix sanity tests * absl::make_unique -> std::make_unique * fix sanity * fix sanity * iwyu * iwyu * update code for XdsResourceTypeImpl changes Co-authored-by: markdroth <markdroth@users.noreply.github.com>
2 years ago
"no children in weighted_target policy: "
"EDS resource eds_service_name contains empty localities: "
"\\[\\{region=\"xds_default_locality_region\", "
"zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
// Send EDS resource that has an endpoint.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should eventually succeed.
WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessage));
}
});
}
// This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
CreateAndStartBackends(1);
// Initial EDS resource has one locality with no endpoints.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION);
// EDS update removes all endpoints from the locality.
EdsResourceArgs::Locality empty_locality("locality0", {});
args = EdsResourceArgs({std::move(empty_locality)});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for RPCs to start failing.
constexpr char kErrorMessage[] =
"no children in weighted_target policy: "
"EDS resource eds_service_name contains empty localities: "
"\\[\\{region=\"xds_default_locality_region\", "
"zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
SendRpcsUntil(DEBUG_LOCATION, [&](const RpcResult& result) {
if (result.status.ok()) return true;
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessage));
return false;
});
// Shut down backend. This triggers a connectivity state update from the
// deactivated child of the weighted_target policy.
ShutdownAllBackends();
// Now restart the backend.
StartAllBackends();
// Re-add endpoint.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should eventually succeed.
WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(absl::StrCat(
// The error message we see here depends on whether
// the client sees the EDS update before or after it
// sees the backend come back up.
MakeConnectionFailureRegex(
"connections to all backends failing; last error: "),
"|", kErrorMessage)));
}
});
}
TEST_P(EdsTest, NoLocalities) {
CreateAndStartBackends(1);
// Initial EDS resource has no localities.
EdsResourceArgs args;
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should fail.
constexpr char kErrorMessage[] =
"no children in weighted_target policy: EDS resource eds_service_name "
"contains no localities";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
// Send EDS resource that has an endpoint.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// RPCs should eventually succeed.
WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessage));
}
});
}
// Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
// all the servers are unreachable.
TEST_P(EdsTest, AllServersUnreachableFailFast) {
// Set Rpc timeout to 5 seconds to ensure there is enough time
// for communication with the xDS server to take place upon test start up.
const uint32_t kRpcTimeoutMs = 5000;
const size_t kNumUnreachableServers = 5;
std::vector<EdsResourceArgs::Endpoint> endpoints;
for (size_t i = 0; i < kNumUnreachableServers; ++i) {
endpoints.emplace_back(MakeNonExistantEndpoint());
}
EdsResourceArgs args({{"locality0", std::move(endpoints)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
// seconds, and we should disocver in that time that the target backend is
// down.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
MakeConnectionFailureRegex(
"connections to all backends failing; last error: "),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
}
// Tests that RPCs fail when the backends are down, and will succeed again
// after the backends are restarted.
TEST_P(EdsTest, BackendsRestart) {
CreateAndStartBackends(3);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION);
// Stop backends. RPCs should fail.
ShutdownAllBackends();
// Wait for channel to transition out of READY, so that we know it has
// noticed that all of the subchannels have failed. Note that it may
// be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
EXPECT_TRUE(channel_->WaitForStateChange(
GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
EXPECT_THAT(channel_->GetState(false),
::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
::testing::Eq(GRPC_CHANNEL_CONNECTING)));
// RPCs should fail.
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
MakeConnectionFailureRegex(
"connections to all backends failing; last error: "));
// Restart all backends. RPCs should start succeeding again.
StartAllBackends();
CheckRpcSendOk(DEBUG_LOCATION, 1,
RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
}
TEST_P(EdsTest, IgnoresDuplicateUpdates) {
CreateAndStartBackends(1);
const size_t kNumRpcsPerAddress = 100;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for all backends to come online.
WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
// between. If the update is not ignored, this will cause the
// round_robin policy to see an update, which will randomly reset its
// position in the address list.
for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
CheckRpcSendOk(DEBUG_LOCATION, 2);
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendOk(DEBUG_LOCATION, 2);
}
// Each backend should have gotten the right number of requests.
for (size_t i = 1; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
}
// Testing just one example of an invalid resource here.
// Unit tests for XdsEndpointResourceType have exhaustive tests for all
// of the invalid cases.
TEST_P(EdsTest, NacksInvalidResource) {
EdsResourceArgs args({
{"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_EQ(response_state->error_message,
"xDS response validation errors: ["
"resource index 0: eds_service_name: "
"INVALID_ARGUMENT: errors parsing EDS resource: ["
"field:endpoints error:priority 0 empty]]");
}
// Tests that if the balancer is down, the RPCs will still be sent to the
// backends according to the last balancer response, until a new balancer is
// reachable.
TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
CreateAndStartBackends(2);
// Set up EDS resource pointing to backend 0.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Start the client and make sure it sees the backend.
WaitForBackend(DEBUG_LOCATION, 0);
// Stop the balancer, and verify that RPCs continue to flow to backend 0.
balancer_->Shutdown();
auto deadline = grpc_timeout_seconds_to_deadline(5);
do {
CheckRpcSendOk(DEBUG_LOCATION);
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
// Check the EDS resource to point to backend 1 and bring the balancer
// back up.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
balancer_->Start();
// Wait for client to see backend 1.
WaitForBackend(DEBUG_LOCATION, 1);
}
// Tests that the localities in a locality map are picked according to their
// weights.
TEST_P(EdsTest, WeightedRoundRobin) {
CreateAndStartBackends(2);
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
const double kLocalityWeightRate0 =
static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
// ADS response contains 2 localities, each of which contains 1 backend.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
{"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for both backends to be ready.
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// The locality picking rates should be roughly equal to the expectation.
const double locality_picked_rate_0 =
static_cast<double>(backends_[0]->backend_service()->request_count()) /
kNumRpcs;
const double locality_picked_rate_1 =
static_cast<double>(backends_[1]->backend_service()->request_count()) /
kNumRpcs;
EXPECT_THAT(locality_picked_rate_0,
::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
EXPECT_THAT(locality_picked_rate_1,
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
// Tests that we don't suffer from integer overflow in locality weights.
TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
CreateAndStartBackends(2);
const uint32_t kLocalityWeight1 = std::numeric_limits<uint32_t>::max() / 3;
const uint32_t kLocalityWeight0 =
std::numeric_limits<uint32_t>::max() - kLocalityWeight1;
const uint64_t kTotalLocalityWeight =
static_cast<uint64_t>(kLocalityWeight0) +
static_cast<uint64_t>(kLocalityWeight1);
const double kLocalityWeightRate0 =
static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
// ADS response contains 2 localities, each of which contains 1 backend.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
{"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for both backends to be ready.
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// The locality picking rates should be roughly equal to the expectation.
const double locality_picked_rate_0 =
static_cast<double>(backends_[0]->backend_service()->request_count()) /
kNumRpcs;
const double locality_picked_rate_1 =
static_cast<double>(backends_[1]->backend_service()->request_count()) /
kNumRpcs;
EXPECT_THAT(locality_picked_rate_0,
::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
EXPECT_THAT(locality_picked_rate_1,
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
// Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest, LocalityContainingNoEndpoints) {
CreateAndStartBackends(2);
const size_t kNumRpcs = 5000;
// EDS response contains 2 localities, one with no endpoints.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
{"locality1", {}},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for both backends to be ready.
WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
// All traffic should go to the reachable locality.
EXPECT_EQ(backends_[0]->backend_service()->request_count(),
kNumRpcs / backends_.size());
EXPECT_EQ(backends_[1]->backend_service()->request_count(),
kNumRpcs / backends_.size());
}
// Tests that the locality map can work properly even when it contains a large
// number of localities.
TEST_P(EdsTest, ManyLocalitiesStressTest) {
const size_t kNumLocalities = 50;
CreateAndStartBackends(kNumLocalities + 1);
const uint32_t kRpcTimeoutMs = 5000;
// The first ADS response contains kNumLocalities localities, each of which
// contains its own backend.
EdsResourceArgs args;
for (size_t i = 0; i < kNumLocalities; ++i) {
std::string name = absl::StrCat("locality", i);
EdsResourceArgs::Locality locality(name,
CreateEndpointsForBackends(i, i + 1));
args.locality_list.emplace_back(std::move(locality));
}
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
/*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false),
RpcOptions().set_timeout_ms(kRpcTimeoutMs));
// The second ADS response contains 1 locality, which contains backend 50.
args =
EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
kNumLocalities, kNumLocalities + 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until backend 50 is ready.
WaitForBackend(DEBUG_LOCATION, kNumLocalities);
}
// Tests that the localities in a locality map are picked correctly after
// update (addition, modification, deletion).
TEST_P(EdsTest, LocalityMapUpdateChurn) {
CreateAndStartBackends(4);
const size_t kNumRpcs = 3000;
// The locality weight for the first 3 localities.
const std::vector<int> kLocalityWeights0 = {2, 3, 4};
const double kTotalLocalityWeight0 =
std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
std::vector<double> locality_weight_rate_0;
locality_weight_rate_0.reserve(kLocalityWeights0.size());
for (int weight : kLocalityWeights0) {
locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
}
// Delete the first locality, keep the second locality, change the third
// locality's weight from 4 to 2, and add a new locality with weight 6.
const std::vector<int> kLocalityWeights1 = {3, 2, 6};
const double kTotalLocalityWeight1 =
std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
std::vector<double> locality_weight_rate_1 = {
0 /* placeholder for locality 0 */};
for (int weight : kLocalityWeights1) {
locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
}
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), 2},
{"locality1", CreateEndpointsForBackends(1, 2), 3},
{"locality2", CreateEndpointsForBackends(2, 3), 4},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for the first 3 backends to be ready.
WaitForAllBackends(DEBUG_LOCATION, 0, 3);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// The picking rates of the first 3 backends should be roughly equal to the
// expectation.
std::vector<double> locality_picked_rates;
for (size_t i = 0; i < 3; ++i) {
locality_picked_rates.push_back(
static_cast<double>(backends_[i]->backend_service()->request_count()) /
kNumRpcs);
}
const double kErrorTolerance = 0.2;
for (size_t i = 0; i < 3; ++i) {
gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
locality_picked_rates[i]);
EXPECT_THAT(
locality_picked_rates[i],
::testing::AllOf(
::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
}
args = EdsResourceArgs({
{"locality1", CreateEndpointsForBackends(1, 2), 3},
{"locality2", CreateEndpointsForBackends(2, 3), 2},
{"locality3", CreateEndpointsForBackends(3, 4), 6},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Backend 3 hasn't received any request.
EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
// Wait until the locality update has been processed, as signaled by backend
// 3 receiving a request.
WaitForAllBackends(DEBUG_LOCATION, 3, 4);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
// Send kNumRpcs RPCs.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// Backend 0 no longer receives any request.
EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
// The picking rates of the last 3 backends should be roughly equal to the
// expectation.
locality_picked_rates = {0 /* placeholder for backend 0 */};
for (size_t i = 1; i < 4; ++i) {
locality_picked_rates.push_back(
static_cast<double>(backends_[i]->backend_service()->request_count()) /
kNumRpcs);
}
for (size_t i = 1; i < 4; ++i) {
gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
locality_picked_rates[i]);
EXPECT_THAT(
locality_picked_rates[i],
::testing::AllOf(
::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
}
}
// Tests that we don't fail RPCs when replacing all of the localities in
// a given priority.
TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
CreateAndStartBackends(2);
// Initial EDS update has backend 0.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for the first backend to be ready.
WaitForBackend(DEBUG_LOCATION, 0);
// Send EDS update that replaces the locality and switches to backend 1.
args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// When the client sees the update, RPCs should start going to backend 1.
// No RPCs should fail during this change.
WaitForBackend(DEBUG_LOCATION, 1);
}
TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
CreateAndStartBackends(4);
// Initial update has two localities.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(1, 2)},
{"locality1", CreateEndpointsForBackends(2, 3)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION, 1, 3);
// Next update removes locality1.
// Also add backend 0 to locality0, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// Next update re-adds locality1.
// Also add backend 3 to locality1, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 3);
}
// Tests that RPCs are dropped according to the drop config.
TEST_P(EdsTest, Drops) {
CreateAndStartBackends(1);
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double kDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
// The ADS response contains two drop categories.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
kErrorTolerance));
}
// Tests that drop config is converted correctly from per hundred.
TEST_P(EdsTest, DropPerHundred) {
CreateAndStartBackends(1);
const uint32_t kDropPerHundredForLb = 10;
const double kDropRateForLb = kDropPerHundredForLb / 100.0;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
// The ADS response contains one drop category.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
args.drop_denominator = FractionalPercent::HUNDRED;
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate,
::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
}
// Tests that drop config is converted correctly from per ten thousand.
TEST_P(EdsTest, DropPerTenThousand) {
CreateAndStartBackends(1);
const uint32_t kDropPerTenThousandForLb = 1000;
const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
const double kErrorTolerance = 0.05;
const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
// The ADS response contains one drop category.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
args.drop_denominator = FractionalPercent::TEN_THOUSAND;
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate,
::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
}
// Tests that drop is working correctly after update.
TEST_P(EdsTest, DropConfigUpdate) {
CreateAndStartBackends(1);
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kErrorTolerance = 0.05;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double kDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
const size_t kNumRpcsLbOnly =
ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
const size_t kNumRpcsBoth =
ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
// The first ADS response contains one drop category.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcsLbOnly RPCs and count the drops.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// The drop rate should be roughly equal to the expectation.
double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
gpr_log(GPR_INFO, "First batch drop rate %f", seen_drop_rate);
EXPECT_THAT(seen_drop_rate,
::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
// The second ADS response contains two drop categories, send an update EDS
// response.
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until the drop rate increases to the middle of the two configs,
// which implies that the update has been in effect.
const double kDropRateThreshold =
(kDropRateForLb + kDropRateForLbAndThrottle) / 2;
size_t num_rpcs = kNumRpcsBoth;
SendRpcsUntil(
DEBUG_LOCATION,
[&](const RpcResult& result) {
++num_rpcs;
if (result.status.ok()) {
EXPECT_EQ(result.response.message(), kRequestMessage);
} else {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::StartsWith(kStatusMessageDropPrefix));
++num_drops;
}
seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
return seen_drop_rate < kDropRateThreshold;
},
/*timeout_ms=*/40000);
// Send kNumRpcsBoth RPCs and count the drops.
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// The new drop rate should be roughly equal to the expectation.
seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
gpr_log(GPR_INFO, "Second batch drop rate %f", seen_drop_rate);
EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
kErrorTolerance));
}
// Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(EdsTest, DropAll) {
const size_t kNumRpcs = 1000;
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 1000000;
// The ADS response contains two drop categories.
EdsResourceArgs args;
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and all of them are dropped.
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
EXPECT_EQ(num_drops, kNumRpcs);
}
//
// EDS failover tests
//
class FailoverTest : public XdsEnd2endTest {
public:
void SetUp() override {
XdsEnd2endTest::SetUp();
ResetStub(/*failover_timeout_ms=*/500);
}
};
INSTANTIATE_TEST_SUITE_P(
XdsTest, FailoverTest,
::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
&XdsTestType::Name);
// Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest, ChooseHighestPriority) {
CreateAndStartBackends(4);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
2},
{"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
3},
{"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
}
// Does not choose priority with no endpoints.
TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
CreateAndStartBackends(3);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
2},
{"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
3},
{"locality3", {}, kDefaultLocalityWeight, 0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
for (size_t i = 1; i < 3; ++i) {
EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
}
// Does not choose locality with no endpoints.
TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
CreateAndStartBackends(1);
EdsResourceArgs args({
{"locality0", {}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for all backends to be used.
WaitForAllBackends(DEBUG_LOCATION);
}
// If the higher priority localities are not reachable, failover to the
// highest priority among the rest.
TEST_P(FailoverTest, Failover) {
CreateAndStartBackends(2);
EdsResourceArgs args({
{"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
{"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
2},
{"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
3},
{"locality3", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
}
// Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
CreateAndStartBackends(1);
// Priority 0 will be unreachable, so we'll use priority 1.
EdsResourceArgs args({
{"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
ConnectionAttemptInjector injector;
auto hold = injector.AddHold(backends_[0]->port());
// Start an RPC in the background, which should cause the channel to
// try to connect.
LongRunningRpc rpc;
rpc.StartRpc(stub_.get(), RpcOptions());
// Wait for connection attempt to start to the backend.
hold->Wait();
// Channel state should be CONNECTING here, and any RPC should be
// queued.
EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
// Allow the connection attempt to complete.
hold->Resume();
// Now the RPC should complete successfully.
gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH ===");
Status status = rpc.GetStatus();
gpr_log(GPR_INFO, "=== RPC FINISHED ===");
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
// If a locality with higher priority than the current one becomes ready,
// switch to it.
TEST_P(FailoverTest, SwitchBackToHigherPriority) {
CreateAndStartBackends(4);
const size_t kNumRpcs = 100;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
2},
{"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
3},
{"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 3);
backends_[3]->StopListeningAndSendGoaways();
backends_[0]->StopListeningAndSendGoaways();
WaitForBackend(DEBUG_LOCATION, 1);
ShutdownBackend(0);
StartBackend(0);
WaitForBackend(DEBUG_LOCATION, 0);
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
}
// The first update only contains unavailable priorities. The second update
// contains available priorities.
TEST_P(FailoverTest, UpdateInitialUnavailable) {
CreateAndStartBackends(2);
EdsResourceArgs args({
{"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
{"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
MakeConnectionFailureRegex(
"connections to all backends failing; last error: "));
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
0},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(MakeConnectionFailureRegex(
"connections to all backends failing; last error: ")));
}
});
}
// Tests that after the localities' priorities are updated, we still choose
// the highest READY priority with the updated localities.
TEST_P(FailoverTest, UpdatePriority) {
CreateAndStartBackends(4);
const size_t kNumRpcs = 100;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
2},
{"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
3},
{"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
2},
{"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
0},
{"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1},
{"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
3},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 1);
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
}
// Moves all localities in the current priority to a higher priority.
TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
CreateAndStartBackends(3);
auto non_existant_endpoint = MakeNonExistantEndpoint();
// First update:
// - Priority 0 is locality 0, containing an unreachable backend.
// - Priority 1 is locality 1, containing backends 0 and 1.
EdsResourceArgs args({
{"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// When we get the first update, all backends in priority 0 are down,
// so we will create priority 1. Backends 0 and 1 should have traffic,
// but backend 2 should not.
WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
// Second update:
// - Priority 0 contains both localities 0 and 1.
// - Priority 1 is not present.
// - We add backend 2 to locality 1, just so we have a way to know
// when the update has been seen by the client.
args = EdsResourceArgs({
{"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
0},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// When backend 2 gets traffic, we know the second update has been seen.
WaitForBackend(DEBUG_LOCATION, 2);
// The xDS server got at least 1 response.
EXPECT_TRUE(balancer_->ads_service()->eds_response_state().has_value());
}
// This tests a bug triggered by the xds_cluster_resolver policy reusing
// a child name for the priority policy when that child name was still
// present but deactivated.
TEST_P(FailoverTest, PriorityChildNameChurn) {
CreateAndStartBackends(4);
auto non_existant_endpoint = MakeNonExistantEndpoint();
// Initial update:
// - P0:locality0, child number 0 (unreachable)
// - P1:locality1, child number 1
// - P2:locality2, child number 2
EdsResourceArgs args({
{"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
{"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1},
{"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
2},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 0);
// Next update:
// - P0:locality0, child number 0 (still unreachable)
// - P1:locality2, child number 2 (moved from P2 to P1)
// - P2:locality3, child number 3 (new child)
// Child number 1 will be deactivated.
args = EdsResourceArgs({
{"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
{"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1},
{"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
2},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 1);
// Next update:
// - P0:locality0, child number 0 (still unreachable)
// - P1:locality4, child number 4 (new child number -- should not reuse #1)
// - P2:locality3, child number 3
// Child number 1 will be deactivated.
args = EdsResourceArgs({
{"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
{"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1},
{"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
2},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// P2 should not have gotten any traffic in this change.
EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
}
//
// EDS client load reporting tests
//
using ClientLoadReportingTest = XdsEnd2endTest;
INSTANTIATE_TEST_SUITE_P(
XdsTest, ClientLoadReportingTest,
::testing::Values(XdsTestType().set_enable_load_reporting()),
&XdsTestType::Name);
// Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest, Vanilla) {
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests send_all_clusters.
TEST_P(ClientLoadReportingTest, SendAllClusters) {
CreateAndStartBackends(2);
balancer_->lrs_service()->set_send_all_clusters(true);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true));
}
// Check that each backend got the right number of requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
backends_[i]->backend_service()->request_count());
}
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
num_warmup_rpcs,
client_stats.total_issued_requests());
EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests that we don't include stats for clusters that are not requested
// by the LRS server.
TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
CreateAndStartBackends(1);
balancer_->lrs_service()->set_cluster_names({"bogus"});
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
WaitForAllBackends(DEBUG_LOCATION);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 0UL);
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests that if the balancer restarts, the client load report contains the
// stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest, BalancerRestart) {
CreateAndStartBackends(4);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends returned by the balancer are ready.
size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats client_stats = std::move(load_report.front());
EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
// Shut down the balancer.
balancer_->Shutdown();
// We should continue using the last EDS response we received from the
// balancer before it was shut down.
// Note: We need to use WaitForAllBackends() here instead of just
// CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
// shuts down, the XdsClient will generate an error to the
// ListenerWatcher, which will cause the xds resolver to send a
// no-op update to the LB policy. When this update gets down to the
// round_robin child policy for the locality, it will generate a new
// subchannel list, which resets the start index randomly. So we need
// to be a little more permissive here to avoid spurious failures.
ResetBackendCounters();
num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// Now restart the balancer, this time pointing to the new backends.
balancer_->Start();
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait for queries to start going to one of the new backends.
// This tells us that we're now using the new serverlist.
num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
// Send one RPC per backend.
CheckRpcSendOk(DEBUG_LOCATION, 2);
num_rpcs += 2;
// Check client stats.
load_report = balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
client_stats = std::move(load_report.front());
EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
}
// Tests load reporting when switching over from one cluster to another.
TEST_P(ClientLoadReportingTest, ChangeClusters) {
CreateAndStartBackends(4);
const char* kNewClusterName = "new_cluster_name";
const char* kNewEdsServiceName = "new_eds_service_name";
balancer_->lrs_service()->set_cluster_names(
{kDefaultClusterName, kNewClusterName});
// cluster kDefaultClusterName -> locality0 -> backends 0 and 1
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// cluster kNewClusterName -> locality1 -> backends 2 and 3
EdsResourceArgs args2({
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsServiceName));
// CDS resource for kNewClusterName.
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
balancer_->ads_service()->SetCdsResource(new_cluster);
// Wait for all backends to come online.
size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
EXPECT_THAT(
load_report,
::testing::ElementsAre(::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
::testing::Property(&ClientStats::eds_service_name,
kDefaultEdsServiceName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
num_rpcs),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_issued_requests,
num_rpcs))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config = default_route_config_;
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Wait for all new backends to be used.
num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
// The load report received at the balancer should be correct.
load_report = balancer_->lrs_service()->WaitForLoadReport();
EXPECT_THAT(
load_report,
::testing::ElementsAre(
::testing::AllOf(
::testing::Property(&ClientStats::cluster_name,
kDefaultClusterName),
::testing::Property(&ClientStats::eds_service_name,
kDefaultEdsServiceName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
::testing::Lt(num_rpcs)),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
0UL),
::testing::Field(&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_rpcs)))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kNewClusterName),
::testing::Property(&ClientStats::eds_service_name,
kNewEdsServiceName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality1",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
::testing::Le(num_rpcs)),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
0UL),
::testing::Field(&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_rpcs)))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
size_t total_ok = 0;
for (const ClientStats& client_stats : load_report) {
total_ok += client_stats.total_successful_requests();
}
EXPECT_EQ(total_ok, num_rpcs);
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingTest, DropStats) {
CreateAndStartBackends(1);
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kErrorTolerance = 0.05;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double kDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
const size_t kNumRpcs =
ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
// The ADS response contains two drop categories.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = SendRpcsAndCountFailuresWithMessage(
DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
kStatusMessageDropPrefix);
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
kErrorTolerance));
// Check client stats.
ClientStats client_stats;
do {
std::vector<ClientStats> load_reports =
balancer_->lrs_service()->WaitForLoadReport();
for (const auto& load_report : load_reports) {
client_stats += load_report;
}
} while (client_stats.total_issued_requests() +
client_stats.total_dropped_requests() <
kNumRpcs);
EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
kNumRpcs,
::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
EXPECT_THAT(
static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
(kNumRpcs * (1 - kDropRateForLb)),
::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
// Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs.
[config] Move global config alongside core configuration (#30788) This is a big rewrite of global config. It does a few things, all somewhat intertwined: 1. centralize the list of configuration we have to a yaml file that can be parsed, and code generated from it 2. add an initialization and a reset stage so that config vars can be centrally accessed very quickly without the need for caching them 3. makes the syntax more C++ like (less macros!) 4. (optionally) adds absl flags to the OSS build This first round of changes is intended to keep the system where it is without major changes. We pick up absl flags to match internal code and remove one point of deviation - but importantly continue to read from the environment variables. In doing so we don't force absl flags on our customers - it's possible to configure grpc without the flags - but instead allow users that do use absl flags to configure grpc using that mechanism. Importantly this lets internal customers configure grpc the same everywhere. Future changes along this path will be two-fold: 1. Move documentation generation into the code generation step, so that within the source of truth yaml file we can find all documentation and data about a configuration knob - eliminating the chance of forgetting to document something in all the right places. 2. Provide fuzzing over configurations. Currently most config variables get stashed in static constants across the codebase. To fuzz over these we'd need a way to reset those cached values between fuzzing rounds, something that is terrifically difficult right now, but with these changes should simply be a reset on `ConfigVars`. <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. --> --------- Co-authored-by: ctiller <ctiller@users.noreply.github.com>
2 years ago
grpc_core::ConfigVars::Overrides overrides;
overrides.client_channel_backup_poll_interval_ms = 1;
grpc_core::ConfigVars::SetOverrides(overrides);
#if TARGET_OS_IPHONE
// Workaround Apple CFStream bug
grpc_core::SetEnv("grpc_cfstream", "0");
#endif
grpc_init();
grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}