XdsClient: remove resource-type-specific methods from XdsClient API (#28231)

* WIP

* introduce XdsResourceType API and change Listener parsing to use it

* converted RouteConfig parsing

* convert cluster and endpoint parsing

* cleanup

* clang-format

* attempt to work around compiler problems

* move XdsResourceType to its own file, and move endpoint code out of XdsApi

* move cluster parsing to its own file

* move route config parsing to its own file

* move listener parsing to its own file

* clang-format

* minor cleanup

* plumbed XdsResourceType throughout XdsClient

* a bit of cleanup

* more cleanup

* construct full resource names before calling XdsApi::CreateAdsRequest()

* remove some unneeded code

* clean up includes and have XdsResourceType initialize the upb symtab

* more cleanup of unnecessary code

* more cleanup

* update comment

* clang-format

* add missing virtual dtor

* fix build

* remove resource-type-specific methods from XdsClient API

* have each resource type register itself upon instantiation

* remove comment

* add missing virtual dtor

* clang-format
pull/28301/head
Mark D. Roth 3 years ago committed by GitHub
parent 3f1d4de5d5
commit 747a5d8c24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 1
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  3. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  4. 29
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  5. 92
      src/core/ext/xds/xds_client.cc
  6. 119
      src/core/ext/xds/xds_client.h
  7. 38
      src/core/ext/xds/xds_cluster.h
  8. 38
      src/core/ext/xds/xds_endpoint.h
  9. 38
      src/core/ext/xds/xds_listener.h
  10. 10
      src/core/ext/xds/xds_resource_type.cc
  11. 7
      src/core/ext/xds/xds_resource_type.h
  12. 39
      src/core/ext/xds/xds_route_config.h
  13. 28
      src/core/ext/xds/xds_server_config_fetcher.cc
  14. 1
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -25,6 +25,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -66,7 +67,7 @@ class CdsLb : public LoadBalancingPolicy {
private: private:
// Watcher for getting cluster data from XdsClient. // Watcher for getting cluster data from XdsClient.
class ClusterWatcher : public XdsClient::ClusterWatcherInterface { class ClusterWatcher : public XdsClusterResourceType::WatcherInterface {
public: public:
ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name) ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
: parent_(std::move(parent)), name_(std::move(name)) {} : parent_(std::move(parent)), name_(std::move(name)) {}
@ -147,7 +148,7 @@ class CdsLb : public LoadBalancingPolicy {
const std::string& cluster_name, const XdsClusterResource& cluster_data); const std::string& cluster_name, const XdsClusterResource& cluster_data);
void CancelClusterDataWatch(absl::string_view cluster_name, void CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher, ClusterWatcher* watcher,
bool delay_unsubscription = false); bool delay_unsubscription = false);
void MaybeDestroyChildPolicyLocked(); void MaybeDestroyChildPolicyLocked();
@ -301,7 +302,8 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
} }
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), config_->cluster()); auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), config_->cluster());
watchers_[config_->cluster()].watcher = watcher.get(); watchers_[config_->cluster()].watcher = watcher.get();
xds_client_->WatchClusterData(config_->cluster(), std::move(watcher)); XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(),
std::move(watcher));
} }
} }
@ -326,7 +328,8 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster(
name.c_str()); name.c_str());
} }
state.watcher = watcher.get(); state.watcher = watcher.get();
xds_client_->WatchClusterData(name, std::move(watcher)); XdsClusterResourceType::StartWatch(xds_client_.get(), name,
std::move(watcher));
return false; return false;
} }
// Don't have the update we need yet. // Don't have the update we need yet.
@ -610,7 +613,7 @@ grpc_error_handle CdsLb::UpdateXdsCertificateProvider(
} }
void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher, ClusterWatcher* watcher,
bool delay_unsubscription) { bool delay_unsubscription) {
if (xds_certificate_provider_ != nullptr) { if (xds_certificate_provider_ != nullptr) {
std::string name(cluster_name); std::string name(cluster_name);
@ -620,7 +623,7 @@ void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
nullptr); nullptr);
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {}); xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
} }
xds_client_->CancelClusterDataWatch(cluster_name, watcher, XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name, watcher,
delay_unsubscription); delay_unsubscription);
} }
// //

@ -30,6 +30,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"

@ -39,6 +39,7 @@
#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -165,7 +166,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
bool disable_reresolution() override { return true; } bool disable_reresolution() override { return true; }
private: private:
class EndpointWatcher : public XdsClient::EndpointWatcherInterface { class EndpointWatcher : public XdsEndpointResourceType::WatcherInterface {
public: public:
explicit EndpointWatcher( explicit EndpointWatcher(
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism) RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)
@ -414,8 +415,8 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
auto watcher = MakeRefCounted<EndpointWatcher>( auto watcher = MakeRefCounted<EndpointWatcher>(
Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism")); Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
watcher_ = watcher.get(); watcher_ = watcher.get();
parent()->xds_client_->WatchEndpointData(GetEdsResourceName(), XdsEndpointResourceType::StartWatch(parent()->xds_client_.get(),
std::move(watcher)); GetEdsResourceName(), std::move(watcher));
} }
void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
@ -425,8 +426,8 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
":%p cancelling xds watch for %s", ":%p cancelling xds watch for %s",
parent(), index(), this, std::string(GetEdsResourceName()).c_str()); parent(), index(), this, std::string(GetEdsResourceName()).c_str());
} }
parent()->xds_client_->CancelEndpointDataWatch(GetEdsResourceName(), XdsEndpointResourceType::CancelWatch(parent()->xds_client_.get(),
watcher_); GetEdsResourceName(), watcher_);
Unref(); Unref();
} }

@ -31,6 +31,8 @@
#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/ext/xds/xds_routing.h" #include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
@ -80,7 +82,7 @@ class XdsResolver : public Resolver {
} }
private: private:
class ListenerWatcher : public XdsClient::ListenerWatcherInterface { class ListenerWatcher : public XdsListenerResourceType::WatcherInterface {
public: public:
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver) explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {} : resolver_(std::move(resolver)) {}
@ -118,7 +120,8 @@ class XdsResolver : public Resolver {
RefCountedPtr<XdsResolver> resolver_; RefCountedPtr<XdsResolver> resolver_;
}; };
class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { class RouteConfigWatcher
: public XdsRouteConfigResourceType::WatcherInterface {
public: public:
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver) explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {} : resolver_(std::move(resolver)) {}
@ -291,14 +294,14 @@ class XdsResolver : public Resolver {
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr; ListenerWatcher* listener_watcher_ = nullptr;
// This will not contain the RouteConfiguration, even if it comes with the // This will not contain the RouteConfiguration, even if it comes with the
// LDS response; instead, the relevant VirtualHost from the // LDS response; instead, the relevant VirtualHost from the
// RouteConfiguration will be saved in current_virtual_host_. // RouteConfiguration will be saved in current_virtual_host_.
XdsListenerResource current_listener_; XdsListenerResource current_listener_;
std::string route_config_name_; std::string route_config_name_;
XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr; RouteConfigWatcher* route_config_watcher_ = nullptr;
XdsRouteConfigResource::VirtualHost current_virtual_host_; XdsRouteConfigResource::VirtualHost current_virtual_host_;
ClusterState::ClusterStateMap cluster_state_map_; ClusterState::ClusterStateMap cluster_state_map_;
@ -694,7 +697,8 @@ void XdsResolver::StartLocked() {
interested_parties_); interested_parties_);
auto watcher = MakeRefCounted<ListenerWatcher>(Ref()); auto watcher = MakeRefCounted<ListenerWatcher>(Ref());
listener_watcher_ = watcher.get(); listener_watcher_ = watcher.get();
xds_client_->WatchListenerData(server_name_, std::move(watcher)); XdsListenerResourceType::StartWatch(xds_client_.get(), server_name_,
std::move(watcher));
} }
void XdsResolver::ShutdownLocked() { void XdsResolver::ShutdownLocked() {
@ -703,12 +707,14 @@ void XdsResolver::ShutdownLocked() {
} }
if (xds_client_ != nullptr) { if (xds_client_ != nullptr) {
if (listener_watcher_ != nullptr) { if (listener_watcher_ != nullptr) {
xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_, XdsListenerResourceType::CancelWatch(xds_client_.get(), server_name_,
listener_watcher_,
/*delay_unsubscription=*/false); /*delay_unsubscription=*/false);
} }
if (route_config_watcher_ != nullptr) { if (route_config_watcher_ != nullptr) {
xds_client_->CancelRouteConfigDataWatch( XdsRouteConfigResourceType::CancelWatch(
server_name_, route_config_watcher_, /*delay_unsubscription=*/false); xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/false);
} }
grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
interested_parties_); interested_parties_);
@ -726,8 +732,8 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
if (listener.http_connection_manager.route_config_name != if (listener.http_connection_manager.route_config_name !=
route_config_name_) { route_config_name_) {
if (route_config_watcher_ != nullptr) { if (route_config_watcher_ != nullptr) {
xds_client_->CancelRouteConfigDataWatch( XdsRouteConfigResourceType::CancelWatch(
route_config_name_, route_config_watcher_, xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/ /*delay_unsubscription=*/
!listener.http_connection_manager.route_config_name.empty()); !listener.http_connection_manager.route_config_name.empty());
route_config_watcher_ = nullptr; route_config_watcher_ = nullptr;
@ -738,7 +744,8 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
current_virtual_host_.routes.clear(); current_virtual_host_.routes.clear();
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref()); auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get(); route_config_watcher_ = watcher.get();
xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher)); XdsRouteConfigResourceType::StartWatch(
xds_client_.get(), route_config_name_, std::move(watcher));
} }
} }
current_listener_ = std::move(listener); current_listener_ = std::move(listener);

@ -37,7 +37,10 @@
#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -76,11 +79,6 @@ const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
const char* kLdsTypeUrl = "envoy.config.listener.v3.Listener";
const char* kRdsTypeUrl = "envoy.config.route.v3.RouteConfiguration";
const char* kCdsTypeUrl = "envoy.config.cluster.v3.Cluster";
const char* kEdsTypeUrl = "envoy.config.endpoint.v3.ClusterLoadAssignment";
} // namespace } // namespace
class XdsClient::Notifier { class XdsClient::Notifier {
@ -1924,66 +1922,6 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
} }
} }
void XdsClient::WatchListenerData(
absl::string_view listener_name,
RefCountedPtr<ListenerWatcherInterface> watcher) {
WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl),
listener_name, std::move(watcher));
}
void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
ListenerWatcherInterface* watcher,
bool delay_unsubscription) {
CancelResourceWatch(
XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl),
listener_name, watcher, delay_unsubscription);
}
void XdsClient::WatchRouteConfigData(
absl::string_view route_config_name,
RefCountedPtr<RouteConfigWatcherInterface> watcher) {
WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl),
route_config_name, std::move(watcher));
}
void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
RouteConfigWatcherInterface* watcher,
bool delay_unsubscription) {
CancelResourceWatch(
XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl),
route_config_name, watcher, delay_unsubscription);
}
void XdsClient::WatchClusterData(
absl::string_view cluster_name,
RefCountedPtr<ClusterWatcherInterface> watcher) {
WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl),
cluster_name, std::move(watcher));
}
void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcherInterface* watcher,
bool delay_unsubscription) {
CancelResourceWatch(
XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl),
cluster_name, watcher, delay_unsubscription);
}
void XdsClient::WatchEndpointData(
absl::string_view eds_service_name,
RefCountedPtr<EndpointWatcherInterface> watcher) {
WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl),
eds_service_name, std::move(watcher));
}
void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
EndpointWatcherInterface* watcher,
bool delay_unsubscription) {
CancelResourceWatch(
XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl),
eds_service_name, watcher, delay_unsubscription);
}
absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName( absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type) { absl::string_view name, const XdsResourceType* type) {
// Old-style names use the empty string for authority. // Old-style names use the empty string for authority.
@ -2050,9 +1988,8 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
it->first.second /*eds_service_name*/); it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get(); load_report_state.drop_stats = cluster_drop_stats.get();
} }
auto resource_name = ParseXdsResourceName( auto resource_name =
cluster_name, ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get());
XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl));
GPR_ASSERT(resource_name.ok()); GPR_ASSERT(resource_name.ok());
auto a = authority_state_map_.find(resource_name->authority); auto a = authority_state_map_.find(resource_name->authority);
if (a != authority_state_map_.end()) { if (a != authority_state_map_.end()) {
@ -2114,9 +2051,8 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
std::move(locality)); std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get(); locality_state.locality_stats = cluster_locality_stats.get();
} }
auto resource_name = ParseXdsResourceName( auto resource_name =
cluster_name, ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get());
XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl));
GPR_ASSERT(resource_name.ok()); GPR_ASSERT(resource_name.ok());
auto a = authority_state_map_.find(resource_name->authority); auto a = authority_state_map_.find(resource_name->authority);
if (a != authority_state_map_.end()) { if (a != authority_state_map_.end()) {
@ -2285,23 +2221,9 @@ std::string XdsClient::DumpClientConfigBinary() {
// accessors for global state // accessors for global state
// //
namespace {
void InitResourceTypeRegistry() {
auto* registry = XdsResourceTypeRegistry::GetOrCreate();
registry->RegisterType(absl::make_unique<XdsListenerResourceType>());
registry->RegisterType(absl::make_unique<XdsRouteConfigResourceType>());
registry->RegisterType(absl::make_unique<XdsClusterResourceType>());
registry->RegisterType(absl::make_unique<XdsEndpointResourceType>());
}
} // namespace
void XdsClientGlobalInit() { void XdsClientGlobalInit() {
g_mu = new Mutex; g_mu = new Mutex;
XdsHttpFilterRegistry::Init(); XdsHttpFilterRegistry::Init();
static gpr_once once = GPR_ONCE_INIT;
gpr_once_init(&once, InitResourceTypeRegistry);
} }
// TODO(roth): Find a better way to clear the fallback config that does // TODO(roth): Find a better way to clear the fallback config that does

@ -28,10 +28,7 @@
#include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_cluster.h" #include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
@ -39,6 +36,7 @@
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core { namespace grpc_core {
@ -59,67 +57,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
}; };
// TODO(roth): Consider removing these resource-type-specific APIs in
// favor of some mechanism for automatic type-deduction for the generic
// API.
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnListenerChanged(XdsListenerResource listener) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnListenerChanged(
static_cast<const XdsListenerResourceType::ListenerData*>(resource)
->resource);
}
};
// RouteConfiguration data watcher interface. Implemented by callers.
class RouteConfigWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnRouteConfigChanged(
static_cast<const XdsRouteConfigResourceType::RouteConfigData*>(
resource)
->resource);
}
};
// Cluster data watcher interface. Implemented by callers.
class ClusterWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnClusterChanged(
static_cast<const XdsClusterResourceType::ClusterData*>(resource)
->resource);
}
};
// Endpoint data watcher interface. Implemented by callers.
class EndpointWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnEndpointChanged(XdsEndpointResource update) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnEndpointChanged(
static_cast<const XdsEndpointResourceType::EndpointData*>(resource)
->resource);
}
};
// Factory function to get or create the global XdsClient instance. // Factory function to get or create the global XdsClient instance.
// If *error is not GRPC_ERROR_NONE upon return, then there was // If *error is not GRPC_ERROR_NONE upon return, then there was
// an error initializing the client. // an error initializing the client.
@ -159,58 +96,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
ResourceWatcherInterface* watcher, ResourceWatcherInterface* watcher,
bool delay_unsubscription = false); bool delay_unsubscription = false);
// Start and cancel listener data watch for a listener.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchListenerData(absl::string_view listener_name,
RefCountedPtr<ListenerWatcherInterface> watcher);
void CancelListenerDataWatch(absl::string_view listener_name,
ListenerWatcherInterface* watcher,
bool delay_unsubscription = false);
// Start and cancel route config data watch for a listener.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchRouteConfigData(absl::string_view route_config_name,
RefCountedPtr<RouteConfigWatcherInterface> watcher);
void CancelRouteConfigDataWatch(absl::string_view route_config_name,
RouteConfigWatcherInterface* watcher,
bool delay_unsubscription = false);
// Start and cancel cluster data watch for a cluster.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchClusterData(absl::string_view cluster_name,
RefCountedPtr<ClusterWatcherInterface> watcher);
void CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcherInterface* watcher,
bool delay_unsubscription = false);
// Start and cancel endpoint data watch for a cluster.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchEndpointData(absl::string_view eds_service_name,
RefCountedPtr<EndpointWatcherInterface> watcher);
void CancelEndpointDataWatch(absl::string_view eds_service_name,
EndpointWatcherInterface* watcher,
bool delay_unsubscription = false);
// Adds and removes drop stats for cluster_name and eds_service_name. // Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
absl::string_view lrs_server, absl::string_view cluster_name, absl::string_view lrs_server, absl::string_view cluster_name,

@ -27,6 +27,7 @@
#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_resource_type.h"
@ -87,6 +88,25 @@ class XdsClusterResourceType : public XdsResourceType {
XdsClusterResource resource; XdsClusterResource resource;
}; };
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnClusterChanged(
static_cast<const XdsClusterResourceType::ClusterData*>(resource)
->resource);
}
};
static const XdsClusterResourceType* Get() {
static const XdsClusterResourceType* g_instance =
new XdsClusterResourceType();
return g_instance;
}
absl::string_view type_url() const override { absl::string_view type_url() const override {
return "envoy.config.cluster.v3.Cluster"; return "envoy.config.cluster.v3.Cluster";
} }
@ -94,6 +114,19 @@ class XdsClusterResourceType : public XdsResourceType {
return "envoy.api.v2.Cluster"; return "envoy.api.v2.Cluster";
} }
static void StartWatch(XdsClient* xds_client, absl::string_view resource_name,
RefCountedPtr<WatcherInterface> watcher) {
xds_client->WatchResource(Get(), resource_name, std::move(watcher));
}
static void CancelWatch(XdsClient* xds_client,
absl::string_view resource_name,
WatcherInterface* watcher,
bool delay_unsubscription = false) {
xds_client->CancelResourceWatch(Get(), resource_name, watcher,
delay_unsubscription);
}
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context, absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource, absl::string_view serialized_resource,
bool is_v2) const override; bool is_v2) const override;
@ -120,6 +153,11 @@ class XdsClusterResourceType : public XdsResourceType {
envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef( envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef(
symtab); symtab);
} }
private:
XdsClusterResourceType() {
XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this);
}
}; };
} // namespace grpc_core } // namespace grpc_core

@ -27,6 +27,7 @@
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -116,6 +117,25 @@ class XdsEndpointResourceType : public XdsResourceType {
XdsEndpointResource resource; XdsEndpointResource resource;
}; };
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnEndpointChanged(XdsEndpointResource update) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnEndpointChanged(
static_cast<const XdsEndpointResourceType::EndpointData*>(resource)
->resource);
}
};
static const XdsEndpointResourceType* Get() {
static const XdsEndpointResourceType* g_instance =
new XdsEndpointResourceType();
return g_instance;
}
absl::string_view type_url() const override { absl::string_view type_url() const override {
return "envoy.config.endpoint.v3.ClusterLoadAssignment"; return "envoy.config.endpoint.v3.ClusterLoadAssignment";
} }
@ -123,6 +143,19 @@ class XdsEndpointResourceType : public XdsResourceType {
return "envoy.api.v2.ClusterLoadAssignment"; return "envoy.api.v2.ClusterLoadAssignment";
} }
static void StartWatch(XdsClient* xds_client, absl::string_view resource_name,
RefCountedPtr<WatcherInterface> watcher) {
xds_client->WatchResource(Get(), resource_name, std::move(watcher));
}
static void CancelWatch(XdsClient* xds_client,
absl::string_view resource_name,
WatcherInterface* watcher,
bool delay_unsubscription = false) {
xds_client->CancelResourceWatch(Get(), resource_name, watcher,
delay_unsubscription);
}
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context, absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource, absl::string_view serialized_resource,
bool is_v2) const override; bool is_v2) const override;
@ -144,6 +177,11 @@ class XdsEndpointResourceType : public XdsResourceType {
void InitUpbSymtab(upb_symtab* symtab) const override { void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab); envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab);
} }
private:
XdsEndpointResourceType() {
XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this);
}
}; };
} // namespace grpc_core } // namespace grpc_core

@ -30,6 +30,7 @@
#include "envoy/config/listener/v3/listener.upbdefs.h" #include "envoy/config/listener/v3/listener.upbdefs.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_route_config.h" #include "src/core/ext/xds/xds_route_config.h"
@ -195,6 +196,25 @@ class XdsListenerResourceType : public XdsResourceType {
XdsListenerResource resource; XdsListenerResource resource;
}; };
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnListenerChanged(XdsListenerResource listener) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnListenerChanged(
static_cast<const XdsListenerResourceType::ListenerData*>(resource)
->resource);
}
};
static const XdsListenerResourceType* Get() {
static const XdsListenerResourceType* g_instance =
new XdsListenerResourceType();
return g_instance;
}
absl::string_view type_url() const override { absl::string_view type_url() const override {
return "envoy.config.listener.v3.Listener"; return "envoy.config.listener.v3.Listener";
} }
@ -202,6 +222,19 @@ class XdsListenerResourceType : public XdsResourceType {
return "envoy.api.v2.Listener"; return "envoy.api.v2.Listener";
} }
static void StartWatch(XdsClient* xds_client, absl::string_view resource_name,
RefCountedPtr<WatcherInterface> watcher) {
xds_client->WatchResource(Get(), resource_name, std::move(watcher));
}
static void CancelWatch(XdsClient* xds_client,
absl::string_view resource_name,
WatcherInterface* watcher,
bool delay_unsubscription = false) {
xds_client->CancelResourceWatch(Get(), resource_name, watcher,
delay_unsubscription);
}
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context, absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource, absl::string_view serialized_resource,
bool is_v2) const override; bool is_v2) const override;
@ -228,6 +261,11 @@ class XdsListenerResourceType : public XdsResourceType {
symtab); symtab);
XdsHttpFilterRegistry::PopulateSymtab(symtab); XdsHttpFilterRegistry::PopulateSymtab(symtab);
} }
private:
XdsListenerResourceType() {
XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this);
}
}; };
} // namespace grpc_core } // namespace grpc_core

@ -45,26 +45,26 @@ XdsResourceTypeRegistry* XdsResourceTypeRegistry::GetOrCreate() {
const XdsResourceType* XdsResourceTypeRegistry::GetType( const XdsResourceType* XdsResourceTypeRegistry::GetType(
absl::string_view resource_type) { absl::string_view resource_type) {
auto it = resource_types_.find(resource_type); auto it = resource_types_.find(resource_type);
if (it != resource_types_.end()) return it->second.get(); if (it != resource_types_.end()) return it->second;
auto it2 = v2_resource_types_.find(resource_type); auto it2 = v2_resource_types_.find(resource_type);
if (it2 != v2_resource_types_.end()) return it2->second; if (it2 != v2_resource_types_.end()) return it2->second;
return nullptr; return nullptr;
} }
void XdsResourceTypeRegistry::RegisterType( void XdsResourceTypeRegistry::RegisterType(
std::unique_ptr<XdsResourceType> resource_type) { const XdsResourceType* resource_type) {
GPR_ASSERT(resource_types_.find(resource_type->type_url()) == GPR_ASSERT(resource_types_.find(resource_type->type_url()) ==
resource_types_.end()); resource_types_.end());
GPR_ASSERT(v2_resource_types_.find(resource_type->v2_type_url()) == GPR_ASSERT(v2_resource_types_.find(resource_type->v2_type_url()) ==
v2_resource_types_.end()); v2_resource_types_.end());
v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type.get()); v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type);
resource_types_.emplace(resource_type->type_url(), std::move(resource_type)); resource_types_.emplace(resource_type->type_url(), resource_type);
} }
void XdsResourceTypeRegistry::ForEach( void XdsResourceTypeRegistry::ForEach(
std::function<void(const XdsResourceType*)> func) { std::function<void(const XdsResourceType*)> func) {
for (const auto& p : resource_types_) { for (const auto& p : resource_types_) {
func(p.second.get()); func(p.second);
} }
} }

@ -105,16 +105,15 @@ class XdsResourceTypeRegistry {
// Registers a resource type. // Registers a resource type.
// All types must be registered before they can be used in the XdsClient. // All types must be registered before they can be used in the XdsClient.
void RegisterType(std::unique_ptr<XdsResourceType> resource_type); void RegisterType(const XdsResourceType* resource_type);
// Calls func for each resource type. // Calls func for each resource type.
void ForEach(std::function<void(const XdsResourceType*)> func); void ForEach(std::function<void(const XdsResourceType*)> func);
private: private:
std::map<absl::string_view /*resource_type*/, std::map<absl::string_view /*resource_type*/, const XdsResourceType*>
std::unique_ptr<XdsResourceType>>
resource_types_; resource_types_;
std::map<absl::string_view /*v2_resource_type*/, XdsResourceType*> std::map<absl::string_view /*v2_resource_type*/, const XdsResourceType*>
v2_resource_types_; v2_resource_types_;
}; };

@ -29,6 +29,7 @@
#include "envoy/config/route/v3/route.upbdefs.h" #include "envoy/config/route/v3/route.upbdefs.h"
#include "re2/re2.h" #include "re2/re2.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_resource_type.h"
@ -195,6 +196,26 @@ class XdsRouteConfigResourceType : public XdsResourceType {
XdsRouteConfigResource resource; XdsRouteConfigResource resource;
}; };
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnRouteConfigChanged(
static_cast<const XdsRouteConfigResourceType::RouteConfigData*>(
resource)
->resource);
}
};
static const XdsRouteConfigResourceType* Get() {
static const XdsRouteConfigResourceType* g_instance =
new XdsRouteConfigResourceType();
return g_instance;
}
absl::string_view type_url() const override { absl::string_view type_url() const override {
return "envoy.config.route.v3.RouteConfiguration"; return "envoy.config.route.v3.RouteConfiguration";
} }
@ -202,6 +223,19 @@ class XdsRouteConfigResourceType : public XdsResourceType {
return "envoy.api.v2.RouteConfiguration"; return "envoy.api.v2.RouteConfiguration";
} }
static void StartWatch(XdsClient* xds_client, absl::string_view resource_name,
RefCountedPtr<WatcherInterface> watcher) {
xds_client->WatchResource(Get(), resource_name, std::move(watcher));
}
static void CancelWatch(XdsClient* xds_client,
absl::string_view resource_name,
WatcherInterface* watcher,
bool delay_unsubscription = false) {
xds_client->CancelResourceWatch(Get(), resource_name, watcher,
delay_unsubscription);
}
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context, absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource, absl::string_view serialized_resource,
bool /*is_v2*/) const override; bool /*is_v2*/) const override;
@ -223,6 +257,11 @@ class XdsRouteConfigResourceType : public XdsResourceType {
void InitUpbSymtab(upb_symtab* symtab) const override { void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab); envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab);
} }
private:
XdsRouteConfigResourceType() {
XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this);
}
}; };
} // namespace grpc_core } // namespace grpc_core

@ -26,6 +26,8 @@
#include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_channel_stack_modifier.h" #include "src/core/ext/xds/xds_channel_stack_modifier.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/ext/xds/xds_routing.h" #include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -86,7 +88,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
// update received was a fatal error (resource does not exist), the server // update received was a fatal error (resource does not exist), the server
// listener is made to stop listening. // listener is made to stop listening.
class XdsServerConfigFetcher::ListenerWatcher class XdsServerConfigFetcher::ListenerWatcher
: public XdsClient::ListenerWatcherInterface { : public XdsListenerResourceType::WatcherInterface {
public: public:
ListenerWatcher(RefCountedPtr<XdsClient> xds_client, ListenerWatcher(RefCountedPtr<XdsClient> xds_client,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
@ -217,7 +219,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
// with the latest updates and new connections do not need to wait for the RDS // with the latest updates and new connections do not need to wait for the RDS
// resources to be fetched. // resources to be fetched.
class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { RouteConfigWatcher : public XdsRouteConfigResourceType::WatcherInterface {
public: public:
RouteConfigWatcher( RouteConfigWatcher(
std::string resource_name, std::string resource_name,
@ -381,7 +383,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
// DynamicXdsServerConfigSelectorProvider // DynamicXdsServerConfigSelectorProvider
class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider::RouteConfigWatcher DynamicXdsServerConfigSelectorProvider::RouteConfigWatcher
: public XdsClient::RouteConfigWatcherInterface { : public XdsRouteConfigResourceType::WatcherInterface {
public: public:
explicit RouteConfigWatcher( explicit RouteConfigWatcher(
RefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent) RefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)
@ -418,7 +420,8 @@ void XdsServerConfigFetcher::StartWatch(
xds_client_, std::move(watcher), serving_status_notifier_, xds_client_, std::move(watcher), serving_status_notifier_,
listening_address); listening_address);
auto* listener_watcher_ptr = listener_watcher.get(); auto* listener_watcher_ptr = listener_watcher.get();
xds_client_->WatchListenerData( XdsListenerResourceType::StartWatch(
xds_client_.get(),
absl::StrReplaceAll( absl::StrReplaceAll(
xds_client_->bootstrap().server_listener_resource_name_template(), xds_client_->bootstrap().server_listener_resource_name_template(),
{{"%s", listening_address}}), {{"%s", listening_address}}),
@ -433,7 +436,8 @@ void XdsServerConfigFetcher::CancelWatch(
auto it = listener_watchers_.find(watcher); auto it = listener_watchers_.find(watcher);
if (it != listener_watchers_.end()) { if (it != listener_watchers_.end()) {
// Cancel the watch on the listener before erasing // Cancel the watch on the listener before erasing
xds_client_->CancelListenerDataWatch( XdsListenerResourceType::CancelWatch(
xds_client_.get(),
absl::StrReplaceAll( absl::StrReplaceAll(
xds_client_->bootstrap().server_listener_resource_name_template(), xds_client_->bootstrap().server_listener_resource_name_template(),
{{"%s", it->second->listening_address()}}), {{"%s", it->second->listening_address()}}),
@ -618,8 +622,8 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
MakeRefCounted<RouteConfigWatcher>(resource_name, WeakRef()); MakeRefCounted<RouteConfigWatcher>(resource_name, WeakRef());
rds_map_.emplace(resource_name, RdsUpdateState{route_config_watcher.get(), rds_map_.emplace(resource_name, RdsUpdateState{route_config_watcher.get(),
absl::nullopt}); absl::nullopt});
xds_client_->WatchRouteConfigData(resource_name, XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name,
std::move(route_config_watcher)); std::move(route_config_watcher));
} }
if (rds_resources_yet_to_fetch_ != 0) { if (rds_resources_yet_to_fetch_ != 0) {
listener_watcher_ = std::move(listener_watcher); listener_watcher_ = std::move(listener_watcher);
@ -638,7 +642,8 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
MutexLock lock(&mu_); MutexLock lock(&mu_);
// Cancel the RDS watches to clear up the weak refs // Cancel the RDS watches to clear up the weak refs
for (const auto& entry : rds_map_) { for (const auto& entry : rds_map_) {
xds_client_->CancelRouteConfigDataWatch(entry.first, entry.second.watcher, XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), entry.first,
entry.second.watcher,
false /* delay_unsubscription */); false /* delay_unsubscription */);
} }
// Also give up the ref on ListenerWatcher since it won't be needed anymore // Also give up the ref on ListenerWatcher since it won't be needed anymore
@ -1159,8 +1164,8 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
GPR_ASSERT(!resource_name_.empty()); GPR_ASSERT(!resource_name_.empty());
auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(Ref()); auto route_config_watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = route_config_watcher.get(); route_config_watcher_ = route_config_watcher.get();
xds_client_->WatchRouteConfigData(resource_name_, XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name_,
std::move(route_config_watcher)); std::move(route_config_watcher));
} }
absl::StatusOr<RefCountedPtr<ServerConfigSelector>> absl::StatusOr<RefCountedPtr<ServerConfigSelector>>
@ -1184,7 +1189,8 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider::CancelWatch() { DynamicXdsServerConfigSelectorProvider::CancelWatch() {
xds_client_->CancelRouteConfigDataWatch(resource_name_, route_config_watcher_, XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), resource_name_,
route_config_watcher_,
false /* delay_unsubscription */); false /* delay_unsubscription */);
MutexLock lock(&mu_); MutexLock lock(&mu_);
watcher_.reset(); watcher_.reset();

@ -64,6 +64,7 @@
#include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"

Loading…
Cancel
Save