Merge pull request #24419 from markdroth/xds_cluster_impl_lb

Move circuit breaking, EDS drops, and load reporting to xds_cluster_impl policy.
pull/24443/head
Mark D. Roth 5 years ago committed by GitHub
commit e05c5da2c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 4
      Makefile
  5. 2
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 2
      doc/environment_variables.md
  9. 2
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 266
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  14. 70
      src/core/ext/filters/client_channel/lb_policy/xds/xds.h
  15. 382
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  16. 8
      src/core/plugin_registry/grpc_plugin_registry.cc
  17. 2
      src/python/grpcio/grpc_core_dependencies.py
  18. 2
      tools/doxygen/Doxyfile.c++.internal
  19. 2
      tools/doxygen/Doxyfile.core.internal
  20. 2
      tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
  21. 2
      tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
  22. 2
      tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
  23. 2
      tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
  24. 2
      tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh

24
BUILD

@ -323,7 +323,7 @@ grpc_cc_library(
"//conditions:default": [ "//conditions:default": [
"grpc_lb_policy_cds", "grpc_lb_policy_cds",
"grpc_lb_policy_eds", "grpc_lb_policy_eds",
"grpc_lb_policy_eds_drop", "grpc_lb_policy_xds_cluster_impl",
"grpc_lb_policy_xds_cluster_manager", "grpc_lb_policy_xds_cluster_manager",
"grpc_resolver_xds", "grpc_resolver_xds",
"grpc_xds_credentials", "grpc_xds_credentials",
@ -1364,14 +1364,24 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "grpc_lb_xds_common",
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
"grpc_xds_client",
],
)
grpc_cc_library( grpc_cc_library(
name = "grpc_lb_policy_eds", name = "grpc_lb_policy_eds",
srcs = [ srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
], ],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
],
external_deps = [ external_deps = [
"absl/strings", "absl/strings",
], ],
@ -1380,14 +1390,15 @@ grpc_cc_library(
"grpc_base", "grpc_base",
"grpc_client_channel", "grpc_client_channel",
"grpc_lb_address_filtering", "grpc_lb_address_filtering",
"grpc_lb_xds_common",
"grpc_xds_client", "grpc_xds_client",
], ],
) )
grpc_cc_library( grpc_cc_library(
name = "grpc_lb_policy_eds_drop", name = "grpc_lb_policy_xds_cluster_impl",
srcs = [ srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc",
], ],
external_deps = [ external_deps = [
"absl/strings", "absl/strings",
@ -1396,6 +1407,7 @@ grpc_cc_library(
deps = [ deps = [
"grpc_base", "grpc_base",
"grpc_client_channel", "grpc_client_channel",
"grpc_lb_xds_common",
"grpc_xds_client", "grpc_xds_client",
], ],
) )

@ -250,8 +250,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc", "src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h", "src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc",
"src/core/ext/filters/client_channel/lb_policy_factory.h", "src/core/ext/filters/client_channel/lb_policy_factory.h",
"src/core/ext/filters/client_channel/lb_policy_registry.cc", "src/core/ext/filters/client_channel/lb_policy_registry.cc",

@ -1445,7 +1445,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc src/core/ext/filters/client_channel/local_subchannel_pool.cc

@ -1846,7 +1846,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \
@ -4599,7 +4599,7 @@ ifneq ($(OPENSSL_DEP),)
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/eds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc: $(OPENSSL_DEP)
src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP) src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc: $(OPENSSL_DEP)

@ -876,7 +876,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc - src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc - src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc - src/core/ext/filters/client_channel/local_subchannel_pool.cc

@ -67,7 +67,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \ src/core/ext/filters/client_channel/local_subchannel_pool.cc \

@ -34,7 +34,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\weighted_target\\weighted_target.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds_drop.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_impl.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_cluster_manager.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " + "src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +

@ -68,7 +68,6 @@ some configuration as environment variables that can be set.
- inproc - traces the in-process transport - inproc - traces the in-process transport
- http_keepalive - traces gRPC keepalive pings - http_keepalive - traces gRPC keepalive pings
- flowctl - traces http2 flow control - flowctl - traces http2 flow control
- lrs_lb - traces lrs LB policy
- op_failure - traces error information when failure is pushed onto a - op_failure - traces error information when failure is pushed onto a
completion queue completion queue
- pick_first - traces the pick first load balancing policy - pick_first - traces the pick first load balancing policy
@ -91,6 +90,7 @@ some configuration as environment variables that can be set.
- weighted_target_lb - traces weighted_target LB policy - weighted_target_lb - traces weighted_target LB policy
- xds_client - traces xds client - xds_client - traces xds client
- xds_cluster_manager_lb - traces cluster manager LB policy - xds_cluster_manager_lb - traces cluster manager LB policy
- xds_cluster_impl_lb - traces cluster impl LB policy
- xds_resolver - traces xds resolver - xds_resolver - traces xds resolver
The following tracers will only run in binaries built in DEBUG mode. This is The following tracers will only run in binaries built in DEBUG mode. This is

@ -235,8 +235,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_factory.h', 'src/core/ext/filters/client_channel/lb_policy_factory.h',
'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc',

@ -153,8 +153,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc )

@ -472,7 +472,7 @@
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc',

@ -133,8 +133,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/cds.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/cds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_factory.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.cc" role="src" />

@ -36,11 +36,9 @@
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
@ -51,23 +49,12 @@ namespace grpc_core {
TraceFlag grpc_lb_eds_trace(false, "eds_lb"); TraceFlag grpc_lb_eds_trace(false, "eds_lb");
const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
namespace { namespace {
constexpr char kEds[] = "eds_experimental"; constexpr char kEds[] = "eds_experimental";
const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
// removed once circuit breaking feature is fully integrated and enabled by
// default.
bool XdsCircuitBreakingEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
// Config for EDS LB policy. // Config for EDS LB policy.
class EdsLbConfig : public LoadBalancingPolicy::Config { class EdsLbConfig : public LoadBalancingPolicy::Config {
public: public:
@ -120,49 +107,6 @@ class EdsLb : public LoadBalancingPolicy {
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
class XdsLocalityAttribute : public ServerAddress::AttributeInterface {
public:
explicit XdsLocalityAttribute(RefCountedPtr<XdsLocalityName> locality_name)
: locality_name_(std::move(locality_name)) {}
RefCountedPtr<XdsLocalityName> locality_name() const {
return locality_name_;
}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<XdsLocalityAttribute>(locality_name_->Ref());
}
int Cmp(const AttributeInterface* other) const override {
const auto* other_locality_attr =
static_cast<const XdsLocalityAttribute*>(other);
return locality_name_->Compare(*other_locality_attr->locality_name_);
}
std::string ToString() const override {
return locality_name_->AsHumanReadableString();
}
private:
RefCountedPtr<XdsLocalityName> locality_name_;
};
class StatsSubchannelWrapper : public DelegatingSubchannel {
public:
StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_stats_(std::move(locality_stats)) {}
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
}
private:
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
class EndpointWatcher : public XdsClient::EndpointWatcherInterface { class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
public: public:
explicit EndpointWatcher(RefCountedPtr<EdsLb> parent) explicit EndpointWatcher(RefCountedPtr<EdsLb> parent)
@ -195,33 +139,6 @@ class EdsLb : public LoadBalancingPolicy {
RefCountedPtr<EdsLb> parent_; RefCountedPtr<EdsLb> parent_;
}; };
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that handles drops.
class EdsPicker : public SubchannelPicker {
public:
explicit EdsPicker(RefCountedPtr<EdsLb> eds_policy);
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<EdsLb> eds_policy_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<ChildPickerWrapper> child_picker_;
bool xds_circuit_breaking_enabled_;
uint32_t max_concurrent_requests_;
};
class Helper : public ChannelControlHelper { class Helper : public ChannelControlHelper {
public: public:
explicit Helper(RefCountedPtr<EdsLb> eds_policy) explicit Helper(RefCountedPtr<EdsLb> eds_policy)
@ -261,7 +178,6 @@ class EdsLb : public LoadBalancingPolicy {
RefCountedPtr<Config> CreateChildPolicyConfigLocked(); RefCountedPtr<Config> CreateChildPolicyConfigLocked();
grpc_channel_args* CreateChildPolicyArgsLocked( grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in); const grpc_channel_args* args_in);
void MaybeUpdateEdsPickerLocked();
// Caller must ensure that config_ is set before calling. // Caller must ensure that config_ is set before calling.
const absl::string_view GetEdsResourceName() const { const absl::string_view GetEdsResourceName() const {
@ -302,109 +218,10 @@ class EdsLb : public LoadBalancingPolicy {
std::vector<size_t /*child_number*/> priority_child_numbers_; std::vector<size_t /*child_number*/> priority_child_numbers_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_; RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
// Current concurrent number of requests;
Atomic<uint32_t> concurrent_requests_{0};
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> child_policy_;
// The latest state and picker returned from the child policy.
grpc_connectivity_state child_state_;
absl::Status child_status_;
RefCountedPtr<ChildPickerWrapper> child_picker_;
}; };
//
// EdsLb::EdsPicker
//
EdsLb::EdsPicker::EdsPicker(RefCountedPtr<EdsLb> eds_policy)
: eds_policy_(std::move(eds_policy)),
drop_stats_(eds_policy_->drop_stats_),
child_picker_(eds_policy_->child_picker_),
xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
max_concurrent_requests_(
eds_policy_->config_->max_concurrent_requests()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] constructed new drop picker %p",
eds_policy_.get(), this);
}
}
EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) {
uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1);
if (xds_circuit_breaking_enabled_) {
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
eds_policy_->concurrent_requests_.FetchSub(1);
if (drop_stats_ != nullptr) {
drop_stats_->AddUncategorizedDrops();
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
}
// If we're not dropping the call, we should always have a child picker.
if (child_picker_ == nullptr) { // Should never happen.
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"eds drop picker not given any child picker"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
eds_policy_->concurrent_requests_.FetchSub(1);
return result;
}
// Not dropping, so delegate to child's picker.
PickResult result = child_picker_->Pick(args);
if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
XdsClusterLocalityStats* locality_stats = nullptr;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
// Record a call started.
locality_stats->AddCallStarted();
// Unwrap subchannel to pass back up the stack.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
EdsLb* eds_policy = static_cast<EdsLb*>(
eds_policy_->Ref(DEBUG_LOCATION, "DropPickPicker+call").release());
auto original_recv_trailing_metadata_ready =
result.recv_trailing_metadata_ready;
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready, eds_policy](
grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
// Record call completion for load reporting.
if (locality_stats != nullptr) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
eds_policy->concurrent_requests_.FetchSub(1);
eds_policy->Unref(DEBUG_LOCATION, "DropPickPicker+call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
}
};
} else {
// TODO(roth): We should ideally also record call failures here in the case
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
eds_policy_->concurrent_requests_.FetchSub(1);
}
return result;
}
// //
// EdsLb::Helper // EdsLb::Helper
// //
@ -412,27 +229,6 @@ EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) {
RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (eds_policy_->shutting_down_) return nullptr; if (eds_policy_->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker.
if (eds_policy_->config_->lrs_load_reporting_server_name().has_value()) {
RefCountedPtr<XdsLocalityName> locality_name;
auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
if (attribute != nullptr) {
const auto* locality_attr =
static_cast<const XdsLocalityAttribute*>(attribute);
locality_name = locality_attr->locality_name();
}
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
eds_policy_->xds_client_->AddClusterLocalityStats(
*eds_policy_->config_->lrs_load_reporting_server_name(),
eds_policy_->config_->cluster_name(),
eds_policy_->config_->eds_service_name(), std::move(locality_name));
return MakeRefCounted<StatsSubchannelWrapper>(
eds_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args),
std::move(locality_stats));
}
// Load reporting not enabled, so don't wrap the subchannel.
return eds_policy_->channel_control_helper()->CreateSubchannel( return eds_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args); std::move(address), args);
} }
@ -444,19 +240,12 @@ void EdsLb::Helper::UpdateState(grpc_connectivity_state state,
return; return;
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "[edslb %p] child policy updated state=%s (%s) picker=%p",
"[edslb %p] child policy updated state=%s (%s) "
"picker=%p",
eds_policy_.get(), ConnectivityStateName(state), eds_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get()); status.ToString().c_str(), picker.get());
} }
// Save the state and picker. eds_policy_->channel_control_helper()->UpdateState(state, status,
eds_policy_->child_state_ = state; std::move(picker));
eds_policy_->child_status_ = status;
eds_policy_->child_picker_ =
MakeRefCounted<ChildPickerWrapper>(std::move(picker));
// Wrap the picker in a EdsPicker and pass it up.
eds_policy_->MaybeUpdateEdsPickerLocked();
} }
void EdsLb::Helper::AddTraceEvent(TraceSeverity severity, void EdsLb::Helper::AddTraceEvent(TraceSeverity severity,
@ -561,11 +350,7 @@ void EdsLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[edslb %p] shutting down", this); gpr_log(GPR_INFO, "[edslb %p] shutting down", this);
} }
shutting_down_ = true; shutting_down_ = true;
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
child_picker_.reset();
MaybeDestroyChildPolicyLocked(); MaybeDestroyChildPolicyLocked();
drop_stats_.reset();
// Cancel watcher. // Cancel watcher.
if (endpoint_watcher_ != nullptr) { if (endpoint_watcher_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
@ -613,28 +398,7 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
grpc_channel_args_destroy(args_); grpc_channel_args_destroy(args_);
args_ = args.args; args_ = args.args;
args.args = nullptr; args.args = nullptr;
const bool lrs_server_changed =
is_initial_update || config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name();
const bool max_concurrent_requests_changed =
is_initial_update || config_->max_concurrent_requests() !=
old_config->max_concurrent_requests();
// Update drop stats for load reporting if needed.
if (lrs_server_changed) {
drop_stats_.reset();
if (config_->lrs_load_reporting_server_name().has_value()) {
const auto key = GetLrsClusterKey();
drop_stats_ = xds_client_->AddClusterDropStats(
config_->lrs_load_reporting_server_name().value(),
key.first /*cluster_name*/, key.second /*eds_service_name*/);
}
}
if (lrs_server_changed || max_concurrent_requests_changed) {
MaybeUpdateEdsPickerLocked();
}
// Update child policy if needed. // Update child policy if needed.
// Note that this comes after updating drop_stats_, since we want that
// to be used by any new picker we create here.
if (child_policy_ != nullptr) UpdateChildPolicyLocked(); if (child_policy_ != nullptr) UpdateChildPolicyLocked();
// Create endpoint watcher if needed. // Create endpoint watcher if needed.
if (is_initial_update) { if (is_initial_update) {
@ -665,7 +429,7 @@ void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) {
// Update the drop config. // Update the drop config.
drop_config_ = std::move(update.drop_config); drop_config_ = std::move(update.drop_config);
// If priority list is empty, add a single priority, just so that we // If priority list is empty, add a single priority, just so that we
// have a child in which to create the eds_drop policy. // have a child in which to create the xds_cluster_impl policy.
if (update.priorities.empty()) update.priorities.emplace_back(); if (update.priorities.empty()) update.priorities.emplace_back();
// Update child policy. // Update child policy.
UpdatePriorityList(std::move(update.priorities)); UpdatePriorityList(std::move(update.priorities));
@ -836,20 +600,21 @@ EdsLb::CreateChildPolicyConfigLocked() {
{"requests_per_million", category.parts_per_million}, {"requests_per_million", category.parts_per_million},
}); });
} }
Json::Object eds_drop_config = { Json::Object xds_cluster_impl_config = {
{"clusterName", std::string(lrs_key.first)}, {"clusterName", std::string(lrs_key.first)},
{"childPolicy", std::move(locality_picking_config)}, {"childPolicy", std::move(locality_picking_config)},
{"dropCategories", std::move(drop_categories)}, {"dropCategories", std::move(drop_categories)},
{"maxConcurrentRequests", config_->max_concurrent_requests()},
}; };
if (!lrs_key.second.empty()) { if (!lrs_key.second.empty()) {
eds_drop_config["edsServiceName"] = std::string(lrs_key.second); xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
} }
if (config_->lrs_load_reporting_server_name().has_value()) { if (config_->lrs_load_reporting_server_name().has_value()) {
eds_drop_config["lrsLoadReportingServerName"] = xds_cluster_impl_config["lrsLoadReportingServerName"] =
config_->lrs_load_reporting_server_name().value(); config_->lrs_load_reporting_server_name().value();
} }
Json locality_picking_policy = Json::Array{Json::Object{ Json locality_picking_policy = Json::Array{Json::Object{
{"eds_drop_experimental", std::move(eds_drop_config)}, {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
}}; }};
// Add priority entry. // Add priority entry.
const size_t child_number = priority_child_numbers_[priority]; const size_t child_number = priority_child_numbers_[priority];
@ -957,15 +722,6 @@ OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void EdsLb::MaybeUpdateEdsPickerLocked() {
// Update only if we have a child picker.
if (child_picker_ != nullptr) {
channel_control_helper()->UpdateState(
child_state_, child_status_,
absl::make_unique<EdsPicker>(Ref(DEBUG_LOCATION, "EdsPicker")));
}
}
// //
// factory // factory
// //

@ -1,26 +1,28 @@
/* //
* // Copyright 2018 gRPC authors.
* Copyright 2018 gRPC authors. //
* // Licensed under the Apache License, Version 2.0 (the "License");
* Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License. // You may obtain a copy of the License at
* You may obtain a copy of the License at //
* // http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0 //
* // Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS,
* distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and // limitations under the License.
* limitations under the License. //
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
/** Channel arg indicating if a target corresponding to the address is a backend /** Channel arg indicating if a target corresponding to the address is a backend
* received from a balancer. The type of this arg is an integer and the value is * received from a balancer. The type of this arg is an integer and the value is
* treated as a bool. */ * treated as a bool. */
@ -29,4 +31,38 @@
#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \ #define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \
"grpc.address_is_backend_from_xds_load_balancer" "grpc.address_is_backend_from_xds_load_balancer"
namespace grpc_core {
// Defined in the EDS policy.
extern const char* kXdsLocalityNameAttributeKey;
class XdsLocalityAttribute : public ServerAddress::AttributeInterface {
public:
explicit XdsLocalityAttribute(RefCountedPtr<XdsLocalityName> locality_name)
: locality_name_(std::move(locality_name)) {}
RefCountedPtr<XdsLocalityName> locality_name() const {
return locality_name_;
}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<XdsLocalityAttribute>(locality_name_->Ref());
}
int Cmp(const AttributeInterface* other) const override {
const auto* other_locality_attr =
static_cast<const XdsLocalityAttribute*>(other);
return locality_name_->Compare(*other_locality_attr->locality_name_);
}
std::string ToString() const override {
return locality_name_->AsHumanReadableString();
}
private:
RefCountedPtr<XdsLocalityName> locality_name_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H */ #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H */

@ -22,11 +22,13 @@
#include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -34,27 +36,41 @@
namespace grpc_core { namespace grpc_core {
TraceFlag grpc_eds_drop_lb_trace(false, "eds_drop_lb"); TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace { namespace {
constexpr char kEdsDrop[] = "eds_drop_experimental"; constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
// removed once circuit breaking feature is fully integrated and enabled by
// default.
bool XdsCircuitBreakingEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
// Config for EDS drop LB policy. // Config for xDS Cluster Impl LB policy.
class EdsDropLbConfig : public LoadBalancingPolicy::Config { class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
public: public:
EdsDropLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy, XdsClusterImplLbConfig(
std::string cluster_name, std::string eds_service_name, RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
absl::optional<std::string> lrs_load_reporting_server_name, std::string cluster_name, std::string eds_service_name,
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config) absl::optional<std::string> lrs_load_reporting_server_name,
uint32_t max_concurrent_requests,
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)
: child_policy_(std::move(child_policy)), : child_policy_(std::move(child_policy)),
cluster_name_(std::move(cluster_name)), cluster_name_(std::move(cluster_name)),
eds_service_name_(std::move(eds_service_name)), eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_name_( lrs_load_reporting_server_name_(
std::move(lrs_load_reporting_server_name)), std::move(lrs_load_reporting_server_name)),
max_concurrent_requests_(max_concurrent_requests),
drop_config_(std::move(drop_config)) {} drop_config_(std::move(drop_config)) {}
const char* name() const override { return kEdsDrop; } const char* name() const override { return kXdsClusterImpl; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const { RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_; return child_policy_;
@ -64,6 +80,9 @@ class EdsDropLbConfig : public LoadBalancingPolicy::Config {
const absl::optional<std::string>& lrs_load_reporting_server_name() const { const absl::optional<std::string>& lrs_load_reporting_server_name() const {
return lrs_load_reporting_server_name_; return lrs_load_reporting_server_name_;
}; };
const uint32_t max_concurrent_requests() const {
return max_concurrent_requests_;
}
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const { RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const {
return drop_config_; return drop_config_;
} }
@ -73,21 +92,38 @@ class EdsDropLbConfig : public LoadBalancingPolicy::Config {
std::string cluster_name_; std::string cluster_name_;
std::string eds_service_name_; std::string eds_service_name_;
absl::optional<std::string> lrs_load_reporting_server_name_; absl::optional<std::string> lrs_load_reporting_server_name_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_; RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
}; };
// EDS Drop LB policy. // xDS Cluster Impl LB policy.
class EdsDropLb : public LoadBalancingPolicy { class XdsClusterImplLb : public LoadBalancingPolicy {
public: public:
EdsDropLb(RefCountedPtr<XdsClient> xds_client, Args args); XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
const char* name() const override { return kEdsDrop; } const char* name() const override { return kXdsClusterImpl; }
void UpdateLocked(UpdateArgs args) override; void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void ResetBackoffLocked() override; void ResetBackoffLocked() override;
private: private:
class StatsSubchannelWrapper : public DelegatingSubchannel {
public:
StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_stats_(std::move(locality_stats)) {}
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
}
private:
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
// A simple wrapper for ref-counting a picker from the child policy. // A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> { class RefCountedPicker : public RefCounted<RefCountedPicker> {
public: public:
@ -100,16 +136,17 @@ class EdsDropLb : public LoadBalancingPolicy {
}; };
// A picker that wraps the picker from the child to perform drops. // A picker that wraps the picker from the child to perform drops.
class DropPicker : public SubchannelPicker { class Picker : public SubchannelPicker {
public: public:
DropPicker(EdsDropLb* eds_drop_lb, RefCountedPtr<RefCountedPicker> picker) Picker(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb,
: drop_config_(eds_drop_lb->config_->drop_config()), RefCountedPtr<RefCountedPicker> picker);
drop_stats_(eds_drop_lb->drop_stats_),
picker_(std::move(picker)) {}
PickResult Pick(PickArgs args); PickResult Pick(PickArgs args);
private: private:
RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb_;
bool xds_circuit_breaking_enabled_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_; RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_; RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<RefCountedPicker> picker_; RefCountedPtr<RefCountedPicker> picker_;
@ -117,10 +154,10 @@ class EdsDropLb : public LoadBalancingPolicy {
class Helper : public ChannelControlHelper { class Helper : public ChannelControlHelper {
public: public:
explicit Helper(RefCountedPtr<EdsDropLb> eds_drop_policy) explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
: eds_drop_policy_(std::move(eds_drop_policy)) {} : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
~Helper() { eds_drop_policy_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
@ -131,10 +168,10 @@ class EdsDropLb : public LoadBalancingPolicy {
absl::string_view message) override; absl::string_view message) override;
private: private:
RefCountedPtr<EdsDropLb> eds_drop_policy_; RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
}; };
~EdsDropLb(); ~XdsClusterImplLb();
void ShutdownLocked() override; void ShutdownLocked() override;
@ -146,7 +183,10 @@ class EdsDropLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked(); void MaybeUpdatePickerLocked();
// Current config from the resolver. // Current config from the resolver.
RefCountedPtr<EdsDropLbConfig> config_; RefCountedPtr<XdsClusterImplLbConfig> config_;
// Current concurrent number of requests;
Atomic<uint32_t> concurrent_requests_{0};
// Internal state. // Internal state.
bool shutting_down_ = false; bool shutting_down_ = false;
@ -166,12 +206,28 @@ class EdsDropLb : public LoadBalancingPolicy {
}; };
// //
// EdsDropLb::DropPicker // XdsClusterImplLb::Picker
// //
LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick( XdsClusterImplLb::Picker::Picker(
RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker)
: xds_cluster_impl_lb_(std::move(xds_cluster_impl_lb)),
xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
max_concurrent_requests_(
xds_cluster_impl_lb_->config_->max_concurrent_requests()),
drop_config_(xds_cluster_impl_lb_->config_->drop_config()),
drop_stats_(xds_cluster_impl_lb_->drop_stats_),
picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
xds_cluster_impl_lb_.get(), this);
}
}
LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) { LoadBalancingPolicy::PickArgs args) {
// Handle drop. // Handle EDS drops.
const std::string* drop_category; const std::string* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) { if (drop_config_->ShouldDrop(&drop_category)) {
if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category); if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
@ -179,41 +235,103 @@ LoadBalancingPolicy::PickResult EdsDropLb::DropPicker::Pick(
result.type = PickResult::PICK_COMPLETE; result.type = PickResult::PICK_COMPLETE;
return result; return result;
} }
// Handle circuit breaking.
uint32_t current = xds_cluster_impl_lb_->concurrent_requests_.FetchAdd(1);
if (xds_circuit_breaking_enabled_) {
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
}
// If we're not dropping the call, we should always have a child picker. // If we're not dropping the call, we should always have a child picker.
if (picker_ == nullptr) { // Should never happen. if (picker_ == nullptr) { // Should never happen.
PickResult result; PickResult result;
result.type = PickResult::PICK_FAILED; result.type = PickResult::PICK_FAILED;
result.error = result.error = grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"eds_drop picker not given any child picker"), "xds_cluster_impl picker not given any child picker"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
return result; return result;
} }
// Not dropping, so delegate to child picker. // Not dropping, so delegate to child picker.
return picker_->Pick(args); PickResult result = picker_->Pick(args);
if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
XdsClusterLocalityStats* locality_stats = nullptr;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
// Record a call started.
locality_stats->AddCallStarted();
// Unwrap subchannel to pass back up the stack.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
XdsClusterImplLb* xds_cluster_impl_lb = static_cast<XdsClusterImplLb*>(
xds_cluster_impl_lb_->Ref(DEBUG_LOCATION, "DropPickPicker+call")
.release());
auto original_recv_trailing_metadata_ready =
result.recv_trailing_metadata_ready;
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready,
xds_cluster_impl_lb](grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
// Record call completion for load reporting.
if (locality_stats != nullptr) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
xds_cluster_impl_lb->concurrent_requests_.FetchSub(1);
xds_cluster_impl_lb->Unref(DEBUG_LOCATION, "DropPickPicker+call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
}
};
} else {
// TODO(roth): We should ideally also record call failures here in the case
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
}
return result;
} }
// //
// EdsDropLb // XdsClusterImplLb
// //
EdsDropLb::EdsDropLb(RefCountedPtr<XdsClient> xds_client, Args args) XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] created -- using xds client %p", this, gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
xds_client_.get()); this, xds_client_.get());
} }
} }
EdsDropLb::~EdsDropLb() { XdsClusterImplLb::~XdsClusterImplLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying eds_drop LB policy", this); gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
this);
} }
} }
void EdsDropLb::ShutdownLocked() { void XdsClusterImplLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] shutting down", this); gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
} }
shutting_down_ = true; shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the // Remove the child policy's interested_parties pollset_set from the
@ -230,35 +348,43 @@ void EdsDropLb::ShutdownLocked() {
xds_client_.reset(); xds_client_.reset();
} }
void EdsDropLb::ExitIdleLocked() { void XdsClusterImplLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
} }
void EdsDropLb::ResetBackoffLocked() { void XdsClusterImplLb::ResetBackoffLocked() {
// The XdsClient will have its backoff reset by the xds resolver, so we // The XdsClient will have its backoff reset by the xds resolver, so we
// don't need to do it here. // don't need to do it here.
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
} }
void EdsDropLb::UpdateLocked(UpdateArgs args) { void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] Received update", this); gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
} }
// Update config. // Update config.
const bool is_initial_update = config_ == nullptr;
auto old_config = std::move(config_); auto old_config = std::move(config_);
config_ = std::move(args.config); config_ = std::move(args.config);
// Update load reporting if needed. // On initial update, create drop stats.
if (old_config == nullptr || if (is_initial_update) {
config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name() ||
config_->cluster_name() != old_config->cluster_name() ||
config_->eds_service_name() != old_config->eds_service_name()) {
drop_stats_.reset();
if (config_->lrs_load_reporting_server_name().has_value()) { if (config_->lrs_load_reporting_server_name().has_value()) {
drop_stats_ = xds_client_->AddClusterDropStats( drop_stats_ = xds_client_->AddClusterDropStats(
config_->lrs_load_reporting_server_name().value(), config_->lrs_load_reporting_server_name().value(),
config_->cluster_name(), config_->eds_service_name()); config_->cluster_name(), config_->eds_service_name());
} }
} else {
// Cluster name, EDS service name, and LRS server name should never
// change, because the EDS policy above us should be swapped out if
// that happens.
GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
GPR_ASSERT(config_->lrs_load_reporting_server_name() ==
old_config->lrs_load_reporting_server_name());
}
// Update picker if max_concurrent_requests has changed.
if (is_initial_update || config_->max_concurrent_requests() !=
old_config->max_concurrent_requests()) {
MaybeUpdatePickerLocked(); MaybeUpdatePickerLocked();
} }
// Update child policy. // Update child policy.
@ -266,14 +392,16 @@ void EdsDropLb::UpdateLocked(UpdateArgs args) {
args.args = nullptr; args.args = nullptr;
} }
void EdsDropLb::MaybeUpdatePickerLocked() { void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// If we're dropping all calls, report READY, regardless of what (or // If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported. // whether) the child has reported.
if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) { if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
auto drop_picker = absl::make_unique<DropPicker>(this, picker_); auto drop_picker =
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker"), picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[eds_drop_lb %p] updating connectivity (drop all): state=READY " "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
"state=READY "
"picker=%p", "picker=%p",
this, drop_picker.get()); this, drop_picker.get());
} }
@ -283,10 +411,12 @@ void EdsDropLb::MaybeUpdatePickerLocked() {
} }
// Otherwise, update only if we have a child picker. // Otherwise, update only if we have a child picker.
if (picker_ != nullptr) { if (picker_ != nullptr) {
auto drop_picker = absl::make_unique<DropPicker>(this, picker_); auto drop_picker =
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker"), picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[eds_drop_lb %p] updating connectivity: state=%s status=(%s) " "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
"status=(%s) "
"picker=%p", "picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(), this, ConnectivityStateName(state_), status_.ToString().c_str(),
drop_picker.get()); drop_picker.get());
@ -296,7 +426,7 @@ void EdsDropLb::MaybeUpdatePickerLocked() {
} }
} }
OrphanablePtr<LoadBalancingPolicy> EdsDropLb::CreateChildPolicyLocked( OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
const grpc_channel_args* args) { const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer(); lb_policy_args.work_serializer = work_serializer();
@ -305,9 +435,10 @@ OrphanablePtr<LoadBalancingPolicy> EdsDropLb::CreateChildPolicyLocked(
absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper")); absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy = OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_eds_drop_lb_trace); &grpc_xds_cluster_impl_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] Created new child policy handler %p", gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] Created new child policy handler %p",
this, lb_policy.get()); this, lb_policy.get());
} }
// Add our interested_parties pollset_set to that of the newly created // Add our interested_parties pollset_set to that of the newly created
@ -318,8 +449,8 @@ OrphanablePtr<LoadBalancingPolicy> EdsDropLb::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses, void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses,
const grpc_channel_args* args) { const grpc_channel_args* args) {
// Create policy if needed. // Create policy if needed.
if (child_policy_ == nullptr) { if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args); child_policy_ = CreateChildPolicyLocked(args);
@ -330,76 +461,105 @@ void EdsDropLb::UpdateChildPolicyLocked(ServerAddressList addresses,
update_args.config = config_->child_policy(); update_args.config = config_->child_policy();
update_args.args = args; update_args.args = args;
// Update the policy. // Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] Updating child policy handler %p", this, gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
child_policy_.get()); child_policy_.get());
} }
child_policy_->UpdateLocked(std::move(update_args)); child_policy_->UpdateLocked(std::move(update_args));
} }
// //
// EdsDropLb::Helper // XdsClusterImplLb::Helper
// //
RefCountedPtr<SubchannelInterface> EdsDropLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (eds_drop_policy_->shutting_down_) return nullptr; if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
return eds_drop_policy_->channel_control_helper()->CreateSubchannel( // If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker.
if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name()
.has_value()) {
RefCountedPtr<XdsLocalityName> locality_name;
auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
if (attribute != nullptr) {
const auto* locality_attr =
static_cast<const XdsLocalityAttribute*>(attribute);
locality_name = locality_attr->locality_name();
}
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
*xds_cluster_impl_policy_->config_
->lrs_load_reporting_server_name(),
xds_cluster_impl_policy_->config_->cluster_name(),
xds_cluster_impl_policy_->config_->eds_service_name(),
std::move(locality_name));
return MakeRefCounted<StatsSubchannelWrapper>(
xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args),
std::move(locality_stats));
}
// Load reporting not enabled, so don't wrap the subchannel.
return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args); std::move(address), args);
} }
void EdsDropLb::Helper::UpdateState(grpc_connectivity_state state, void XdsClusterImplLb::Helper::UpdateState(
const absl::Status& status, grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) { std::unique_ptr<SubchannelPicker> picker) {
if (eds_drop_policy_->shutting_down_) return; if (xds_cluster_impl_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[eds_drop_lb %p] child connectivity state update: state=%s (%s) " "[xds_cluster_impl_lb %p] child connectivity state update: "
"state=%s (%s) "
"picker=%p", "picker=%p",
eds_drop_policy_.get(), ConnectivityStateName(state), xds_cluster_impl_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get()); status.ToString().c_str(), picker.get());
} }
// Save the state and picker. // Save the state and picker.
eds_drop_policy_->state_ = state; xds_cluster_impl_policy_->state_ = state;
eds_drop_policy_->status_ = status; xds_cluster_impl_policy_->status_ = status;
eds_drop_policy_->picker_ = xds_cluster_impl_policy_->picker_ =
MakeRefCounted<RefCountedPicker>(std::move(picker)); MakeRefCounted<RefCountedPicker>(std::move(picker));
// Wrap the picker and return it to the channel. // Wrap the picker and return it to the channel.
eds_drop_policy_->MaybeUpdatePickerLocked(); xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
} }
void EdsDropLb::Helper::RequestReresolution() { void XdsClusterImplLb::Helper::RequestReresolution() {
if (eds_drop_policy_->shutting_down_) return; if (xds_cluster_impl_policy_->shutting_down_) return;
eds_drop_policy_->channel_control_helper()->RequestReresolution(); xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
} }
void EdsDropLb::Helper::AddTraceEvent(TraceSeverity severity, void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) { absl::string_view message) {
if (eds_drop_policy_->shutting_down_) return; if (xds_cluster_impl_policy_->shutting_down_) return;
eds_drop_policy_->channel_control_helper()->AddTraceEvent(severity, message); xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
} }
// //
// factory // factory
// //
class EdsDropLbFactory : public LoadBalancingPolicyFactory { class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error); RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, gpr_log(
"cannot get XdsClient to instantiate eds_drop LB policy: %s", GPR_ERROR,
grpc_error_string(error)); "cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return nullptr; return nullptr;
} }
return MakeOrphanable<EdsDropLb>(std::move(xds_client), std::move(args)); return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
std::move(args));
} }
const char* name() const override { return kEdsDrop; } const char* name() const override { return kXdsClusterImpl; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override { const Json& json, grpc_error** error) const override {
@ -408,7 +568,7 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory {
// This policy was configured in the deprecated loadBalancingPolicy // This policy was configured in the deprecated loadBalancingPolicy
// field or in the client API. // field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:eds_drop policy requires " "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
"configuration. Please use loadBalancingConfig field of service " "configuration. Please use loadBalancingConfig field of service "
"config instead."); "config instead.");
return nullptr; return nullptr;
@ -466,6 +626,18 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory {
lrs_load_reporting_server_name = it->second.string_value(); lrs_load_reporting_server_name = it->second.string_value();
} }
} }
// Max concurrent requests.
uint32_t max_concurrent_requests = 1024;
it = json.object_value().find("maxConcurrentRequests");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_concurrent_requests error:must be of type number"));
} else {
max_concurrent_requests =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
}
// Drop config. // Drop config.
auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>(); auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>();
it = json.object_value().find("dropCategories"); it = json.object_value().find("dropCategories");
@ -482,13 +654,13 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory {
} }
if (!error_list.empty()) { if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR( *error = GRPC_ERROR_CREATE_FROM_VECTOR(
"eds_drop_experimental LB policy config", &error_list); "xds_cluster_impl_experimental LB policy config", &error_list);
return nullptr; return nullptr;
} }
return MakeRefCounted<EdsDropLbConfig>( return MakeRefCounted<XdsClusterImplLbConfig>(
std::move(child_policy), std::move(cluster_name), std::move(child_policy), std::move(cluster_name),
std::move(eds_service_name), std::move(lrs_load_reporting_server_name), std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
std::move(drop_config)); max_concurrent_requests, std::move(drop_config));
} }
private: private:
@ -562,10 +734,10 @@ class EdsDropLbFactory : public LoadBalancingPolicyFactory {
// Plugin registration // Plugin registration
// //
void grpc_lb_policy_eds_drop_init() { void grpc_lb_policy_xds_cluster_impl_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder:: grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory( RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::EdsDropLbFactory>()); absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
} }
void grpc_lb_policy_eds_drop_shutdown() {} void grpc_lb_policy_xds_cluster_impl_shutdown() {}

@ -72,8 +72,8 @@ void grpc_lb_policy_cds_init(void);
void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_cds_shutdown(void);
void grpc_lb_policy_eds_init(void); void grpc_lb_policy_eds_init(void);
void grpc_lb_policy_eds_shutdown(void); void grpc_lb_policy_eds_shutdown(void);
void grpc_lb_policy_eds_drop_init(void); void grpc_lb_policy_xds_cluster_impl_init(void);
void grpc_lb_policy_eds_drop_shutdown(void); void grpc_lb_policy_xds_cluster_impl_shutdown(void);
void grpc_lb_policy_xds_cluster_manager_init(void); void grpc_lb_policy_xds_cluster_manager_init(void);
void grpc_lb_policy_xds_cluster_manager_shutdown(void); void grpc_lb_policy_xds_cluster_manager_shutdown(void);
void grpc_resolver_xds_init(void); void grpc_resolver_xds_init(void);
@ -130,8 +130,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_cds_shutdown); grpc_lb_policy_cds_shutdown);
grpc_register_plugin(grpc_lb_policy_eds_init, grpc_register_plugin(grpc_lb_policy_eds_init,
grpc_lb_policy_eds_shutdown); grpc_lb_policy_eds_shutdown);
grpc_register_plugin(grpc_lb_policy_eds_drop_init, grpc_register_plugin(grpc_lb_policy_xds_cluster_impl_init,
grpc_lb_policy_eds_drop_shutdown); grpc_lb_policy_xds_cluster_impl_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init, grpc_register_plugin(grpc_lb_policy_xds_cluster_manager_init,
grpc_lb_policy_xds_cluster_manager_shutdown); grpc_lb_policy_xds_cluster_manager_shutdown);
grpc_register_plugin(grpc_resolver_xds_init, grpc_register_plugin(grpc_resolver_xds_init,

@ -43,7 +43,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc', 'src/core/ext/filters/client_channel/local_subchannel_pool.cc',

@ -1086,8 +1086,8 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \

@ -914,8 +914,8 @@ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \ src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \

@ -62,7 +62,7 @@ bazel build //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client
# Test cases "path_matching" and "header_matching" are not included in "all", # Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests. # because not all interop clients in all languages support these new tests.
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \ tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \

@ -65,7 +65,7 @@ bazel build test/cpp/interop:xds_interop_client
# #
# TODO: remove "path_matching" and "header_matching" from --test_case after # TODO: remove "path_matching" and "header_matching" from --test_case after
# they are added into "all". # they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \ tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \

@ -65,7 +65,7 @@ python tools/run_tests/run_tests.py -l csharp -c opt --build_only
# #
# TODO(jtattermusch): remove "path_matching" and "header_matching" from # TODO(jtattermusch): remove "path_matching" and "header_matching" from
# --test_case after they are added into "all". # --test_case after they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \ tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \

@ -70,7 +70,7 @@ export CC=/usr/bin/gcc
composer install && \ composer install && \
./bin/generate_proto_php.sh) ./bin/generate_proto_php.sh)
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \ tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \

@ -60,7 +60,7 @@ touch "$TOOLS_DIR"/src/proto/grpc/health/v1/__init__.py
(cd src/ruby && bundle && rake compile) (cd src/ruby && bundle && rake compile)
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,weighted_target_lb,lrs_lb "$PYTHON" \ GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,eds_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \ tools/run_tests/run_xds_tests.py \
--test_case="all,path_matching,header_matching" \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \

Loading…
Cancel
Save