Moving XDS Routing functionalities from LB policy into XdsConfigSelector

pull/23659/head
Donna Dionne 4 years ago
parent 61879df215
commit ffb560090a
  1. 15
      BUILD
  2. 3
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 4
      Makefile
  5. 3
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 1
      doc/environment_variables.md
  9. 2
      gRPC-C++.podspec
  10. 4
      gRPC-Core.podspec
  11. 3
      grpc.gemspec
  12. 2
      grpc.gyp
  13. 3
      package.xml
  14. 17
      src/core/ext/filters/client_channel/client_channel.cc
  15. 4
      src/core/ext/filters/client_channel/config_selector.cc
  16. 3
      src/core/ext/filters/client_channel/config_selector.h
  17. 727
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  18. 1141
      src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
  19. 676
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  20. 28
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
  21. 8
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  22. 8
      src/core/plugin_registry/grpc_plugin_registry.cc
  23. 2
      src/python/grpcio/grpc_core_dependencies.py
  24. 9
      test/cpp/end2end/xds_end2end_test.cc
  25. 3
      tools/doxygen/Doxyfile.c++.internal
  26. 3
      tools/doxygen/Doxyfile.core.internal
  27. 2
      tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
  28. 2
      tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
  29. 2
      tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
  30. 2
      tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
  31. 2
      tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh

15
BUILD

@ -332,7 +332,7 @@ grpc_cc_library(
"grpc_lb_policy_cds",
"grpc_lb_policy_eds",
"grpc_lb_policy_lrs",
"grpc_lb_policy_xds_routing",
"grpc_lb_policy_xds_cluster_manager",
"grpc_resolver_xds",
],
},
@ -1380,9 +1380,9 @@ grpc_cc_library(
)
grpc_cc_library(
name = "grpc_lb_policy_xds_routing",
name = "grpc_lb_policy_xds_cluster_manager",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
],
external_deps = [
"absl/strings",
@ -1391,6 +1391,7 @@ grpc_cc_library(
deps = [
"grpc_base",
"grpc_client_channel",
"grpc_resolver_xds_header",
"grpc_xds_api_header",
],
)
@ -1679,6 +1680,14 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_resolver_xds_header",
hdrs = [
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h",
],
language = "c++",
)
grpc_cc_library(
name = "grpc_resolver_xds",
srcs = [

@ -251,7 +251,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
"src/core/ext/filters/client_channel/lb_policy_factory.h",
"src/core/ext/filters/client_channel/lb_policy_registry.cc",
"src/core/ext/filters/client_channel/lb_policy_registry.h",
@ -281,6 +281,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h",
"src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc",
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc",
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h",
"src/core/ext/filters/client_channel/resolver_factory.h",
"src/core/ext/filters/client_channel/resolver_registry.cc",
"src/core/ext/filters/client_channel/resolver_registry.h",

@ -1442,7 +1442,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc
src/core/ext/filters/client_channel/proxy_mapper_registry.cc

@ -1847,7 +1847,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/proxy_mapper_registry.cc \
@ -4522,7 +4522,7 @@ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc: $(OPENSSL_DEP)

@ -408,6 +408,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
- src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h
- src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
- src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
- src/core/ext/filters/client_channel/resolver_factory.h
- src/core/ext/filters/client_channel/resolver_registry.h
- src/core/ext/filters/client_channel/resolver_result_parsing.h
@ -788,7 +789,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc
- src/core/ext/filters/client_channel/proxy_mapper_registry.cc

@ -68,7 +68,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/proxy_mapper_registry.cc \

@ -35,7 +35,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\lrs.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_routing.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\proxy_mapper_registry.cc " +

@ -90,6 +90,7 @@ some configuration as environment variables that can be set.
- tsi - traces tsi transport security
- weighted_target_lb - traces weighted_target LB policy
- xds_client - traces xds client
- xds_cluster_manager_lb - traces cluster manager LB policy
- xds_resolver - traces xds resolver
The following tracers will only run in binaries built in DEBUG mode. This is

@ -241,6 +241,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',
@ -744,6 +745,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',

@ -237,7 +237,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.h',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_factory.h',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.h',
@ -267,6 +267,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_registry.h',
@ -1158,6 +1159,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/client_channel/resolver/dns/dns_resolver_selection.h',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/resolver_factory.h',
'src/core/ext/filters/client_channel/resolver_registry.h',
'src/core/ext/filters/client_channel/resolver_result_parsing.h',

@ -155,7 +155,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.h )
@ -185,6 +185,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver_factory.h )
s.files += %w( src/core/ext/filters/client_channel/resolver_registry.cc )
s.files += %w( src/core/ext/filters/client_channel/resolver_registry.h )

@ -473,7 +473,7 @@
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/proxy_mapper_registry.cc',

@ -135,7 +135,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.h" role="src" />
@ -165,6 +165,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_registry.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver_registry.h" role="src" />

@ -1747,7 +1747,7 @@ void ChannelData::UpdateStateAndPickerLocked(
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Clean the control plane when entering IDLE.
if (picker_ == nullptr) {
if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
health_check_service_name_.reset();
saved_service_config_.reset();
saved_config_selector_.reset();
@ -1797,7 +1797,7 @@ void ChannelData::UpdateStateAndPickerLocked(
// Note: Original value will be destroyed after the lock is released.
picker_.swap(picker);
// Clean the data plane if the updated picker is nullptr.
if (picker_ == nullptr) {
if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
received_service_config_data_ = false;
// Note: We save the objects to unref until after the lock is released.
retry_throttle_data_to_unref = std::move(retry_throttle_data_);
@ -1822,6 +1822,14 @@ void ChannelData::UpdateStateAndPickerLocked(
void ChannelData::UpdateServiceConfigInDataPlaneLocked(
bool service_config_changed,
RefCountedPtr<ConfigSelector> config_selector) {
// If the service config did not change and there is no new ConfigSelector,
// retain the old one (if any).
// TODO(roth): Consider whether this is really the right way to handle
// this. We might instead want to decide this in ApplyServiceConfig()
// where we decide whether to stick with the saved service config.
if (!service_config_changed && config_selector == nullptr) {
config_selector = saved_config_selector_;
}
// Check if ConfigSelector has changed.
const bool config_selector_changed =
saved_config_selector_ != config_selector;
@ -3873,6 +3881,7 @@ class CallData::QueuedPickCanceller {
}
if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
calld->MaybeInvokeConfigSelectorCommitCallback();
calld->MaybeRemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
@ -4162,7 +4171,9 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
connected_subchannel_ =
chand->GetConnectedSubchannelInDataPlane(result.subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
if (retry_committed_) MaybeInvokeConfigSelectorCommitCallback();
if (!enable_retries_ || retry_committed_) {
MaybeInvokeConfigSelectorCommitCallback();
}
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
*error = result.error;

@ -17,12 +17,8 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/lib/channel/channel_args.h"
// Channel arg key for ConfigSelector.
#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
namespace grpc_core {
namespace {

@ -34,6 +34,9 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/transport/metadata_batch.h"
// Channel arg key for ConfigSelector.
#define GRPC_ARG_CONFIG_SELECTOR "grpc.internal.config_selector"
namespace grpc_core {
// Internal API used to allow resolver implementations to override

@ -0,0 +1,727 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/container/inlined_vector.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "re2/re2.h"
#include <grpc/grpc.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
#include "src/core/ext/xds/xds_api.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/transport/error_utils.h"
#define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
namespace grpc_core {
TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
namespace {
constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
// Config for xds_cluster_manager LB policy.
class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
public:
using ClusterMap =
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
XdsClusterManagerLbConfig(ClusterMap cluster_map)
: cluster_map_(std::move(cluster_map)) {}
const char* name() const override { return kXdsClusterManager; }
const ClusterMap& cluster_map() const { return cluster_map_; }
private:
ClusterMap cluster_map_;
};
// xds_cluster_manager LB policy.
class XdsClusterManagerLb : public LoadBalancingPolicy {
public:
explicit XdsClusterManagerLb(Args args);
const char* name() const override { return kXdsClusterManager; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
ChildPickerWrapper(std::string name,
std::unique_ptr<SubchannelPicker> picker)
: name_(std::move(name)), picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
const std::string& name() const { return name_; }
private:
std::string name_;
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using prefix or path matching and then delegates to that
// child's picker.
class ClusterPicker : public SubchannelPicker {
public:
// Maintains a map of cluster names to pickers.
using ClusterMap = std::map<absl::string_view /*cluster_name*/,
RefCountedPtr<ChildPickerWrapper>>;
// It is required that the keys of cluster_map have to live at least as long
// as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map,
RefCountedPtr<XdsClusterManagerLbConfig> config)
: cluster_map_(std::move(cluster_map)), config_(std::move(config)) {}
PickResult Pick(PickArgs args) override;
private:
ClusterMap cluster_map_;
// Take a reference to config so that we can use
// XdsApi::RdsUpdate::RdsRoute::Matchers from it.
RefCountedPtr<XdsClusterManagerLbConfig> config_;
};
// Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
class ClusterChild : public InternallyRefCounted<ClusterChild> {
public:
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
const std::string& name);
~ClusterChild();
void Orphan() override;
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses,
const grpc_channel_args* args);
void ExitIdleLocked();
void ResetBackoffLocked();
void DeactivateLocked();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
: xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
~Helper() { xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
void OnDelayedRemovalTimerLocked(grpc_error* error);
// The owning LB policy.
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
// Points to the corresponding key in children map.
const std::string name_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
bool shutdown_ = false;
};
~XdsClusterManagerLb();
void ShutdownLocked() override;
void UpdateStateLocked();
// Current config from the resolver.
RefCountedPtr<XdsClusterManagerLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
// Children.
std::map<std::string, OrphanablePtr<ClusterChild>> children_;
};
//
// XdsClusterManagerLb::ClusterPicker
//
XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
PickArgs args) {
auto cluster_name =
args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
auto it = cluster_map_.find(cluster_name);
if (it != cluster_map_.end()) {
return it->second->Pick(args);
}
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds cluster manager picker: unknown cluster \"",
cluster_name, "\"")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
//
// XdsClusterManagerLb
//
XdsClusterManagerLb::XdsClusterManagerLb(Args args)
: LoadBalancingPolicy(std::move(args)) {}
XdsClusterManagerLb::~XdsClusterManagerLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(
GPR_INFO,
"[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
this);
}
}
void XdsClusterManagerLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
}
shutting_down_ = true;
children_.clear();
}
void XdsClusterManagerLb::ExitIdleLocked() {
for (auto& p : children_) p.second->ExitIdleLocked();
}
void XdsClusterManagerLb::ResetBackoffLocked() {
for (auto& p : children_) p.second->ResetBackoffLocked();
}
void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
}
// Update config.
config_ = std::move(args.config);
// Deactivate the children not in the new config.
for (const auto& p : children_) {
const std::string& name = p.first;
ClusterChild* child = p.second.get();
if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
child->DeactivateLocked();
}
}
// Add or update the children in the new config.
for (const auto& p : config_->cluster_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = children_.find(name);
if (it == children_.end()) {
it = children_
.emplace(name, MakeOrphanable<ClusterChild>(
Ref(DEBUG_LOCATION, "ClusterChild"), name))
.first;
}
it->second->UpdateLocked(config, args.addresses, args.args);
}
UpdateStateLocked();
}
void XdsClusterManagerLb::UpdateStateLocked() {
// Also count the number of children in each state, to determine the
// overall state.
size_t num_ready = 0;
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
for (const auto& p : children_) {
const auto& child_name = p.first;
const ClusterChild* child = p.second.get();
// Skip the children that are not in the latest update.
if (config_->cluster_map().find(child_name) ==
config_->cluster_map().end()) {
continue;
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
++num_ready;
break;
}
case GRPC_CHANNEL_CONNECTING: {
++num_connecting;
break;
}
case GRPC_CHANNEL_IDLE: {
++num_idle;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
++num_transient_failures;
break;
}
default:
GPR_UNREACHABLE_CODE(return );
}
}
// Determine aggregated connectivity state.
grpc_connectivity_state connectivity_state;
if (num_ready > 0) {
connectivity_state = GRPC_CHANNEL_READY;
} else if (num_connecting > 0) {
connectivity_state = GRPC_CHANNEL_CONNECTING;
} else if (num_idle > 0) {
connectivity_state = GRPC_CHANNEL_IDLE;
} else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
this, ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
absl::Status status;
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
ClusterPicker::ClusterMap cluster_map;
for (const auto& p : config_->cluster_map()) {
const std::string& cluster_name = p.first;
RefCountedPtr<ChildPickerWrapper>& child_picker =
cluster_map[cluster_name];
child_picker = children_[cluster_name]->picker_wrapper();
if (child_picker == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(
GPR_INFO,
"[xds_cluster_manager_lb %p] child %s has not yet returned a "
"picker; creating a QueuePicker.",
this, cluster_name.c_str());
}
child_picker = MakeRefCounted<ChildPickerWrapper>(
cluster_name, absl::make_unique<QueuePicker>(
Ref(DEBUG_LOCATION, "QueuePicker")));
}
}
picker =
absl::make_unique<ClusterPicker>(std::move(cluster_map), config_);
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
picker =
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TRANSIENT_FAILURE from XdsClusterManagerLb"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
status = grpc_error_to_absl_status(error);
picker = absl::make_unique<TransientFailurePicker>(error);
}
channel_control_helper()->UpdateState(connectivity_state, status,
std::move(picker));
}
//
// XdsClusterManagerLb::ClusterChild
//
XdsClusterManagerLb::ClusterChild::ClusterChild(
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
const std::string& name)
: xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
name_(name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
xds_cluster_manager_policy_.get(), this, name_.c_str());
}
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
}
XdsClusterManagerLb::ClusterChild::~ClusterChild() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
"child",
xds_cluster_manager_policy_.get(), this);
}
xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
}
void XdsClusterManagerLb::ClusterChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] ClusterChild %p %s: "
"shutting down child",
xds_cluster_manager_policy_.get(), this, name_.c_str());
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(
child_policy_->interested_parties(),
xds_cluster_manager_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
}
shutdown_ = true;
Unref();
}
OrphanablePtr<LoadBalancingPolicy>
XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer =
xds_cluster_manager_policy_->work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_xds_cluster_manager_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
"new child "
"policy handler %p",
xds_cluster_manager_policy_.get(), this, name_.c_str(),
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(
lb_policy->interested_parties(),
xds_cluster_manager_policy_->interested_parties());
return lb_policy;
}
void XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses, const grpc_channel_args* args) {
if (xds_cluster_manager_policy_->shutting_down_) return;
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_callback_pending_) {
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
update_args.addresses = addresses;
update_args.args = grpc_channel_args_copy(args);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] ClusterChild %p %s: "
"Updating child "
"policy handler %p",
xds_cluster_manager_policy_.get(), this, name_.c_str(),
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
child_policy_->ExitIdleLocked();
}
void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (delayed_removal_timer_callback_pending_ == true) return;
// Set the child weight to 0 so that future picker won't contain this child.
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
grpc_timer_init(&delayed_removal_timer_,
ExecCtx::Get()->Now() +
GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
&on_delayed_removal_timer_);
delayed_removal_timer_callback_pending_ = true;
}
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
void* arg, grpc_error* error) {
ClusterChild* self = static_cast<ClusterChild*>(arg);
GRPC_ERROR_REF(error); // Ref owned by the lambda
self->xds_cluster_manager_policy_->work_serializer()->Run(
[self, error]() { self->OnDelayedRemovalTimerLocked(error); },
DEBUG_LOCATION);
}
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
grpc_error* error) {
delayed_removal_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !shutdown_) {
xds_cluster_manager_policy_->children_.erase(name_);
}
Unref(DEBUG_LOCATION, "ClusterChild+timer");
GRPC_ERROR_UNREF(error);
}
//
// XdsClusterManagerLb::ClusterChild::Helper
//
RefCountedPtr<SubchannelInterface>
XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
return nullptr;
return xds_cluster_manager_child_->xds_cluster_manager_policy_
->channel_control_helper()
->CreateSubchannel(args);
}
void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(
GPR_INFO,
"[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
"picker=%p",
xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
return;
// Cache the picker in the ClusterChild.
xds_cluster_manager_child_->picker_wrapper_ =
MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
std::move(picker));
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY.
if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
xds_cluster_manager_child_->seen_failure_since_ready_ = true;
}
} else {
if (state != GRPC_CHANNEL_READY) return;
xds_cluster_manager_child_->seen_failure_since_ready_ = false;
}
xds_cluster_manager_child_->connectivity_state_ = state;
// Notify the LB policy.
xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
}
void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
return;
xds_cluster_manager_child_->xds_cluster_manager_policy_
->channel_control_helper()
->RequestReresolution();
}
void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
return;
xds_cluster_manager_child_->xds_cluster_manager_policy_
->channel_control_helper()
->AddTraceEvent(severity, message);
}
//
// factory
//
class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
}
const char* name() const override { return kXdsClusterManager; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds_cluster_manager was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:xds_cluster_manager policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
return nullptr;
}
std::vector<grpc_error*> error_list;
XdsClusterManagerLbConfig::ClusterMap cluster_map;
std::set<std::string /*cluster_name*/> clusters_to_be_used;
auto it = json.object_value().find("children");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:children error:required field not present"));
} else if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:children error:type should be object"));
} else {
for (const auto& p : it->second.object_value()) {
const std::string& child_name = p.first;
if (child_name.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:children element error: name cannot be empty"));
continue;
}
RefCountedPtr<LoadBalancingPolicy::Config> child_config;
std::vector<grpc_error*> child_errors =
ParseChildConfig(p.second, &child_config);
if (!child_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:children name:", child_name).c_str());
for (grpc_error* child_error : child_errors) {
error = grpc_error_add_child(error, child_error);
}
error_list.push_back(error);
} else {
cluster_map[child_name] = std::move(child_config);
clusters_to_be_used.insert(child_name);
}
}
}
if (cluster_map.empty()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
}
if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_cluster_manager_experimental LB policy config", &error_list);
return nullptr;
}
return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
}
private:
static std::vector<grpc_error*> ParseChildConfig(
const Json& json,
RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
} else {
grpc_error* parse_error = GRPC_ERROR_NONE;
*child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (*child_config == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
}
return error_list;
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_xds_cluster_manager_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());
}
void grpc_lb_policy_xds_cluster_manager_shutdown() {}

@ -18,7 +18,10 @@
#include <grpc/support/port_platform.h>
#include "absl/strings/match.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "re2/re2.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
@ -30,6 +33,8 @@ namespace grpc_core {
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
const char* kXdsClusterAttribute = "xds_cluster_name";
namespace {
//
@ -42,8 +47,7 @@ class XdsResolver : public Resolver {
: Resolver(std::move(args.work_serializer),
std::move(args.result_handler)),
args_(grpc_channel_args_copy(args.args)),
interested_parties_(args.pollset_set),
config_selector_(MakeRefCounted<XdsConfigSelector>()) {
interested_parties_(args.pollset_set) {
char* path = args.uri->path;
if (path[0] == '/') ++path;
server_name_ = path;
@ -82,47 +86,57 @@ class XdsResolver : public Resolver {
RefCountedPtr<XdsResolver> resolver_;
};
class XdsConfigSelector : public ConfigSelector {
class ClusterState
: public RefCounted<ClusterState, PolymorphicRefCount, false> {
public:
CallConfig GetCallConfig(GetCallConfigArgs args) override {
return CallConfig();
}
using ClusterStateMap =
std::map<std::string, std::unique_ptr<ClusterState>>;
ClusterState(const std::string& cluster_name,
ClusterStateMap* cluster_state_map)
: it_(cluster_state_map
->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
.first) {}
const std::string& cluster() const { return it_->first; }
private:
ClusterStateMap::iterator it_;
};
// Returns the weighted_clusters action name to use from
// weighted_cluster_index_map_ for a WeightedClusters route action.
std::string WeightedClustersActionName(
const std::vector<XdsApi::Route::ClusterWeight>& weighted_clusters);
class XdsConfigSelector : public ConfigSelector {
public:
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
const std::vector<XdsApi::Route>& routes);
~XdsConfigSelector();
CallConfig GetCallConfig(GetCallConfigArgs args) override;
// Updates weighted_cluster_index_map_ that will
// determine the names of the WeightedCluster actions for the current update.
void UpdateWeightedClusterIndexMap(const std::vector<XdsApi::Route>& routes);
private:
struct Route {
XdsApi::Route route;
absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
weighted_cluster_state;
};
using RouteTable = std::vector<Route>;
void MaybeAddCluster(const std::string& name);
// Create the service config generated by the list of routes.
grpc_error* CreateServiceConfig(const std::vector<XdsApi::Route>& routes,
RefCountedPtr<ServiceConfig>* service_config);
RefCountedPtr<XdsResolver> resolver_;
RouteTable route_table_;
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
};
void OnListenerChanged(std::vector<XdsApi::Route> routes);
grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
void OnError(grpc_error* error);
void PropagateUpdate();
void MaybeRemoveUnusedClusters();
std::string server_name_;
const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_;
OrphanablePtr<XdsClient> xds_client_;
RefCountedPtr<XdsConfigSelector> config_selector_;
// 2-level map to store WeightedCluster action names.
// Top level map is keyed by cluster names without weight like a_b_c; bottom
// level map is keyed by cluster names + weights like a10_b50_c40.
struct ClusterNamesInfo {
uint64_t next_index = 0;
std::map<std::string /*cluster names + weights*/,
uint64_t /*policy index number*/>
cluster_weights_map;
};
using WeightedClusterIndexMap =
std::map<std::string /*cluster names*/, ClusterNamesInfo>;
// Cache of action names for WeightedCluster targets in the current
// service config.
WeightedClusterIndexMap weighted_cluster_index_map_;
ClusterState::ClusterStateMap cluster_state_map_;
std::vector<XdsApi::Route> current_update_;
};
//
@ -136,36 +150,14 @@ void XdsResolver::ListenerWatcher::OnListenerChanged(
gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data",
resolver_.get());
}
Result result;
grpc_error* error =
resolver_->CreateServiceConfig(routes, &result.service_config);
if (error != GRPC_ERROR_NONE) {
OnError(error);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s",
resolver_.get(), result.service_config->json_string().c_str());
}
grpc_arg new_args[] = {
resolver_->xds_client_->MakeChannelArg(),
resolver_->config_selector_->MakeChannelArg(),
};
result.args = grpc_channel_args_copy_and_add(resolver_->args_, new_args,
GPR_ARRAY_SIZE(new_args));
resolver_->result_handler()->ReturnResult(std::move(result));
resolver_->OnListenerChanged(std::move(routes));
}
void XdsResolver::ListenerWatcher::OnError(grpc_error* error) {
if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(),
grpc_error_string(error));
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =
grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1);
result.service_config_error = error;
resolver_->result_handler()->ReturnResult(std::move(result));
resolver_->OnError(error);
}
void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
@ -183,316 +175,294 @@ void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
}
//
// XdsResolver
// XdsResolver::XdsConfigSelector
//
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
work_serializer(), interested_parties_, server_name_,
absl::make_unique<ListenerWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s",
grpc_error_string(error));
result_handler()->ReturnError(error);
XdsResolver::XdsConfigSelector::XdsConfigSelector(
RefCountedPtr<XdsResolver> resolver,
const std::vector<XdsApi::Route>& routes)
: resolver_(std::move(resolver)) {
// 1. Construct the route table
// 2 Update resolver's cluster state map
// 3. Construct cluster list to hold on to entries in the cluster state
// map.
// Reserve the necessary entries up-front to avoid reallocation as we add
// elements. This is necessary because the string_view in the entry's
// weighted_cluster_state field points to the memory in the route field, so
// moving the entry in a reallocation will cause the string_view to point to
// invalid data.
route_table_.reserve(routes.size());
for (auto& route : routes) {
route_table_.emplace_back();
auto& route_entry = route_table_.back();
route_entry.route = route;
if (route.weighted_clusters.empty()) {
MaybeAddCluster(route.cluster_name);
} else {
uint32_t end = 0;
for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
MaybeAddCluster(weighted_cluster.name);
end += weighted_cluster.weight;
route_entry.weighted_cluster_state.emplace_back(end,
weighted_cluster.name);
}
}
}
}
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
return absl::StrFormat(
" \"cds:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_name, cluster_name);
XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
clusters_.clear();
resolver_->MaybeRemoveUnusedClusters();
}
std::string CreateServiceConfigRoute(const std::string& action_name,
const XdsApi::Route& route) {
std::vector<std::string> headers;
for (const auto& header : route.matchers.header_matchers) {
std::string header_matcher;
switch (header.type) {
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
header_matcher = absl::StrFormat(" \"exact_match\": \"%s\"",
header.string_matcher);
break;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
header_matcher = absl::StrFormat(" \"regex_match\": \"%s\"",
header.regex_match->pattern());
break;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
header_matcher = absl::StrFormat(
" \"range_match\":{\n"
" \"start\":%d,\n"
" \"end\":%d\n"
" }",
header.range_start, header.range_end);
break;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT:
header_matcher =
absl::StrFormat(" \"present_match\": %s",
header.present_match ? "true" : "false");
break;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
header_matcher = absl::StrFormat(
" \"prefix_match\": \"%s\"", header.string_matcher);
break;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
header_matcher = absl::StrFormat(
" \"suffix_match\": \"%s\"", header.string_matcher);
break;
default:
break;
}
std::vector<std::string> header_parts;
header_parts.push_back(
absl::StrFormat(" { \n"
" \"name\": \"%s\",\n",
header.name));
header_parts.push_back(header_matcher);
if (header.invert_match) {
header_parts.push_back(
absl::StrFormat(",\n"
" \"invert_match\": true"));
void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
if (clusters_.find(name) == clusters_.end()) {
auto it = resolver_->cluster_state_map_.find(name);
if (it == resolver_->cluster_state_map_.end()) {
auto new_cluster_state =
MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
} else {
clusters_[it->second->cluster()] = it->second->Ref();
}
header_parts.push_back(
absl::StrFormat("\n"
" }"));
headers.push_back(absl::StrJoin(header_parts, ""));
}
std::vector<std::string> headers_service_config;
if (!headers.empty()) {
headers_service_config.push_back("\"headers\":[\n");
headers_service_config.push_back(absl::StrJoin(headers, ","));
headers_service_config.push_back(" ],\n");
}
std::string path_match_str;
switch (route.matchers.path_matcher.type) {
}
bool PathMatch(const absl::string_view& path,
const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
switch (path_matcher.type) {
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
path_match_str = absl::StrFormat(
"\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
return absl::StartsWith(path, path_matcher.string_matcher);
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
path_match_str = absl::StrFormat(
"\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
return path == path_matcher.string_matcher;
case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
path_match_str =
absl::StrFormat("\"regex\": \"%s\",\n",
route.matchers.path_matcher.regex_matcher->pattern());
break;
return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
default:
return false;
}
return absl::StrFormat(
" { \n"
" %s"
" %s"
" %s"
" \"action\": \"%s\"\n"
" }",
path_match_str, absl::StrJoin(headers_service_config, ""),
route.matchers.fraction_per_million.has_value()
? absl::StrFormat("\"match_fraction\":%d,\n",
route.matchers.fraction_per_million.value())
: "",
action_name);
}
// Create the service config for one weighted cluster.
std::string CreateServiceConfigActionWeightedCluster(
const std::string& name,
const std::vector<XdsApi::Route::ClusterWeight>& clusters) {
std::vector<std::string> config_parts;
config_parts.push_back(
absl::StrFormat(" \"weighted:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"weighted_target_experimental\":{\n"
" \"targets\":{\n",
name));
std::vector<std::string> weighted_targets;
weighted_targets.reserve(clusters.size());
for (const auto& cluster_weight : clusters) {
weighted_targets.push_back(absl::StrFormat(
" \"%s\":{\n"
" \"weight\":%d,\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_weight.name, cluster_weight.weight, cluster_weight.name));
absl::optional<absl::string_view> GetMetadataValue(
const std::string& target_key, grpc_metadata_batch* initial_metadata,
std::string* concatenated_value) {
// Find all values for the specified key.
GPR_DEBUG_ASSERT(initial_metadata != nullptr);
absl::InlinedVector<absl::string_view, 1> values;
for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
md = md->next) {
absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
if (target_key == key) values.push_back(value);
}
config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
config_parts.push_back(
" }\n"
" }\n"
" } ]\n"
" }");
return absl::StrJoin(config_parts, "");
// If none found, no match.
if (values.empty()) return absl::nullopt;
// If exactly one found, return it as-is.
if (values.size() == 1) return values.front();
// If more than one found, concatenate the values, using
// *concatenated_values as a temporary holding place for the
// concatenated string.
*concatenated_value = absl::StrJoin(values, ",");
return *concatenated_value;
}
struct WeightedClustersKeys {
std::string cluster_names_key;
std::string cluster_weights_key;
};
bool HeaderMatchHelper(
const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
grpc_metadata_batch* initial_metadata) {
std::string concatenated_value;
absl::optional<absl::string_view> value;
// Note: If we ever allow binary headers here, we still need to
// special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
// they are not visible to the LB policy in grpc-go.
if (absl::EndsWith(header_matcher.name, "-bin") ||
header_matcher.name == "grpc-previous-rpc-attempts") {
value = absl::nullopt;
} else if (header_matcher.name == "content-type") {
value = "application/grpc";
} else {
value = GetMetadataValue(header_matcher.name, initial_metadata,
&concatenated_value);
}
if (!value.has_value()) {
if (header_matcher.type ==
XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
return !header_matcher.present_match;
} else {
// For all other header matcher types, we need the header value to
// exist to consider matches.
return false;
}
}
switch (header_matcher.type) {
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
return value.value() == header_matcher.string_matcher;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
int64_t int_value;
if (!absl::SimpleAtoi(value.value(), &int_value)) {
return false;
}
return int_value >= header_matcher.range_start &&
int_value < header_matcher.range_end;
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
return absl::StartsWith(value.value(), header_matcher.string_matcher);
case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
return absl::EndsWith(value.value(), header_matcher.string_matcher);
default:
return false;
}
}
// Returns the cluster names and weights key or the cluster names only key.
WeightedClustersKeys GetWeightedClustersKey(
const std::vector<XdsApi::Route::ClusterWeight>& weighted_clusters) {
std::set<std::string> cluster_names;
std::set<std::string> cluster_weights;
for (const auto& cluster_weight : weighted_clusters) {
cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
cluster_weights.emplace(
absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
bool HeadersMatch(
const std::vector<XdsApi::Route::Matchers::HeaderMatcher>& header_matchers,
grpc_metadata_batch* initial_metadata) {
for (const auto& header_matcher : header_matchers) {
bool match = HeaderMatchHelper(header_matcher, initial_metadata);
if (header_matcher.invert_match) match = !match;
if (!match) return false;
}
return {absl::StrJoin(cluster_names, "_"),
absl::StrJoin(cluster_weights, "_")};
return true;
}
std::string XdsResolver::WeightedClustersActionName(
const std::vector<XdsApi::Route::ClusterWeight>& weighted_clusters) {
WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
auto cluster_names_map_it =
weighted_cluster_index_map_.find(keys.cluster_names_key);
GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
const auto& cluster_weights_map =
cluster_names_map_it->second.cluster_weights_map;
auto cluster_weights_map_it =
cluster_weights_map.find(keys.cluster_weights_key);
GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
return absl::StrFormat("%s_%d", keys.cluster_names_key,
cluster_weights_map_it->second);
bool UnderFraction(const uint32_t fraction_per_million) {
// Generate a random number in [0, 1000000).
const uint32_t random_number = rand() % 1000000;
return random_number < fraction_per_million;
}
void XdsResolver::UpdateWeightedClusterIndexMap(
const std::vector<XdsApi::Route>& routes) {
// Construct a list of unique WeightedCluster
// actions which we need to process: to find action names
std::map<std::string /* cluster_weights_key */,
std::string /* cluster_names_key */>
actions_to_process;
for (const auto& route : routes) {
if (!route.weighted_clusters.empty()) {
WeightedClustersKeys keys =
GetWeightedClustersKey(route.weighted_clusters);
auto action_it = actions_to_process.find(keys.cluster_weights_key);
if (action_it == actions_to_process.end()) {
actions_to_process[std::move(keys.cluster_weights_key)] =
std::move(keys.cluster_names_key);
}
ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
GetCallConfigArgs args) {
for (const auto& entry : route_table_) {
// Path matching.
if (!PathMatch(StringViewFromSlice(*args.path),
entry.route.matchers.path_matcher)) {
continue;
}
}
// First pass of all unique WeightedCluster actions: if the exact same
// weighted target policy (same clusters and weights) appears in the old map,
// then that old action name is taken again and should be moved to the new
// map; any other action names from the old set of actions are candidates for
// reuse.
XdsResolver::WeightedClusterIndexMap new_weighted_cluster_index_map;
for (auto action_it = actions_to_process.begin();
action_it != actions_to_process.end();) {
const std::string& cluster_names_key = action_it->second;
const std::string& cluster_weights_key = action_it->first;
auto old_cluster_names_map_it =
weighted_cluster_index_map_.find(cluster_names_key);
if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
// Add cluster_names_key to the new map and copy next_index.
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
new_cluster_names_info.next_index =
old_cluster_names_map_it->second.next_index;
// Lookup cluster_weights_key in old map.
auto& old_cluster_weights_map =
old_cluster_names_map_it->second.cluster_weights_map;
auto old_cluster_weights_map_it =
old_cluster_weights_map.find(cluster_weights_key);
if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
// same policy found, move from old map to new map.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_map_it->second;
old_cluster_weights_map.erase(old_cluster_weights_map_it);
// This action has been added to new map, so no need to process it
// again.
action_it = actions_to_process.erase(action_it);
continue;
}
// Header Matching.
if (!HeadersMatch(entry.route.matchers.header_matchers,
args.initial_metadata)) {
continue;
}
++action_it;
}
// Second pass of all remaining unique WeightedCluster actions: if clusters
// for a new action are the same as an old unused action, reuse the name. If
// clusters differ, use a brand new name.
for (const auto& action : actions_to_process) {
const std::string& cluster_names_key = action.second;
const std::string& cluster_weights_key = action.first;
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
auto& old_cluster_weights_map =
weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
auto old_cluster_weights_it = old_cluster_weights_map.begin();
if (old_cluster_weights_it != old_cluster_weights_map.end()) {
// There is something to reuse: this action uses the same set
// of clusters as a previous action and that action name is not
// already taken.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_it->second;
// Remove the name from being able to reuse again.
old_cluster_weights_map.erase(old_cluster_weights_it);
// Match fraction check
if (entry.route.matchers.fraction_per_million.has_value() &&
!UnderFraction(entry.route.matchers.fraction_per_million.value())) {
continue;
}
// Found a route match
absl::string_view cluster_name;
if (entry.route.weighted_clusters.empty()) {
cluster_name = entry.route.cluster_name;
} else {
// There is nothing to reuse, take the next index to use and
// increment.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
new_cluster_names_info.next_index++;
const uint32_t key =
rand() %
entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
.first;
// Find the index in weighted clusters corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = entry.weighted_cluster_state.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (entry.weighted_cluster_state[mid].first > key) {
end_index = mid;
} else if (entry.weighted_cluster_state[mid].first < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
cluster_name = entry.weighted_cluster_state[index].second;
}
auto it = clusters_.find(cluster_name);
GPR_ASSERT(it != clusters_.end());
XdsResolver* resolver =
static_cast<XdsResolver*>(resolver_->Ref().release());
ClusterState* cluster_state = it->second->Ref().release();
CallConfig call_config;
call_config.call_attributes[kXdsClusterAttribute] = it->first;
call_config.on_call_committed = [resolver, cluster_state]() {
cluster_state->Unref();
ExecCtx::Run(
// TODO(roth): This hop into the ExecCtx is being done to avoid
// entering the WorkSerializer while holding the client channel data
// plane mutex, since that can lead to deadlocks. However, we should
// not have to solve this problem in each individual ConfigSelector
// implementation. When we have time, we should fix the client channel
// code to avoid this by not invoking the
// CallConfig::on_call_committed callback until after it has released
// the data plane mutex.
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
auto* resolver = static_cast<XdsResolver*>(arg);
resolver->work_serializer()->Run(
[resolver]() {
resolver->MaybeRemoveUnusedClusters();
resolver->Unref();
},
DEBUG_LOCATION);
},
resolver, nullptr),
GRPC_ERROR_NONE);
};
return call_config;
}
return CallConfig();
}
//
// XdsResolver
//
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
work_serializer(), interested_parties_, server_name_,
absl::make_unique<ListenerWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s",
grpc_error_string(error));
result_handler()->ReturnError(error);
}
weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
}
void XdsResolver::OnListenerChanged(std::vector<XdsApi::Route> routes) {
// Save the update in the resolver.
current_update_ = std::move(routes);
// Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
// and ReturnResult.
PropagateUpdate();
}
grpc_error* XdsResolver::CreateServiceConfig(
const std::vector<XdsApi::Route>& routes,
RefCountedPtr<ServiceConfig>* service_config) {
UpdateWeightedClusterIndexMap(routes);
std::vector<std::string> actions_vector;
std::vector<std::string> route_table;
std::set<std::string> actions_set;
for (const auto& route : routes) {
const std::string action_name =
route.weighted_clusters.empty()
? route.cluster_name
: WeightedClustersActionName(route.weighted_clusters);
if (actions_set.find(action_name) == actions_set.end()) {
actions_set.emplace(action_name);
actions_vector.push_back(
route.weighted_clusters.empty()
? CreateServiceConfigActionCluster(action_name)
: CreateServiceConfigActionWeightedCluster(
action_name, route.weighted_clusters));
}
route_table.push_back(CreateServiceConfigRoute(
absl::StrFormat("%s:%s",
route.weighted_clusters.empty() ? "cds" : "weighted",
action_name),
route));
std::vector<std::string> clusters;
for (const auto& cluster : cluster_state_map_) {
clusters.push_back(
absl::StrFormat(" \"%s\":{\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster.first, cluster.first));
}
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":{\n");
config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
config_parts.push_back(
" },\n"
" \"routes\":[\n");
config_parts.push_back(absl::StrJoin(route_table, ",\n"));
" { \"xds_cluster_manager_experimental\":{\n"
" \"children\":{\n");
config_parts.push_back(absl::StrJoin(clusters, ",\n"));
config_parts.push_back(
" ]\n"
" }\n"
" } }\n"
" ]\n"
"}");
@ -502,6 +472,56 @@ grpc_error* XdsResolver::CreateServiceConfig(
return error;
}
void XdsResolver::OnError(grpc_error* error) {
grpc_arg xds_client_arg = xds_client_->MakeChannelArg();
Result result;
result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1);
result.service_config_error = error;
result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::PropagateUpdate() {
// First create XdsConfigSelector, which may add new entries to the cluster
// state map, and then CreateServiceConfig for LB policies.
auto config_selector =
MakeRefCounted<XdsConfigSelector>(Ref(), current_update_);
Result result;
grpc_error* error = CreateServiceConfig(&result.service_config);
if (error != GRPC_ERROR_NONE) {
OnError(error);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
result.service_config->json_string().c_str());
}
grpc_arg new_args[] = {
xds_client_->MakeChannelArg(),
config_selector->MakeChannelArg(),
};
result.args =
grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::MaybeRemoveUnusedClusters() {
bool update_needed = false;
for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
if (it->second->RefIfNonZero()) {
it->second->Unref();
++it;
} else {
update_needed = true;
it = cluster_state_map_.erase(it);
}
}
if (update_needed && xds_client_ != nullptr) {
// Propagate the update by creating XdsConfigSelector, CreateServiceConfig,
// and ReturnResult.
PropagateUpdate();
}
}
//
// Factory
//

@ -0,0 +1,28 @@
//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H
#include <grpc/support/port_platform.h>
namespace grpc_core {
extern const char* kXdsClusterAttribute;
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H */

@ -322,6 +322,14 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
// later use.
RefCountedPtr<ConfigSelector> config_selector =
ConfigSelector::GetFromChannelArgs(*result.args);
// Remove the config selector from channel args so that we're not holding
// unnecessary refs that cause it to be destroyed somewhere other than in the
// WorkSerializer.
const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
grpc_channel_args* new_args =
grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
grpc_channel_args_destroy(result.args);
result.args = new_args;
// Create or update LB policy, as needed.
if (service_config_result.lb_policy_config != nullptr) {
CreateOrUpdateLbPolicyLocked(

@ -70,8 +70,8 @@ void grpc_lb_policy_eds_init(void);
void grpc_lb_policy_eds_shutdown(void);
void grpc_lb_policy_lrs_init(void);
void grpc_lb_policy_lrs_shutdown(void);
void grpc_lb_policy_xds_routing_init(void);
void grpc_lb_policy_xds_routing_shutdown(void);
void grpc_lb_policy_xds_cluster_manager_init(void);
void grpc_lb_policy_xds_cluster_manager_shutdown(void);
void grpc_resolver_xds_init(void);
void grpc_resolver_xds_shutdown(void);
#endif
@ -126,8 +126,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_eds_shutdown);
grpc_register_plugin(grpc_lb_policy_lrs_init,
grpc_lb_policy_lrs_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_routing_init,
grpc_lb_policy_xds_routing_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init,
grpc_lb_policy_xds_cluster_manager_shutdown);
grpc_register_plugin(grpc_resolver_xds_init,
grpc_resolver_xds_shutdown);
#endif

@ -44,7 +44,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/proxy_mapper_registry.cc',

@ -1941,7 +1941,7 @@ TEST_P(BasicTest, Vanilla) {
backends_[i]->backend_service()->request_count());
}
// Check LB policy name for the channel.
EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_routing_experimental"
EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_cluster_manager_experimental"
: "eds_experimental"),
channel_->GetLoadBalancingPolicyName());
}
@ -3684,11 +3684,8 @@ TEST_P(LdsRdsTest, XdsRoutingClusterUpdateClustersWithPickingDelays) {
StartBackend(0);
sending_rpc.join();
// Make sure RPCs go to the correct backend:
// Before moving routing to XdsConfigSelector, 2 to backend 1;
// TODO(donnadionne): After moving routing to XdsConfigSelector, 1 for each
// backend.
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
EXPECT_EQ(1, backends_[0]->backend_service()->request_count());
EXPECT_EQ(1, backends_[1]->backend_service()->request_count());
}
TEST_P(LdsRdsTest, XdsRoutingHeadersMatching) {

@ -1102,7 +1102,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \
@ -1132,6 +1132,7 @@ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/resolver_factory.h \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \

@ -916,7 +916,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \
@ -949,6 +949,7 @@ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \
src/core/ext/filters/client_channel/resolver/sockaddr/README.md \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/resolver_factory.h \
src/core/ext/filters/client_channel/resolver_registry.cc \
src/core/ext/filters/client_channel/resolver_registry.h \

@ -62,7 +62,7 @@ bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client
# Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests.
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \

@ -65,7 +65,7 @@ bazel build test/cpp/interop:xds_interop_client
#
# TODO: remove "path_matching" and "header_matching" from --test_case after
# they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \

@ -65,7 +65,7 @@ python tools/run_tests/run_tests.py -l csharp -c opt --build_only
#
# TODO(jtattermusch): remove "path_matching" and "header_matching" from
# --test_case after they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \

@ -70,7 +70,7 @@ export CC=/usr/bin/gcc
composer install && \
./bin/generate_proto_php.sh)
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \

@ -60,7 +60,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py
(cd src/ruby && bundle && rake compile)
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_routing_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \

Loading…
Cancel
Save