diff --git a/BUILD b/BUILD
index 94185cbc575..71031f7db7a 100644
--- a/BUILD
+++ b/BUILD
@@ -332,7 +332,7 @@ grpc_cc_library(
"grpc_lb_policy_cds",
"grpc_lb_policy_eds",
"grpc_lb_policy_lrs",
- "grpc_lb_policy_xds_routing",
+ "grpc_lb_policy_xds_cluster_manager",
"grpc_resolver_xds",
],
},
@@ -1380,9 +1380,9 @@ grpc_cc_library(
)
grpc_cc_library(
- name = "grpc_lb_policy_xds_routing",
+ name = "grpc_lb_policy_xds_cluster_manager",
srcs = [
- "src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
+ "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
],
external_deps = [
"absl/strings",
@@ -1391,6 +1391,7 @@ grpc_cc_library(
deps = [
"grpc_base",
"grpc_client_channel",
+ "grpc_resolver_xds_header",
"grpc_xds_api_header",
],
)
@@ -1679,6 +1680,14 @@ grpc_cc_library(
],
)
+grpc_cc_library(
+ name = "grpc_resolver_xds_header",
+ hdrs = [
+ "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h",
+ ],
+ language = "c++",
+)
+
grpc_cc_library(
name = "grpc_resolver_xds",
srcs = [
diff --git a/BUILD.gn b/BUILD.gn
index 5f1ffba0879..36dae38a5a5 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -251,7 +251,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
- "src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
+ "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
"src/core/ext/filters/client_channel/lb_policy_factory.h",
"src/core/ext/filters/client_channel/lb_policy_registry.cc",
"src/core/ext/filters/client_channel/lb_policy_registry.h",
@@ -281,6 +281,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h",
"src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc",
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc",
+ "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h",
"src/core/ext/filters/client_channel/resolver_factory.h",
"src/core/ext/filters/client_channel/resolver_registry.cc",
"src/core/ext/filters/client_channel/resolver_registry.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1f3ea428184..0d655de3c50 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1442,7 +1442,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
+ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc
src/core/ext/filters/client_channel/proxy_mapper_registry.cc
diff --git a/Makefile b/Makefile
index 17691de5dc5..5441fa42e91 100644
--- a/Makefile
+++ b/Makefile
@@ -1847,7 +1847,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
+ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/proxy_mapper_registry.cc \
@@ -4522,7 +4522,7 @@ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc: $(OPENSSL_DEP)
-src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc: $(OPENSSL_DEP)
+src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc: $(OPENSSL_DEP)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 055f6f22747..244a2db3007 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -408,6 +408,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
- src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h
- src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+ - src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
- src/core/ext/filters/client_channel/resolver_factory.h
- src/core/ext/filters/client_channel/resolver_registry.h
- src/core/ext/filters/client_channel/resolver_result_parsing.h
@@ -788,7 +789,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- - src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
+ - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc
- src/core/ext/filters/client_channel/proxy_mapper_registry.cc
diff --git a/config.m4 b/config.m4
index 8381cfb0702..40c9c782c9b 100644
--- a/config.m4
+++ b/config.m4
@@ -68,7 +68,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
+ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/proxy_mapper_registry.cc \
diff --git a/config.w32 b/config.w32
index 419bc4759d5..fb5633d68ad 100644
--- a/config.w32
+++ b/config.w32
@@ -35,7 +35,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\lrs.cc " +
- "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_routing.cc " +
+ "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\proxy_mapper_registry.cc " +
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index 13b71878c6d..f93a25156dd 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -90,6 +90,7 @@ some configuration as environment variables that can be set.
- tsi - traces tsi transport security
- weighted_target_lb - traces weighted_target LB policy
- xds_client - traces xds client
+ - xds_cluster_manager_lb - traces cluster manager LB policy
- xds_resolver - traces xds resolver
The following tracers will only run in binaries built in DEBUG mode. This is
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index d7b3224a9df..d1279e6f38a 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -241,6 +241,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
+ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
@@ -744,6 +745,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
+ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index bdfaface168..2a52e0a7acb 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -237,7 +237,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.h',
- 'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_factory.h',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.h',
@@ -267,6 +267,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
+ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_registry.h',
@@ -1158,6 +1159,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
+ 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index bae5c641695..6826e2435cf 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -155,7 +155,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h )
- s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc )
+ s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.h )
@@ -185,6 +185,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc )
+ s.files += %w( src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver_factory.h )
s.files += %w( src/core/ext/filters/client_channel/resolver_registry.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver_registry.h )
diff --git a/grpc.gyp b/grpc.gyp
index 454df875cc7..426bab801d4 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -473,7 +473,7 @@
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
- 'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/proxy_mapper_registry.cc',
diff --git a/package.xml b/package.xml
index bfcaa220401..3f6e0e3aef7 100644
--- a/package.xml
+++ b/package.xml
@@ -135,7 +135,7 @@
-
+
@@ -165,6 +165,7 @@
+
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 1b8f2cb1dff..f3d13330e61 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -1747,7 +1747,7 @@ void ChannelData::UpdateStateAndPickerLocked(
const char* reason,
std::unique_ptr picker) {
// Clean the control plane when entering IDLE.
- if (picker_ == nullptr) {
+ if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
health_check_service_name_.reset();
saved_service_config_.reset();
saved_config_selector_.reset();
@@ -1797,7 +1797,7 @@ void ChannelData::UpdateStateAndPickerLocked(
// Note: Original value will be destroyed after the lock is released.
picker_.swap(picker);
// Clean the data plane if the updated picker is nullptr.
- if (picker_ == nullptr) {
+ if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
received_service_config_data_ = false;
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std::move(retry_throttle_data_);
@@ -1822,6 +1822,14 @@ void ChannelData::UpdateStateAndPickerLocked(
void ChannelData::UpdateServiceConfigInDataPlaneLocked(
bool service_config_changed,
RefCountedPtr config_selector) {
+ // If the service config did not change and there is no new ConfigSelector,
+ // retain the old one (if any).
+ // TODO(roth): Consider whether this is really the right way to handle
+ // this. We might instead want to decide this in ApplyServiceConfig()
+ // where we decide whether to stick with the saved service config.
+ if (!service_config_changed && config_selector == nullptr) {
+ config_selector = saved_config_selector_;
+ }
// Check if ConfigSelector has changed.
const bool config_selector_changed =
saved_config_selector_ != config_selector;
@@ -3873,6 +3881,7 @@ class CallData::QueuedPickCanceller {
}
if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
+ calld->MaybeInvokeConfigSelectorCommitCallback();
calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
@@ -4162,7 +4171,9 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
connected_subchannel_ =
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
- if (retry_committed_) MaybeInvokeConfigSelectorCommitCallback();
+ if (!enable_retries_ || retry_committed_) {
+ MaybeInvokeConfigSelectorCommitCallback();
+ }
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
*error = result.error;
diff --git a/src/core/ext/filters/client_channel/config_selector.cc b/src/core/ext/filters/client_channel/config_selector.cc
index e5d2a3f0be3..0c2a08e5eb1 100644
--- a/src/core/ext/filters/client_channel/config_selector.cc
+++ b/src/core/ext/filters/client_channel/config_selector.cc
@@ -17,12 +17,8 @@
#include
#include "src/core/ext/filters/client_channel/config_selector.h"
-
#include "src/core/lib/channel/channel_args.h"
-// Channel arg key for ConfigSelector.
-#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
-
namespace grpc_core {
namespace {
diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h
index edc212476cd..e3a65ef330d 100644
--- a/src/core/ext/filters/client_channel/config_selector.h
+++ b/src/core/ext/filters/client_channel/config_selector.h
@@ -34,6 +34,9 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/transport/metadata_batch.h"
+// Channel arg key for ConfigSelector.
+#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
+
namespace grpc_core {
// Internal API used to allow resolver implementations to override
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
new file mode 100644
index 00000000000..fff6c411a08
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
@@ -0,0 +1,727 @@
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include
+
+#include
+#include
+#include
+
+#include "absl/container/inlined_vector.h"
+#include "absl/strings/match.h"
+#include "absl/strings/numbers.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
+#include "absl/strings/string_view.h"
+#include "re2/re2.h"
+
+#include
+
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
+#include "src/core/ext/xds/xds_api.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/work_serializer.h"
+#include "src/core/lib/transport/error_utils.h"
+
+#define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
+
+namespace grpc_core {
+
+TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
+
+namespace {
+
+constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
+
+// Config for xds_cluster_manager LB policy.
+class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
+ public:
+ using ClusterMap =
+ std::map>;
+
+ XdsClusterManagerLbConfig(ClusterMap cluster_map)
+ : cluster_map_(std::move(cluster_map)) {}
+
+ const char* name() const override { return kXdsClusterManager; }
+
+ const ClusterMap& cluster_map() const { return cluster_map_; }
+
+ private:
+ ClusterMap cluster_map_;
+};
+
+// xds_cluster_manager LB policy.
+class XdsClusterManagerLb : public LoadBalancingPolicy {
+ public:
+ explicit XdsClusterManagerLb(Args args);
+
+ const char* name() const override { return kXdsClusterManager; }
+
+ void UpdateLocked(UpdateArgs args) override;
+ void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
+
+ private:
+ // A simple wrapper for ref-counting a picker from the child policy.
+ class ChildPickerWrapper : public RefCounted {
+ public:
+ ChildPickerWrapper(std::string name,
+ std::unique_ptr picker)
+ : name_(std::move(name)), picker_(std::move(picker)) {}
+ PickResult Pick(PickArgs args) { return picker_->Pick(args); }
+
+ const std::string& name() const { return name_; }
+
+ private:
+ std::string name_;
+ std::unique_ptr picker_;
+ };
+
+ // Picks a child using prefix or path matching and then delegates to that
+ // child's picker.
+ class ClusterPicker : public SubchannelPicker {
+ public:
+ // Maintains a map of cluster names to pickers.
+ using ClusterMap = std::map>;
+
+ // It is required that the keys of cluster_map have to live at least as long
+ // as the ClusterPicker instance.
+ ClusterPicker(ClusterMap cluster_map,
+ RefCountedPtr config)
+ : cluster_map_(std::move(cluster_map)), config_(std::move(config)) {}
+
+ PickResult Pick(PickArgs args) override;
+
+ private:
+ ClusterMap cluster_map_;
+ // Take a reference to config so that we can use
+ // XdsApi::RdsUpdate::RdsRoute::Matchers from it.
+ RefCountedPtr config_;
+ };
+
+ // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
+ class ClusterChild : public InternallyRefCounted {
+ public:
+ ClusterChild(RefCountedPtr xds_cluster_manager_policy,
+ const std::string& name);
+ ~ClusterChild();
+
+ void Orphan() override;
+
+ void UpdateLocked(RefCountedPtr config,
+ const ServerAddressList& addresses,
+ const grpc_channel_args* args);
+ void ExitIdleLocked();
+ void ResetBackoffLocked();
+ void DeactivateLocked();
+
+ grpc_connectivity_state connectivity_state() const {
+ return connectivity_state_;
+ }
+ RefCountedPtr picker_wrapper() const {
+ return picker_wrapper_;
+ }
+
+ private:
+ class Helper : public ChannelControlHelper {
+ public:
+ explicit Helper(RefCountedPtr xds_cluster_manager_child)
+ : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
+
+ ~Helper() { xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper"); }
+
+ RefCountedPtr CreateSubchannel(
+ const grpc_channel_args& args) override;
+ void UpdateState(grpc_connectivity_state state,
+ const absl::Status& status,
+ std::unique_ptr picker) override;
+ void RequestReresolution() override;
+ void AddTraceEvent(TraceSeverity severity,
+ absl::string_view message) override;
+
+ private:
+ RefCountedPtr xds_cluster_manager_child_;
+ };
+
+ // Methods for dealing with the child policy.
+ OrphanablePtr CreateChildPolicyLocked(
+ const grpc_channel_args* args);
+
+ static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
+ void OnDelayedRemovalTimerLocked(grpc_error* error);
+
+ // The owning LB policy.
+ RefCountedPtr xds_cluster_manager_policy_;
+
+ // Points to the corresponding key in children map.
+ const std::string name_;
+
+ OrphanablePtr child_policy_;
+
+ RefCountedPtr picker_wrapper_;
+ grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
+ bool seen_failure_since_ready_ = false;
+
+ // States for delayed removal.
+ grpc_timer delayed_removal_timer_;
+ grpc_closure on_delayed_removal_timer_;
+ bool delayed_removal_timer_callback_pending_ = false;
+ bool shutdown_ = false;
+ };
+
+ ~XdsClusterManagerLb();
+
+ void ShutdownLocked() override;
+
+ void UpdateStateLocked();
+
+ // Current config from the resolver.
+ RefCountedPtr config_;
+
+ // Internal state.
+ bool shutting_down_ = false;
+
+ // Children.
+ std::map> children_;
+};
+
+//
+// XdsClusterManagerLb::ClusterPicker
+//
+
+XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
+ PickArgs args) {
+ auto cluster_name =
+ args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
+ auto it = cluster_map_.find(cluster_name);
+ if (it != cluster_map_.end()) {
+ return it->second->Pick(args);
+ }
+ PickResult result;
+ result.type = PickResult::PICK_FAILED;
+ result.error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("xds cluster manager picker: unknown cluster \"",
+ cluster_name, "\"")
+ .c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
+ return result;
+}
+
+//
+// XdsClusterManagerLb
+//
+
+XdsClusterManagerLb::XdsClusterManagerLb(Args args)
+ : LoadBalancingPolicy(std::move(args)) {}
+
+XdsClusterManagerLb::~XdsClusterManagerLb() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
+ this);
+ }
+}
+
+void XdsClusterManagerLb::ShutdownLocked() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
+ }
+ shutting_down_ = true;
+ children_.clear();
+}
+
+void XdsClusterManagerLb::ExitIdleLocked() {
+ for (auto& p : children_) p.second->ExitIdleLocked();
+}
+
+void XdsClusterManagerLb::ResetBackoffLocked() {
+ for (auto& p : children_) p.second->ResetBackoffLocked();
+}
+
+void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
+ if (shutting_down_) return;
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
+ }
+ // Update config.
+ config_ = std::move(args.config);
+ // Deactivate the children not in the new config.
+ for (const auto& p : children_) {
+ const std::string& name = p.first;
+ ClusterChild* child = p.second.get();
+ if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
+ child->DeactivateLocked();
+ }
+ }
+ // Add or update the children in the new config.
+ for (const auto& p : config_->cluster_map()) {
+ const std::string& name = p.first;
+ const RefCountedPtr& config = p.second;
+ auto it = children_.find(name);
+ if (it == children_.end()) {
+ it = children_
+ .emplace(name, MakeOrphanable(
+ Ref(DEBUG_LOCATION, "ClusterChild"), name))
+ .first;
+ }
+ it->second->UpdateLocked(config, args.addresses, args.args);
+ }
+ UpdateStateLocked();
+}
+
+void XdsClusterManagerLb::UpdateStateLocked() {
+ // Also count the number of children in each state, to determine the
+ // overall state.
+ size_t num_ready = 0;
+ size_t num_connecting = 0;
+ size_t num_idle = 0;
+ size_t num_transient_failures = 0;
+ for (const auto& p : children_) {
+ const auto& child_name = p.first;
+ const ClusterChild* child = p.second.get();
+ // Skip the children that are not in the latest update.
+ if (config_->cluster_map().find(child_name) ==
+ config_->cluster_map().end()) {
+ continue;
+ }
+ switch (child->connectivity_state()) {
+ case GRPC_CHANNEL_READY: {
+ ++num_ready;
+ break;
+ }
+ case GRPC_CHANNEL_CONNECTING: {
+ ++num_connecting;
+ break;
+ }
+ case GRPC_CHANNEL_IDLE: {
+ ++num_idle;
+ break;
+ }
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ ++num_transient_failures;
+ break;
+ }
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+ }
+ // Determine aggregated connectivity state.
+ grpc_connectivity_state connectivity_state;
+ if (num_ready > 0) {
+ connectivity_state = GRPC_CHANNEL_READY;
+ } else if (num_connecting > 0) {
+ connectivity_state = GRPC_CHANNEL_CONNECTING;
+ } else if (num_idle > 0) {
+ connectivity_state = GRPC_CHANNEL_IDLE;
+ } else {
+ connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
+ this, ConnectivityStateName(connectivity_state));
+ }
+ std::unique_ptr picker;
+ absl::Status status;
+ switch (connectivity_state) {
+ case GRPC_CHANNEL_READY: {
+ ClusterPicker::ClusterMap cluster_map;
+ for (const auto& p : config_->cluster_map()) {
+ const std::string& cluster_name = p.first;
+ RefCountedPtr& child_picker =
+ cluster_map[cluster_name];
+ child_picker = children_[cluster_name]->picker_wrapper();
+ if (child_picker == nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[xds_cluster_manager_lb %p] child %s has not yet returned a "
+ "picker; creating a QueuePicker.",
+ this, cluster_name.c_str());
+ }
+ child_picker = MakeRefCounted(
+ cluster_name, absl::make_unique(
+ Ref(DEBUG_LOCATION, "QueuePicker")));
+ }
+ }
+ picker =
+ absl::make_unique(std::move(cluster_map), config_);
+ break;
+ }
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:
+ picker =
+ absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker"));
+ break;
+ default:
+ grpc_error* error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "TRANSIENT_FAILURE from XdsClusterManagerLb"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ status = grpc_error_to_absl_status(error);
+ picker = absl::make_unique(error);
+ }
+ channel_control_helper()->UpdateState(connectivity_state, status,
+ std::move(picker));
+}
+
+//
+// XdsClusterManagerLb::ClusterChild
+//
+
+XdsClusterManagerLb::ClusterChild::ClusterChild(
+ RefCountedPtr xds_cluster_manager_policy,
+ const std::string& name)
+ : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
+ name_(name) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
+ xds_cluster_manager_policy_.get(), this, name_.c_str());
+ }
+ GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
+ grpc_schedule_on_exec_ctx);
+}
+
+XdsClusterManagerLb::ClusterChild::~ClusterChild() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
+ "child",
+ xds_cluster_manager_policy_.get(), this);
+ }
+ xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
+}
+
+void XdsClusterManagerLb::ClusterChild::Orphan() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
+ "shutting down child",
+ xds_cluster_manager_policy_.get(), this, name_.c_str());
+ }
+ // Remove the child policy's interested_parties pollset_set from the
+ // xDS policy.
+ grpc_pollset_set_del_pollset_set(
+ child_policy_->interested_parties(),
+ xds_cluster_manager_policy_->interested_parties());
+ child_policy_.reset();
+ // Drop our ref to the child's picker, in case it's holding a ref to
+ // the child.
+ picker_wrapper_.reset();
+ if (delayed_removal_timer_callback_pending_) {
+ grpc_timer_cancel(&delayed_removal_timer_);
+ }
+ shutdown_ = true;
+ Unref();
+}
+
+OrphanablePtr
+XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
+ const grpc_channel_args* args) {
+ LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.work_serializer =
+ xds_cluster_manager_policy_->work_serializer();
+ lb_policy_args.args = args;
+ lb_policy_args.channel_control_helper =
+ absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper"));
+ OrphanablePtr lb_policy =
+ MakeOrphanable(std::move(lb_policy_args),
+ &grpc_xds_cluster_manager_lb_trace);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
+ "new child "
+ "policy handler %p",
+ xds_cluster_manager_policy_.get(), this, name_.c_str(),
+ lb_policy.get());
+ }
+ // Add the xDS's interested_parties pollset_set to that of the newly created
+ // child policy. This will make the child policy progress upon activity on
+ // xDS LB, which in turn is tied to the application's call.
+ grpc_pollset_set_add_pollset_set(
+ lb_policy->interested_parties(),
+ xds_cluster_manager_policy_->interested_parties());
+ return lb_policy;
+}
+
+void XdsClusterManagerLb::ClusterChild::UpdateLocked(
+ RefCountedPtr config,
+ const ServerAddressList& addresses, const grpc_channel_args* args) {
+ if (xds_cluster_manager_policy_->shutting_down_) return;
+ // Update child weight.
+ // Reactivate if needed.
+ if (delayed_removal_timer_callback_pending_) {
+ delayed_removal_timer_callback_pending_ = false;
+ grpc_timer_cancel(&delayed_removal_timer_);
+ }
+ // Create child policy if needed.
+ if (child_policy_ == nullptr) {
+ child_policy_ = CreateChildPolicyLocked(args);
+ }
+ // Construct update args.
+ UpdateArgs update_args;
+ update_args.config = std::move(config);
+ update_args.addresses = addresses;
+ update_args.args = grpc_channel_args_copy(args);
+ // Update the policy.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
+ "Updating child "
+ "policy handler %p",
+ xds_cluster_manager_policy_.get(), this, name_.c_str(),
+ child_policy_.get());
+ }
+ child_policy_->UpdateLocked(std::move(update_args));
+}
+
+void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
+ child_policy_->ExitIdleLocked();
+}
+
+void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
+ child_policy_->ResetBackoffLocked();
+}
+
+void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
+ // If already deactivated, don't do that again.
+ if (delayed_removal_timer_callback_pending_ == true) return;
+ // Set the child weight to 0 so that future picker won't contain this child.
+ // Start a timer to delete the child.
+ Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
+ grpc_timer_init(&delayed_removal_timer_,
+ ExecCtx::Get()->Now() +
+ GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
+ &on_delayed_removal_timer_);
+ delayed_removal_timer_callback_pending_ = true;
+}
+
+void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
+ void* arg, grpc_error* error) {
+ ClusterChild* self = static_cast(arg);
+ GRPC_ERROR_REF(error); // Ref owned by the lambda
+ self->xds_cluster_manager_policy_->work_serializer()->Run(
+ [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
+ DEBUG_LOCATION);
+}
+
+void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
+ grpc_error* error) {
+ delayed_removal_timer_callback_pending_ = false;
+ if (error == GRPC_ERROR_NONE && !shutdown_) {
+ xds_cluster_manager_policy_->children_.erase(name_);
+ }
+ Unref(DEBUG_LOCATION, "ClusterChild+timer");
+ GRPC_ERROR_UNREF(error);
+}
+
+//
+// XdsClusterManagerLb::ClusterChild::Helper
+//
+
+RefCountedPtr
+XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
+ const grpc_channel_args& args) {
+ if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
+ return nullptr;
+ return xds_cluster_manager_child_->xds_cluster_manager_policy_
+ ->channel_control_helper()
+ ->CreateSubchannel(args);
+}
+
+void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
+ grpc_connectivity_state state, const absl::Status& status,
+ std::unique_ptr picker) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
+ "picker=%p",
+ xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
+ xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
+ status.ToString().c_str(), picker.get());
+ }
+ if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
+ return;
+ // Cache the picker in the ClusterChild.
+ xds_cluster_manager_child_->picker_wrapper_ =
+ MakeRefCounted(xds_cluster_manager_child_->name_,
+ std::move(picker));
+ // Decide what state to report for aggregation purposes.
+ // If we haven't seen a failure since the last time we were in state
+ // READY, then we report the state change as-is. However, once we do see
+ // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
+ // changes until we go back into state READY.
+ if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ xds_cluster_manager_child_->seen_failure_since_ready_ = true;
+ }
+ } else {
+ if (state != GRPC_CHANNEL_READY) return;
+ xds_cluster_manager_child_->seen_failure_since_ready_ = false;
+ }
+ xds_cluster_manager_child_->connectivity_state_ = state;
+ // Notify the LB policy.
+ xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
+}
+
+void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
+ if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
+ return;
+ xds_cluster_manager_child_->xds_cluster_manager_policy_
+ ->channel_control_helper()
+ ->RequestReresolution();
+}
+
+void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
+ TraceSeverity severity, absl::string_view message) {
+ if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
+ return;
+ xds_cluster_manager_child_->xds_cluster_manager_policy_
+ ->channel_control_helper()
+ ->AddTraceEvent(severity, message);
+}
+
+//
+// factory
+//
+
+class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr CreateLoadBalancingPolicy(
+ LoadBalancingPolicy::Args args) const override {
+ return MakeOrphanable(std::move(args));
+ }
+
+ const char* name() const override { return kXdsClusterManager; }
+
+ RefCountedPtr ParseLoadBalancingConfig(
+ const Json& json, grpc_error** error) const override {
+ GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
+ if (json.type() == Json::Type::JSON_NULL) {
+ // xds_cluster_manager was mentioned as a policy in the deprecated
+ // loadBalancingPolicy field or in the client API.
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
+ "configuration. Please use loadBalancingConfig field of service "
+ "config instead.");
+ return nullptr;
+ }
+ std::vector error_list;
+ XdsClusterManagerLbConfig::ClusterMap cluster_map;
+ std::set clusters_to_be_used;
+ auto it = json.object_value().find("children");
+ if (it == json.object_value().end()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:children error:required field not present"));
+ } else if (it->second.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:children error:type should be object"));
+ } else {
+ for (const auto& p : it->second.object_value()) {
+ const std::string& child_name = p.first;
+ if (child_name.empty()) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:children element error: name cannot be empty"));
+ continue;
+ }
+ RefCountedPtr child_config;
+ std::vector child_errors =
+ ParseChildConfig(p.second, &child_config);
+ if (!child_errors.empty()) {
+ // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
+ // string is not static in this case.
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("field:children name:", child_name).c_str());
+ for (grpc_error* child_error : child_errors) {
+ error = grpc_error_add_child(error, child_error);
+ }
+ error_list.push_back(error);
+ } else {
+ cluster_map[child_name] = std::move(child_config);
+ clusters_to_be_used.insert(child_name);
+ }
+ }
+ }
+ if (cluster_map.empty()) {
+ error_list.push_back(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
+ }
+ if (!error_list.empty()) {
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+ "xds_cluster_manager_experimental LB policy config", &error_list);
+ return nullptr;
+ }
+ return MakeRefCounted(std::move(cluster_map));
+ }
+
+ private:
+ static std::vector ParseChildConfig(
+ const Json& json,
+ RefCountedPtr* child_config) {
+ std::vector error_list;
+ if (json.type() != Json::Type::OBJECT) {
+ error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "value should be of type object"));
+ return error_list;
+ }
+ auto it = json.object_value().find("childPolicy");
+ if (it == json.object_value().end()) {
+ error_list.push_back(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
+ } else {
+ grpc_error* parse_error = GRPC_ERROR_NONE;
+ *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
+ it->second, &parse_error);
+ if (*child_config == nullptr) {
+ GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
+ std::vector child_errors;
+ child_errors.push_back(parse_error);
+ error_list.push_back(
+ GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
+ }
+ }
+ return error_list;
+ }
+};
+
+} // namespace
+
+} // namespace grpc_core
+
+//
+// Plugin registration
+//
+
+void grpc_lb_policy_xds_cluster_manager_init() {
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ absl::make_unique());
+}
+
+void grpc_lb_policy_xds_cluster_manager_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
deleted file mode 100644
index 22de026aefa..00000000000
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
+++ /dev/null
@@ -1,1141 +0,0 @@
-//
-// Copyright 2018 gRPC authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-#include
-
-#include
-#include
-#include
-
-#include "absl/container/inlined_vector.h"
-#include "absl/strings/match.h"
-#include "absl/strings/numbers.h"
-#include "absl/strings/str_cat.h"
-#include "absl/strings/str_join.h"
-#include "absl/strings/str_split.h"
-#include "absl/strings/string_view.h"
-#include "re2/re2.h"
-
-#include
-
-#include "src/core/ext/filters/client_channel/lb_policy.h"
-#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/xds/xds_api.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gprpp/orphanable.h"
-#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/work_serializer.h"
-#include "src/core/lib/transport/error_utils.h"
-
-#define GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
-
-namespace grpc_core {
-
-TraceFlag grpc_xds_routing_lb_trace(false, "xds_routing_lb");
-
-namespace {
-
-constexpr char kXdsRouting[] = "xds_routing_experimental";
-
-// Config for xds_routing LB policy.
-class XdsRoutingLbConfig : public LoadBalancingPolicy::Config {
- public:
- struct Route {
- XdsApi::Route::Matchers matchers;
- std::string action;
- };
- using RouteTable = std::vector;
- using ActionMap =
- std::map>;
-
- XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table)
- : action_map_(std::move(action_map)),
- route_table_(std::move(route_table)) {}
-
- const char* name() const override { return kXdsRouting; }
-
- const ActionMap& action_map() const { return action_map_; }
-
- const RouteTable& route_table() const { return route_table_; }
-
- private:
- ActionMap action_map_;
- RouteTable route_table_;
-};
-
-// xds_routing LB policy.
-class XdsRoutingLb : public LoadBalancingPolicy {
- public:
- explicit XdsRoutingLb(Args args);
-
- const char* name() const override { return kXdsRouting; }
-
- void UpdateLocked(UpdateArgs args) override;
- void ExitIdleLocked() override;
- void ResetBackoffLocked() override;
-
- private:
- // A simple wrapper for ref-counting a picker from the child policy.
- class ChildPickerWrapper : public RefCounted {
- public:
- ChildPickerWrapper(std::string name,
- std::unique_ptr picker)
- : name_(std::move(name)), picker_(std::move(picker)) {}
- PickResult Pick(PickArgs args) { return picker_->Pick(args); }
-
- const std::string& name() const { return name_; }
-
- private:
- std::string name_;
- std::unique_ptr picker_;
- };
-
- // Picks a child using prefix or path matching and then delegates to that
- // child's picker.
- class RoutePicker : public SubchannelPicker {
- public:
- struct Route {
- const XdsApi::Route::Matchers* matchers;
- RefCountedPtr picker;
- };
-
- // Maintains an ordered xds route table as provided by RDS response.
- using RouteTable = std::vector;
-
- RoutePicker(RouteTable route_table,
- RefCountedPtr config)
- : route_table_(std::move(route_table)), config_(std::move(config)) {}
-
- PickResult Pick(PickArgs args) override;
-
- private:
- RouteTable route_table_;
- // Take a reference to config so that we can use
- // XdsApi::Route::Matchers from it.
- RefCountedPtr config_;
- };
-
- // Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
- class XdsRoutingChild : public InternallyRefCounted {
- public:
- XdsRoutingChild(RefCountedPtr xds_routing_policy,
- const std::string& name);
- ~XdsRoutingChild();
-
- void Orphan() override;
-
- void UpdateLocked(RefCountedPtr config,
- const ServerAddressList& addresses,
- const grpc_channel_args* args);
- void ExitIdleLocked();
- void ResetBackoffLocked();
- void DeactivateLocked();
-
- grpc_connectivity_state connectivity_state() const {
- return connectivity_state_;
- }
- RefCountedPtr picker_wrapper() const {
- return picker_wrapper_;
- }
-
- private:
- class Helper : public ChannelControlHelper {
- public:
- explicit Helper(RefCountedPtr xds_routing_child)
- : xds_routing_child_(std::move(xds_routing_child)) {}
-
- ~Helper() { xds_routing_child_.reset(DEBUG_LOCATION, "Helper"); }
-
- RefCountedPtr CreateSubchannel(
- const grpc_channel_args& args) override;
- void UpdateState(grpc_connectivity_state state,
- const absl::Status& status,
- std::unique_ptr picker) override;
- void RequestReresolution() override;
- void AddTraceEvent(TraceSeverity severity,
- absl::string_view message) override;
-
- private:
- RefCountedPtr xds_routing_child_;
- };
-
- // Methods for dealing with the child policy.
- OrphanablePtr CreateChildPolicyLocked(
- const grpc_channel_args* args);
-
- static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
- void OnDelayedRemovalTimerLocked(grpc_error* error);
-
- // The owning LB policy.
- RefCountedPtr xds_routing_policy_;
-
- // Points to the corresponding key in XdsRoutingLb::actions_.
- const std::string name_;
-
- OrphanablePtr child_policy_;
-
- RefCountedPtr picker_wrapper_;
- grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
- bool seen_failure_since_ready_ = false;
-
- // States for delayed removal.
- grpc_timer delayed_removal_timer_;
- grpc_closure on_delayed_removal_timer_;
- bool delayed_removal_timer_callback_pending_ = false;
- bool shutdown_ = false;
- };
-
- ~XdsRoutingLb();
-
- void ShutdownLocked() override;
-
- void UpdateStateLocked();
-
- // Current config from the resolver.
- RefCountedPtr config_;
-
- // Internal state.
- bool shutting_down_ = false;
-
- // Children.
- std::map> actions_;
-};
-
-//
-// XdsRoutingLb::RoutePicker
-//
-
-bool PathMatch(const absl::string_view& path,
- const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
- switch (path_matcher.type) {
- case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
- return absl::StartsWith(path, path_matcher.string_matcher);
- case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
- return path == path_matcher.string_matcher;
- case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
- return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
- default:
- return false;
- }
-}
-
-absl::optional GetMetadataValue(
- const std::string& key,
- LoadBalancingPolicy::MetadataInterface* initial_metadata,
- std::string* concatenated_value) {
- // Find all values for the specified key.
- GPR_DEBUG_ASSERT(initial_metadata != nullptr);
- absl::InlinedVector values;
- for (const auto p : *initial_metadata) {
- if (p.first == key) values.push_back(p.second);
- }
- // If none found, no match.
- if (values.empty()) return absl::nullopt;
- // If exactly one found, return it as-is.
- if (values.size() == 1) return values.front();
- // If more than one found, concatenate the values, using
- // *concatenated_values as a temporary holding place for the
- // concatenated string.
- *concatenated_value = absl::StrJoin(values, ",");
- return *concatenated_value;
-}
-
-bool HeaderMatchHelper(
- const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
- LoadBalancingPolicy::MetadataInterface* initial_metadata) {
- std::string concatenated_value;
- absl::optional value;
- // Note: If we ever allow binary headers here, we still need to
- // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
- // they are not visible to the LB policy in grpc-go.
- if (absl::EndsWith(header_matcher.name, "-bin") ||
- header_matcher.name == "grpc-previous-rpc-attempts") {
- value = absl::nullopt;
- } else if (header_matcher.name == "content-type") {
- value = "application/grpc";
- } else {
- value = GetMetadataValue(header_matcher.name, initial_metadata,
- &concatenated_value);
- }
- if (!value.has_value()) {
- if (header_matcher.type ==
- XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
- return !header_matcher.present_match;
- } else {
- // For all other header matcher types, we need the header value to
- // exist to consider matches.
- return false;
- }
- }
- switch (header_matcher.type) {
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
- return value.value() == header_matcher.string_matcher;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
- return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
- int64_t int_value;
- if (!absl::SimpleAtoi(value.value(), &int_value)) {
- return false;
- }
- return int_value >= header_matcher.range_start &&
- int_value < header_matcher.range_end;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
- return absl::StartsWith(value.value(), header_matcher.string_matcher);
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
- return absl::EndsWith(value.value(), header_matcher.string_matcher);
- default:
- return false;
- }
-}
-
-bool HeadersMatch(
- const std::vector& header_matchers,
- LoadBalancingPolicy::MetadataInterface* initial_metadata) {
- for (const auto& header_matcher : header_matchers) {
- bool match = HeaderMatchHelper(header_matcher, initial_metadata);
- if (header_matcher.invert_match) match = !match;
- if (!match) return false;
- }
- return true;
-}
-
-bool UnderFraction(const uint32_t fraction_per_million) {
- // Generate a random number in [0, 1000000).
- const uint32_t random_number = rand() % 1000000;
- return random_number < fraction_per_million;
-}
-
-XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
- for (const Route& route : route_table_) {
- // Path matching.
- if (!PathMatch(args.path, route.matchers->path_matcher)) continue;
- // Header Matching.
- if (!HeadersMatch(route.matchers->header_matchers, args.initial_metadata)) {
- continue;
- }
- // Match fraction check
- if (route.matchers->fraction_per_million.has_value() &&
- !UnderFraction(route.matchers->fraction_per_million.value())) {
- continue;
- }
- // Found a match
- return route.picker->Pick(args);
- }
- PickResult result;
- result.type = PickResult::PICK_FAILED;
- result.error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "xds routing picker: no matching route"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
- return result;
-}
-
-//
-// XdsRoutingLb
-//
-
-XdsRoutingLb::XdsRoutingLb(Args args) : LoadBalancingPolicy(std::move(args)) {}
-
-XdsRoutingLb::~XdsRoutingLb() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO, "[xds_routing_lb %p] destroying xds_routing LB policy",
- this);
- }
-}
-
-void XdsRoutingLb::ShutdownLocked() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO, "[xds_routing_lb %p] shutting down", this);
- }
- shutting_down_ = true;
- actions_.clear();
-}
-
-void XdsRoutingLb::ExitIdleLocked() {
- for (auto& p : actions_) p.second->ExitIdleLocked();
-}
-
-void XdsRoutingLb::ResetBackoffLocked() {
- for (auto& p : actions_) p.second->ResetBackoffLocked();
-}
-
-void XdsRoutingLb::UpdateLocked(UpdateArgs args) {
- if (shutting_down_) return;
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO, "[xds_routing_lb %p] Received update", this);
- }
- // Update config.
- config_ = std::move(args.config);
- // Deactivate the actions not in the new config.
- for (const auto& p : actions_) {
- const std::string& name = p.first;
- XdsRoutingChild* child = p.second.get();
- if (config_->action_map().find(name) == config_->action_map().end()) {
- child->DeactivateLocked();
- }
- }
- // Add or update the actions in the new config.
- for (const auto& p : config_->action_map()) {
- const std::string& name = p.first;
- const RefCountedPtr& config = p.second;
- auto it = actions_.find(name);
- if (it == actions_.end()) {
- it = actions_
- .emplace(name, MakeOrphanable(
- Ref(DEBUG_LOCATION, "XdsRoutingChild"), name))
- .first;
- }
- it->second->UpdateLocked(config, args.addresses, args.args);
- }
-}
-
-void XdsRoutingLb::UpdateStateLocked() {
- // Also count the number of children in each state, to determine the
- // overall state.
- size_t num_ready = 0;
- size_t num_connecting = 0;
- size_t num_idle = 0;
- size_t num_transient_failures = 0;
- for (const auto& p : actions_) {
- const auto& child_name = p.first;
- const XdsRoutingChild* child = p.second.get();
- // Skip the actions that are not in the latest update.
- if (config_->action_map().find(child_name) == config_->action_map().end()) {
- continue;
- }
- switch (child->connectivity_state()) {
- case GRPC_CHANNEL_READY: {
- ++num_ready;
- break;
- }
- case GRPC_CHANNEL_CONNECTING: {
- ++num_connecting;
- break;
- }
- case GRPC_CHANNEL_IDLE: {
- ++num_idle;
- break;
- }
- case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- ++num_transient_failures;
- break;
- }
- default:
- GPR_UNREACHABLE_CODE(return );
- }
- }
- // Determine aggregated connectivity state.
- grpc_connectivity_state connectivity_state;
- if (num_ready > 0) {
- connectivity_state = GRPC_CHANNEL_READY;
- } else if (num_connecting > 0) {
- connectivity_state = GRPC_CHANNEL_CONNECTING;
- } else if (num_idle > 0) {
- connectivity_state = GRPC_CHANNEL_IDLE;
- } else {
- connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO, "[xds_routing_lb %p] connectivity changed to %s", this,
- ConnectivityStateName(connectivity_state));
- }
- std::unique_ptr picker;
- absl::Status status;
- switch (connectivity_state) {
- case GRPC_CHANNEL_READY: {
- RoutePicker::RouteTable route_table;
- for (const auto& config_route : config_->route_table()) {
- RoutePicker::Route route;
- route.matchers = &config_route.matchers;
- route.picker = actions_[config_route.action]->picker_wrapper();
- if (route.picker == nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] child %s has not yet returned a "
- "picker; creating a QueuePicker.",
- this, config_route.action.c_str());
- }
- route.picker = MakeRefCounted(
- config_route.action, absl::make_unique(
- Ref(DEBUG_LOCATION, "QueuePicker")));
- }
- route_table.push_back(std::move(route));
- }
- picker = absl::make_unique(std::move(route_table), config_);
- break;
- }
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:
- picker =
- absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker"));
- break;
- default:
- grpc_error* error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "TRANSIENT_FAILURE from XdsRoutingLb"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
- status = grpc_error_to_absl_status(error);
- picker = absl::make_unique(error);
- }
- channel_control_helper()->UpdateState(connectivity_state, status,
- std::move(picker));
-}
-
-//
-// XdsRoutingLb::XdsRoutingChild
-//
-
-XdsRoutingLb::XdsRoutingChild::XdsRoutingChild(
- RefCountedPtr xds_routing_policy, const std::string& name)
- : xds_routing_policy_(std::move(xds_routing_policy)), name_(name) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO, "[xds_routing_lb %p] created XdsRoutingChild %p for %s",
- xds_routing_policy_.get(), this, name_.c_str());
- }
- GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
- grpc_schedule_on_exec_ctx);
-}
-
-XdsRoutingLb::XdsRoutingChild::~XdsRoutingChild() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] XdsRoutingChild %p: destroying child",
- xds_routing_policy_.get(), this);
- }
- xds_routing_policy_.reset(DEBUG_LOCATION, "XdsRoutingChild");
-}
-
-void XdsRoutingLb::XdsRoutingChild::Orphan() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] XdsRoutingChild %p %s: shutting down child",
- xds_routing_policy_.get(), this, name_.c_str());
- }
- // Remove the child policy's interested_parties pollset_set from the
- // xDS policy.
- grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
- xds_routing_policy_->interested_parties());
- child_policy_.reset();
- // Drop our ref to the child's picker, in case it's holding a ref to
- // the child.
- picker_wrapper_.reset();
- if (delayed_removal_timer_callback_pending_) {
- grpc_timer_cancel(&delayed_removal_timer_);
- }
- shutdown_ = true;
- Unref();
-}
-
-OrphanablePtr
-XdsRoutingLb::XdsRoutingChild::CreateChildPolicyLocked(
- const grpc_channel_args* args) {
- LoadBalancingPolicy::Args lb_policy_args;
- lb_policy_args.work_serializer = xds_routing_policy_->work_serializer();
- lb_policy_args.args = args;
- lb_policy_args.channel_control_helper =
- absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper"));
- OrphanablePtr lb_policy =
- MakeOrphanable(std::move(lb_policy_args),
- &grpc_xds_routing_lb_trace);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] XdsRoutingChild %p %s: Created new child "
- "policy handler %p",
- xds_routing_policy_.get(), this, name_.c_str(), lb_policy.get());
- }
- // Add the xDS's interested_parties pollset_set to that of the newly created
- // child policy. This will make the child policy progress upon activity on
- // xDS LB, which in turn is tied to the application's call.
- grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
- xds_routing_policy_->interested_parties());
- return lb_policy;
-}
-
-void XdsRoutingLb::XdsRoutingChild::UpdateLocked(
- RefCountedPtr config,
- const ServerAddressList& addresses, const grpc_channel_args* args) {
- if (xds_routing_policy_->shutting_down_) return;
- // Update child weight.
- // Reactivate if needed.
- if (delayed_removal_timer_callback_pending_) {
- delayed_removal_timer_callback_pending_ = false;
- grpc_timer_cancel(&delayed_removal_timer_);
- }
- // Create child policy if needed.
- if (child_policy_ == nullptr) {
- child_policy_ = CreateChildPolicyLocked(args);
- }
- // Construct update args.
- UpdateArgs update_args;
- update_args.config = std::move(config);
- update_args.addresses = addresses;
- update_args.args = grpc_channel_args_copy(args);
- // Update the policy.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] XdsRoutingChild %p %s: Updating child "
- "policy handler %p",
- xds_routing_policy_.get(), this, name_.c_str(),
- child_policy_.get());
- }
- child_policy_->UpdateLocked(std::move(update_args));
-}
-
-void XdsRoutingLb::XdsRoutingChild::ExitIdleLocked() {
- child_policy_->ExitIdleLocked();
-}
-
-void XdsRoutingLb::XdsRoutingChild::ResetBackoffLocked() {
- child_policy_->ResetBackoffLocked();
-}
-
-void XdsRoutingLb::XdsRoutingChild::DeactivateLocked() {
- // If already deactivated, don't do that again.
- if (delayed_removal_timer_callback_pending_ == true) return;
- // Set the child weight to 0 so that future picker won't contain this child.
- // Start a timer to delete the child.
- Ref(DEBUG_LOCATION, "XdsRoutingChild+timer").release();
- grpc_timer_init(
- &delayed_removal_timer_,
- ExecCtx::Get()->Now() + GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS,
- &on_delayed_removal_timer_);
- delayed_removal_timer_callback_pending_ = true;
-}
-
-void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimer(void* arg,
- grpc_error* error) {
- XdsRoutingChild* self = static_cast(arg);
- GRPC_ERROR_REF(error); // Ref owned by the lambda
- self->xds_routing_policy_->work_serializer()->Run(
- [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
- DEBUG_LOCATION);
-}
-
-void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked(
- grpc_error* error) {
- delayed_removal_timer_callback_pending_ = false;
- if (error == GRPC_ERROR_NONE && !shutdown_) {
- xds_routing_policy_->actions_.erase(name_);
- }
- Unref(DEBUG_LOCATION, "XdsRoutingChild+timer");
- GRPC_ERROR_UNREF(error);
-}
-
-//
-// XdsRoutingLb::XdsRoutingChild::Helper
-//
-
-RefCountedPtr
-XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel(
- const grpc_channel_args& args) {
- if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr;
- return xds_routing_child_->xds_routing_policy_->channel_control_helper()
- ->CreateSubchannel(args);
-}
-
-void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState(
- grpc_connectivity_state state, const absl::Status& status,
- std::unique_ptr picker) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
- gpr_log(GPR_INFO,
- "[xds_routing_lb %p] child %s: received update: state=%s (%s) "
- "picker=%p",
- xds_routing_child_->xds_routing_policy_.get(),
- xds_routing_child_->name_.c_str(), ConnectivityStateName(state),
- status.ToString().c_str(), picker.get());
- }
- if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
- // Cache the picker in the XdsRoutingChild.
- xds_routing_child_->picker_wrapper_ = MakeRefCounted(
- xds_routing_child_->name_, std::move(picker));
- // Decide what state to report for aggregation purposes.
- // If we haven't seen a failure since the last time we were in state
- // READY, then we report the state change as-is. However, once we do see
- // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
- // changes until we go back into state READY.
- if (!xds_routing_child_->seen_failure_since_ready_) {
- if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- xds_routing_child_->seen_failure_since_ready_ = true;
- }
- } else {
- if (state != GRPC_CHANNEL_READY) return;
- xds_routing_child_->seen_failure_since_ready_ = false;
- }
- xds_routing_child_->connectivity_state_ = state;
- // Notify the LB policy.
- xds_routing_child_->xds_routing_policy_->UpdateStateLocked();
-}
-
-void XdsRoutingLb::XdsRoutingChild::Helper::RequestReresolution() {
- if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
- xds_routing_child_->xds_routing_policy_->channel_control_helper()
- ->RequestReresolution();
-}
-
-void XdsRoutingLb::XdsRoutingChild::Helper::AddTraceEvent(
- TraceSeverity severity, absl::string_view message) {
- if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
- xds_routing_child_->xds_routing_policy_->channel_control_helper()
- ->AddTraceEvent(severity, message);
-}
-
-//
-// factory
-//
-
-class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
- public:
- OrphanablePtr CreateLoadBalancingPolicy(
- LoadBalancingPolicy::Args args) const override {
- return MakeOrphanable(std::move(args));
- }
-
- const char* name() const override { return kXdsRouting; }
-
- RefCountedPtr ParseLoadBalancingConfig(
- const Json& json, grpc_error** error) const override {
- GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
- if (json.type() == Json::Type::JSON_NULL) {
- // xds_routing was mentioned as a policy in the deprecated
- // loadBalancingPolicy field or in the client API.
- *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:loadBalancingPolicy error:xds_routing policy requires "
- "configuration. Please use loadBalancingConfig field of service "
- "config instead.");
- return nullptr;
- }
- std::vector error_list;
- // action map.
- XdsRoutingLbConfig::ActionMap action_map;
- std::set actions_to_be_used;
- auto it = json.object_value().find("actions");
- if (it == json.object_value().end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:actions error:required field not present"));
- } else if (it->second.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:actions error:type should be object"));
- } else {
- for (const auto& p : it->second.object_value()) {
- if (p.first.empty()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:actions element error: name cannot be empty"));
- continue;
- }
- RefCountedPtr child_config;
- std::vector child_errors =
- ParseChildConfig(p.second, &child_config);
- if (!child_errors.empty()) {
- // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
- // string is not static in this case.
- grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat("field:actions name:", p.first).c_str());
- for (grpc_error* child_error : child_errors) {
- error = grpc_error_add_child(error, child_error);
- }
- error_list.push_back(error);
- } else {
- action_map[p.first] = std::move(child_config);
- actions_to_be_used.insert(p.first);
- }
- }
- }
- if (action_map.empty()) {
- error_list.push_back(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid actions configured"));
- }
- XdsRoutingLbConfig::RouteTable route_table;
- it = json.object_value().find("routes");
- if (it == json.object_value().end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:routes error:required field not present"));
- } else if (it->second.type() != Json::Type::ARRAY) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:routes error:type should be array"));
- } else {
- const Json::Array& array = it->second.array_value();
- for (size_t i = 0; i < array.size(); ++i) {
- XdsRoutingLbConfig::Route route;
- std::vector route_errors =
- ParseRoute(array[i], action_map, &route, &actions_to_be_used);
- if (!route_errors.empty()) {
- // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
- // string is not static in this case.
- grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat("field:routes element: ", i, " error").c_str());
- for (grpc_error* route_error : route_errors) {
- error = grpc_error_add_child(error, route_error);
- }
- error_list.push_back(error);
- }
- route_table.emplace_back(std::move(route));
- }
- }
- if (route_table.empty()) {
- grpc_error* error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured");
- error_list.push_back(error);
- }
- if (!actions_to_be_used.empty()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "some actions were not referenced by any route"));
- }
- if (!error_list.empty()) {
- *error = GRPC_ERROR_CREATE_FROM_VECTOR(
- "xds_routing_experimental LB policy config", &error_list);
- return nullptr;
- }
- return MakeRefCounted(std::move(action_map),
- std::move(route_table));
- }
-
- private:
- static std::vector ParseChildConfig(
- const Json& json,
- RefCountedPtr* child_config) {
- std::vector error_list;
- if (json.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "value should be of type object"));
- return error_list;
- }
- auto it = json.object_value().find("childPolicy");
- if (it == json.object_value().end()) {
- error_list.push_back(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
- } else {
- grpc_error* parse_error = GRPC_ERROR_NONE;
- *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
- it->second, &parse_error);
- if (*child_config == nullptr) {
- GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
- std::vector child_errors;
- child_errors.push_back(parse_error);
- error_list.push_back(
- GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
- }
- }
- return error_list;
- }
-
- static std::vector ParseRoute(
- const Json& json, const XdsRoutingLbConfig::ActionMap& action_map,
- XdsRoutingLbConfig::Route* route,
- std::set* actions_to_be_used) {
- std::vector error_list;
- if (json.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "value should be of type object"));
- return error_list;
- }
- // Parse and ensure one and only one path matcher is set: prefix, path, or
- // regex.
- bool path_matcher_seen = false;
- auto it = json.object_value().find("prefix");
- if (it != json.object_value().end()) {
- if (it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:prefix error: should be string"));
- } else {
- path_matcher_seen = true;
- route->matchers.path_matcher.type =
- XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX;
- route->matchers.path_matcher.string_matcher = it->second.string_value();
- }
- }
- it = json.object_value().find("path");
- if (it != json.object_value().end()) {
- if (path_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:path error: other path matcher already specified"));
- } else {
- path_matcher_seen = true;
- if (it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:path error: should be string"));
- } else {
- route->matchers.path_matcher.type =
- XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH;
- route->matchers.path_matcher.string_matcher =
- it->second.string_value();
- }
- }
- }
- it = json.object_value().find("regex");
- if (it != json.object_value().end()) {
- if (path_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:regex error: other path matcher already specified"));
- } else {
- path_matcher_seen = true;
- if (it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:regex error: should be string"));
- } else {
- route->matchers.path_matcher.type =
- XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX;
- route->matchers.path_matcher.regex_matcher =
- absl::make_unique(it->second.string_value());
- }
- }
- }
- if (!path_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "one path matcher: prefix, path, or regex is required"));
- }
- // Parse Header Matcher: headers.
- it = json.object_value().find("headers");
- if (it != json.object_value().end()) {
- if (it->second.type() != Json::Type::ARRAY) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:headers error: should be array"));
- } else {
- const Json::Array& array = it->second.array_value();
- for (size_t i = 0; i < array.size(); ++i) {
- const Json& header_json = array[i];
- if (header_json.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "value should be of type object"));
- } else {
- route->matchers.header_matchers.emplace_back();
- XdsApi::Route::Matchers::HeaderMatcher& header_matcher =
- route->matchers.header_matchers.back();
- auto header_it = header_json.object_value().find("name");
- if (header_it == header_json.object_value().end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:name error:required field missing"));
- } else {
- if (header_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:name error: should be string"));
- } else {
- header_matcher.name = header_it->second.string_value();
- }
- }
- header_it = header_json.object_value().find("invert_match");
- if (header_it != header_json.object_value().end()) {
- if (header_it->second.type() == Json::Type::JSON_TRUE) {
- header_matcher.invert_match = true;
- } else if (header_it->second.type() == Json::Type::JSON_FALSE) {
- header_matcher.invert_match = false;
- } else {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:present_match error: should be boolean"));
- }
- }
- // Parse and ensure one and only one header matcher is set per
- // header matcher.
- bool header_matcher_seen = false;
- header_it = header_json.object_value().find("exact_match");
- if (header_it != header_json.object_value().end()) {
- header_matcher_seen = true;
- if (header_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:exact_match error: should be string"));
- } else {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::EXACT;
- header_matcher.string_matcher =
- header_it->second.string_value();
- }
- }
- header_it = header_json.object_value().find("regex_match");
- if (header_it != header_json.object_value().end()) {
- if (header_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:regex_match error: other header matcher already "
- "specified"));
- } else {
- header_matcher_seen = true;
- if (header_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:regex_match error: should be string"));
- } else {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::REGEX;
- header_matcher.regex_match =
- absl::make_unique(header_it->second.string_value());
- }
- }
- }
- header_it = header_json.object_value().find("range_match");
- if (header_it != header_json.object_value().end()) {
- if (header_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:range_match error: other header matcher already "
- "specified"));
- } else {
- header_matcher_seen = true;
- if (header_it->second.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:range_match error: should be object"));
- } else {
- auto range_it =
- header_it->second.object_value().find("start");
- if (range_it != header_it->second.object_value().end()) {
- if (range_it->second.type() != Json::Type::NUMBER) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:start error: should be of number"));
- } else {
- header_matcher.range_start = gpr_parse_nonnegative_int(
- range_it->second.string_value().c_str());
- }
- } else {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:start missing"));
- }
- range_it = header_it->second.object_value().find("end");
- if (range_it != header_it->second.object_value().end()) {
- if (range_it->second.type() != Json::Type::NUMBER) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:end error: should be of number"));
- } else {
- header_matcher.range_end = gpr_parse_nonnegative_int(
- range_it->second.string_value().c_str());
- }
- } else {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:end missing"));
- }
- if (header_matcher.range_end > header_matcher.range_start) {
- header_matcher.type = XdsApi::Route::Matchers::
- HeaderMatcher::HeaderMatcherType::RANGE;
- }
- }
- }
- }
- header_it = header_json.object_value().find("present_match");
- if (header_it != header_json.object_value().end()) {
- if (header_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:present_match error: other header matcher already "
- "specified"));
- } else {
- header_matcher_seen = true;
- if (header_it->second.type() == Json::Type::JSON_TRUE) {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::PRESENT;
- header_matcher.present_match = true;
- } else if (header_it->second.type() == Json::Type::JSON_FALSE) {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::PRESENT;
- header_matcher.present_match = false;
- } else {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:present_match error: should be boolean"));
- }
- }
- }
- header_it = header_json.object_value().find("prefix_match");
- if (header_it != header_json.object_value().end()) {
- if (header_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:prefix_match error: other header matcher already "
- "specified"));
- } else {
- header_matcher_seen = true;
- if (header_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:prefix_match error: should be string"));
- } else {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::PREFIX;
- header_matcher.string_matcher =
- header_it->second.string_value();
- }
- }
- }
- header_it = header_json.object_value().find("suffix_match");
- if (header_it != header_json.object_value().end()) {
- if (header_matcher_seen) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:suffix_match error: other header matcher already "
- "specified"));
- } else {
- header_matcher_seen = true;
- if (header_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:suffix_match error: should be string"));
- } else {
- header_matcher.type = XdsApi::Route::Matchers::HeaderMatcher::
- HeaderMatcherType::SUFFIX;
- header_matcher.string_matcher =
- header_it->second.string_value();
- }
- }
- }
- }
- }
- }
- }
- // Parse Fraction numerator.
- it = json.object_value().find("match_fraction");
- if (it != json.object_value().end()) {
- if (it->second.type() != Json::Type::NUMBER) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:match_fraction error:must be of type number"));
- } else {
- route->matchers.fraction_per_million =
- gpr_parse_nonnegative_int(it->second.string_value().c_str());
- }
- }
- // Parse action.
- it = json.object_value().find("action");
- if (it == json.object_value().end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:action error:required field missing"));
- } else if (it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:action error:should be of type string"));
- } else {
- route->action = it->second.string_value();
- if (route->action.empty()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:action error:cannot be empty"));
- } else {
- // Validate action exists and mark it as used.
- if (action_map.find(route->action) == action_map.end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- absl::StrCat("field:action error:", route->action,
- " does not exist")
- .c_str()));
- }
- actions_to_be_used->erase(route->action);
- }
- }
- return error_list;
- }
-};
-
-} // namespace
-
-} // namespace grpc_core
-
-//
-// Plugin registration
-//
-
-void grpc_lb_policy_xds_routing_init() {
- grpc_core::LoadBalancingPolicyRegistry::Builder::
- RegisterLoadBalancingPolicyFactory(
- absl::make_unique());
-}
-
-void grpc_lb_policy_xds_routing_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
index d522ce3a77a..2ce29e9d070 100644
--- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
@@ -18,7 +18,10 @@
#include
+#include "absl/strings/match.h"
#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
+#include "re2/re2.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
@@ -30,6 +33,8 @@ namespace grpc_core {
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
+const char* kXdsClusterAttribute = "xds_cluster_name";
+
namespace {
//
@@ -42,8 +47,7 @@ class XdsResolver : public Resolver {
: Resolver(std::move(args.work_serializer),
std::move(args.result_handler)),
args_(grpc_channel_args_copy(args.args)),
- interested_parties_(args.pollset_set),
- config_selector_(MakeRefCounted()) {
+ interested_parties_(args.pollset_set) {
char* path = args.uri->path;
if (path[0] == '/') ++path;
server_name_ = path;
@@ -82,47 +86,57 @@ class XdsResolver : public Resolver {
RefCountedPtr resolver_;
};
- class XdsConfigSelector : public ConfigSelector {
+ class ClusterState
+ : public RefCounted {
public:
- CallConfig GetCallConfig(GetCallConfigArgs args) override {
- return CallConfig();
- }
+ using ClusterStateMap =
+ std::map>;
+
+ ClusterState(const std::string& cluster_name,
+ ClusterStateMap* cluster_state_map)
+ : it_(cluster_state_map
+ ->emplace(cluster_name, std::unique_ptr(this))
+ .first) {}
+ const std::string& cluster() const { return it_->first; }
+
+ private:
+ ClusterStateMap::iterator it_;
};
- // Returns the weighted_clusters action name to use from
- // weighted_cluster_index_map_ for a WeightedClusters route action.
- std::string WeightedClustersActionName(
- const std::vector& weighted_clusters);
+ class XdsConfigSelector : public ConfigSelector {
+ public:
+ XdsConfigSelector(RefCountedPtr resolver,
+ const std::vector& routes);
+ ~XdsConfigSelector();
+ CallConfig GetCallConfig(GetCallConfigArgs args) override;
- // Updates weighted_cluster_index_map_ that will
- // determine the names of the WeightedCluster actions for the current update.
- void UpdateWeightedClusterIndexMap(const std::vector& routes);
+ private:
+ struct Route {
+ XdsApi::Route route;
+ absl::InlinedVector, 2>
+ weighted_cluster_state;
+ };
+ using RouteTable = std::vector;
+
+ void MaybeAddCluster(const std::string& name);
- // Create the service config generated by the list of routes.
- grpc_error* CreateServiceConfig(const std::vector& routes,
- RefCountedPtr* service_config);
+ RefCountedPtr resolver_;
+ RouteTable route_table_;
+ std::map> clusters_;
+ };
+
+ void OnListenerChanged(std::vector routes);
+ grpc_error* CreateServiceConfig(RefCountedPtr* service_config);
+ void OnError(grpc_error* error);
+ void PropagateUpdate();
+ void MaybeRemoveUnusedClusters();
std::string server_name_;
const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_;
OrphanablePtr xds_client_;
- RefCountedPtr config_selector_;
-
- // 2-level map to store WeightedCluster action names.
- // Top level map is keyed by cluster names without weight like a_b_c; bottom
- // level map is keyed by cluster names + weights like a10_b50_c40.
- struct ClusterNamesInfo {
- uint64_t next_index = 0;
- std::map
- cluster_weights_map;
- };
- using WeightedClusterIndexMap =
- std::map;
-
- // Cache of action names for WeightedCluster targets in the current
- // service config.
- WeightedClusterIndexMap weighted_cluster_index_map_;
+ ClusterState::ClusterStateMap cluster_state_map_;
+ std::vector current_update_;
};
//
@@ -136,36 +150,14 @@ void XdsResolver::ListenerWatcher::OnListenerChanged(
gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data",
resolver_.get());
}
- Result result;
- grpc_error* error =
- resolver_->CreateServiceConfig(routes, &result.service_config);
- if (error != GRPC_ERROR_NONE) {
- OnError(error);
- return;
- }
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
- gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s",
- resolver_.get(), result.service_config->json_string().c_str());
- }
- grpc_arg new_args[] = {
- resolver_->xds_client_->MakeChannelArg(),
- resolver_->config_selector_->MakeChannelArg(),
- };
- result.args = grpc_channel_args_copy_and_add(resolver_->args_, new_args,
- GPR_ARRAY_SIZE(new_args));
- resolver_->result_handler()->ReturnResult(std::move(result));
+ resolver_->OnListenerChanged(std::move(routes));
}
void XdsResolver::ListenerWatcher::OnError(grpc_error* error) {
if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(),
grpc_error_string(error));
- grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
- Result result;
- result.args =
- grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1);
- result.service_config_error = error;
- resolver_->result_handler()->ReturnResult(std::move(result));
+ resolver_->OnError(error);
}
void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
@@ -183,316 +175,294 @@ void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
}
//
-// XdsResolver
+// XdsResolver::XdsConfigSelector
//
-void XdsResolver::StartLocked() {
- grpc_error* error = GRPC_ERROR_NONE;
- xds_client_ = MakeOrphanable(
- work_serializer(), interested_parties_, server_name_,
- absl::make_unique(Ref()), *args_, &error);
- if (error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR,
- "Failed to create xds client -- channel will remain in "
- "TRANSIENT_FAILURE: %s",
- grpc_error_string(error));
- result_handler()->ReturnError(error);
+XdsResolver::XdsConfigSelector::XdsConfigSelector(
+ RefCountedPtr resolver,
+ const std::vector& routes)
+ : resolver_(std::move(resolver)) {
+ // 1. Construct the route table
+ // 2 Update resolver's cluster state map
+ // 3. Construct cluster list to hold on to entries in the cluster state
+ // map.
+ // Reserve the necessary entries up-front to avoid reallocation as we add
+ // elements. This is necessary because the string_view in the entry's
+ // weighted_cluster_state field points to the memory in the route field, so
+ // moving the entry in a reallocation will cause the string_view to point to
+ // invalid data.
+ route_table_.reserve(routes.size());
+ for (auto& route : routes) {
+ route_table_.emplace_back();
+ auto& route_entry = route_table_.back();
+ route_entry.route = route;
+ if (route.weighted_clusters.empty()) {
+ MaybeAddCluster(route.cluster_name);
+ } else {
+ uint32_t end = 0;
+ for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
+ MaybeAddCluster(weighted_cluster.name);
+ end += weighted_cluster.weight;
+ route_entry.weighted_cluster_state.emplace_back(end,
+ weighted_cluster.name);
+ }
+ }
}
}
-std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
- return absl::StrFormat(
- " \"cds:%s\":{\n"
- " \"childPolicy\":[ {\n"
- " \"cds_experimental\":{\n"
- " \"cluster\": \"%s\"\n"
- " }\n"
- " } ]\n"
- " }",
- cluster_name, cluster_name);
+XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
+ clusters_.clear();
+ resolver_->MaybeRemoveUnusedClusters();
}
-std::string CreateServiceConfigRoute(const std::string& action_name,
- const XdsApi::Route& route) {
- std::vector headers;
- for (const auto& header : route.matchers.header_matchers) {
- std::string header_matcher;
- switch (header.type) {
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
- header_matcher = absl::StrFormat(" \"exact_match\": \"%s\"",
- header.string_matcher);
- break;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
- header_matcher = absl::StrFormat(" \"regex_match\": \"%s\"",
- header.regex_match->pattern());
- break;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
- header_matcher = absl::StrFormat(
- " \"range_match\":{\n"
- " \"start\":%d,\n"
- " \"end\":%d\n"
- " }",
- header.range_start, header.range_end);
- break;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT:
- header_matcher =
- absl::StrFormat(" \"present_match\": %s",
- header.present_match ? "true" : "false");
- break;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
- header_matcher = absl::StrFormat(
- " \"prefix_match\": \"%s\"", header.string_matcher);
- break;
- case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
- header_matcher = absl::StrFormat(
- " \"suffix_match\": \"%s\"", header.string_matcher);
- break;
- default:
- break;
- }
- std::vector header_parts;
- header_parts.push_back(
- absl::StrFormat(" { \n"
- " \"name\": \"%s\",\n",
- header.name));
- header_parts.push_back(header_matcher);
- if (header.invert_match) {
- header_parts.push_back(
- absl::StrFormat(",\n"
- " \"invert_match\": true"));
+void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
+ if (clusters_.find(name) == clusters_.end()) {
+ auto it = resolver_->cluster_state_map_.find(name);
+ if (it == resolver_->cluster_state_map_.end()) {
+ auto new_cluster_state =
+ MakeRefCounted(name, &resolver_->cluster_state_map_);
+ clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
+ } else {
+ clusters_[it->second->cluster()] = it->second->Ref();
}
- header_parts.push_back(
- absl::StrFormat("\n"
- " }"));
- headers.push_back(absl::StrJoin(header_parts, ""));
- }
- std::vector headers_service_config;
- if (!headers.empty()) {
- headers_service_config.push_back("\"headers\":[\n");
- headers_service_config.push_back(absl::StrJoin(headers, ","));
- headers_service_config.push_back(" ],\n");
}
- std::string path_match_str;
- switch (route.matchers.path_matcher.type) {
+}
+
+bool PathMatch(const absl::string_view& path,
+ const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
+ switch (path_matcher.type) {
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
- path_match_str = absl::StrFormat(
- "\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
- break;
+ return absl::StartsWith(path, path_matcher.string_matcher);
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
- path_match_str = absl::StrFormat(
- "\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
- break;
+ return path == path_matcher.string_matcher;
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
- path_match_str =
- absl::StrFormat("\"regex\": \"%s\",\n",
- route.matchers.path_matcher.regex_matcher->pattern());
- break;
+ return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
+ default:
+ return false;
}
- return absl::StrFormat(
- " { \n"
- " %s"
- " %s"
- " %s"
- " \"action\": \"%s\"\n"
- " }",
- path_match_str, absl::StrJoin(headers_service_config, ""),
- route.matchers.fraction_per_million.has_value()
- ? absl::StrFormat("\"match_fraction\":%d,\n",
- route.matchers.fraction_per_million.value())
- : "",
- action_name);
}
-// Create the service config for one weighted cluster.
-std::string CreateServiceConfigActionWeightedCluster(
- const std::string& name,
- const std::vector& clusters) {
- std::vector config_parts;
- config_parts.push_back(
- absl::StrFormat(" \"weighted:%s\":{\n"
- " \"childPolicy\":[ {\n"
- " \"weighted_target_experimental\":{\n"
- " \"targets\":{\n",
- name));
- std::vector weighted_targets;
- weighted_targets.reserve(clusters.size());
- for (const auto& cluster_weight : clusters) {
- weighted_targets.push_back(absl::StrFormat(
- " \"%s\":{\n"
- " \"weight\":%d,\n"
- " \"childPolicy\":[ {\n"
- " \"cds_experimental\":{\n"
- " \"cluster\": \"%s\"\n"
- " }\n"
- " } ]\n"
- " }",
- cluster_weight.name, cluster_weight.weight, cluster_weight.name));
+absl::optional GetMetadataValue(
+ const std::string& target_key, grpc_metadata_batch* initial_metadata,
+ std::string* concatenated_value) {
+ // Find all values for the specified key.
+ GPR_DEBUG_ASSERT(initial_metadata != nullptr);
+ absl::InlinedVector values;
+ for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
+ md = md->next) {
+ absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
+ absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
+ if (target_key == key) values.push_back(value);
}
- config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
- config_parts.push_back(
- " }\n"
- " }\n"
- " } ]\n"
- " }");
- return absl::StrJoin(config_parts, "");
+ // If none found, no match.
+ if (values.empty()) return absl::nullopt;
+ // If exactly one found, return it as-is.
+ if (values.size() == 1) return values.front();
+ // If more than one found, concatenate the values, using
+ // *concatenated_values as a temporary holding place for the
+ // concatenated string.
+ *concatenated_value = absl::StrJoin(values, ",");
+ return *concatenated_value;
}
-struct WeightedClustersKeys {
- std::string cluster_names_key;
- std::string cluster_weights_key;
-};
+bool HeaderMatchHelper(
+ const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
+ grpc_metadata_batch* initial_metadata) {
+ std::string concatenated_value;
+ absl::optional value;
+ // Note: If we ever allow binary headers here, we still need to
+ // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
+ // they are not visible to the LB policy in grpc-go.
+ if (absl::EndsWith(header_matcher.name, "-bin") ||
+ header_matcher.name == "grpc-previous-rpc-attempts") {
+ value = absl::nullopt;
+ } else if (header_matcher.name == "content-type") {
+ value = "application/grpc";
+ } else {
+ value = GetMetadataValue(header_matcher.name, initial_metadata,
+ &concatenated_value);
+ }
+ if (!value.has_value()) {
+ if (header_matcher.type ==
+ XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
+ return !header_matcher.present_match;
+ } else {
+ // For all other header matcher types, we need the header value to
+ // exist to consider matches.
+ return false;
+ }
+ }
+ switch (header_matcher.type) {
+ case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
+ return value.value() == header_matcher.string_matcher;
+ case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
+ return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
+ case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
+ int64_t int_value;
+ if (!absl::SimpleAtoi(value.value(), &int_value)) {
+ return false;
+ }
+ return int_value >= header_matcher.range_start &&
+ int_value < header_matcher.range_end;
+ case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
+ return absl::StartsWith(value.value(), header_matcher.string_matcher);
+ case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
+ return absl::EndsWith(value.value(), header_matcher.string_matcher);
+ default:
+ return false;
+ }
+}
-// Returns the cluster names and weights key or the cluster names only key.
-WeightedClustersKeys GetWeightedClustersKey(
- const std::vector& weighted_clusters) {
- std::set cluster_names;
- std::set cluster_weights;
- for (const auto& cluster_weight : weighted_clusters) {
- cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
- cluster_weights.emplace(
- absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
+bool HeadersMatch(
+ const std::vector& header_matchers,
+ grpc_metadata_batch* initial_metadata) {
+ for (const auto& header_matcher : header_matchers) {
+ bool match = HeaderMatchHelper(header_matcher, initial_metadata);
+ if (header_matcher.invert_match) match = !match;
+ if (!match) return false;
}
- return {absl::StrJoin(cluster_names, "_"),
- absl::StrJoin(cluster_weights, "_")};
+ return true;
}
-std::string XdsResolver::WeightedClustersActionName(
- const std::vector& weighted_clusters) {
- WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
- auto cluster_names_map_it =
- weighted_cluster_index_map_.find(keys.cluster_names_key);
- GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
- const auto& cluster_weights_map =
- cluster_names_map_it->second.cluster_weights_map;
- auto cluster_weights_map_it =
- cluster_weights_map.find(keys.cluster_weights_key);
- GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
- return absl::StrFormat("%s_%d", keys.cluster_names_key,
- cluster_weights_map_it->second);
+bool UnderFraction(const uint32_t fraction_per_million) {
+ // Generate a random number in [0, 1000000).
+ const uint32_t random_number = rand() % 1000000;
+ return random_number < fraction_per_million;
}
-void XdsResolver::UpdateWeightedClusterIndexMap(
- const std::vector& routes) {
- // Construct a list of unique WeightedCluster
- // actions which we need to process: to find action names
- std::map
- actions_to_process;
- for (const auto& route : routes) {
- if (!route.weighted_clusters.empty()) {
- WeightedClustersKeys keys =
- GetWeightedClustersKey(route.weighted_clusters);
- auto action_it = actions_to_process.find(keys.cluster_weights_key);
- if (action_it == actions_to_process.end()) {
- actions_to_process[std::move(keys.cluster_weights_key)] =
- std::move(keys.cluster_names_key);
- }
+ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
+ GetCallConfigArgs args) {
+ for (const auto& entry : route_table_) {
+ // Path matching.
+ if (!PathMatch(StringViewFromSlice(*args.path),
+ entry.route.matchers.path_matcher)) {
+ continue;
}
- }
- // First pass of all unique WeightedCluster actions: if the exact same
- // weighted target policy (same clusters and weights) appears in the old map,
- // then that old action name is taken again and should be moved to the new
- // map; any other action names from the old set of actions are candidates for
- // reuse.
- XdsResolver::WeightedClusterIndexMap new_weighted_cluster_index_map;
- for (auto action_it = actions_to_process.begin();
- action_it != actions_to_process.end();) {
- const std::string& cluster_names_key = action_it->second;
- const std::string& cluster_weights_key = action_it->first;
- auto old_cluster_names_map_it =
- weighted_cluster_index_map_.find(cluster_names_key);
- if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
- // Add cluster_names_key to the new map and copy next_index.
- auto& new_cluster_names_info =
- new_weighted_cluster_index_map[cluster_names_key];
- new_cluster_names_info.next_index =
- old_cluster_names_map_it->second.next_index;
- // Lookup cluster_weights_key in old map.
- auto& old_cluster_weights_map =
- old_cluster_names_map_it->second.cluster_weights_map;
- auto old_cluster_weights_map_it =
- old_cluster_weights_map.find(cluster_weights_key);
- if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
- // same policy found, move from old map to new map.
- new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
- old_cluster_weights_map_it->second;
- old_cluster_weights_map.erase(old_cluster_weights_map_it);
- // This action has been added to new map, so no need to process it
- // again.
- action_it = actions_to_process.erase(action_it);
- continue;
- }
+ // Header Matching.
+ if (!HeadersMatch(entry.route.matchers.header_matchers,
+ args.initial_metadata)) {
+ continue;
}
- ++action_it;
- }
- // Second pass of all remaining unique WeightedCluster actions: if clusters
- // for a new action are the same as an old unused action, reuse the name. If
- // clusters differ, use a brand new name.
- for (const auto& action : actions_to_process) {
- const std::string& cluster_names_key = action.second;
- const std::string& cluster_weights_key = action.first;
- auto& new_cluster_names_info =
- new_weighted_cluster_index_map[cluster_names_key];
- auto& old_cluster_weights_map =
- weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
- auto old_cluster_weights_it = old_cluster_weights_map.begin();
- if (old_cluster_weights_it != old_cluster_weights_map.end()) {
- // There is something to reuse: this action uses the same set
- // of clusters as a previous action and that action name is not
- // already taken.
- new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
- old_cluster_weights_it->second;
- // Remove the name from being able to reuse again.
- old_cluster_weights_map.erase(old_cluster_weights_it);
+ // Match fraction check
+ if (entry.route.matchers.fraction_per_million.has_value() &&
+ !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
+ continue;
+ }
+ // Found a route match
+ absl::string_view cluster_name;
+ if (entry.route.weighted_clusters.empty()) {
+ cluster_name = entry.route.cluster_name;
} else {
- // There is nothing to reuse, take the next index to use and
- // increment.
- new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
- new_cluster_names_info.next_index++;
+ const uint32_t key =
+ rand() %
+ entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
+ .first;
+ // Find the index in weighted clusters corresponding to key.
+ size_t mid = 0;
+ size_t start_index = 0;
+ size_t end_index = entry.weighted_cluster_state.size() - 1;
+ size_t index = 0;
+ while (end_index > start_index) {
+ mid = (start_index + end_index) / 2;
+ if (entry.weighted_cluster_state[mid].first > key) {
+ end_index = mid;
+ } else if (entry.weighted_cluster_state[mid].first < key) {
+ start_index = mid + 1;
+ } else {
+ index = mid + 1;
+ break;
+ }
+ }
+ if (index == 0) index = start_index;
+ GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
+ cluster_name = entry.weighted_cluster_state[index].second;
}
+ auto it = clusters_.find(cluster_name);
+ GPR_ASSERT(it != clusters_.end());
+ XdsResolver* resolver =
+ static_cast(resolver_->Ref().release());
+ ClusterState* cluster_state = it->second->Ref().release();
+ CallConfig call_config;
+ call_config.call_attributes[kXdsClusterAttribute] = it->first;
+ call_config.on_call_committed = [resolver, cluster_state]() {
+ cluster_state->Unref();
+ ExecCtx::Run(
+ // TODO(roth): This hop into the ExecCtx is being done to avoid
+ // entering the WorkSerializer while holding the client channel data
+ // plane mutex, since that can lead to deadlocks. However, we should
+ // not have to solve this problem in each individual ConfigSelector
+ // implementation. When we have time, we should fix the client channel
+ // code to avoid this by not invoking the
+ // CallConfig::on_call_committed callback until after it has released
+ // the data plane mutex.
+ DEBUG_LOCATION,
+ GRPC_CLOSURE_CREATE(
+ [](void* arg, grpc_error* /*error*/) {
+ auto* resolver = static_cast(arg);
+ resolver->work_serializer()->Run(
+ [resolver]() {
+ resolver->MaybeRemoveUnusedClusters();
+ resolver->Unref();
+ },
+ DEBUG_LOCATION);
+ },
+ resolver, nullptr),
+ GRPC_ERROR_NONE);
+ };
+ return call_config;
+ }
+ return CallConfig();
+}
+
+//
+// XdsResolver
+//
+
+void XdsResolver::StartLocked() {
+ grpc_error* error = GRPC_ERROR_NONE;
+ xds_client_ = MakeOrphanable(
+ work_serializer(), interested_parties_, server_name_,
+ absl::make_unique(Ref()), *args_, &error);
+ if (error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR,
+ "Failed to create xds client -- channel will remain in "
+ "TRANSIENT_FAILURE: %s",
+ grpc_error_string(error));
+ result_handler()->ReturnError(error);
}
- weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
+}
+
+void XdsResolver::OnListenerChanged(std::vector routes) {
+ // Save the update in the resolver.
+ current_update_ = std::move(routes);
+ // Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
+ // and ReturnResult.
+ PropagateUpdate();
}
grpc_error* XdsResolver::CreateServiceConfig(
- const std::vector& routes,
RefCountedPtr* service_config) {
- UpdateWeightedClusterIndexMap(routes);
- std::vector actions_vector;
- std::vector route_table;
- std::set actions_set;
- for (const auto& route : routes) {
- const std::string action_name =
- route.weighted_clusters.empty()
- ? route.cluster_name
- : WeightedClustersActionName(route.weighted_clusters);
- if (actions_set.find(action_name) == actions_set.end()) {
- actions_set.emplace(action_name);
- actions_vector.push_back(
- route.weighted_clusters.empty()
- ? CreateServiceConfigActionCluster(action_name)
- : CreateServiceConfigActionWeightedCluster(
- action_name, route.weighted_clusters));
- }
- route_table.push_back(CreateServiceConfigRoute(
- absl::StrFormat("%s:%s",
- route.weighted_clusters.empty() ? "cds" : "weighted",
- action_name),
- route));
+ std::vector clusters;
+ for (const auto& cluster : cluster_state_map_) {
+ clusters.push_back(
+ absl::StrFormat(" \"%s\":{\n"
+ " \"childPolicy\":[ {\n"
+ " \"cds_experimental\":{\n"
+ " \"cluster\": \"%s\"\n"
+ " }\n"
+ " } ]\n"
+ " }",
+ cluster.first, cluster.first));
}
std::vector config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
- " { \"xds_routing_experimental\":{\n"
- " \"actions\":{\n");
- config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
- config_parts.push_back(
- " },\n"
- " \"routes\":[\n");
- config_parts.push_back(absl::StrJoin(route_table, ",\n"));
+ " { \"xds_cluster_manager_experimental\":{\n"
+ " \"children\":{\n");
+ config_parts.push_back(absl::StrJoin(clusters, ",\n"));
config_parts.push_back(
- " ]\n"
+ " }\n"
" } }\n"
" ]\n"
"}");
@@ -502,6 +472,56 @@ grpc_error* XdsResolver::CreateServiceConfig(
return error;
}
+void XdsResolver::OnError(grpc_error* error) {
+ grpc_arg xds_client_arg = xds_client_->MakeChannelArg();
+ Result result;
+ result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1);
+ result.service_config_error = error;
+ result_handler()->ReturnResult(std::move(result));
+}
+
+void XdsResolver::PropagateUpdate() {
+ // First create XdsConfigSelector, which may add new entries to the cluster
+ // state map, and then CreateServiceConfig for LB policies.
+ auto config_selector =
+ MakeRefCounted(Ref(), current_update_);
+ Result result;
+ grpc_error* error = CreateServiceConfig(&result.service_config);
+ if (error != GRPC_ERROR_NONE) {
+ OnError(error);
+ return;
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
+ gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
+ result.service_config->json_string().c_str());
+ }
+ grpc_arg new_args[] = {
+ xds_client_->MakeChannelArg(),
+ config_selector->MakeChannelArg(),
+ };
+ result.args =
+ grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
+ result_handler()->ReturnResult(std::move(result));
+}
+
+void XdsResolver::MaybeRemoveUnusedClusters() {
+ bool update_needed = false;
+ for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
+ if (it->second->RefIfNonZero()) {
+ it->second->Unref();
+ ++it;
+ } else {
+ update_needed = true;
+ it = cluster_state_map_.erase(it);
+ }
+ }
+ if (update_needed && xds_client_ != nullptr) {
+ // Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
+ // and ReturnResult.
+ PropagateUpdate();
+ }
+}
+
//
// Factory
//
diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
new file mode 100644
index 00000000000..72a80a97e1f
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
@@ -0,0 +1,28 @@
+//
+// Copyright 2019 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H
+
+#include
+
+namespace grpc_core {
+
+extern const char* kXdsClusterAttribute;
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H */
diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc
index ce204f030e7..e95d1a779f3 100644
--- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc
+++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc
@@ -322,6 +322,14 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
// later use.
RefCountedPtr config_selector =
ConfigSelector::GetFromChannelArgs(*result.args);
+ // Remove the config selector from channel args so that we're not holding
+ // unnecessary refs that cause it to be destroyed somewhere other than in the
+ // WorkSerializer.
+ const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
+ grpc_channel_args* new_args =
+ grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
+ grpc_channel_args_destroy(result.args);
+ result.args = new_args;
// Create or update LB policy, as needed.
if (service_config_result.lb_policy_config != nullptr) {
CreateOrUpdateLbPolicyLocked(
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index 134263c19af..5d724ee8fc9 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -70,8 +70,8 @@ void grpc_lb_policy_eds_init(void);
void grpc_lb_policy_eds_shutdown(void);
void grpc_lb_policy_lrs_init(void);
void grpc_lb_policy_lrs_shutdown(void);
-void grpc_lb_policy_xds_routing_init(void);
-void grpc_lb_policy_xds_routing_shutdown(void);
+void grpc_lb_policy_xds_cluster_manager_init(void);
+void grpc_lb_policy_xds_cluster_manager_shutdown(void);
void grpc_resolver_xds_init(void);
void grpc_resolver_xds_shutdown(void);
#endif
@@ -126,8 +126,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_eds_shutdown);
grpc_register_plugin(grpc_lb_policy_lrs_init,
grpc_lb_policy_lrs_shutdown);
- grpc_register_plugin(grpc_lb_policy_xds_routing_init,
- grpc_lb_policy_xds_routing_shutdown);
+ grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init,
+ grpc_lb_policy_xds_cluster_manager_shutdown);
grpc_register_plugin(grpc_resolver_xds_init,
grpc_resolver_xds_shutdown);
#endif
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 8ecaf65da44..f68603d8a5e 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -44,7 +44,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
- 'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/proxy_mapper_registry.cc',
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 02264d33d23..8d771d25981 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -1941,7 +1941,7 @@ TEST_P(BasicTest, Vanilla) {
backends_[i]->backend_service()->request_count());
}
// Check LB policy name for the channel.
- EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_routing_experimental"
+ EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_cluster_manager_experimental"
: "eds_experimental"),
channel_->GetLoadBalancingPolicyName());
}
@@ -3684,11 +3684,8 @@ TEST_P(LdsRdsTest, XdsRoutingClusterUpdateClustersWithPickingDelays) {
StartBackend(0);
sending_rpc.join();
// Make sure RPCs go to the correct backend:
- // Before moving routing to XdsConfigSelector, 2 to backend 1;
- // TODO(donnadionne): After moving routing to XdsConfigSelector, 1 for each
- // backend.
- EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
- EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
+ EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
+ EXPECT_EQ(1, backends_[1]->backend_service()->request_count());
}
TEST_P(LdsRdsTest, XdsRoutingHeadersMatching) {
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 342cafcc2f0..ad07812ca84 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1102,7 +1102,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
-src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
+src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \
@@ -1132,6 +1132,7 @@ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
+src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/resolver_factory.h \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 03d1cced318..7eb5f8d9434 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -916,7 +916,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
-src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
+src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \
@@ -949,6 +949,7 @@ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \
src/core/ext/filters/client_channel/resolver/sockaddr/README.md \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
+src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/resolver_factory.h \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
index 7863f5e2571..7ab0898449f 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
@@ -62,7 +62,7 @@ bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client
# Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests.
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
index 69062cc8f49..da306d1b2ae 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
@@ -65,7 +65,7 @@ bazel build test/cpp/interop:xds_interop_client
#
# TODO: remove "path_matching" and "header_matching" from --test_case after
# they are added into "all".
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
index 06f695b2b82..d043c7f7610 100755
--- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
@@ -65,7 +65,7 @@ python tools/run_tests/run_tests.py -l csharp -c opt --build_only
#
# TODO(jtattermusch): remove "path_matching" and "header_matching" from
# --test_case after they are added into "all".
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
index c9c347901f8..01f15b2a53f 100755
--- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
@@ -70,7 +70,7 @@ export CC=/usr/bin/gcc
composer install && \
./bin/generate_proto_php.sh)
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
index 0776a0c4c0e..9b84e4983d9 100644
--- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
@@ -60,7 +60,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py
(cd src/ruby && bundle && rake compile)
-GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
+GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \