|
|
|
@ -31,6 +31,7 @@ |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
|
#include "absl/status/statusor.h" |
|
|
|
|
#include "absl/strings/str_cat.h" |
|
|
|
|
#include "absl/strings/str_join.h" |
|
|
|
|
#include "absl/strings/string_view.h" |
|
|
|
|
#include "absl/types/optional.h" |
|
|
|
|
|
|
|
|
@ -224,16 +225,44 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { |
|
|
|
|
// in methods of this class rather than in lambdas to work around an MSVC
|
|
|
|
|
// bug.
|
|
|
|
|
void OnResourceChangedHelper(XdsEndpointResource update) { |
|
|
|
|
std::string resolution_note; |
|
|
|
|
if (update.priorities.empty()) { |
|
|
|
|
resolution_note = absl::StrCat( |
|
|
|
|
"EDS resource ", discovery_mechanism_->GetEdsResourceName(), |
|
|
|
|
" contains no localities"); |
|
|
|
|
} else { |
|
|
|
|
std::set<std::string> empty_localities; |
|
|
|
|
for (const auto& priority : update.priorities) { |
|
|
|
|
for (const auto& p : priority.localities) { |
|
|
|
|
if (p.second.endpoints.empty()) { |
|
|
|
|
empty_localities.insert(p.first->AsHumanReadableString()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!empty_localities.empty()) { |
|
|
|
|
resolution_note = absl::StrCat( |
|
|
|
|
"EDS resource ", discovery_mechanism_->GetEdsResourceName(), |
|
|
|
|
" contains empty localities: [", |
|
|
|
|
absl::StrJoin(empty_localities, "; "), "]"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
discovery_mechanism_->parent()->OnEndpointChanged( |
|
|
|
|
discovery_mechanism_->index(), std::move(update)); |
|
|
|
|
discovery_mechanism_->index(), std::move(update), |
|
|
|
|
std::move(resolution_note)); |
|
|
|
|
} |
|
|
|
|
void OnErrorHelper(absl::Status status) { |
|
|
|
|
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), |
|
|
|
|
status); |
|
|
|
|
discovery_mechanism_->parent()->OnError( |
|
|
|
|
discovery_mechanism_->index(), |
|
|
|
|
absl::StrCat("EDS watcher error for resource ", |
|
|
|
|
discovery_mechanism_->GetEdsResourceName(), " (", |
|
|
|
|
status.ToString(), ")")); |
|
|
|
|
} |
|
|
|
|
void OnResourceDoesNotExistHelper() { |
|
|
|
|
discovery_mechanism_->parent()->OnResourceDoesNotExist( |
|
|
|
|
discovery_mechanism_->index()); |
|
|
|
|
discovery_mechanism_->index(), |
|
|
|
|
absl::StrCat("EDS resource ", |
|
|
|
|
discovery_mechanism_->GetEdsResourceName(), |
|
|
|
|
" does not exist")); |
|
|
|
|
} |
|
|
|
|
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_; |
|
|
|
|
}; |
|
|
|
@ -288,6 +317,11 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { |
|
|
|
|
// access protected member in base class.
|
|
|
|
|
friend class ResolverResultHandler; |
|
|
|
|
|
|
|
|
|
absl::string_view GetDnsHostname() const { |
|
|
|
|
auto& config = parent()->config_->discovery_mechanisms()[index()]; |
|
|
|
|
return config.dns_hostname; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
OrphanablePtr<Resolver> resolver_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -295,6 +329,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { |
|
|
|
|
OrphanablePtr<DiscoveryMechanism> discovery_mechanism; |
|
|
|
|
// Most recent update reported by the discovery mechanism.
|
|
|
|
|
absl::optional<XdsEndpointResource> latest_update; |
|
|
|
|
// Last resolution note reported by the discovery mechanism, if any.
|
|
|
|
|
std::string resolution_note; |
|
|
|
|
// State used to retain child policy names for priority policy.
|
|
|
|
|
std::vector<size_t /*child_number*/> priority_child_numbers; |
|
|
|
|
size_t next_available_child_number = 0; |
|
|
|
@ -335,9 +371,10 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
|
|
|
|
|
|
void OnEndpointChanged(size_t index, XdsEndpointResource update); |
|
|
|
|
void OnError(size_t index, absl::Status status); |
|
|
|
|
void OnResourceDoesNotExist(size_t index); |
|
|
|
|
void OnEndpointChanged(size_t index, XdsEndpointResource update, |
|
|
|
|
std::string resolution_note); |
|
|
|
|
void OnError(size_t index, std::string resolution_note); |
|
|
|
|
void OnResourceDoesNotExist(size_t index, std::string resolution_note); |
|
|
|
|
|
|
|
|
|
void MaybeDestroyChildPolicyLocked(); |
|
|
|
|
|
|
|
|
@ -345,6 +382,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
|
|
|
|
const ChannelArgs& args); |
|
|
|
|
ServerAddressList CreateChildPolicyAddressesLocked(); |
|
|
|
|
std::string CreateChildPolicyResolutionNoteLocked(); |
|
|
|
|
RefCountedPtr<Config> CreateChildPolicyConfigLocked(); |
|
|
|
|
ChannelArgs CreateChildPolicyArgsLocked(const ChannelArgs& args_in); |
|
|
|
|
|
|
|
|
@ -440,17 +478,16 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() { |
|
|
|
|
std::string target = |
|
|
|
|
parent()->config_->discovery_mechanisms()[index()].dns_hostname; |
|
|
|
|
std::string target; |
|
|
|
|
ChannelArgs args = parent()->args_; |
|
|
|
|
auto* fake_resolver_response_generator = |
|
|
|
|
args.GetPointer<FakeResolverResponseGenerator>( |
|
|
|
|
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR); |
|
|
|
|
if (fake_resolver_response_generator != nullptr) { |
|
|
|
|
target = absl::StrCat("fake:", target); |
|
|
|
|
target = absl::StrCat("fake:", GetDnsHostname()); |
|
|
|
|
args = args.SetObject(fake_resolver_response_generator->Ref()); |
|
|
|
|
} else { |
|
|
|
|
target = absl::StrCat("dns:", target); |
|
|
|
|
target = absl::StrCat("dns:", GetDnsHostname()); |
|
|
|
|
} |
|
|
|
|
resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver( |
|
|
|
|
target.c_str(), args, parent()->interested_parties(), |
|
|
|
@ -458,7 +495,9 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() { |
|
|
|
|
absl::make_unique<ResolverResultHandler>( |
|
|
|
|
Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"))); |
|
|
|
|
if (resolver_ == nullptr) { |
|
|
|
|
parent()->OnResourceDoesNotExist(index()); |
|
|
|
|
parent()->OnResourceDoesNotExist( |
|
|
|
|
index(), |
|
|
|
|
absl::StrCat("error creating DNS resolver for ", GetDnsHostname())); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
resolver_->StartLocked(); |
|
|
|
@ -488,14 +527,18 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() { |
|
|
|
|
|
|
|
|
|
void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: |
|
|
|
|
ReportResult(Resolver::Result result) { |
|
|
|
|
XdsClusterResolverLb* lb_policy = discovery_mechanism_->parent(); |
|
|
|
|
size_t index = discovery_mechanism_->index(); |
|
|
|
|
if (!result.addresses.ok()) { |
|
|
|
|
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), |
|
|
|
|
result.addresses.status()); |
|
|
|
|
if (result.resolution_note.empty()) { |
|
|
|
|
result.resolution_note = absl::StrCat( |
|
|
|
|
"DNS resolution failed for ", discovery_mechanism_->GetDnsHostname(), |
|
|
|
|
" (", result.addresses.status().ToString(), ")"); |
|
|
|
|
} |
|
|
|
|
lb_policy->OnError(index, result.resolution_note); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Convert resolver result to EDS update.
|
|
|
|
|
// TODO(roth): Figure out a way to pass resolution_note through to the
|
|
|
|
|
// child policy.
|
|
|
|
|
XdsEndpointResource update; |
|
|
|
|
XdsEndpointResource::Priority::Locality locality; |
|
|
|
|
locality.name = MakeRefCounted<XdsLocalityName>("", "", ""); |
|
|
|
@ -504,8 +547,8 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: |
|
|
|
|
XdsEndpointResource::Priority priority; |
|
|
|
|
priority.localities.emplace(locality.name.get(), std::move(locality)); |
|
|
|
|
update.priorities.emplace_back(std::move(priority)); |
|
|
|
|
discovery_mechanism_->parent()->OnEndpointChanged( |
|
|
|
|
discovery_mechanism_->index(), std::move(update)); |
|
|
|
|
lb_policy->OnEndpointChanged(index, std::move(update), |
|
|
|
|
std::move(result.resolution_note)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -615,13 +658,14 @@ void XdsClusterResolverLb::ExitIdleLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClusterResolverLb::OnEndpointChanged(size_t index, |
|
|
|
|
XdsEndpointResource update) { |
|
|
|
|
XdsEndpointResource update, |
|
|
|
|
std::string resolution_note) { |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_cluster_resolver_lb %p] Received update from xds client" |
|
|
|
|
" for discovery mechanism %" PRIuPTR "", |
|
|
|
|
this, index); |
|
|
|
|
" for discovery mechanism %" PRIuPTR " (resolution_note=\"%s\")", |
|
|
|
|
this, index, resolution_note.c_str()); |
|
|
|
|
} |
|
|
|
|
DiscoveryMechanismEntry& discovery_entry = discovery_mechanisms_[index]; |
|
|
|
|
// We need at least one priority for each discovery mechanism, just so that we
|
|
|
|
@ -694,6 +738,7 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index, |
|
|
|
|
} |
|
|
|
|
// Save update.
|
|
|
|
|
discovery_entry.latest_update = std::move(update); |
|
|
|
|
discovery_entry.resolution_note = std::move(resolution_note); |
|
|
|
|
discovery_entry.priority_child_numbers = std::move(priority_child_numbers); |
|
|
|
|
// If any discovery mechanism has not received its first update,
|
|
|
|
|
// wait until that happens before creating the child policy.
|
|
|
|
@ -711,27 +756,28 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index, |
|
|
|
|
UpdateChildPolicyLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClusterResolverLb::OnError(size_t index, absl::Status status) { |
|
|
|
|
void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR |
|
|
|
|
" xds watcher reported error: %s", |
|
|
|
|
this, index, status.ToString().c_str()); |
|
|
|
|
" reported error: %s", |
|
|
|
|
this, index, resolution_note.c_str()); |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
if (!discovery_mechanisms_[index].latest_update.has_value()) { |
|
|
|
|
// Call OnEndpointChanged with an empty update just like
|
|
|
|
|
// OnResourceDoesNotExist.
|
|
|
|
|
OnEndpointChanged(index, XdsEndpointResource()); |
|
|
|
|
// Call OnEndpointChanged() with an empty update just like
|
|
|
|
|
// OnResourceDoesNotExist().
|
|
|
|
|
OnEndpointChanged(index, XdsEndpointResource(), std::move(resolution_note)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) { |
|
|
|
|
void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index, |
|
|
|
|
std::string resolution_note) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR |
|
|
|
|
" resource does not exist", |
|
|
|
|
this, index); |
|
|
|
|
" resource does not exist: %s", |
|
|
|
|
this, index, resolution_note.c_str()); |
|
|
|
|
if (shutting_down_) return; |
|
|
|
|
// Call OnEndpointChanged with an empty update.
|
|
|
|
|
OnEndpointChanged(index, XdsEndpointResource()); |
|
|
|
|
// Call OnEndpointChanged() with an empty update.
|
|
|
|
|
OnEndpointChanged(index, XdsEndpointResource(), std::move(resolution_note)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -780,6 +826,16 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { |
|
|
|
|
return addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::string XdsClusterResolverLb::CreateChildPolicyResolutionNoteLocked() { |
|
|
|
|
std::vector<absl::string_view> resolution_notes; |
|
|
|
|
for (const auto& discovery_entry : discovery_mechanisms_) { |
|
|
|
|
if (!discovery_entry.resolution_note.empty()) { |
|
|
|
|
resolution_notes.push_back(discovery_entry.resolution_note); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return absl::StrJoin(resolution_notes, "; "); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::Config> |
|
|
|
|
XdsClusterResolverLb::CreateChildPolicyConfigLocked() { |
|
|
|
|
Json::Object priority_children; |
|
|
|
@ -948,6 +1004,7 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() { |
|
|
|
|
update_args.config = CreateChildPolicyConfigLocked(); |
|
|
|
|
if (update_args.config == nullptr) return; |
|
|
|
|
update_args.addresses = CreateChildPolicyAddressesLocked(); |
|
|
|
|
update_args.resolution_note = CreateChildPolicyResolutionNoteLocked(); |
|
|
|
|
update_args.args = CreateChildPolicyArgsLocked(args_); |
|
|
|
|
if (child_policy_ == nullptr) { |
|
|
|
|
child_policy_ = CreateChildPolicyLocked(update_args.args); |
|
|
|
|