From 7ccdc85fa0b174aca7a869260bb556bb8f9252c3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 29 Oct 2019 12:00:12 -0700 Subject: [PATCH] CDS LB policy --- BUILD | 28 ++ BUILD.gn | 1 + CMakeLists.txt | 4 +- Makefile | 4 +- build.yaml | 18 + config.m4 | 3 +- config.w32 | 3 +- gRPC-Core.podspec | 1 + grpc.gemspec | 3 +- grpc.gyp | 4 +- package.xml | 3 +- .../ext/filters/client_channel/lb_policy.h | 3 + .../client_channel/lb_policy/xds/cds.cc | 368 ++++++++++++++++++ .../ext/filters/client_channel/xds/xds_api.h | 12 +- .../filters/client_channel/xds/xds_client.cc | 44 ++- .../plugin_registry/grpc_plugin_registry.cc | 4 + .../grpc_unsecure_plugin_registry.cc | 4 + src/python/grpcio/grpc_core_dependencies.py | 3 +- test/cpp/end2end/xds_end2end_test.cc | 6 +- tools/doxygen/Doxyfile.core.internal | 1 + 20 files changed, 490 insertions(+), 27 deletions(-) create mode 100644 src/core/ext/filters/client_channel/lb_policy/xds/cds.cc diff --git a/BUILD b/BUILD index 4ba29707c19..aaa38184cfd 100644 --- a/BUILD +++ b/BUILD @@ -320,6 +320,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_grpclb", "grpc_lb_policy_xds", + "grpc_lb_policy_cds", "grpc_resolver_xds", ], ) @@ -337,6 +338,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_grpclb_secure", "grpc_lb_policy_xds_secure", + "grpc_lb_policy_cds_secure", "grpc_resolver_xds_secure", "grpc_secure", "grpc_transport_chttp2_client_secure", @@ -1336,6 +1338,32 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_lb_policy_cds", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_xds_client", + ], +) + +grpc_cc_library( + name = "grpc_lb_policy_cds_secure", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_xds_client_secure", + ], +) + grpc_cc_library( name = "grpc_lb_subchannel_list", hdrs = [ diff --git a/BUILD.gn b/BUILD.gn index 2add67ab5ce..b34da1db3d9 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -244,6 +244,7 @@ config("grpc_config") { "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc", "src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc", "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h", + "src/core/ext/filters/client_channel/lb_policy/xds/cds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds.cc", "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", "src/core/ext/filters/client_channel/lb_policy_factory.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b356e36069..0ebd8815aa2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1403,7 +1403,7 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc + src/core/ext/filters/client_channel/lb_policy/xds/cds.cc src/core/ext/filters/client_channel/xds/xds_api.cc src/core/ext/filters/client_channel/xds/xds_bootstrap.cc src/core/ext/filters/client_channel/xds/xds_channel_secure.cc @@ -1430,6 +1430,7 @@ add_library(grpc src/core/ext/upb-generated/envoy/type/http.upb.c src/core/ext/upb-generated/envoy/type/percent.upb.c src/core/ext/upb-generated/envoy/type/range.upb.c + src/core/ext/filters/client_channel/lb_policy/xds/xds.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -2964,6 +2965,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c + src/core/ext/filters/client_channel/lb_policy/xds/cds.cc src/core/ext/filters/client_channel/lb_policy/xds/xds.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc diff --git a/Makefile b/Makefile index e13f8a26e00..82f6a32b895 100644 --- a/Makefile +++ b/Makefile @@ -3856,7 +3856,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/xds/xds_api.cc \ src/core/ext/filters/client_channel/xds/xds_bootstrap.cc \ src/core/ext/filters/client_channel/xds/xds_channel_secure.cc \ @@ -3883,6 +3883,7 @@ LIBGRPC_SRC = \ src/core/ext/upb-generated/envoy/type/http.upb.c \ src/core/ext/upb-generated/envoy/type/percent.upb.c \ src/core/ext/upb-generated/envoy/type/range.upb.c \ + src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ @@ -5353,6 +5354,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \ + src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ diff --git a/build.yaml b/build.yaml index 5b3290cc353..78a5d32d76a 100644 --- a/build.yaml +++ b/build.yaml @@ -1067,6 +1067,22 @@ filegroups: plugin: grpc_http_filters uses: - grpc_base +- name: grpc_lb_policy_cds + src: + - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc + plugin: grpc_lb_policy_cds + uses: + - grpc_base + - grpc_client_channel + - grpc_xds_client +- name: grpc_lb_policy_cds_secure + src: + - src/core/ext/filters/client_channel/lb_policy/xds/cds.cc + plugin: grpc_lb_policy_cds + uses: + - grpc_base + - grpc_client_channel + - grpc_xds_client_secure - name: grpc_lb_policy_grpclb headers: - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -1683,6 +1699,7 @@ libs: - grpc_transport_chttp2_client_insecure - grpc_transport_inproc - grpc_lb_policy_grpclb_secure + - grpc_lb_policy_cds_secure - grpc_lb_policy_xds_secure - grpc_lb_policy_pick_first - grpc_lb_policy_round_robin @@ -1764,6 +1781,7 @@ libs: - grpc_resolver_fake - grpc_resolver_xds - grpc_lb_policy_grpclb + - grpc_lb_policy_cds - grpc_lb_policy_xds - grpc_lb_policy_pick_first - grpc_lb_policy_round_robin diff --git a/config.m4 b/config.m4 index 6ce4556319f..a27f3dac28c 100644 --- a/config.m4 +++ b/config.m4 @@ -416,7 +416,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \ - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ + src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/xds/xds_api.cc \ src/core/ext/filters/client_channel/xds/xds_bootstrap.cc \ src/core/ext/filters/client_channel/xds/xds_channel_secure.cc \ @@ -443,6 +443,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/upb-generated/envoy/type/http.upb.c \ src/core/ext/upb-generated/envoy/type/percent.upb.c \ src/core/ext/upb-generated/envoy/type/range.upb.c \ + src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ diff --git a/config.w32 b/config.w32 index 6312ac305f6..577cee82202 100644 --- a/config.w32 +++ b/config.w32 @@ -386,7 +386,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\load_balancer_api.cc " + "src\\core\\ext\\upb-generated\\src\\proto\\grpc\\lb\\v1\\load_balancer.upb.c " + "src\\core\\ext\\filters\\client_channel\\resolver\\fake\\fake_resolver.cc " + - "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " + "src\\core\\ext\\filters\\client_channel\\xds\\xds_api.cc " + "src\\core\\ext\\filters\\client_channel\\xds\\xds_bootstrap.cc " + "src\\core\\ext\\filters\\client_channel\\xds\\xds_channel_secure.cc " + @@ -413,6 +413,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\upb-generated\\envoy\\type\\http.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\type\\percent.upb.c " + "src\\core\\ext\\upb-generated\\envoy\\type\\range.upb.c " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index e81e2b17daf..eeb6beff9f9 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -226,6 +226,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', + 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.h', 'src/core/ext/filters/client_channel/lb_policy_factory.h', diff --git a/grpc.gemspec b/grpc.gemspec index b707e23e8ba..d095a9a2372 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -848,7 +848,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc ) s.files += %w( src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c ) s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/cds.cc ) s.files += %w( src/core/ext/filters/client_channel/xds/xds_api.cc ) s.files += %w( src/core/ext/filters/client_channel/xds/xds_bootstrap.cc ) s.files += %w( src/core/ext/filters/client_channel/xds/xds_channel_secure.cc ) @@ -875,6 +875,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/upb-generated/envoy/type/http.upb.c ) s.files += %w( src/core/ext/upb-generated/envoy/type/percent.upb.c ) s.files += %w( src/core/ext/upb-generated/envoy/type/range.upb.c ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc ) diff --git a/grpc.gyp b/grpc.gyp index 8f14b640489..422672ab37c 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -554,7 +554,7 @@ 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', 'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/xds/xds_api.cc', 'src/core/ext/filters/client_channel/xds/xds_bootstrap.cc', 'src/core/ext/filters/client_channel/xds/xds_channel_secure.cc', @@ -581,6 +581,7 @@ 'src/core/ext/upb-generated/envoy/type/http.upb.c', 'src/core/ext/upb-generated/envoy/type/percent.upb.c', 'src/core/ext/upb-generated/envoy/type/range.upb.c', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', @@ -1449,6 +1450,7 @@ 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', 'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c', + 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', diff --git a/package.xml b/package.xml index 5d12d25d009..1f6d4bc53b9 100644 --- a/package.xml +++ b/package.xml @@ -853,7 +853,7 @@ - + @@ -880,6 +880,7 @@ + diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 1a5ea06615c..a037a5070c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -320,6 +320,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { UniquePtr channel_control_helper; /// Channel args. // TODO(roth): Find a better channel args representation for this API. + // TODO(roth): Clarify ownership semantics here -- currently, this + // does not take ownership of args, which is the opposite of how we + // handle them in UpdateArgs. const grpc_channel_args* args = nullptr; }; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc new file mode 100644 index 00000000000..b2969f78ffc --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -0,0 +1,368 @@ +// +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include + +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/service_config.h" +#include "src/core/ext/filters/client_channel/xds/xds_client.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" + +namespace grpc_core { + +TraceFlag grpc_cds_lb_trace(false, "cds_lb"); + +namespace { + +constexpr char kCds[] = "cds_experimental"; + +// Parsed config for this LB policy. +class ParsedCdsConfig : public LoadBalancingPolicy::Config { + public: + explicit ParsedCdsConfig(UniquePtr cluster) + : cluster_(std::move(cluster)) {} + const char* cluster() const { return cluster_.get(); } + const char* name() const override { return kCds; } + + private: + UniquePtr cluster_; +}; + +// CDS LB policy. +class CdsLb : public LoadBalancingPolicy { + public: + explicit CdsLb(Args args); + + const char* name() const override { return kCds; } + + void UpdateLocked(UpdateArgs args) override; + void ResetBackoffLocked() override; + + private: + // Watcher for getting cluster data from XdsClient. + class ClusterWatcher : public XdsClient::ClusterWatcherInterface { + public: + explicit ClusterWatcher(RefCountedPtr parent) + : parent_(std::move(parent)) {} + void OnClusterChanged(CdsUpdate cluster_data) override; + void OnError(grpc_error* error) override; + + private: + RefCountedPtr parent_; + }; + + // Delegating helper to be passed to child policy. + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr parent) : parent_(std::move(parent)) {} + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; + void UpdateState(grpc_connectivity_state state, + UniquePtr picker) override; + void RequestReresolution() override; + void AddTraceEvent(TraceSeverity severity, StringView message) override; + + private: + RefCountedPtr parent_; + }; + + ~CdsLb(); + + void ShutdownLocked() override; + + RefCountedPtr config_; + + // Current channel args from the resolver. + const grpc_channel_args* args_ = nullptr; + + // The xds client. + RefCountedPtr xds_client_; + // A pointer to the cluster watcher, to be used when cancelling the watch. + // Note that this is not owned, so this pointer must never be derefernced. + ClusterWatcher* cluster_watcher_ = nullptr; + + // Child LB policy. + OrphanablePtr child_policy_; + + // Internal state. + bool shutting_down_ = false; +}; + +// +// CdsLb::ClusterWatcher +// + +void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client", + parent_.get()); + } + // Construct config for child policy. + char* lrs_str = nullptr; + if (cluster_data.lrs_load_reporting_server_name != nullptr) { + gpr_asprintf(&lrs_str, " \"lrsLoadReportingServerName\": \"%s\",\n", + cluster_data.lrs_load_reporting_server_name.get()); + } + char* json_str; + gpr_asprintf(&json_str, + "[{\n" + " \"xds_experimental\": {\n" + "%s" + " \"edsServiceName\": \"%s\"\n" + " }\n" + "}]", + (lrs_str == nullptr ? "" : lrs_str), + (cluster_data.eds_service_name == nullptr + ? parent_->config_->cluster() + : cluster_data.eds_service_name.get())); + gpr_free(lrs_str); + UniquePtr json_str_deleter(json_str); + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", + parent_.get(), json_str); + } + grpc_json* json = grpc_json_parse_string(json_str); + if (json == nullptr) { + char* msg; + gpr_asprintf(&msg, "Could not parse LB config: %s", json_str); + OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg)); + gpr_free(msg); + return; + } + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); + grpc_json_destroy(json); + if (error != GRPC_ERROR_NONE) { + OnError(error); + return; + } + // Create child policy if not already present. + if (parent_->child_policy_ == nullptr) { + LoadBalancingPolicy::Args args; + args.combiner = parent_->combiner(); + args.args = parent_->args_; + args.channel_control_helper = MakeUnique(parent_->Ref()); + parent_->child_policy_ = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + "xds_experimental", std::move(args)); + grpc_pollset_set_add_pollset_set( + parent_->child_policy_->interested_parties(), + parent_->interested_parties()); + } + // Update child policy. + UpdateArgs args; + args.config = std::move(config); + args.args = grpc_channel_args_copy(parent_->args_); + parent_->child_policy_->UpdateLocked(std::move(args)); +} + +void CdsLb::ClusterWatcher::OnError(grpc_error* error) { + gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", + parent_.get(), parent_->config_->cluster(), grpc_error_string(error)); + // Go into TRANSIENT_FAILURE if we have not yet created the child + // policy (i.e., we have not yet received data from xds). Otherwise, + // we keep running with the data we had previously. + if (parent_->child_policy_ == nullptr) { + parent_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + MakeUnique(error)); + } else { + GRPC_ERROR_UNREF(error); + } +} + +// +// CdsLb::Helper +// + +RefCountedPtr CdsLb::Helper::CreateSubchannel( + const grpc_channel_args& args) { + if (parent_->shutting_down_) return nullptr; + return parent_->channel_control_helper()->CreateSubchannel(args); +} + +void CdsLb::Helper::UpdateState(grpc_connectivity_state state, + UniquePtr picker) { + if (parent_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] state updated by child: %s", this, + ConnectivityStateName(state)); + } + parent_->channel_control_helper()->UpdateState(state, std::move(picker)); +} + +void CdsLb::Helper::RequestReresolution() { + if (parent_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.", + parent_.get()); + } + parent_->channel_control_helper()->RequestReresolution(); +} + +void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { + if (parent_->shutting_down_) return; + parent_->channel_control_helper()->AddTraceEvent(severity, message); +} + +// +// CdsLb +// + +CdsLb::CdsLb(Args args) + : LoadBalancingPolicy(std::move(args)), + xds_client_(XdsClient::GetFromChannelArgs(*args.args)) { + if (xds_client_ != nullptr && GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] Using xds client %p from channel", this, + xds_client_.get()); + } +} + +CdsLb::~CdsLb() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this); + } + grpc_channel_args_destroy(args_); +} + +void CdsLb::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] shutting down", this); + } + shutting_down_ = true; + if (child_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } + if (xds_client_ != nullptr) { + if (cluster_watcher_ != nullptr) { + xds_client_->CancelClusterDataWatch(StringView(config_->cluster()), + cluster_watcher_); + } + xds_client_.reset(); + } +} + +void CdsLb::ResetBackoffLocked() { + if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); +} + +void CdsLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { + gpr_log(GPR_INFO, "[cdslb %p] received update", this); + } + // Update config. + auto old_config = std::move(config_); + config_ = std::move(args.config); + // Update args. + grpc_channel_args_destroy(args_); + args_ = args.args; + args.args = nullptr; + // If cluster name changed, cancel watcher and restart. + if (old_config == nullptr || + strcmp(old_config->cluster(), config_->cluster()) != 0) { + if (old_config != nullptr) { + xds_client_->CancelClusterDataWatch(StringView(old_config->cluster()), + cluster_watcher_); + } + auto watcher = MakeUnique(Ref()); + cluster_watcher_ = watcher.get(); + xds_client_->WatchClusterData(StringView(config_->cluster()), + std::move(watcher)); + } +} + +// +// factory +// + +class CdsFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } + + const char* name() const override { return kCds; } + + RefCountedPtr ParseLoadBalancingConfig( + const grpc_json* json, grpc_error** error) const override { + GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); + if (json == nullptr) { + // xds was mentioned as a policy in the deprecated loadBalancingPolicy + // field or in the client API. + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:loadBalancingPolicy error:cds policy requires configuration. " + "Please use loadBalancingConfig field of service config instead."); + return nullptr; + } + GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0); + InlinedVector error_list; + const char* cluster = nullptr; + for (const grpc_json* field = json->child; field != nullptr; + field = field->next) { + if (field->key == nullptr) continue; + if (strcmp(field->key, "cluster") == 0) { + if (cluster != nullptr) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:cluster error:Duplicate entry")); + } + if (field->type != GRPC_JSON_STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:cluster error:type should be string")); + continue; + } + cluster = field->value; + } + } + if (cluster == nullptr) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "required field 'cluster' not present")); + } + if (error_list.empty()) { + return MakeRefCounted( + UniquePtr(gpr_strdup(cluster))); + } else { + *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list); + return nullptr; + } + } +}; + +} // namespace + +} // namespace grpc_core + +// +// Plugin registration +// + +void grpc_lb_policy_cds_init() { + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::MakeUnique()); +} + +void grpc_lb_policy_cds_shutdown() {} diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 0410c1d8fbc..af176e23bca 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -136,8 +136,16 @@ struct EdsUpdate { bool drop_all = false; }; -// TODO(juanlishen): Add fields as part of implementing CDS support. -struct CdsUpdate {}; +struct CdsUpdate { + // The name to use in the EDS request. + // If null, the cluster name will be used. + UniquePtr eds_service_name; + // The LRS server to use for load reporting. + // If null, load reporting will be disabled. + // If set to the empty string, will use the same server we obtained + // the CDS data from. + UniquePtr lrs_load_reporting_server_name; +}; // Creates an EDS request querying \a service_name. grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name, diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index a2e7f7d5922..9b2b13d28db 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -1289,14 +1289,27 @@ void XdsClient::Orphan() { Unref(DEBUG_LOCATION, "XdsClient::Orphan()"); } -void XdsClient::WatchClusterData( - StringView /*cluster*/, UniquePtr /*watcher*/) { - // TODO(juanlishen): Implement. +void XdsClient::WatchClusterData(StringView cluster, + UniquePtr watcher) { + ClusterWatcherInterface* w = watcher.get(); + cluster_state_.cluster_watchers[w] = std::move(watcher); + // TODO(juanlishen): Start CDS call if not already started and return + // real data via watcher. + CdsUpdate update; + update.eds_service_name = cluster.dup(); + update.lrs_load_reporting_server_name.reset(gpr_strdup("")); + w->OnClusterChanged(std::move(update)); } -void XdsClient::CancelClusterDataWatch(StringView /*cluster*/, - ClusterWatcherInterface* /*watcher*/) { - // TODO(juanlishen): Implement. +void XdsClient::CancelClusterDataWatch(StringView cluster, + ClusterWatcherInterface* watcher) { + auto it = cluster_state_.cluster_watchers.find(watcher); + if (it != cluster_state_.cluster_watchers.end()) { + cluster_state_.cluster_watchers.erase(it); + } + if (chand_ != nullptr && cluster_state_.cluster_watchers.empty()) { + // TODO(juanlishen): Stop CDS call. + } } void XdsClient::WatchEndpointData(StringView /*cluster*/, @@ -1371,16 +1384,19 @@ void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { XdsClient* self = static_cast(arg); // TODO(roth): When we add support for WeightedClusters, select the // LB policy based on that functionality. - static const char* json = - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"xds_experimental\":{\n" - " \"lrsLoadReportingServerName\": \"\"\n" - " } }\n" - " ]\n" - "}"; + char* json; + gpr_asprintf(&json, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"cds_experimental\":{\n" + " \"cluster\": \"%s\"\n" + " } }\n" + " ]\n" + "}", + self->server_name_.get()); RefCountedPtr service_config = ServiceConfig::Create(json, &error); + gpr_free(json); if (error != GRPC_ERROR_NONE) { self->service_config_watcher_->OnError(error); } else { diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index ebe3def245a..20ad526d837 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -34,6 +34,8 @@ void grpc_resolver_fake_init(void); void grpc_resolver_fake_shutdown(void); void grpc_lb_policy_grpclb_init(void); void grpc_lb_policy_grpclb_shutdown(void); +void grpc_lb_policy_cds_init(void); +void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_pick_first_init(void); @@ -74,6 +76,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_fake_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); + grpc_register_plugin(grpc_lb_policy_cds_init, + grpc_lb_policy_cds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_init, grpc_lb_policy_xds_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index 6668836f02f..bfed2e22ddd 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -42,6 +42,8 @@ void grpc_resolver_xds_init(void); void grpc_resolver_xds_shutdown(void); void grpc_lb_policy_grpclb_init(void); void grpc_lb_policy_grpclb_shutdown(void); +void grpc_lb_policy_cds_init(void); +void grpc_lb_policy_cds_shutdown(void); void grpc_lb_policy_xds_init(void); void grpc_lb_policy_xds_shutdown(void); void grpc_lb_policy_pick_first_init(void); @@ -82,6 +84,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_xds_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); + grpc_register_plugin(grpc_lb_policy_cds_init, + grpc_lb_policy_cds_shutdown); grpc_register_plugin(grpc_lb_policy_xds_init, grpc_lb_policy_xds_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index b52e7bdaed9..d750439eda6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -385,7 +385,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', 'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', + 'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc', 'src/core/ext/filters/client_channel/xds/xds_api.cc', 'src/core/ext/filters/client_channel/xds/xds_bootstrap.cc', 'src/core/ext/filters/client_channel/xds/xds_channel_secure.cc', @@ -412,6 +412,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/upb-generated/envoy/type/http.upb.c', 'src/core/ext/upb-generated/envoy/type/percent.upb.c', 'src/core/ext/upb-generated/envoy/type/range.upb.c', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 01bc7b20c4f..0f501807952 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1069,7 +1069,9 @@ TEST_P(BasicTest, Vanilla) { EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count()); EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count()); // Check LB policy name for the channel. - EXPECT_EQ("xds_experimental", channel_->GetLoadBalancingPolicyName()); + EXPECT_EQ( + (GetParam().use_xds_resolver() ? "cds_experimental" : "xds_experimental"), + channel_->GetLoadBalancingPolicyName()); } TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { @@ -1098,8 +1100,6 @@ TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { // The ADS service got a single request, and sent a single response. EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count()); EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count()); - // Check LB policy name for the channel. - EXPECT_EQ("xds_experimental", channel_->GetLoadBalancingPolicyName()); } // Tests that subchannel sharing works when the same backend is listed multiple diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index ad2aa1c8960..feeffaad1d5 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -913,6 +913,7 @@ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ +src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/lb_policy/xds/xds.h \ src/core/ext/filters/client_channel/lb_policy_factory.h \