diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index be19d6b8c8c..0133a713ec1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -31,6 +31,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" @@ -129,6 +130,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { RefCountedPtr xds_cluster_resolver_lb, size_t index) : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {} + virtual void Start() = 0; void Orphan() override = 0; // Caller must ensure that config_ is set before calling. @@ -166,7 +168,9 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { public: EdsDiscoveryMechanism( RefCountedPtr xds_cluster_resolver_lb, - size_t index); + size_t index) + : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {} + void Start() override; void Orphan() override; private: @@ -218,6 +222,37 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { EndpointWatcher* watcher_ = nullptr; }; + class LogicalDNSDiscoveryMechanism : public DiscoveryMechanism { + public: + LogicalDNSDiscoveryMechanism( + RefCountedPtr xds_cluster_resolver_lb, + size_t index) + : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {} + void Start() override; + void Orphan() override; + + private: + class ResolverResultHandler : public Resolver::ResultHandler { + public: + explicit ResolverResultHandler( + RefCountedPtr discovery_mechanism) + : discovery_mechanism_(std::move(discovery_mechanism)) {} + + ~ResolverResultHandler() override {} + + void ReturnResult(Resolver::Result result) override; + + void ReturnError(grpc_error* error) override; + + private: + RefCountedPtr discovery_mechanism_; + }; + // This is only necessary because of a bug in msvc where nested class cannot + // access protected member in base class. + friend class ResolverResultHandler; + OrphanablePtr resolver_; + }; + struct DiscoveryMechanismEntry { OrphanablePtr discovery_mechanism; bool first_update_received = false; @@ -341,14 +376,12 @@ void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity, // XdsClusterResolverLb::EdsDiscoveryMechanism // -XdsClusterResolverLb::EdsDiscoveryMechanism::EdsDiscoveryMechanism( - RefCountedPtr xds_cluster_resolver_lb, size_t index) - : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) { +void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { gpr_log(GPR_INFO, - "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR ":%p starting xds watch for %s", - parent(), index, this, + parent(), index(), this, std::string(GetXdsClusterResolverResourceName()).c_str()); } auto watcher = absl::make_unique( @@ -361,7 +394,7 @@ XdsClusterResolverLb::EdsDiscoveryMechanism::EdsDiscoveryMechanism( void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { gpr_log(GPR_INFO, - "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR + "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR ":%p cancelling xds watch for %s", parent(), index(), this, std::string(GetXdsClusterResolverResourceName()).c_str()); @@ -431,6 +464,63 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: delete this; } +// +// XdsClusterResolverLb::LogicalDNSDiscoveryMechanism +// + +void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() { + resolver_ = ResolverRegistry::CreateResolver( + parent()->server_name_.c_str(), parent()->args_, + grpc_pollset_set_create(), parent()->work_serializer(), + absl::make_unique( + Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"))); + if (resolver_ == nullptr) { + parent()->OnResourceDoesNotExist(index()); + return; + } + resolver_->StartLocked(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log(GPR_INFO, + "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism " + "%" PRIuPTR ":%p starting dns resolver %p", + parent(), index(), this, resolver_.get()); + } +} + +void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) { + gpr_log( + GPR_INFO, + "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR + ":%p shutting down dns resolver %p", + parent(), index(), this, resolver_.get()); + } + resolver_.reset(); + Unref(); +} + +// +// XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler +// + +void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: + ReturnResult(Resolver::Result result) { + // convert result to eds update + XdsApi::EdsUpdate update; + XdsApi::EdsUpdate::Priority::Locality locality; + locality.name = MakeRefCounted("", "", ""); + locality.endpoints = std::move(result.addresses); + update.priorities[0].localities.emplace(locality.name.get(), + std::move(locality)); + discovery_mechanism_->parent()->OnEndpointChanged( + discovery_mechanism_->index(), std::move(update)); +} + +void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler:: + ReturnError(grpc_error* error) { + discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error); +} + // // XdsClusterResolverLb public methods // @@ -530,16 +620,29 @@ void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) { if (child_policy_ != nullptr) UpdateChildPolicyLocked(); // Create endpoint watcher if needed. if (is_initial_update) { - for (auto config : config_->discovery_mechanisms()) { - // TODO(donnadionne): need to add new types of - // watchers. + for (const auto& config : config_->discovery_mechanisms()) { DiscoveryMechanismEntry entry; - entry.discovery_mechanism = - grpc_core::MakeOrphanable( - Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"), - discovery_mechanisms_.size()); + if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism:: + DiscoveryMechanismType::EDS) { + entry.discovery_mechanism = + grpc_core::MakeOrphanable( + Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"), + discovery_mechanisms_.size()); + } else if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism:: + DiscoveryMechanismType::LOGICAL_DNS) { + entry.discovery_mechanism = + grpc_core::MakeOrphanable( + Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"), + discovery_mechanisms_.size()); + } else { + GPR_ASSERT(0); + } discovery_mechanisms_.push_back(std::move(entry)); } + // Call start() on all discovery mechanisms after creation. + for (const auto& discovery_mechanism : discovery_mechanisms_) { + discovery_mechanism.discovery_mechanism->Start(); + } } }