mirror of https://github.com/grpc/grpc.git
[xDS] move CDS and EDS watchers into xds resolver (#35011)
Implements gRFC A74 (https://github.com/grpc/proposal/pull/404).
Closes #35011
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35011 from markdroth:xds_watchers_in_xds_resolver a39f71f37f
PiperOrigin-RevId: 595134549
pull/35425/head
parent
2c18d16475
commit
c7101d0867
60 changed files with 2945 additions and 3125 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,281 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_DEPENDENCY_MANAGER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_DEPENDENCY_MANAGER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/container/flat_hash_map.h" |
||||
#include "absl/container/flat_hash_set.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/ext/xds/xds_client_grpc.h" |
||||
#include "src/core/ext/xds/xds_cluster.h" |
||||
#include "src/core/ext/xds/xds_endpoint.h" |
||||
#include "src/core/ext/xds/xds_listener.h" |
||||
#include "src/core/ext/xds/xds_route_config.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/resolver/resolver.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Watches all xDS resources and handles dependencies between them.
|
||||
// Reports updates only when all necessary resources have been obtained.
|
||||
class XdsDependencyManager : public RefCounted<XdsDependencyManager>, |
||||
public Orphanable { |
||||
public: |
||||
struct XdsConfig : public RefCounted<XdsConfig> { |
||||
// Listener resource.
|
||||
std::shared_ptr<const XdsListenerResource> listener; |
||||
// RouteConfig resource. Will be populated even if RouteConfig is
|
||||
// inlined into the Listener resource.
|
||||
std::shared_ptr<const XdsRouteConfigResource> route_config; |
||||
// Virtual host. Points into route_config. Will always be non-null.
|
||||
const XdsRouteConfigResource::VirtualHost* virtual_host; |
||||
|
||||
// Cluster map. A cluster will have a non-OK status if either
|
||||
// (a) there was an error and we did not already have a valid
|
||||
// resource or (b) the resource does not exist.
|
||||
struct ClusterConfig { |
||||
// Cluster name and resource.
|
||||
std::string cluster_name; |
||||
std::shared_ptr<const XdsClusterResource> cluster; |
||||
// Endpoint info. If there was an error, endpoints will be null
|
||||
// and resolution_note will be set. Not used for aggregate clusters.
|
||||
struct EndpointConfig { |
||||
std::shared_ptr<const XdsEndpointResource> endpoints; |
||||
std::string resolution_note; |
||||
|
||||
EndpointConfig(std::shared_ptr<const XdsEndpointResource> endpoints, |
||||
std::string resolution_note) |
||||
: endpoints(std::move(endpoints)), |
||||
resolution_note(std::move(resolution_note)) {} |
||||
bool operator==(const EndpointConfig& other) const { |
||||
return endpoints == other.endpoints && |
||||
resolution_note == other.resolution_note; |
||||
} |
||||
}; |
||||
// The list of leaf clusters for an aggregate cluster.
|
||||
struct AggregateConfig { |
||||
std::vector<absl::string_view> leaf_clusters; |
||||
|
||||
explicit AggregateConfig(std::vector<absl::string_view> leaf_clusters) |
||||
: leaf_clusters(std::move(leaf_clusters)) {} |
||||
bool operator==(const AggregateConfig& other) const { |
||||
return leaf_clusters == other.leaf_clusters; |
||||
} |
||||
}; |
||||
absl::variant<EndpointConfig, AggregateConfig> children; |
||||
|
||||
// Ctor for leaf clusters.
|
||||
ClusterConfig(std::string cluster_name, |
||||
std::shared_ptr<const XdsClusterResource> cluster, |
||||
std::shared_ptr<const XdsEndpointResource> endpoints, |
||||
std::string resolution_note); |
||||
// Ctor for aggregate clusters.
|
||||
ClusterConfig(std::string cluster_name, |
||||
std::shared_ptr<const XdsClusterResource> cluster, |
||||
std::vector<absl::string_view> leaf_clusters); |
||||
|
||||
bool operator==(const ClusterConfig& other) const { |
||||
return cluster_name == other.cluster_name && cluster == other.cluster && |
||||
children == other.children; |
||||
} |
||||
}; |
||||
absl::flat_hash_map<std::string, absl::StatusOr<ClusterConfig>> clusters; |
||||
|
||||
std::string ToString() const; |
||||
|
||||
static absl::string_view ChannelArgName() { |
||||
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_config"; |
||||
} |
||||
static int ChannelArgsCompare(const XdsConfig* a, const XdsConfig* b) { |
||||
return QsortCompare(a, b); |
||||
} |
||||
static constexpr bool ChannelArgUseConstPtr() { return true; } |
||||
}; |
||||
|
||||
class Watcher { |
||||
public: |
||||
virtual ~Watcher() = default; |
||||
|
||||
virtual void OnUpdate(RefCountedPtr<const XdsConfig> config) = 0; |
||||
|
||||
// These methods are invoked when there is an error or
|
||||
// does-not-exist on LDS or RDS only.
|
||||
virtual void OnError(absl::string_view context, absl::Status status) = 0; |
||||
virtual void OnResourceDoesNotExist(std::string context) = 0; |
||||
}; |
||||
|
||||
class ClusterSubscription : public DualRefCounted<ClusterSubscription> { |
||||
public: |
||||
ClusterSubscription(absl::string_view cluster_name, |
||||
RefCountedPtr<XdsDependencyManager> dependency_mgr) |
||||
: cluster_name_(cluster_name), |
||||
dependency_mgr_(std::move(dependency_mgr)) {} |
||||
|
||||
void Orphan() override; |
||||
|
||||
absl::string_view cluster_name() const { return cluster_name_; } |
||||
|
||||
private: |
||||
std::string cluster_name_; |
||||
RefCountedPtr<XdsDependencyManager> dependency_mgr_; |
||||
}; |
||||
|
||||
XdsDependencyManager(RefCountedPtr<GrpcXdsClient> xds_client, |
||||
std::shared_ptr<WorkSerializer> work_serializer, |
||||
std::unique_ptr<Watcher> watcher, |
||||
std::string data_plane_authority, |
||||
std::string listener_resource_name, ChannelArgs args, |
||||
grpc_pollset_set* interested_parties); |
||||
|
||||
void Orphan() override; |
||||
|
||||
// Gets an external cluster subscription. This allows us to include
|
||||
// clusters in the config that are referenced by something other than
|
||||
// the route config (e.g., RLS). The cluster will be included in the
|
||||
// config as long as the returned object is still referenced.
|
||||
RefCountedPtr<ClusterSubscription> GetClusterSubscription( |
||||
absl::string_view cluster_name); |
||||
|
||||
static absl::string_view ChannelArgName() { |
||||
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_dependency_manager"; |
||||
} |
||||
static int ChannelArgsCompare(const XdsDependencyManager* a, |
||||
const XdsDependencyManager* b) { |
||||
return QsortCompare(a, b); |
||||
} |
||||
|
||||
private: |
||||
class ListenerWatcher; |
||||
class RouteConfigWatcher; |
||||
class ClusterWatcher; |
||||
class EndpointWatcher; |
||||
|
||||
class DnsResultHandler; |
||||
|
||||
struct ClusterWatcherState { |
||||
// Pointer to watcher, to be used when cancelling.
|
||||
// Not owned, so do not dereference.
|
||||
ClusterWatcher* watcher = nullptr; |
||||
// Most recent update obtained from this watcher.
|
||||
absl::StatusOr<std::shared_ptr<const XdsClusterResource>> update = nullptr; |
||||
}; |
||||
|
||||
struct EndpointConfig { |
||||
// If there was an error, update will be null and resolution_note
|
||||
// will be non-empty.
|
||||
std::shared_ptr<const XdsEndpointResource> endpoints; |
||||
std::string resolution_note; |
||||
}; |
||||
|
||||
struct EndpointWatcherState { |
||||
// Pointer to watcher, to be used when cancelling.
|
||||
// Not owned, so do not dereference.
|
||||
EndpointWatcher* watcher = nullptr; |
||||
// Most recent update obtained from this watcher.
|
||||
EndpointConfig update; |
||||
}; |
||||
|
||||
struct DnsState { |
||||
OrphanablePtr<Resolver> resolver; |
||||
// Most recent result from the resolver.
|
||||
EndpointConfig update; |
||||
}; |
||||
|
||||
// Event handlers.
|
||||
void OnListenerUpdate(std::shared_ptr<const XdsListenerResource> listener); |
||||
void OnRouteConfigUpdate( |
||||
const std::string& name, |
||||
std::shared_ptr<const XdsRouteConfigResource> route_config); |
||||
void OnError(std::string context, absl::Status status); |
||||
void OnResourceDoesNotExist(std::string context); |
||||
|
||||
void OnClusterUpdate(const std::string& name, |
||||
std::shared_ptr<const XdsClusterResource> cluster); |
||||
void OnClusterError(const std::string& name, absl::Status status); |
||||
void OnClusterDoesNotExist(const std::string& name); |
||||
|
||||
void OnEndpointUpdate(const std::string& name, |
||||
std::shared_ptr<const XdsEndpointResource> endpoint); |
||||
void OnEndpointError(const std::string& name, absl::Status status); |
||||
void OnEndpointDoesNotExist(const std::string& name); |
||||
|
||||
void OnDnsResult(const std::string& dns_name, Resolver::Result result); |
||||
void PopulateDnsUpdate(const std::string& dns_name, Resolver::Result result, |
||||
DnsState* dns_state); |
||||
|
||||
// Starts CDS and EDS/DNS watches for the specified cluster if needed.
|
||||
// Adds an entry to cluster_config_map, which will contain the cluster
|
||||
// data if the data is available.
|
||||
// For each EDS cluster, adds the EDS resource to eds_resources_seen.
|
||||
// For each Logical DNS cluster, adds the DNS hostname to dns_names_seen.
|
||||
// For aggregate clusters, calls itself recursively. If leaf_clusters is
|
||||
// non-null, populates it with a list of leaf clusters, or an error if
|
||||
// max depth is exceeded.
|
||||
// Returns true if all resources have been obtained.
|
||||
bool PopulateClusterConfigMap( |
||||
absl::string_view name, int depth, |
||||
absl::flat_hash_map<std::string, |
||||
absl::StatusOr<XdsConfig::ClusterConfig>>* |
||||
cluster_config_map, |
||||
std::set<absl::string_view>* eds_resources_seen, |
||||
std::set<absl::string_view>* dns_names_seen, |
||||
absl::StatusOr<std::vector<absl::string_view>>* leaf_clusters = nullptr); |
||||
|
||||
// Called when an external cluster subscription is unreffed.
|
||||
void OnClusterSubscriptionUnref(absl::string_view cluster_name, |
||||
ClusterSubscription* subscription); |
||||
|
||||
// Checks whether all necessary resources have been obtained, and if
|
||||
// so reports an update to the watcher.
|
||||
void MaybeReportUpdate(); |
||||
|
||||
// Parameters passed into ctor.
|
||||
RefCountedPtr<GrpcXdsClient> xds_client_; |
||||
std::shared_ptr<WorkSerializer> work_serializer_; |
||||
std::unique_ptr<Watcher> watcher_; |
||||
const std::string data_plane_authority_; |
||||
const std::string listener_resource_name_; |
||||
ChannelArgs args_; |
||||
grpc_pollset_set* interested_parties_; |
||||
|
||||
// Listener state.
|
||||
ListenerWatcher* listener_watcher_ = nullptr; |
||||
std::shared_ptr<const XdsListenerResource> current_listener_; |
||||
std::string route_config_name_; |
||||
|
||||
// RouteConfig state.
|
||||
RouteConfigWatcher* route_config_watcher_ = nullptr; |
||||
std::shared_ptr<const XdsRouteConfigResource> current_route_config_; |
||||
const XdsRouteConfigResource::VirtualHost* current_virtual_host_ = nullptr; |
||||
absl::flat_hash_set<absl::string_view> clusters_from_route_config_; |
||||
|
||||
// Cluster state.
|
||||
absl::flat_hash_map<std::string, ClusterWatcherState> cluster_watchers_; |
||||
absl::flat_hash_map<absl::string_view, WeakRefCountedPtr<ClusterSubscription>> |
||||
cluster_subscriptions_; |
||||
|
||||
// Endpoint state.
|
||||
absl::flat_hash_map<std::string, EndpointWatcherState> endpoint_watchers_; |
||||
absl::flat_hash_map<std::string, DnsState> dns_resolvers_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_DEPENDENCY_MANAGER_H
|
@ -0,0 +1,25 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver_trace.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver"); |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,30 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_TRACE_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_TRACE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
extern TraceFlag grpc_xds_resolver_trace; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_TRACE_H
|
Loading…
Reference in new issue