From 747a5d8c24e967e1c66237f51f8522b79f235dc8 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 7 Dec 2021 11:00:08 -0800 Subject: [PATCH] XdsClient: remove resource-type-specific methods from XdsClient API (#28231) * WIP * introduce XdsResourceType API and change Listener parsing to use it * converted RouteConfig parsing * convert cluster and endpoint parsing * cleanup * clang-format * attempt to work around compiler problems * move XdsResourceType to its own file, and move endpoint code out of XdsApi * move cluster parsing to its own file * move route config parsing to its own file * move listener parsing to its own file * clang-format * minor cleanup * plumbed XdsResourceType throughout XdsClient * a bit of cleanup * more cleanup * construct full resource names before calling XdsApi::CreateAdsRequest() * remove some unneeded code * clean up includes and have XdsResourceType initialize the upb symtab * more cleanup of unnecessary code * more cleanup * update comment * clang-format * add missing virtual dtor * fix build * remove resource-type-specific methods from XdsClient API * have each resource type register itself upon instantiation * remove comment * add missing virtual dtor * clang-format --- .../client_channel/lb_policy/xds/cds.cc | 15 ++- .../lb_policy/xds/xds_cluster_impl.cc | 1 + .../lb_policy/xds/xds_cluster_resolver.cc | 11 +- .../resolver/xds/xds_resolver.cc | 29 +++-- src/core/ext/xds/xds_client.cc | 92 ++------------ src/core/ext/xds/xds_client.h | 119 +----------------- src/core/ext/xds/xds_cluster.h | 38 ++++++ src/core/ext/xds/xds_endpoint.h | 38 ++++++ src/core/ext/xds/xds_listener.h | 38 ++++++ src/core/ext/xds/xds_resource_type.cc | 10 +- src/core/ext/xds/xds_resource_type.h | 7 +- src/core/ext/xds/xds_route_config.h | 39 ++++++ src/core/ext/xds/xds_server_config_fetcher.cc | 28 +++-- test/cpp/end2end/xds/xds_end2end_test.cc | 1 + 14 files changed, 222 insertions(+), 244 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index bb7742fe786..efe4465c5b0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -25,6 +25,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_cluster.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" @@ -66,7 +67,7 @@ class CdsLb : public LoadBalancingPolicy { private: // Watcher for getting cluster data from XdsClient. - class ClusterWatcher : public XdsClient::ClusterWatcherInterface { + class ClusterWatcher : public XdsClusterResourceType::WatcherInterface { public: ClusterWatcher(RefCountedPtr parent, std::string name) : parent_(std::move(parent)), name_(std::move(name)) {} @@ -147,7 +148,7 @@ class CdsLb : public LoadBalancingPolicy { const std::string& cluster_name, const XdsClusterResource& cluster_data); void CancelClusterDataWatch(absl::string_view cluster_name, - XdsClient::ClusterWatcherInterface* watcher, + ClusterWatcher* watcher, bool delay_unsubscription = false); void MaybeDestroyChildPolicyLocked(); @@ -301,7 +302,8 @@ void CdsLb::UpdateLocked(UpdateArgs args) { } auto watcher = MakeRefCounted(Ref(), config_->cluster()); watchers_[config_->cluster()].watcher = watcher.get(); - xds_client_->WatchClusterData(config_->cluster(), std::move(watcher)); + XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(), + std::move(watcher)); } } @@ -326,7 +328,8 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster( name.c_str()); } state.watcher = watcher.get(); - xds_client_->WatchClusterData(name, std::move(watcher)); + XdsClusterResourceType::StartWatch(xds_client_.get(), name, + std::move(watcher)); return false; } // Don't have the update we need yet. @@ -610,7 +613,7 @@ grpc_error_handle CdsLb::UpdateXdsCertificateProvider( } void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, - XdsClient::ClusterWatcherInterface* watcher, + ClusterWatcher* watcher, bool delay_unsubscription) { if (xds_certificate_provider_ != nullptr) { std::string name(cluster_name); @@ -620,7 +623,7 @@ void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, nullptr); xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {}); } - xds_client_->CancelClusterDataWatch(cluster_name, watcher, + XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name, watcher, delay_unsubscription); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 6f1f3c6fa0e..5064c05b905 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -30,6 +30,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_endpoint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 81408e96019..e7d7e769d2b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -39,6 +39,7 @@ #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_endpoint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" @@ -165,7 +166,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { bool disable_reresolution() override { return true; } private: - class EndpointWatcher : public XdsClient::EndpointWatcherInterface { + class EndpointWatcher : public XdsEndpointResourceType::WatcherInterface { public: explicit EndpointWatcher( RefCountedPtr discovery_mechanism) @@ -414,8 +415,8 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() { auto watcher = MakeRefCounted( Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism")); watcher_ = watcher.get(); - parent()->xds_client_->WatchEndpointData(GetEdsResourceName(), - std::move(watcher)); + XdsEndpointResourceType::StartWatch(parent()->xds_client_.get(), + GetEdsResourceName(), std::move(watcher)); } void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { @@ -425,8 +426,8 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { ":%p cancelling xds watch for %s", parent(), index(), this, std::string(GetEdsResourceName()).c_str()); } - parent()->xds_client_->CancelEndpointDataWatch(GetEdsResourceName(), - watcher_); + XdsEndpointResourceType::CancelWatch(parent()->xds_client_.get(), + GetEdsResourceName(), watcher_); Unref(); } diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 42407fa08d8..892d478cd27 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -31,6 +31,8 @@ #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_http_filters.h" +#include "src/core/ext/xds/xds_listener.h" +#include "src/core/ext/xds/xds_route_config.h" #include "src/core/ext/xds/xds_routing.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/closure.h" @@ -80,7 +82,7 @@ class XdsResolver : public Resolver { } private: - class ListenerWatcher : public XdsClient::ListenerWatcherInterface { + class ListenerWatcher : public XdsListenerResourceType::WatcherInterface { public: explicit ListenerWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} @@ -118,7 +120,8 @@ class XdsResolver : public Resolver { RefCountedPtr resolver_; }; - class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { + class RouteConfigWatcher + : public XdsRouteConfigResourceType::WatcherInterface { public: explicit RouteConfigWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} @@ -291,14 +294,14 @@ class XdsResolver : public Resolver { RefCountedPtr xds_client_; - XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr; + ListenerWatcher* listener_watcher_ = nullptr; // This will not contain the RouteConfiguration, even if it comes with the // LDS response; instead, the relevant VirtualHost from the // RouteConfiguration will be saved in current_virtual_host_. XdsListenerResource current_listener_; std::string route_config_name_; - XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr; + RouteConfigWatcher* route_config_watcher_ = nullptr; XdsRouteConfigResource::VirtualHost current_virtual_host_; ClusterState::ClusterStateMap cluster_state_map_; @@ -694,7 +697,8 @@ void XdsResolver::StartLocked() { interested_parties_); auto watcher = MakeRefCounted(Ref()); listener_watcher_ = watcher.get(); - xds_client_->WatchListenerData(server_name_, std::move(watcher)); + XdsListenerResourceType::StartWatch(xds_client_.get(), server_name_, + std::move(watcher)); } void XdsResolver::ShutdownLocked() { @@ -703,12 +707,14 @@ void XdsResolver::ShutdownLocked() { } if (xds_client_ != nullptr) { if (listener_watcher_ != nullptr) { - xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_, + XdsListenerResourceType::CancelWatch(xds_client_.get(), server_name_, + listener_watcher_, /*delay_unsubscription=*/false); } if (route_config_watcher_ != nullptr) { - xds_client_->CancelRouteConfigDataWatch( - server_name_, route_config_watcher_, /*delay_unsubscription=*/false); + XdsRouteConfigResourceType::CancelWatch( + xds_client_.get(), route_config_name_, route_config_watcher_, + /*delay_unsubscription=*/false); } grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), interested_parties_); @@ -726,8 +732,8 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) { if (listener.http_connection_manager.route_config_name != route_config_name_) { if (route_config_watcher_ != nullptr) { - xds_client_->CancelRouteConfigDataWatch( - route_config_name_, route_config_watcher_, + XdsRouteConfigResourceType::CancelWatch( + xds_client_.get(), route_config_name_, route_config_watcher_, /*delay_unsubscription=*/ !listener.http_connection_manager.route_config_name.empty()); route_config_watcher_ = nullptr; @@ -738,7 +744,8 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) { current_virtual_host_.routes.clear(); auto watcher = MakeRefCounted(Ref()); route_config_watcher_ = watcher.get(); - xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher)); + XdsRouteConfigResourceType::StartWatch( + xds_client_.get(), route_config_name_, std::move(watcher)); } } current_listener_ = std::move(listener); diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 95cd22fc27d..4f9512c723f 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -37,7 +37,10 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_cluster.h" +#include "src/core/ext/xds/xds_endpoint.h" #include "src/core/ext/xds/xds_http_filters.h" +#include "src/core/ext/xds/xds_listener.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -76,11 +79,6 @@ const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; -const char* kLdsTypeUrl = "envoy.config.listener.v3.Listener"; -const char* kRdsTypeUrl = "envoy.config.route.v3.RouteConfiguration"; -const char* kCdsTypeUrl = "envoy.config.cluster.v3.Cluster"; -const char* kEdsTypeUrl = "envoy.config.endpoint.v3.ClusterLoadAssignment"; - } // namespace class XdsClient::Notifier { @@ -1924,66 +1922,6 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type, } } -void XdsClient::WatchListenerData( - absl::string_view listener_name, - RefCountedPtr watcher) { - WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl), - listener_name, std::move(watcher)); -} - -void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, - ListenerWatcherInterface* watcher, - bool delay_unsubscription) { - CancelResourceWatch( - XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl), - listener_name, watcher, delay_unsubscription); -} - -void XdsClient::WatchRouteConfigData( - absl::string_view route_config_name, - RefCountedPtr watcher) { - WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl), - route_config_name, std::move(watcher)); -} - -void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, - RouteConfigWatcherInterface* watcher, - bool delay_unsubscription) { - CancelResourceWatch( - XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl), - route_config_name, watcher, delay_unsubscription); -} - -void XdsClient::WatchClusterData( - absl::string_view cluster_name, - RefCountedPtr watcher) { - WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl), - cluster_name, std::move(watcher)); -} - -void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, - ClusterWatcherInterface* watcher, - bool delay_unsubscription) { - CancelResourceWatch( - XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl), - cluster_name, watcher, delay_unsubscription); -} - -void XdsClient::WatchEndpointData( - absl::string_view eds_service_name, - RefCountedPtr watcher) { - WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl), - eds_service_name, std::move(watcher)); -} - -void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, - EndpointWatcherInterface* watcher, - bool delay_unsubscription) { - CancelResourceWatch( - XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl), - eds_service_name, watcher, delay_unsubscription); -} - absl::StatusOr XdsClient::ParseXdsResourceName( absl::string_view name, const XdsResourceType* type) { // Old-style names use the empty string for authority. @@ -2050,9 +1988,8 @@ RefCountedPtr XdsClient::AddClusterDropStats( it->first.second /*eds_service_name*/); load_report_state.drop_stats = cluster_drop_stats.get(); } - auto resource_name = ParseXdsResourceName( - cluster_name, - XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl)); + auto resource_name = + ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get()); GPR_ASSERT(resource_name.ok()); auto a = authority_state_map_.find(resource_name->authority); if (a != authority_state_map_.end()) { @@ -2114,9 +2051,8 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( std::move(locality)); locality_state.locality_stats = cluster_locality_stats.get(); } - auto resource_name = ParseXdsResourceName( - cluster_name, - XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl)); + auto resource_name = + ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get()); GPR_ASSERT(resource_name.ok()); auto a = authority_state_map_.find(resource_name->authority); if (a != authority_state_map_.end()) { @@ -2285,23 +2221,9 @@ std::string XdsClient::DumpClientConfigBinary() { // accessors for global state // -namespace { - -void InitResourceTypeRegistry() { - auto* registry = XdsResourceTypeRegistry::GetOrCreate(); - registry->RegisterType(absl::make_unique()); - registry->RegisterType(absl::make_unique()); - registry->RegisterType(absl::make_unique()); - registry->RegisterType(absl::make_unique()); -} - -} // namespace - void XdsClientGlobalInit() { g_mu = new Mutex; XdsHttpFilterRegistry::Init(); - static gpr_once once = GPR_ONCE_INIT; - gpr_once_init(&once, InitResourceTypeRegistry); } // TODO(roth): Find a better way to clear the fallback config that does diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 143f0927588..661cdb06649 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -28,10 +28,7 @@ #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" -#include "src/core/ext/xds/xds_cluster.h" -#include "src/core/ext/xds/xds_endpoint.h" -#include "src/core/ext/xds/xds_listener.h" -#include "src/core/ext/xds/xds_route_config.h" +#include "src/core/ext/xds/xds_resource_type.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/memory.h" @@ -39,6 +36,7 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -59,67 +57,6 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; - // TODO(roth): Consider removing these resource-type-specific APIs in - // favor of some mechanism for automatic type-deduction for the generic - // API. - - // Listener data watcher interface. Implemented by callers. - class ListenerWatcherInterface : public ResourceWatcherInterface { - public: - virtual void OnListenerChanged(XdsListenerResource listener) = 0; - - private: - void OnResourceChanged( - const XdsResourceType::ResourceData* resource) override { - OnListenerChanged( - static_cast(resource) - ->resource); - } - }; - - // RouteConfiguration data watcher interface. Implemented by callers. - class RouteConfigWatcherInterface : public ResourceWatcherInterface { - public: - virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0; - - private: - void OnResourceChanged( - const XdsResourceType::ResourceData* resource) override { - OnRouteConfigChanged( - static_cast( - resource) - ->resource); - } - }; - - // Cluster data watcher interface. Implemented by callers. - class ClusterWatcherInterface : public ResourceWatcherInterface { - public: - virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0; - - private: - void OnResourceChanged( - const XdsResourceType::ResourceData* resource) override { - OnClusterChanged( - static_cast(resource) - ->resource); - } - }; - - // Endpoint data watcher interface. Implemented by callers. - class EndpointWatcherInterface : public ResourceWatcherInterface { - public: - virtual void OnEndpointChanged(XdsEndpointResource update) = 0; - - private: - void OnResourceChanged( - const XdsResourceType::ResourceData* resource) override { - OnEndpointChanged( - static_cast(resource) - ->resource); - } - }; - // Factory function to get or create the global XdsClient instance. // If *error is not GRPC_ERROR_NONE upon return, then there was // an error initializing the client. @@ -159,58 +96,6 @@ class XdsClient : public DualRefCounted { ResourceWatcherInterface* watcher, bool delay_unsubscription = false); - // Start and cancel listener data watch for a listener. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchListenerData(absl::string_view listener_name, - RefCountedPtr watcher); - void CancelListenerDataWatch(absl::string_view listener_name, - ListenerWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel route config data watch for a listener. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchRouteConfigData(absl::string_view route_config_name, - RefCountedPtr watcher); - void CancelRouteConfigDataWatch(absl::string_view route_config_name, - RouteConfigWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel cluster data watch for a cluster. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchClusterData(absl::string_view cluster_name, - RefCountedPtr watcher); - void CancelClusterDataWatch(absl::string_view cluster_name, - ClusterWatcherInterface* watcher, - bool delay_unsubscription = false); - - // Start and cancel endpoint data watch for a cluster. - // The XdsClient takes ownership of the watcher, but the caller may - // keep a raw pointer to the watcher, which may be used only for - // cancellation. (Because the caller does not own the watcher, the - // pointer must not be used for any other purpose.) - // If the caller is going to start a new watch after cancelling the - // old one, it should set delay_unsubscription to true. - void WatchEndpointData(absl::string_view eds_service_name, - RefCountedPtr watcher); - void CancelEndpointDataWatch(absl::string_view eds_service_name, - EndpointWatcherInterface* watcher, - bool delay_unsubscription = false); - // Adds and removes drop stats for cluster_name and eds_service_name. RefCountedPtr AddClusterDropStats( absl::string_view lrs_server, absl::string_view cluster_name, diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index 3071340b13e..7ad51594bd4 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -27,6 +27,7 @@ #include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_resource_type.h" @@ -87,6 +88,25 @@ class XdsClusterResourceType : public XdsResourceType { XdsClusterResource resource; }; + class WatcherInterface : public XdsClient::ResourceWatcherInterface { + public: + virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnClusterChanged( + static_cast(resource) + ->resource); + } + }; + + static const XdsClusterResourceType* Get() { + static const XdsClusterResourceType* g_instance = + new XdsClusterResourceType(); + return g_instance; + } + absl::string_view type_url() const override { return "envoy.config.cluster.v3.Cluster"; } @@ -94,6 +114,19 @@ class XdsClusterResourceType : public XdsResourceType { return "envoy.api.v2.Cluster"; } + static void StartWatch(XdsClient* xds_client, absl::string_view resource_name, + RefCountedPtr watcher) { + xds_client->WatchResource(Get(), resource_name, std::move(watcher)); + } + + static void CancelWatch(XdsClient* xds_client, + absl::string_view resource_name, + WatcherInterface* watcher, + bool delay_unsubscription = false) { + xds_client->CancelResourceWatch(Get(), resource_name, watcher, + delay_unsubscription); + } + absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; @@ -120,6 +153,11 @@ class XdsClusterResourceType : public XdsResourceType { envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef( symtab); } + + private: + XdsClusterResourceType() { + XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_endpoint.h b/src/core/ext/xds/xds_endpoint.h index 9c230acdc05..4b38306c28e 100644 --- a/src/core/ext/xds/xds_endpoint.h +++ b/src/core/ext/xds/xds_endpoint.h @@ -27,6 +27,7 @@ #include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_resource_type.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -116,6 +117,25 @@ class XdsEndpointResourceType : public XdsResourceType { XdsEndpointResource resource; }; + class WatcherInterface : public XdsClient::ResourceWatcherInterface { + public: + virtual void OnEndpointChanged(XdsEndpointResource update) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnEndpointChanged( + static_cast(resource) + ->resource); + } + }; + + static const XdsEndpointResourceType* Get() { + static const XdsEndpointResourceType* g_instance = + new XdsEndpointResourceType(); + return g_instance; + } + absl::string_view type_url() const override { return "envoy.config.endpoint.v3.ClusterLoadAssignment"; } @@ -123,6 +143,19 @@ class XdsEndpointResourceType : public XdsResourceType { return "envoy.api.v2.ClusterLoadAssignment"; } + static void StartWatch(XdsClient* xds_client, absl::string_view resource_name, + RefCountedPtr watcher) { + xds_client->WatchResource(Get(), resource_name, std::move(watcher)); + } + + static void CancelWatch(XdsClient* xds_client, + absl::string_view resource_name, + WatcherInterface* watcher, + bool delay_unsubscription = false) { + xds_client->CancelResourceWatch(Get(), resource_name, watcher, + delay_unsubscription); + } + absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; @@ -144,6 +177,11 @@ class XdsEndpointResourceType : public XdsResourceType { void InitUpbSymtab(upb_symtab* symtab) const override { envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab); } + + private: + XdsEndpointResourceType() { + XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_listener.h b/src/core/ext/xds/xds_listener.h index ee211c77726..d4376590eb5 100644 --- a/src/core/ext/xds/xds_listener.h +++ b/src/core/ext/xds/xds_listener.h @@ -30,6 +30,7 @@ #include "envoy/config/listener/v3/listener.upbdefs.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_route_config.h" @@ -195,6 +196,25 @@ class XdsListenerResourceType : public XdsResourceType { XdsListenerResource resource; }; + class WatcherInterface : public XdsClient::ResourceWatcherInterface { + public: + virtual void OnListenerChanged(XdsListenerResource listener) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnListenerChanged( + static_cast(resource) + ->resource); + } + }; + + static const XdsListenerResourceType* Get() { + static const XdsListenerResourceType* g_instance = + new XdsListenerResourceType(); + return g_instance; + } + absl::string_view type_url() const override { return "envoy.config.listener.v3.Listener"; } @@ -202,6 +222,19 @@ class XdsListenerResourceType : public XdsResourceType { return "envoy.api.v2.Listener"; } + static void StartWatch(XdsClient* xds_client, absl::string_view resource_name, + RefCountedPtr watcher) { + xds_client->WatchResource(Get(), resource_name, std::move(watcher)); + } + + static void CancelWatch(XdsClient* xds_client, + absl::string_view resource_name, + WatcherInterface* watcher, + bool delay_unsubscription = false) { + xds_client->CancelResourceWatch(Get(), resource_name, watcher, + delay_unsubscription); + } + absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; @@ -228,6 +261,11 @@ class XdsListenerResourceType : public XdsResourceType { symtab); XdsHttpFilterRegistry::PopulateSymtab(symtab); } + + private: + XdsListenerResourceType() { + XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_resource_type.cc b/src/core/ext/xds/xds_resource_type.cc index d7497101835..c2083e0518e 100644 --- a/src/core/ext/xds/xds_resource_type.cc +++ b/src/core/ext/xds/xds_resource_type.cc @@ -45,26 +45,26 @@ XdsResourceTypeRegistry* XdsResourceTypeRegistry::GetOrCreate() { const XdsResourceType* XdsResourceTypeRegistry::GetType( absl::string_view resource_type) { auto it = resource_types_.find(resource_type); - if (it != resource_types_.end()) return it->second.get(); + if (it != resource_types_.end()) return it->second; auto it2 = v2_resource_types_.find(resource_type); if (it2 != v2_resource_types_.end()) return it2->second; return nullptr; } void XdsResourceTypeRegistry::RegisterType( - std::unique_ptr resource_type) { + const XdsResourceType* resource_type) { GPR_ASSERT(resource_types_.find(resource_type->type_url()) == resource_types_.end()); GPR_ASSERT(v2_resource_types_.find(resource_type->v2_type_url()) == v2_resource_types_.end()); - v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type.get()); - resource_types_.emplace(resource_type->type_url(), std::move(resource_type)); + v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type); + resource_types_.emplace(resource_type->type_url(), resource_type); } void XdsResourceTypeRegistry::ForEach( std::function func) { for (const auto& p : resource_types_) { - func(p.second.get()); + func(p.second); } } diff --git a/src/core/ext/xds/xds_resource_type.h b/src/core/ext/xds/xds_resource_type.h index cdeb0cca8c6..3f8fe0fbcf6 100644 --- a/src/core/ext/xds/xds_resource_type.h +++ b/src/core/ext/xds/xds_resource_type.h @@ -105,16 +105,15 @@ class XdsResourceTypeRegistry { // Registers a resource type. // All types must be registered before they can be used in the XdsClient. - void RegisterType(std::unique_ptr resource_type); + void RegisterType(const XdsResourceType* resource_type); // Calls func for each resource type. void ForEach(std::function func); private: - std::map> + std::map resource_types_; - std::map + std::map v2_resource_types_; }; diff --git a/src/core/ext/xds/xds_route_config.h b/src/core/ext/xds/xds_route_config.h index 54f610253b7..1069637d5d2 100644 --- a/src/core/ext/xds/xds_route_config.h +++ b/src/core/ext/xds/xds_route_config.h @@ -29,6 +29,7 @@ #include "envoy/config/route/v3/route.upbdefs.h" #include "re2/re2.h" +#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_resource_type.h" @@ -195,6 +196,26 @@ class XdsRouteConfigResourceType : public XdsResourceType { XdsRouteConfigResource resource; }; + class WatcherInterface : public XdsClient::ResourceWatcherInterface { + public: + virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnRouteConfigChanged( + static_cast( + resource) + ->resource); + } + }; + + static const XdsRouteConfigResourceType* Get() { + static const XdsRouteConfigResourceType* g_instance = + new XdsRouteConfigResourceType(); + return g_instance; + } + absl::string_view type_url() const override { return "envoy.config.route.v3.RouteConfiguration"; } @@ -202,6 +223,19 @@ class XdsRouteConfigResourceType : public XdsResourceType { return "envoy.api.v2.RouteConfiguration"; } + static void StartWatch(XdsClient* xds_client, absl::string_view resource_name, + RefCountedPtr watcher) { + xds_client->WatchResource(Get(), resource_name, std::move(watcher)); + } + + static void CancelWatch(XdsClient* xds_client, + absl::string_view resource_name, + WatcherInterface* watcher, + bool delay_unsubscription = false) { + xds_client->CancelResourceWatch(Get(), resource_name, watcher, + delay_unsubscription); + } + absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool /*is_v2*/) const override; @@ -223,6 +257,11 @@ class XdsRouteConfigResourceType : public XdsResourceType { void InitUpbSymtab(upb_symtab* symtab) const override { envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab); } + + private: + XdsRouteConfigResourceType() { + XdsResourceTypeRegistry::GetOrCreate()->RegisterType(this); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index 9992abd5da9..6d0c3d0e6be 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -26,6 +26,8 @@ #include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_channel_stack_modifier.h" #include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_listener.h" +#include "src/core/ext/xds/xds_route_config.h" #include "src/core/ext/xds/xds_routing.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" @@ -86,7 +88,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { // update received was a fatal error (resource does not exist), the server // listener is made to stop listening. class XdsServerConfigFetcher::ListenerWatcher - : public XdsClient::ListenerWatcherInterface { + : public XdsListenerResourceType::WatcherInterface { public: ListenerWatcher(RefCountedPtr xds_client, std::unique_ptr @@ -217,7 +219,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager // with the latest updates and new connections do not need to wait for the RDS // resources to be fetched. class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: - RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { + RouteConfigWatcher : public XdsRouteConfigResourceType::WatcherInterface { public: RouteConfigWatcher( std::string resource_name, @@ -381,7 +383,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: // DynamicXdsServerConfigSelectorProvider class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: DynamicXdsServerConfigSelectorProvider::RouteConfigWatcher - : public XdsClient::RouteConfigWatcherInterface { + : public XdsRouteConfigResourceType::WatcherInterface { public: explicit RouteConfigWatcher( RefCountedPtr parent) @@ -418,7 +420,8 @@ void XdsServerConfigFetcher::StartWatch( xds_client_, std::move(watcher), serving_status_notifier_, listening_address); auto* listener_watcher_ptr = listener_watcher.get(); - xds_client_->WatchListenerData( + XdsListenerResourceType::StartWatch( + xds_client_.get(), absl::StrReplaceAll( xds_client_->bootstrap().server_listener_resource_name_template(), {{"%s", listening_address}}), @@ -433,7 +436,8 @@ void XdsServerConfigFetcher::CancelWatch( auto it = listener_watchers_.find(watcher); if (it != listener_watchers_.end()) { // Cancel the watch on the listener before erasing - xds_client_->CancelListenerDataWatch( + XdsListenerResourceType::CancelWatch( + xds_client_.get(), absl::StrReplaceAll( xds_client_->bootstrap().server_listener_resource_name_template(), {{"%s", it->second->listening_address()}}), @@ -618,8 +622,8 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: MakeRefCounted(resource_name, WeakRef()); rds_map_.emplace(resource_name, RdsUpdateState{route_config_watcher.get(), absl::nullopt}); - xds_client_->WatchRouteConfigData(resource_name, - std::move(route_config_watcher)); + XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name, + std::move(route_config_watcher)); } if (rds_resources_yet_to_fetch_ != 0) { listener_watcher_ = std::move(listener_watcher); @@ -638,7 +642,8 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: MutexLock lock(&mu_); // Cancel the RDS watches to clear up the weak refs for (const auto& entry : rds_map_) { - xds_client_->CancelRouteConfigDataWatch(entry.first, entry.second.watcher, + XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), entry.first, + entry.second.watcher, false /* delay_unsubscription */); } // Also give up the ref on ListenerWatcher since it won't be needed anymore @@ -1159,8 +1164,8 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: GPR_ASSERT(!resource_name_.empty()); auto route_config_watcher = MakeRefCounted(Ref()); route_config_watcher_ = route_config_watcher.get(); - xds_client_->WatchRouteConfigData(resource_name_, - std::move(route_config_watcher)); + XdsRouteConfigResourceType::StartWatch(xds_client_.get(), resource_name_, + std::move(route_config_watcher)); } absl::StatusOr> @@ -1184,7 +1189,8 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: DynamicXdsServerConfigSelectorProvider::CancelWatch() { - xds_client_->CancelRouteConfigDataWatch(resource_name_, route_config_watcher_, + XdsRouteConfigResourceType::CancelWatch(xds_client_.get(), resource_name_, + route_config_watcher_, false /* delay_unsubscription */); MutexLock lock(&mu_); watcher_.reset(); diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index 2acd619d09d..84adbe3d5c2 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -64,6 +64,7 @@ #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" +#include "src/core/ext/xds/xds_listener.h" #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h"