Merge pull request #25298 from donnadionne/logical_dns_cases

Fixing Logical DNS case:
reviewable/pr25108/r12^2
donnadionne 4 years ago committed by GitHub
commit b7cceab3b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 91
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  3. 64
      test/cpp/end2end/xds_end2end_test.cc

@ -63,6 +63,7 @@ class CdsLb : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void ExitIdleLocked() override;
private:
// Watcher for getting cluster data from XdsClient.
@ -316,6 +317,10 @@ void CdsLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void CdsLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void CdsLb::UpdateLocked(UpdateArgs args) {
// Update config.
auto old_config = std::move(config_);

@ -115,6 +115,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void ExitIdleLocked() override;
private:
// Discovery Mechanism Base class
@ -134,6 +135,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
virtual void Start() = 0;
void Orphan() override = 0;
virtual Json::Array override_child_policy() = 0;
virtual bool disable_reresolution() = 0;
// Caller must ensure that config_ is set before calling.
const absl::string_view GetXdsClusterResolverResourceName() const {
@ -174,6 +177,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
void Start() override;
void Orphan() override;
Json::Array override_child_policy() override { return Json::Array{}; }
bool disable_reresolution() override { return true; }
private:
class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
@ -232,6 +237,14 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
: DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
void Start() override;
void Orphan() override;
Json::Array override_child_policy() override {
return Json::Array{
Json::Object{
{"pick_first", Json::Object()},
},
};
}
bool disable_reresolution() override { return false; };
private:
class ResolverResultHandler : public Resolver::ResultHandler {
@ -674,6 +687,10 @@ void XdsClusterResolverLb::ResetBackoffLocked() {
}
}
void XdsClusterResolverLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void XdsClusterResolverLb::OnEndpointChanged(size_t index,
XdsApi::EdsUpdate update) {
if (shutting_down_) return;
@ -851,36 +868,44 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
size_t num_priorities_remaining_in_discovery =
discovery_mechanisms_[discovery_index].num_priorities;
for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
const auto& localities = priority_list_[priority].localities;
Json::Object weighted_targets;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
const auto& locality = p.second;
// Construct JSON object containing locality name.
Json::Object locality_name_json;
if (!locality_name->region().empty()) {
locality_name_json["region"] = locality_name->region();
}
if (!locality_name->zone().empty()) {
locality_name_json["zone"] = locality_name->zone();
}
if (!locality_name->sub_zone().empty()) {
locality_name_json["subzone"] = locality_name->sub_zone();
Json child_policy;
if (!discovery_mechanisms_[discovery_index]
.discovery_mechanism->override_child_policy()
.empty()) {
child_policy = discovery_mechanisms_[discovery_index]
.discovery_mechanism->override_child_policy();
} else {
const auto& localities = priority_list_[priority].localities;
Json::Object weighted_targets;
for (const auto& p : localities) {
XdsLocalityName* locality_name = p.first;
const auto& locality = p.second;
// Construct JSON object containing locality name.
Json::Object locality_name_json;
if (!locality_name->region().empty()) {
locality_name_json["region"] = locality_name->region();
}
if (!locality_name->zone().empty()) {
locality_name_json["zone"] = locality_name->zone();
}
if (!locality_name->sub_zone().empty()) {
locality_name_json["subzone"] = locality_name->sub_zone();
}
// Add weighted target entry.
weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
{"weight", locality.lb_weight},
{"childPolicy", config_->endpoint_picking_policy()},
};
}
// Add weighted target entry.
weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
{"weight", locality.lb_weight},
{"childPolicy", config_->endpoint_picking_policy()},
};
// Construct locality-picking policy.
// Start with field from our config and add the "targets" field.
child_policy = config_->locality_picking_policy();
Json::Object& config =
*(*child_policy.mutable_array())[0].mutable_object();
auto it = config.begin();
GPR_ASSERT(it != config.end());
(*it->second.mutable_object())["targets"] = std::move(weighted_targets);
}
// Construct locality-picking policy.
// Start with field from our config and add the "targets" field.
Json locality_picking_config = config_->locality_picking_policy();
Json::Object& config =
*(*locality_picking_config.mutable_array())[0].mutable_object();
auto it = config.begin();
GPR_ASSERT(it != config.end());
(*it->second.mutable_object())["targets"] = std::move(weighted_targets);
// Wrap it in the drop policy.
Json::Array drop_categories;
if (discovery_mechanisms_[discovery_index].drop_config != nullptr) {
@ -896,7 +921,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
.discovery_mechanism->GetLrsClusterKey();
Json::Object xds_cluster_impl_config = {
{"clusterName", std::string(lrs_key.first)},
{"childPolicy", std::move(locality_picking_config)},
{"childPolicy", std::move(child_policy)},
{"dropCategories", std::move(drop_categories)},
{"maxConcurrentRequests",
config_->discovery_mechanisms()[discovery_index]
@ -918,10 +943,14 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
const size_t child_number = priority_child_numbers_[priority];
std::string child_name = absl::StrCat("child", child_number);
priority_priorities.emplace_back(child_name);
priority_children[child_name] = Json::Object{
Json::Object child_config = {
{"config", std::move(locality_picking_policy)},
{"ignore_reresolution_requests", true},
};
if (discovery_mechanisms_[discovery_index]
.discovery_mechanism->disable_reresolution()) {
child_config["ignore_reresolution_requests"] = true;
}
priority_children[child_name] = std::move(child_config);
// Each priority in the priority_list_ should correspond to a priority in a
// discovery mechanism in discovery_mechanisms_ (both in the same order).
// Keeping track of the discovery_mechanism each priority belongs to.

@ -5402,11 +5402,70 @@ TEST_P(CdsTest, AggregateClusterType) {
WaitForBackend(2);
EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
// Bring backend 1 back and ensure all traffic go back to it.
StartBackend(1);
WaitForBackend(1);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}
TEST_P(CdsTest, AggregateClusterEdsToLogicalDns) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER",
"true");
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kLogicalDNSClusterName = "logical_dns_cluster";
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(1, 2)},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
// Populate new CDS resources.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1);
// Create Logical DNS Cluster
auto logical_dns_cluster = default_cluster_;
logical_dns_cluster.set_name(kLogicalDNSClusterName);
logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
balancers_[0]->ads_service()->SetCdsResource(logical_dns_cluster);
// Create Aggregate Cluster
auto cluster = default_cluster_;
CustomClusterType* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kLogicalDNSClusterName);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Set Logical DNS result
{
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(2, 3));
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Wait for traffic to go to backend 1.
WaitForBackend(1);
// Shutdown backend 1 and wait for all traffic to go to backend 2.
ShutdownBackend(1);
WaitForBackend(2);
EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
// Bring backend 1 back and ensure all traffic go back to it.
StartBackend(1);
WaitForBackend(1);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}
TEST_P(CdsTest, AggregateClusterMixedType) {
TEST_P(CdsTest, AggregateClusterLogicalDnsToEds) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER",
"true");
SetNextResolution({});
@ -5455,6 +5514,9 @@ TEST_P(CdsTest, AggregateClusterMixedType) {
WaitForBackend(2);
EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state().state,
AdsServiceImpl::ResponseState::ACKED);
// Bring backend 1 back and ensure all traffic go back to it.
StartBackend(1);
WaitForBackend(1);
gpr_unsetenv(
"GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER");
}

Loading…
Cancel
Save