From a0b812c30f0cf2f93fa026de8b9448246ff353a1 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Wed, 15 Jan 2020 19:36:22 -0800 Subject: [PATCH] Add CDS to xds client --- CMakeLists.txt | 7 + Makefile | 21 +- build.yaml | 1 + .../ext/filters/client_channel/lb_policy.cc | 7 +- .../client_channel/lb_policy/xds/cds.cc | 17 +- .../client_channel/lb_policy/xds/xds.cc | 87 ++- .../ext/filters/client_channel/xds/xds_api.cc | 544 ++++++++++----- .../ext/filters/client_channel/xds/xds_api.h | 110 ++- .../filters/client_channel/xds/xds_client.cc | 629 +++++++++++++----- .../filters/client_channel/xds/xds_client.h | 54 +- .../client_channel/xds/xds_client_stats.cc | 13 +- .../client_channel/xds/xds_client_stats.h | 42 +- src/proto/grpc/testing/xds/BUILD | 7 + src/proto/grpc/testing/xds/cds_for_test.proto | 157 +++++ test/cpp/end2end/BUILD | 1 + test/cpp/end2end/xds_end2end_test.cc | 254 +++++-- 16 files changed, 1444 insertions(+), 507 deletions(-) create mode 100644 src/proto/grpc/testing/xds/cds_for_test.proto diff --git a/CMakeLists.txt b/CMakeLists.txt index 5f0f984a1ae..8bd129c868e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -388,6 +388,9 @@ protobuf_generate_grpc_cpp( protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/ads_for_test.proto ) +protobuf_generate_grpc_cpp( + src/proto/grpc/testing/xds/cds_for_test.proto +) protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/eds_for_test.proto ) @@ -16032,6 +16035,10 @@ add_executable(xds_end2end_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/cds_for_test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/cds_for_test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/eds_for_test.pb.h diff --git a/Makefile b/Makefile index 127609ea456..7786e524654 100644 --- a/Makefile +++ b/Makefile @@ -3029,6 +3029,22 @@ $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc: src/proto/grpc/tes $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(PROTOC_PLUGINS_DIR)/grpc_cpp_plugin$(EXECUTABLE_SUFFIX) $< endif +ifeq ($(NO_PROTOC),true) +$(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc: protoc_dep_error +$(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc: protoc_dep_error +else + +$(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc: src/proto/grpc/testing/xds/cds_for_test.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[PROTOC] Generating protobuf CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --cpp_out=$(GENDIR) $< + +$(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc: src/proto/grpc/testing/xds/cds_for_test.proto $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[GRPC] Generating gRPC's protobuf service CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(PROTOC_PLUGINS_DIR)/grpc_cpp_plugin$(EXECUTABLE_SUFFIX) $< +endif + ifeq ($(NO_PROTOC),true) $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.pb.cc: protoc_dep_error $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc: protoc_dep_error @@ -20614,6 +20630,7 @@ endif XDS_END2END_TEST_SRC = \ $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc \ + $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc \ test/cpp/end2end/xds_end2end_test.cc \ @@ -20649,6 +20666,8 @@ endif $(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/xds/ads_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/xds/cds_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/xds/eds_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/xds/lrs_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a @@ -20662,7 +20681,7 @@ ifneq ($(NO_DEPS),true) -include $(XDS_END2END_TEST_OBJS:.o=.dep) endif endif -$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc PUBLIC_HEADERS_MUST_BE_C89_SRC = \ diff --git a/build.yaml b/build.yaml index ce2b2c82616..9159412aed1 100644 --- a/build.yaml +++ b/build.yaml @@ -6091,6 +6091,7 @@ targets: language: c++ src: - src/proto/grpc/testing/xds/ads_for_test.proto + - src/proto/grpc/testing/xds/cds_for_test.proto - src/proto/grpc/testing/xds/eds_for_test.proto - src/proto/grpc/testing/xds/lrs_for_test.proto - test/cpp/end2end/xds_end2end_test.cc diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index c7d756c9ace..a7848d73654 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -44,7 +44,7 @@ LoadBalancingPolicy::~LoadBalancingPolicy() { void LoadBalancingPolicy::Orphan() { ShutdownLocked(); - Unref(); + Unref(DEBUG_LOCATION, "Orphan"); } // @@ -104,7 +104,8 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // ExitIdleLocked(). if (!exit_idle_called_) { exit_idle_called_ = true; - parent_->Ref().release(); // ref held by closure. + // Ref held by closure. + parent_->Ref(DEBUG_LOCATION, "QueuePicker::CallExitIdle").release(); parent_->combiner()->Run( GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr), GRPC_ERROR_NONE); @@ -118,7 +119,7 @@ void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg, grpc_error* /*error*/) { LoadBalancingPolicy* parent = static_cast(arg); parent->ExitIdleLocked(); - parent->Unref(); + parent->Unref(DEBUG_LOCATION, "QueuePicker::CallExitIdle"); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index ae8c5f05571..341d2903891 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -39,13 +39,13 @@ constexpr char kCds[] = "cds_experimental"; // Parsed config for this LB policy. class ParsedCdsConfig : public LoadBalancingPolicy::Config { public: - explicit ParsedCdsConfig(grpc_core::UniquePtr cluster) + explicit ParsedCdsConfig(std::string cluster) : cluster_(std::move(cluster)) {} - const char* cluster() const { return cluster_.get(); } + const char* cluster() const { return cluster_.c_str(); } const char* name() const override { return kCds; } private: - grpc_core::UniquePtr cluster_; + std::string cluster_; }; // CDS LB policy. @@ -119,9 +119,9 @@ void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) { } // Construct config for child policy. char* lrs_str = nullptr; - if (cluster_data.lrs_load_reporting_server_name != nullptr) { + if (cluster_data.lrs_load_reporting_server_name.has_value()) { gpr_asprintf(&lrs_str, " \"lrsLoadReportingServerName\": \"%s\",\n", - cluster_data.lrs_load_reporting_server_name.get()); + cluster_data.lrs_load_reporting_server_name.value().c_str()); } char* json_str; gpr_asprintf(&json_str, @@ -132,9 +132,9 @@ void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) { " }\n" "}]", (lrs_str == nullptr ? "" : lrs_str), - (cluster_data.eds_service_name == nullptr + (cluster_data.eds_service_name.empty() ? parent_->config_->cluster() - : cluster_data.eds_service_name.get())); + : cluster_data.eds_service_name.c_str())); gpr_free(lrs_str); grpc_core::UniquePtr json_str_deleter(json_str); if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { @@ -342,8 +342,7 @@ class CdsFactory : public LoadBalancingPolicyFactory { "required field 'cluster' not present")); } if (error_list.empty()) { - return MakeRefCounted( - grpc_core::UniquePtr(gpr_strdup(cluster))); + return MakeRefCounted(cluster); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list); return nullptr; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 7b9025b3486..59f59fad4c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -78,8 +78,8 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config { public: ParsedXdsConfig(RefCountedPtr child_policy, RefCountedPtr fallback_policy, - grpc_core::UniquePtr eds_service_name, - grpc_core::UniquePtr lrs_load_reporting_server_name) + std::string eds_service_name, + Optional lrs_load_reporting_server_name) : child_policy_(std::move(child_policy)), fallback_policy_(std::move(fallback_policy)), eds_service_name_(std::move(eds_service_name)), @@ -96,17 +96,19 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config { return fallback_policy_; } - const char* eds_service_name() const { return eds_service_name_.get(); }; + const char* eds_service_name() const { + return eds_service_name_.empty() ? nullptr : eds_service_name_.c_str(); + }; - const char* lrs_load_reporting_server_name() const { - return lrs_load_reporting_server_name_.get(); + const Optional& lrs_load_reporting_server_name() const { + return lrs_load_reporting_server_name_; }; private: RefCountedPtr child_policy_; RefCountedPtr fallback_policy_; - grpc_core::UniquePtr eds_service_name_; - grpc_core::UniquePtr lrs_load_reporting_server_name_; + std::string eds_service_name_; + Optional lrs_load_reporting_server_name_; }; class XdsLb : public LoadBalancingPolicy { @@ -160,6 +162,8 @@ class XdsLb : public LoadBalancingPolicy { pickers_(std::move(pickers)), drop_config_(xds_policy_->drop_config_) {} + ~LocalityPicker() { xds_policy_.reset(DEBUG_LOCATION, "LocalityPicker"); } + PickResult Pick(PickArgs args) override; private: @@ -285,6 +289,8 @@ class XdsLb : public LoadBalancingPolicy { LocalityMap(RefCountedPtr xds_policy, uint32_t priority); + ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); } + void UpdateLocked( const XdsPriorityListUpdate::LocalityMap& locality_map_update); void ResetBackoffLocked(); @@ -397,7 +403,7 @@ class XdsLb : public LoadBalancingPolicy { if (config_ != nullptr && config_->eds_service_name() != nullptr) { return config_->eds_service_name(); } - return server_name_.get(); + return server_name_.c_str(); } XdsClient* xds_client() const { @@ -406,7 +412,7 @@ class XdsLb : public LoadBalancingPolicy { } // Server name from target URI. - grpc_core::UniquePtr server_name_; + std::string server_name_; // Current channel args and config from the resolver. const grpc_channel_args* args_ = nullptr; @@ -495,7 +501,7 @@ LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick( XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) { // Handle drop. - const grpc_core::UniquePtr* drop_category; + const std::string* drop_category; if (drop_config_->ShouldDrop(&drop_category)) { xds_policy_->client_stats_.AddCallDropped(*drop_category); PickResult result; @@ -612,6 +618,8 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { explicit EndpointWatcher(RefCountedPtr xds_policy) : xds_policy_(std::move(xds_policy)) {} + ~EndpointWatcher() { xds_policy_.reset(DEBUG_LOCATION, "EndpointWatcher"); } + void OnEndpointChanged(EdsUpdate update) override { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Received EDS update from xds client", @@ -706,11 +714,10 @@ XdsLb::XdsLb(Args args) GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); - server_name_.reset( - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path)); + server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] server name from channel: %s", this, - server_name_.get()); + server_name_.c_str()); } grpc_uri_destroy(uri); } @@ -743,9 +750,12 @@ void XdsLb::ShutdownLocked() { // watcher holds a ref to us. xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()), endpoint_watcher_); - if (config_->lrs_load_reporting_server_name() != nullptr) { + if (config_->lrs_load_reporting_server_name().has_value()) { + // TODO(roth): We should pass the cluster name (in addition to the + // eds_service_name) when adding the client stats. To do so, we need to + // first find a way to plumb the cluster name down into this LB policy. xds_client()->RemoveClientStats( - StringView(config_->lrs_load_reporting_server_name()), + StringView(config_->lrs_load_reporting_server_name().value().c_str()), StringView(eds_service_name()), &client_stats_); } xds_client_from_channel_.reset(); @@ -820,7 +830,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) { xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), endpoint_watcher_); } - auto watcher = MakeUnique(Ref()); + auto watcher = + MakeUnique(Ref(DEBUG_LOCATION, "EndpointWatcher")); endpoint_watcher_ = watcher.get(); xds_client()->WatchEndpointData(StringView(eds_service_name()), std::move(watcher)); @@ -831,21 +842,25 @@ void XdsLb::UpdateLocked(UpdateArgs args) { // all of the pickers whenever load reporting is enabled or disabled // here. if (is_initial_update || - (config_->lrs_load_reporting_server_name() == nullptr) != - (old_config->lrs_load_reporting_server_name() == nullptr) || - (config_->lrs_load_reporting_server_name() != nullptr && - old_config->lrs_load_reporting_server_name() != nullptr && - strcmp(config_->lrs_load_reporting_server_name(), - old_config->lrs_load_reporting_server_name()) != 0)) { + (config_->lrs_load_reporting_server_name().has_value()) != + (old_config->lrs_load_reporting_server_name().has_value()) || + (config_->lrs_load_reporting_server_name().has_value() && + old_config->lrs_load_reporting_server_name().has_value() && + config_->lrs_load_reporting_server_name().value() != + old_config->lrs_load_reporting_server_name().value())) { if (old_config != nullptr && - old_config->lrs_load_reporting_server_name() != nullptr) { + old_config->lrs_load_reporting_server_name().has_value()) { xds_client()->RemoveClientStats( - StringView(old_config->lrs_load_reporting_server_name()), + StringView( + old_config->lrs_load_reporting_server_name().value().c_str()), StringView(old_eds_service_name), &client_stats_); } - if (config_->lrs_load_reporting_server_name() != nullptr) { + if (config_->lrs_load_reporting_server_name().has_value()) { + // TODO(roth): We should pass the cluster name (in addition to the + // eds_service_name) when adding the client stats. To do so, we need to + // first find a way to plumb the cluster name down into this LB policy. xds_client()->AddClientStats( - StringView(config_->lrs_load_reporting_server_name()), + StringView(config_->lrs_load_reporting_server_name().value().c_str()), StringView(eds_service_name()), &client_stats_); } } @@ -1083,7 +1098,7 @@ void XdsLb::PriorityList::MaybeCreateLocalityMapLocked(uint32_t priority) { // Exhausted priorities in the update. if (!priority_list_update().Contains(priority)) return; auto new_locality_map = new LocalityMap( - xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+LocalityMap"), priority); + xds_policy_->Ref(DEBUG_LOCATION, "LocalityMap"), priority); priorities_.emplace_back(OrphanablePtr(new_locality_map)); new_locality_map->UpdateLocked(*priority_list_update().Find(priority)); } @@ -1152,7 +1167,6 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, xds_policy_.get(), priority_); } - GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, grpc_schedule_on_exec_ctx); // Start the failover timer. @@ -1239,9 +1253,10 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { picker_list.push_back(std::make_pair(end, locality->picker_wrapper())); } xds_policy()->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, grpc_core::MakeUnique( - xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), - std::move(picker_list))); + GRPC_CHANNEL_READY, + grpc_core::MakeUnique( + xds_policy_->Ref(DEBUG_LOCATION, "LocalityPicker"), + std::move(picker_list))); } OrphanablePtr @@ -1869,11 +1884,15 @@ class XdsFactory : public LoadBalancingPolicyFactory { } } if (error_list.empty()) { + Optional optional_lrs_load_reporting_server_name; + if (lrs_load_reporting_server_name != nullptr) { + optional_lrs_load_reporting_server_name.set( + std::string(lrs_load_reporting_server_name)); + } return MakeRefCounted( std::move(child_policy), std::move(fallback_policy), - grpc_core::UniquePtr(gpr_strdup(eds_service_name)), - grpc_core::UniquePtr( - gpr_strdup(lrs_load_reporting_server_name))); + eds_service_name == nullptr ? "" : eds_service_name, + std::move(optional_lrs_load_reporting_server_name)); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list); return nullptr; diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index 12b61a2f937..43e9e6957dc 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -25,11 +25,14 @@ #include #include "src/core/ext/filters/client_channel/xds/xds_api.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "envoy/api/v2/cds.upb.h" #include "envoy/api/v2/core/address.upb.h" #include "envoy/api/v2/core/base.upb.h" +#include "envoy/api/v2/core/config_source.upb.h" #include "envoy/api/v2/core/health_check.upb.h" #include "envoy/api/v2/discovery.upb.h" #include "envoy/api/v2/eds.upb.h" @@ -40,19 +43,12 @@ #include "google/protobuf/any.upb.h" #include "google/protobuf/duration.upb.h" #include "google/protobuf/struct.upb.h" -#include "google/protobuf/timestamp.upb.h" #include "google/protobuf/wrappers.upb.h" +#include "google/rpc/status.upb.h" #include "upb/upb.h" namespace grpc_core { -namespace { - -constexpr char kEdsTypeUrl[] = - "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; - -} // namespace - bool XdsPriorityListUpdate::operator==( const XdsPriorityListUpdate& other) const { if (priorities_.size() != other.priorities_.size()) return false; @@ -88,8 +84,7 @@ bool XdsPriorityListUpdate::Contains( return false; } -bool XdsDropConfig::ShouldDrop( - const grpc_core::UniquePtr** category_name) const { +bool XdsDropConfig::ShouldDrop(const std::string** category_name) const { for (size_t i = 0; i < drop_category_list_.size(); ++i) { const auto& drop_category = drop_category_list_[i]; // Generate a random number in [0, 1000000). @@ -199,20 +194,141 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node, } // namespace -grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name, - const XdsBootstrap::Node* node, - const char* build_version) { +grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode( + const std::string& type_url, const std::string& nonce, grpc_error* error) { upb::Arena arena; // Create a request. envoy_api_v2_DiscoveryRequest* request = envoy_api_v2_DiscoveryRequest_new(arena.ptr()); - envoy_api_v2_core_Node* node_msg = - envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); - PopulateNode(arena.ptr(), node, build_version, node_msg); - envoy_api_v2_DiscoveryRequest_add_resource_names( - request, upb_strview_makez(server_name), arena.ptr()); + // Set type_url. + envoy_api_v2_DiscoveryRequest_set_type_url( + request, upb_strview_makez(type_url.c_str())); + // Set nonce. + envoy_api_v2_DiscoveryRequest_set_response_nonce( + request, upb_strview_makez(nonce.c_str())); + // Set error_detail. + grpc_slice error_description_slice; + GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, + &error_description_slice)); + upb_strview error_description_strview = + upb_strview_make(reinterpret_cast( + GPR_SLICE_START_PTR(error_description_slice)), + GPR_SLICE_LENGTH(error_description_slice)); + google_rpc_Status* error_detail = + envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena.ptr()); + google_rpc_Status_set_message(error_detail, error_description_strview); + GRPC_ERROR_UNREF(error); + // Encode the request. + size_t output_length; + char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(), + &output_length); + return grpc_slice_from_copied_buffer(output, output_length); +} + +grpc_slice XdsCdsRequestCreateAndEncode( + const std::set& cluster_names, const XdsBootstrap::Node* node, + const char* build_version, const std::string& version, + const std::string& nonce, grpc_error* error) { + upb::Arena arena; + // Create a request. + envoy_api_v2_DiscoveryRequest* request = + envoy_api_v2_DiscoveryRequest_new(arena.ptr()); + // Set version_info. + if (!version.empty()) { + envoy_api_v2_DiscoveryRequest_set_version_info( + request, upb_strview_makez(version.c_str())); + } + // Populate node. + if (build_version != nullptr) { + envoy_api_v2_core_Node* node_msg = + envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); + PopulateNode(arena.ptr(), node, build_version, node_msg); + } + // Add resource_names. + for (const auto& cluster_name : cluster_names) { + envoy_api_v2_DiscoveryRequest_add_resource_names( + request, upb_strview_make(cluster_name.data(), cluster_name.size()), + arena.ptr()); + } + // Set type_url. + envoy_api_v2_DiscoveryRequest_set_type_url(request, + upb_strview_makez(kCdsTypeUrl)); + // Set nonce. + if (!nonce.empty()) { + envoy_api_v2_DiscoveryRequest_set_response_nonce( + request, upb_strview_makez(nonce.c_str())); + } + // Set error_detail if it's a NACK. + if (error != GRPC_ERROR_NONE) { + grpc_slice error_description_slice; + GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, + &error_description_slice)); + upb_strview error_description_strview = + upb_strview_make(reinterpret_cast( + GPR_SLICE_START_PTR(error_description_slice)), + GPR_SLICE_LENGTH(error_description_slice)); + google_rpc_Status* error_detail = + envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, + arena.ptr()); + google_rpc_Status_set_message(error_detail, error_description_strview); + GRPC_ERROR_UNREF(error); + } + // Encode the request. + size_t output_length; + char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(), + &output_length); + return grpc_slice_from_copied_buffer(output, output_length); +} + +grpc_slice XdsEdsRequestCreateAndEncode( + const std::set& eds_service_names, + const XdsBootstrap::Node* node, const char* build_version, + const std::string& version, const std::string& nonce, grpc_error* error) { + upb::Arena arena; + // Create a request. + envoy_api_v2_DiscoveryRequest* request = + envoy_api_v2_DiscoveryRequest_new(arena.ptr()); + // Set version_info. + if (!version.empty()) { + envoy_api_v2_DiscoveryRequest_set_version_info( + request, upb_strview_makez(version.c_str())); + } + // Populate node. + if (build_version != nullptr) { + envoy_api_v2_core_Node* node_msg = + envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr()); + PopulateNode(arena.ptr(), node, build_version, node_msg); + } + // Add resource_names. + for (const auto& eds_service_name : eds_service_names) { + envoy_api_v2_DiscoveryRequest_add_resource_names( + request, + upb_strview_make(eds_service_name.data(), eds_service_name.size()), + arena.ptr()); + } + // Set type_url. envoy_api_v2_DiscoveryRequest_set_type_url(request, upb_strview_makez(kEdsTypeUrl)); + // Set nonce. + if (!nonce.empty()) { + envoy_api_v2_DiscoveryRequest_set_response_nonce( + request, upb_strview_makez(nonce.c_str())); + } + // Set error_detail if it's a NACK. + if (error != GRPC_ERROR_NONE) { + grpc_slice error_description_slice; + GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, + &error_description_slice)); + upb_strview error_description_strview = + upb_strview_make(reinterpret_cast( + GPR_SLICE_START_PTR(error_description_slice)), + GPR_SLICE_LENGTH(error_description_slice)); + google_rpc_Status* error_detail = + envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, + arena.ptr()); + google_rpc_Status_set_message(error_detail, error_description_strview); + GRPC_ERROR_UNREF(error); + } // Encode the request. size_t output_length; char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(), @@ -220,6 +336,76 @@ grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name, return grpc_slice_from_copied_buffer(output, output_length); } +grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response, + CdsUpdateMap* cds_update_map, upb_arena* arena) { + // Get the resources from the response. + size_t size; + const google_protobuf_Any* const* resources = + envoy_api_v2_DiscoveryResponse_resources(response, &size); + if (size < 1) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "CDS response contains 0 resource."); + } + // Parse all the resources in the CDS response. + for (size_t i = 0; i < size; ++i) { + CdsUpdate cds_update; + // Check the type_url of the resource. + const upb_strview type_url = google_protobuf_Any_type_url(resources[i]); + if (!upb_strview_eql(type_url, upb_strview_makez(kCdsTypeUrl))) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not CDS."); + } + // Decode the cluster. + const upb_strview encoded_cluster = google_protobuf_Any_value(resources[i]); + const envoy_api_v2_Cluster* cluster = envoy_api_v2_Cluster_parse( + encoded_cluster.data, encoded_cluster.size, arena); + if (cluster == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster."); + } + // Check the cluster_discovery_type. + if (!envoy_api_v2_Cluster_has_type(cluster)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found."); + } + if (envoy_api_v2_Cluster_type(cluster) != envoy_api_v2_Cluster_EDS) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not EDS."); + } + // Check the EDS config source. + const envoy_api_v2_Cluster_EdsClusterConfig* eds_cluster_config = + envoy_api_v2_Cluster_eds_cluster_config(cluster); + const envoy_api_v2_core_ConfigSource* eds_config = + envoy_api_v2_Cluster_EdsClusterConfig_eds_config(eds_cluster_config); + if (!envoy_api_v2_core_ConfigSource_has_ads(eds_config)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("ConfigSource is not ADS."); + } + // Record EDS service_name (if any). + upb_strview service_name = + envoy_api_v2_Cluster_EdsClusterConfig_service_name(eds_cluster_config); + if (service_name.size != 0) { + cds_update.eds_service_name = + std::string(service_name.data, service_name.size); + } + // Check the LB policy. + if (envoy_api_v2_Cluster_lb_policy(cluster) != + envoy_api_v2_Cluster_ROUND_ROBIN) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LB policy is not ROUND_ROBIN."); + } + // Record LRS server name (if any). + const envoy_api_v2_core_ConfigSource* lrs_server = + envoy_api_v2_Cluster_lrs_server(cluster); + if (lrs_server != nullptr) { + if (!envoy_api_v2_core_ConfigSource_has_self(lrs_server)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "ConfigSource is not self."); + } + cds_update.lrs_load_reporting_server_name.set(""); + } + upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster); + cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size), + std::move(cds_update)); + } + return GRPC_ERROR_NONE; +} + namespace { grpc_error* ServerAddressParseAndAppend( @@ -257,17 +443,6 @@ grpc_error* ServerAddressParseAndAppend( return GRPC_ERROR_NONE; } -namespace { - -grpc_core::UniquePtr StringCopy(const upb_strview& strview) { - char* str = static_cast(gpr_malloc(strview.size + 1)); - memcpy(str, strview.data, strview.size); - str[strview.size] = '\0'; - return grpc_core::UniquePtr(str); -} - -} // namespace - grpc_error* LocalityParse( const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints, XdsPriorityListUpdate::LocalityMap::Locality* output_locality) { @@ -284,10 +459,12 @@ grpc_error* LocalityParse( // Parse locality name. const envoy_api_v2_core_Locality* locality = envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints); + upb_strview region = envoy_api_v2_core_Locality_region(locality); + upb_strview zone = envoy_api_v2_core_Locality_region(locality); + upb_strview sub_zone = envoy_api_v2_core_Locality_sub_zone(locality); output_locality->name = MakeRefCounted( - StringCopy(envoy_api_v2_core_Locality_region(locality)), - StringCopy(envoy_api_v2_core_Locality_zone(locality)), - StringCopy(envoy_api_v2_core_Locality_sub_zone(locality))); + std::string(region.data, region.size), std::string(zone.data, zone.size), + std::string(sub_zone.data, sub_zone.size)); // Parse the addresses. size_t size; const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints = @@ -338,30 +515,15 @@ grpc_error* DropParseAndAppend( // Cap numerator to 1000000. numerator = GPR_MIN(numerator, 1000000); if (numerator == 1000000) *drop_all = true; - drop_config->AddCategory(StringCopy(category), numerator); + drop_config->AddCategory(std::string(category.data, category.size), + numerator); return GRPC_ERROR_NONE; } -} // namespace - -grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, - EdsUpdate* update) { - upb::Arena arena; - // Decode the response. - const envoy_api_v2_DiscoveryResponse* response = - envoy_api_v2_DiscoveryResponse_parse( - reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), - GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); - // Parse the response. - if (response == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found."); - } - // Check the type_url of the response. - upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response); - upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl); - if (!upb_strview_eql(type_url, expected_type_url)) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS."); - } +grpc_error* EdsResponsedParse( + const envoy_api_v2_DiscoveryResponse* response, + const std::set& expected_eds_service_names, + EdsUpdateMap* eds_update_map, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = @@ -370,48 +532,115 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "EDS response contains 0 resource."); } - // Check the type_url of the resource. - type_url = google_protobuf_Any_type_url(resources[0]); - if (!upb_strview_eql(type_url, expected_type_url)) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS."); - } - // Get the cluster_load_assignment. - upb_strview encoded_cluster_load_assignment = - google_protobuf_Any_value(resources[0]); - envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment = - envoy_api_v2_ClusterLoadAssignment_parse( - encoded_cluster_load_assignment.data, - encoded_cluster_load_assignment.size, arena.ptr()); - // Get the endpoints. - const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints = - envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment, - &size); for (size_t i = 0; i < size; ++i) { - XdsPriorityListUpdate::LocalityMap::Locality locality; - grpc_error* error = LocalityParse(endpoints[i], &locality); - if (error != GRPC_ERROR_NONE) return error; - // Filter out locality with weight 0. - if (locality.lb_weight == 0) continue; - update->priority_list_update.Add(locality); - } - // Get the drop config. - update->drop_config = MakeRefCounted(); - const envoy_api_v2_ClusterLoadAssignment_Policy* policy = - envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment); - if (policy != nullptr) { - const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const* - drop_overload = - envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy, - &size); - for (size_t i = 0; i < size; ++i) { - grpc_error* error = DropParseAndAppend( - drop_overload[i], update->drop_config.get(), &update->drop_all); + EdsUpdate eds_update; + // Check the type_url of the resource. + upb_strview type_url = google_protobuf_Any_type_url(resources[i]); + if (!upb_strview_eql(type_url, upb_strview_makez(kEdsTypeUrl))) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS."); + } + // Get the cluster_load_assignment. + upb_strview encoded_cluster_load_assignment = + google_protobuf_Any_value(resources[i]); + envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment = + envoy_api_v2_ClusterLoadAssignment_parse( + encoded_cluster_load_assignment.data, + encoded_cluster_load_assignment.size, arena); + if (cluster_load_assignment == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't parse cluster_load_assignment."); + } + // Check the cluster name (which actually means eds_service_name). Ignore + // unexpected names. + upb_strview cluster_name = envoy_api_v2_ClusterLoadAssignment_cluster_name( + cluster_load_assignment); + StringView cluster_name_strview(cluster_name.data, cluster_name.size); + if (expected_eds_service_names.find(cluster_name_strview) == + expected_eds_service_names.end()) { + continue; + } + // Get the endpoints. + size_t locality_size; + const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints = + envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment, + &locality_size); + for (size_t j = 0; j < locality_size; ++j) { + XdsPriorityListUpdate::LocalityMap::Locality locality; + grpc_error* error = LocalityParse(endpoints[j], &locality); if (error != GRPC_ERROR_NONE) return error; + // Filter out locality with weight 0. + if (locality.lb_weight == 0) continue; + eds_update.priority_list_update.Add(locality); + } + // Get the drop config. + eds_update.drop_config = MakeRefCounted(); + const envoy_api_v2_ClusterLoadAssignment_Policy* policy = + envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment); + if (policy != nullptr) { + size_t drop_size; + const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const* + drop_overload = + envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads( + policy, &drop_size); + for (size_t j = 0; j < drop_size; ++j) { + grpc_error* error = + DropParseAndAppend(drop_overload[j], eds_update.drop_config.get(), + &eds_update.drop_all); + if (error != GRPC_ERROR_NONE) return error; + } + } + // Validate the update content. + if (eds_update.priority_list_update.empty() && !eds_update.drop_all) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "EDS response doesn't contain any valid " + "locality but doesn't require to drop all calls."); } + eds_update_map->emplace(std::string(cluster_name.data, cluster_name.size), + std::move(eds_update)); } return GRPC_ERROR_NONE; } +} // namespace + +grpc_error* XdsAdsResponseDecodeAndParse( + const grpc_slice& encoded_response, + const std::set& expected_eds_service_names, + CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map, + std::string* version, std::string* nonce, std::string* type_url) { + upb::Arena arena; + // Decode the response. + const envoy_api_v2_DiscoveryResponse* response = + envoy_api_v2_DiscoveryResponse_parse( + reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), + GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); + // If decoding fails, output an empty type_url and return. + if (response == nullptr) { + *type_url = ""; + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't decode the whole response."); + } + // Record the type_url, the version_info, and the nonce of the response. + upb_strview type_url_strview = + envoy_api_v2_DiscoveryResponse_type_url(response); + *type_url = std::string(type_url_strview.data, type_url_strview.size); + upb_strview version_info = + envoy_api_v2_DiscoveryResponse_version_info(response); + *version = std::string(version_info.data, version_info.size); + upb_strview nonce_strview = envoy_api_v2_DiscoveryResponse_nonce(response); + *nonce = std::string(nonce_strview.data, nonce_strview.size); + // Parse the response according to the resource type. + if (*type_url == kCdsTypeUrl) { + return CdsResponseParse(response, cds_update_map, arena.ptr()); + } else if (*type_url == kEdsTypeUrl) { + return EdsResponsedParse(response, expected_eds_service_names, + eds_update_map, arena.ptr()); + } else { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Unsupported ADS resource type."); + } +} + namespace { grpc_slice LrsRequestEncode( @@ -425,7 +654,7 @@ grpc_slice LrsRequestEncode( } // namespace -grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, +grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name, const XdsBootstrap::Node* node, const char* build_version) { upb::Arena arena; @@ -444,7 +673,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, request, arena.ptr()); // Set the cluster name. envoy_api_v2_endpoint_ClusterStats_set_cluster_name( - cluster_stats, upb_strview_makez(server_name)); + cluster_stats, upb_strview_makez(server_name.c_str())); return LrsRequestEncode(request, arena.ptr()); } @@ -452,17 +681,17 @@ namespace { void LocalityStatsPopulate( envoy_api_v2_endpoint_UpstreamLocalityStats* output, - std::pair, - XdsClientStats::LocalityStats::Snapshot>& input, + const std::pair, + XdsClientStats::LocalityStats::Snapshot>& input, upb_arena* arena) { // Set sub_zone. envoy_api_v2_core_Locality* locality = envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output, arena); envoy_api_v2_core_Locality_set_sub_zone( - locality, upb_strview_makez(input.first->sub_zone())); + locality, upb_strview_makez(input.first->sub_zone().c_str())); // Set total counts. - XdsClientStats::LocalityStats::Snapshot& snapshot = input.second; + const XdsClientStats::LocalityStats::Snapshot& snapshot = input.second; envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests( output, snapshot.total_successful_requests); envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress( @@ -473,7 +702,7 @@ void LocalityStatsPopulate( output, snapshot.total_issued_requests); // Add load metric stats. for (auto& p : snapshot.load_metric_stats) { - const char* metric_name = p.first.get(); + const char* metric_name = p.first.c_str(); const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value = p.second; envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric = @@ -490,62 +719,80 @@ void LocalityStatsPopulate( } // namespace -grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, - XdsClientStats* client_stats) { +grpc_slice XdsLrsRequestCreateAndEncode( + std::map> client_stats_map) { upb::Arena arena; - XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset(); - // Prune unused locality stats. - client_stats->PruneLocalityStats(); + // Get the snapshots. + std::map> + snapshot_map; + for (auto& p : client_stats_map) { + const StringView& cluster_name = p.first; + for (auto* client_stats : p.second) { + XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset(); + // Prune unused locality stats. + client_stats->PruneLocalityStats(); + if (snapshot.IsAllZero()) continue; + snapshot_map[cluster_name].emplace_back(std::move(snapshot)); + } + } // When all the counts are zero, return empty slice. - if (snapshot.IsAllZero()) return grpc_empty_slice(); + if (snapshot_map.empty()) return grpc_empty_slice(); // Create a request. envoy_service_load_stats_v2_LoadStatsRequest* request = envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr()); - // Add cluster stats. There is only one because we only use one server name in - // one channel. - envoy_api_v2_endpoint_ClusterStats* cluster_stats = - envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats( - request, arena.ptr()); - // Set the cluster name. - envoy_api_v2_endpoint_ClusterStats_set_cluster_name( - cluster_stats, upb_strview_makez(server_name)); - // Add locality stats. - for (auto& p : snapshot.upstream_locality_stats) { - envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats = - envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats( - cluster_stats, arena.ptr()); - LocalityStatsPopulate(locality_stats, p, arena.ptr()); - } - // Add dropped requests. - for (auto& p : snapshot.dropped_requests) { - const char* category = p.first.get(); - const uint64_t count = p.second; - envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests = - envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats, - arena.ptr()); - envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category( - dropped_requests, upb_strview_makez(category)); - envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count( - dropped_requests, count); - } - // Set total dropped requests. - envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests( - cluster_stats, snapshot.total_dropped_requests); - // Set real load report interval. - gpr_timespec timespec = - grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN); - google_protobuf_Duration* load_report_interval = - envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval( - cluster_stats, arena.ptr()); - google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec); - google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec); + for (auto& p : snapshot_map) { + const StringView& cluster_name = p.first; + const auto& snapshot_list = p.second; + for (size_t i = 0; i < snapshot_list.size(); ++i) { + const auto& snapshot = snapshot_list[i]; + // Add cluster stats. + envoy_api_v2_endpoint_ClusterStats* cluster_stats = + envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats( + request, arena.ptr()); + // Set the cluster name. + envoy_api_v2_endpoint_ClusterStats_set_cluster_name( + cluster_stats, + upb_strview_make(cluster_name.data(), cluster_name.size())); + // Add locality stats. + for (auto& p : snapshot.upstream_locality_stats) { + envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats = + envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats( + cluster_stats, arena.ptr()); + LocalityStatsPopulate(locality_stats, p, arena.ptr()); + } + // Add dropped requests. + for (auto& p : snapshot.dropped_requests) { + const char* category = p.first.c_str(); + const uint64_t count = p.second; + envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests = + envoy_api_v2_endpoint_ClusterStats_add_dropped_requests( + cluster_stats, arena.ptr()); + envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category( + dropped_requests, upb_strview_makez(category)); + envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count( + dropped_requests, count); + } + // Set total dropped requests. + envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests( + cluster_stats, snapshot.total_dropped_requests); + // Set real load report interval. + gpr_timespec timespec = + grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN); + google_protobuf_Duration* load_report_interval = + envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval( + cluster_stats, arena.ptr()); + google_protobuf_Duration_set_seconds(load_report_interval, + timespec.tv_sec); + google_protobuf_Duration_set_nanos(load_report_interval, + timespec.tv_nsec); + } + } return LrsRequestEncode(request, arena.ptr()); } -grpc_error* XdsLrsResponseDecodeAndParse( - const grpc_slice& encoded_response, - grpc_core::UniquePtr* cluster_name, - grpc_millis* load_reporting_interval) { +grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response, + std::set* cluster_names, + grpc_millis* load_reporting_interval) { upb::Arena arena; // Decode the response. const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response = @@ -554,19 +801,16 @@ grpc_error* XdsLrsResponseDecodeAndParse( GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); // Parse the response. if (decoded_response == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found."); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode response."); } - // Check the cluster size in the response. + // Store the cluster names. size_t size; const upb_strview* clusters = envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response, &size); - if (size != 1) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "The number of clusters (server names) is not 1."); + for (size_t i = 0; i < size; ++i) { + cluster_names->emplace(clusters[i].data, clusters[i].size); } - // Get the cluster name for reporting loads. - *cluster_name = StringCopy(clusters[0]); // Get the load report interval. const google_protobuf_Duration* load_reporting_interval_duration = envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval( diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 51693cd6e4c..4a2da768d45 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -23,14 +23,47 @@ #include +#include + #include #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h" #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" +#include "src/core/lib/gprpp/optional.h" namespace grpc_core { +constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster"; +constexpr char kEdsTypeUrl[] = + "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; + +// The version state for each specific ADS resource type. +struct VersionState { + // The version of the latest response that is accepted and used. + std::string version_info; + // The nonce of the latest response. + std::string nonce; + // The error message to be included in a NACK with the nonce. Consumed when a + // nonce is NACK'ed for the first time. + grpc_error* error = GRPC_ERROR_NONE; + + ~VersionState() { GRPC_ERROR_UNREF(error); } +}; + +struct CdsUpdate { + // The name to use in the EDS request. + // If empty, the cluster name will be used. + std::string eds_service_name; + // The LRS server to use for load reporting. + // If not set, load reporting will be disabled. + // If set to the empty string, will use the same server we obtained the CDS + // data from. + Optional lrs_load_reporting_server_name; +}; + +using CdsUpdateMap = std::map; + class XdsPriorityListUpdate { public: struct LocalityMap { @@ -97,24 +130,22 @@ class XdsDropConfig : public RefCounted { public: struct DropCategory { bool operator==(const DropCategory& other) const { - return strcmp(name.get(), other.name.get()) == 0 && - parts_per_million == other.parts_per_million; + return name == other.name && parts_per_million == other.parts_per_million; } - grpc_core::UniquePtr name; + std::string name; const uint32_t parts_per_million; }; using DropCategoryList = InlinedVector; - void AddCategory(grpc_core::UniquePtr name, - uint32_t parts_per_million) { + void AddCategory(std::string name, uint32_t parts_per_million) { drop_category_list_.emplace_back( DropCategory{std::move(name), parts_per_million}); } // The only method invoked from the data plane combiner. - bool ShouldDrop(const grpc_core::UniquePtr** category_name) const; + bool ShouldDrop(const std::string** category_name) const; const DropCategoryList& drop_category_list() const { return drop_category_list_; @@ -137,44 +168,53 @@ struct EdsUpdate { bool drop_all = false; }; -struct CdsUpdate { - // The name to use in the EDS request. - // If null, the cluster name will be used. - grpc_core::UniquePtr eds_service_name; - // The LRS server to use for load reporting. - // If null, load reporting will be disabled. - // If set to the empty string, will use the same server we obtained - // the CDS data from. - grpc_core::UniquePtr lrs_load_reporting_server_name; -}; - -// Creates an EDS request querying \a service_name. -grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name, - const XdsBootstrap::Node* node, - const char* build_version); - -// Parses the EDS response and returns the args to update locality map. If there -// is any error, the output update is invalid. -grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, - EdsUpdate* update); +using EdsUpdateMap = std::map; + +// Creates a request to nack an unsupported resource type. +// Takes ownership of \a error. +grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode( + const std::string& type_url, const std::string& nonce, grpc_error* error); + +// Creates a CDS request querying \a cluster_names. +// Takes ownership of \a error. +grpc_slice XdsCdsRequestCreateAndEncode( + const std::set& cluster_names, const XdsBootstrap::Node* node, + const char* build_version, const std::string& version, + const std::string& nonce, grpc_error* error); + +// Creates an EDS request querying \a eds_service_names. +// Takes ownership of \a error. +grpc_slice XdsEdsRequestCreateAndEncode( + const std::set& eds_service_names, + const XdsBootstrap::Node* node, const char* build_version, + const std::string& version, const std::string& nonce, grpc_error* error); + +// Parses the ADS response and outputs the validated update for either CDS or +// EDS. If the response can't be parsed at the top level, \a type_url will point +// to an empty string; otherwise, it will point to the received data. +grpc_error* XdsAdsResponseDecodeAndParse( + const grpc_slice& encoded_response, + const std::set& expected_eds_service_names, + CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map, + std::string* version, std::string* nonce, std::string* type_url); // Creates an LRS request querying \a server_name. -grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, +grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name, const XdsBootstrap::Node* node, const char* build_version); // Creates an LRS request sending client-side load reports. If all the counters -// in \a client_stats are zero, returns empty slice. -grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, - XdsClientStats* client_stats); +// are zero, returns empty slice. +grpc_slice XdsLrsRequestCreateAndEncode( + std::map> + client_stats_map); -// Parses the LRS response and returns \a cluster_name and \a +// Parses the LRS response and returns \a // load_reporting_interval for client-side load reporting. If there is any // error, the output config is invalid. -grpc_error* XdsLrsResponseDecodeAndParse( - const grpc_slice& encoded_response, - grpc_core::UniquePtr* cluster_name, - grpc_millis* load_reporting_interval); +grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response, + std::set* cluster_names, + grpc_millis* load_reporting_interval); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 3f4ed94cb52..ccdad9ee93d 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -125,10 +125,35 @@ class XdsClient::ChannelState::AdsCallState XdsClient* xds_client() const { return chand()->xds_client(); } bool seen_response() const { return seen_response_; } + // If \a type_url is an unsupported type, \a nonce_for_unsupported_type and + // \a error_for_unsupported_type will be used in the request; otherwise, the + // nonce and error stored in each ADS call state will be used. Takes ownership + // of \a error_for_unsupported_type. + void SendMessageLocked(const std::string& type_url, + const std::string& nonce_for_unsupported_type, + grpc_error* error_for_unsupported_type, + bool is_first_message); + private: + struct BufferedRequest { + std::string nonce; + grpc_error* error; + + // Takes ownership of \a error. + BufferedRequest(std::string nonce, grpc_error* error) + : nonce(std::move(nonce)), error(error) {} + + ~BufferedRequest() { GRPC_ERROR_UNREF(error); } + }; + + void AcceptCdsUpdate(CdsUpdateMap cds_update_map, std::string new_version); + void AcceptEdsUpdate(EdsUpdateMap eds_update_map, std::string new_version); + + static void OnRequestSent(void* arg, grpc_error* error); + static void OnRequestSentLocked(void* arg, grpc_error* error); static void OnResponseReceived(void* arg, grpc_error* error); - static void OnStatusReceived(void* arg, grpc_error* error); static void OnResponseReceivedLocked(void* arg, grpc_error* error); + static void OnStatusReceived(void* arg, grpc_error* error); static void OnStatusReceivedLocked(void* arg, grpc_error* error); bool IsCurrentCallOnChannel() const; @@ -145,6 +170,7 @@ class XdsClient::ChannelState::AdsCallState // send_message grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure on_request_sent_; // recv_message grpc_byte_buffer* recv_message_payload_ = nullptr; @@ -155,6 +181,14 @@ class XdsClient::ChannelState::AdsCallState grpc_status_code status_code_; grpc_slice status_details_; grpc_closure on_status_received_; + + // Version state. + VersionState cds_version_; + VersionState eds_version_; + + // Buffered requests. + std::map> + buffered_request_map_; }; // Contains an LRS call to the xds server. @@ -168,6 +202,7 @@ class XdsClient::ChannelState::LrsCallState void Orphan() override; void MaybeStartReportingLocked(); + bool ShouldSendLoadReports(const StringView& cluster_name) const; RetryableCall* parent() { return parent_.get(); } ChannelState* chand() const { return parent_->chand(); } @@ -244,7 +279,7 @@ class XdsClient::ChannelState::LrsCallState grpc_closure on_status_received_; // Load reporting state. - grpc_core::UniquePtr cluster_name_; + std::set cluster_names_; // Asked for by the LRS server. grpc_millis load_reporting_interval_ = 0; OrphanablePtr reporter_; }; @@ -376,14 +411,6 @@ bool XdsClient::ChannelState::HasActiveAdsCall() const { return ads_calld_->calld() != nullptr; } -void XdsClient::ChannelState::MaybeStartAdsCall() { - if (ads_calld_ != nullptr) return; - ads_calld_.reset( - new RetryableCall(Ref(DEBUG_LOCATION, "ChannelState+ads"))); -} - -void XdsClient::ChannelState::StopAdsCall() { ads_calld_.reset(); } - void XdsClient::ChannelState::MaybeStartLrsCall() { if (lrs_calld_ != nullptr) return; lrs_calld_.reset( @@ -409,6 +436,33 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); } +void XdsClient::ChannelState::OnResourceNamesChanged( + const std::string& type_url) { + if (ads_calld_ == nullptr) { + // Start the ADS call if this is the first request. + ads_calld_.reset(new RetryableCall( + Ref(DEBUG_LOCATION, "ChannelState+ads"))); + // Note: AdsCallState's ctor will automatically send necessary messages, so + // we can return here. + return; + } + // If the ADS call is in backoff state, we don't need to do anything now + // because when the call is restarted it will resend all necessary requests. + if (ads_calld() == nullptr) return; + // Send the message if the ADS call is active. + ads_calld()->SendMessageLocked(type_url, "", nullptr, false); +} + +void XdsClient::ChannelState::OnWatcherRemoved() { + // Keep the ADS call if there are watcher(s). + for (const auto& p : xds_client()->cluster_map_) { + const ClusterState& cluster_state = p.second; + if (!cluster_state.watchers.empty()) return; + } + if (!xds_client()->endpoint_map_.empty()) return; + ads_calld_.reset(); +} + // // XdsClient::ChannelState::RetryableCall<> // @@ -522,8 +576,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // activity in xds_client()->interested_parties_, which is comprised of // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); - GPR_ASSERT(xds_client()->server_name_ != nullptr); - GPR_ASSERT(*xds_client()->server_name_.get() != '\0'); + GPR_ASSERT(!xds_client()->server_name_.empty()); // Create a call with the specified method name. call_ = grpc_channel_create_pollset_set_call( chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, @@ -531,14 +584,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES, nullptr, GRPC_MILLIS_INF_FUTURE, nullptr); GPR_ASSERT(call_ != nullptr); - // Init the request payload. - grpc_slice request_payload_slice = XdsEdsRequestCreateAndEncode( - xds_client()->server_name_.get(), xds_client()->bootstrap_->node(), - xds_client()->build_version_.get()); - send_message_payload_ = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(request_payload_slice); - // Init other data associated with the call. + // Init data associated with the call. grpc_metadata_array_init(&initial_metadata_recv_); grpc_metadata_array_init(&trailing_metadata_recv_); // Start the call. @@ -559,16 +605,20 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( op->flags = 0; op->reserved = nullptr; op++; - // Op: send request message. - GPR_ASSERT(send_message_payload_ != nullptr); - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = send_message_payload_; - op->flags = 0; - op->reserved = nullptr; - op++; call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), nullptr); GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: send request message. + GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, + grpc_schedule_on_exec_ctx); + bool initial_message = true; + if (!xds_client()->cluster_map_.empty()) { + SendMessageLocked(kCdsTypeUrl, "", nullptr, initial_message); + initial_message = false; + } + if (!xds_client()->endpoint_map_.empty()) { + SendMessageLocked(kEdsTypeUrl, "", nullptr, initial_message); + } // Op: recv initial metadata. op = ops; op->op = GRPC_OP_RECV_INITIAL_METADATA; @@ -629,86 +679,126 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { // corresponding unref happens in on_status_received_ instead of here. } -void XdsClient::ChannelState::AdsCallState::OnResponseReceived( - void* arg, grpc_error* error) { - AdsCallState* ads_calld = static_cast(arg); - ads_calld->xds_client()->combiner_->Run( - GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, - OnResponseReceivedLocked, ads_calld, nullptr), - GRPC_ERROR_REF(error)); -} - -void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( - void* arg, grpc_error* /*error*/) { - AdsCallState* ads_calld = static_cast(arg); - XdsClient* xds_client = ads_calld->xds_client(); - // Empty payload means the call was cancelled. - if (!ads_calld->IsCurrentCallOnChannel() || - ads_calld->recv_message_payload_ == nullptr) { - ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); +void XdsClient::ChannelState::AdsCallState::SendMessageLocked( + const std::string& type_url, const std::string& nonce_for_unsupported_type, + grpc_error* error_for_unsupported_type, bool is_first_message) { + // Buffer message sending if an existing message is in flight. + if (send_message_payload_ != nullptr) { + buffered_request_map_[type_url].reset(new BufferedRequest( + nonce_for_unsupported_type, error_for_unsupported_type)); return; } - // Read the response. - grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_); - grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(ads_calld->recv_message_payload_); - ads_calld->recv_message_payload_ = nullptr; - // TODO(juanlishen): When we convert this to use the xds protocol, the - // balancer will send us a fallback timeout such that we should go into - // fallback mode if we have lost contact with the balancer after a certain - // period of time. We will need to save the timeout value here, and then - // when the balancer call ends, we will need to start a timer for the - // specified period of time, and if the timer fires, we go into fallback - // mode. We will also need to cancel the timer when we receive a serverlist - // from the balancer. - // This anonymous lambda is a hack to avoid the usage of goto. - [&]() { - // Parse the response. - EdsUpdate update; - grpc_error* parse_error = - XdsEdsResponseDecodeAndParse(response_slice, &update); - if (parse_error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, - "[xds_client %p] ADS response parsing failed. error=%s", - xds_client, grpc_error_string(parse_error)); - GRPC_ERROR_UNREF(parse_error); - return; + grpc_slice request_payload_slice; + const XdsBootstrap::Node* node = + is_first_message ? xds_client()->bootstrap_->node() : nullptr; + const char* build_version = + is_first_message ? xds_client()->build_version_.get() : nullptr; + if (type_url == kCdsTypeUrl) { + request_payload_slice = XdsCdsRequestCreateAndEncode( + xds_client()->WatchedClusterNames(), node, build_version, + cds_version_.version_info, cds_version_.nonce, cds_version_.error); + cds_version_.error = GRPC_ERROR_NONE; + GRPC_ERROR_UNREF(error_for_unsupported_type); + } else if (type_url == kEdsTypeUrl) { + request_payload_slice = XdsEdsRequestCreateAndEncode( + xds_client()->EdsServiceNames(), node, build_version, + eds_version_.version_info, eds_version_.nonce, eds_version_.error); + eds_version_.error = GRPC_ERROR_NONE; + GRPC_ERROR_UNREF(error_for_unsupported_type); + } else { + request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode( + type_url, nonce_for_unsupported_type, error_for_unsupported_type); + } + // Create message payload. + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + // Send the message. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = send_message_payload_; + Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release(); + GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, + grpc_schedule_on_exec_ctx); + grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); + if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { + gpr_log(GPR_ERROR, + "[xds_client %p] calld=%p call_error=%d sending ADS message", + xds_client(), this, call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + +void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( + CdsUpdateMap cds_update_map, std::string new_version) { + for (auto& p : cds_update_map) { + const char* cluster_name = p.first.c_str(); + CdsUpdate& cds_update = p.second; + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] CDS update (cluster=%s) received: " + "eds_service_name=%s, " + "lrs_load_reporting_server_name=%s", + xds_client(), cluster_name, cds_update.eds_service_name.c_str(), + cds_update.lrs_load_reporting_server_name.has_value() + ? cds_update.lrs_load_reporting_server_name.value().c_str() + : "(N/A)"); } - if (update.priority_list_update.empty() && !update.drop_all) { - char* response_slice_str = - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); - gpr_log(GPR_ERROR, - "[xds_client %p] ADS response '%s' doesn't contain any valid " - "locality but doesn't require to drop all calls. Ignoring.", - xds_client, response_slice_str); - gpr_free(response_slice_str); - return; + ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; + // Ignore identical update. + if (cluster_state.update.has_value() && + cds_update.eds_service_name == + cluster_state.update.value().eds_service_name && + cds_update.lrs_load_reporting_server_name.value() == + cluster_state.update.value() + .lrs_load_reporting_server_name.value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] CDS update identical to current, ignoring.", + xds_client()); + } + continue; + } + // Update the cluster state. + cluster_state.update.set(std::move(cds_update)); + // Notify all watchers. + for (const auto& p : cluster_state.watchers) { + p.first->OnClusterChanged(cluster_state.update.value()); } - ads_calld->seen_response_ = true; + } + cds_version_.version_info = std::move(new_version); +} + +void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( + EdsUpdateMap eds_update_map, std::string new_version) { + for (auto& p : eds_update_map) { + const char* eds_service_name = p.first.c_str(); + EdsUpdate& eds_update = p.second; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] ADS response with %" PRIuPTR + "[xds_client %p] EDS response with %" PRIuPTR " priorities and %" PRIuPTR " drop categories received (drop_all=%d)", - xds_client, update.priority_list_update.size(), - update.drop_config->drop_category_list().size(), update.drop_all); - for (size_t priority = 0; priority < update.priority_list_update.size(); - ++priority) { - const auto* locality_map_update = - update.priority_list_update.Find(static_cast(priority)); + xds_client(), eds_update.priority_list_update.size(), + eds_update.drop_config->drop_category_list().size(), + eds_update.drop_all); + for (size_t priority = 0; + priority < eds_update.priority_list_update.size(); ++priority) { + const auto* locality_map_update = eds_update.priority_list_update.Find( + static_cast(priority)); gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR " contains %" PRIuPTR " localities", - xds_client, priority, locality_map_update->size()); + xds_client(), priority, locality_map_update->size()); size_t locality_count = 0; for (const auto& p : locality_map_update->localities) { const auto& locality = p.second; gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR " %s contains %" PRIuPTR " server addresses", - xds_client, priority, locality_count, + xds_client(), priority, locality_count, locality.name->AsHumanReadableString(), locality.serverlist.size()); for (size_t i = 0; i < locality.serverlist.size(); ++i) { @@ -718,59 +808,184 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR " %s, server address %" PRIuPTR ": %s", - xds_client, priority, locality_count, + xds_client(), priority, locality_count, locality.name->AsHumanReadableString(), i, ipport); gpr_free(ipport); } ++locality_count; } } - for (size_t i = 0; i < update.drop_config->drop_category_list().size(); - ++i) { + for (size_t i = 0; + i < eds_update.drop_config->drop_category_list().size(); ++i) { const XdsDropConfig::DropCategory& drop_category = - update.drop_config->drop_category_list()[i]; + eds_update.drop_config->drop_category_list()[i]; gpr_log(GPR_INFO, "[xds_client %p] Drop category %s has drop rate %d per million", - xds_client, drop_category.name.get(), + xds_client(), drop_category.name.c_str(), drop_category.parts_per_million); } } - // Start load reporting if needed. - auto& lrs_call = ads_calld->chand()->lrs_calld_; - if (lrs_call != nullptr) { - LrsCallState* lrs_calld = lrs_call->calld(); - if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); - } + EndpointState& endpoint_state = + xds_client()->endpoint_map_[eds_service_name]; // Ignore identical update. - const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update; + const EdsUpdate& prev_update = endpoint_state.update; const bool priority_list_changed = - prev_update.priority_list_update != update.priority_list_update; + prev_update.priority_list_update != eds_update.priority_list_update; const bool drop_config_changed = prev_update.drop_config == nullptr || - *prev_update.drop_config != *update.drop_config; + *prev_update.drop_config != *eds_update.drop_config; if (!priority_list_changed && !drop_config_changed) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] EDS update identical to current, ignoring.", - xds_client); + xds_client()); } - return; + continue; } // Update the cluster state. - ClusterState& cluster_state = xds_client->cluster_state_; - cluster_state.eds_update = std::move(update); + endpoint_state.update = std::move(eds_update); // Notify all watchers. - for (const auto& p : cluster_state.endpoint_watchers) { - p.first->OnEndpointChanged(cluster_state.eds_update); + for (const auto& p : endpoint_state.watchers) { + p.first->OnEndpointChanged(endpoint_state.update); } - }(); + } + eds_version_.version_info = std::move(new_version); +} + +void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg, + grpc_error* error) { + AdsCallState* ads_calld = static_cast(arg); + ads_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&ads_calld->on_request_sent_, OnRequestSentLocked, + ads_calld, nullptr), + GRPC_ERROR_REF(error)); +} + +void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( + void* arg, grpc_error* error) { + AdsCallState* self = static_cast(arg); + if (self->IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) { + // Clean up the sent message. + grpc_byte_buffer_destroy(self->send_message_payload_); + self->send_message_payload_ = nullptr; + // Continue to send another pending message if any. + // TODO(roth): The current code to handle buffered messages has the + // advantage of sending only the most recent list of resource names for each + // resource type (no matter how many times that resource type has been + // requested to send while the current message sending is still pending). + // But its disadvantage is that we send the requests in fixed order of + // resource types. We need to fix this if we are seeing some resource + // type(s) starved due to frequent requests of other resource type(s). + for (auto& p : self->buffered_request_map_) { + const std::string& type_url = p.first; + std::unique_ptr& buffered_request = p.second; + if (buffered_request != nullptr) { + self->SendMessageLocked(type_url, buffered_request->nonce, + buffered_request->error, false); + buffered_request->error = GRPC_ERROR_NONE; + buffered_request.reset(); + break; + } + } + } + self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); +} + +void XdsClient::ChannelState::AdsCallState::OnResponseReceived( + void* arg, grpc_error* error) { + AdsCallState* ads_calld = static_cast(arg); + ads_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, + OnResponseReceivedLocked, ads_calld, nullptr), + GRPC_ERROR_REF(error)); +} + +void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( + void* arg, grpc_error* /*error*/) { + AdsCallState* ads_calld = static_cast(arg); + XdsClient* xds_client = ads_calld->xds_client(); + // Empty payload means the call was cancelled. + if (!ads_calld->IsCurrentCallOnChannel() || + ads_calld->recv_message_payload_ == nullptr) { + ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); + return; + } + // Read the response. + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(ads_calld->recv_message_payload_); + ads_calld->recv_message_payload_ = nullptr; + // TODO(juanlishen): When we convert this to use the xds protocol, the + // balancer will send us a fallback timeout such that we should go into + // fallback mode if we have lost contact with the balancer after a certain + // period of time. We will need to save the timeout value here, and then + // when the balancer call ends, we will need to start a timer for the + // specified period of time, and if the timer fires, we go into fallback + // mode. We will also need to cancel the timer when we receive a serverlist + // from the balancer. + // Parse the response. + CdsUpdateMap cds_update_map; + EdsUpdateMap eds_update_map; + std::string version; + std::string nonce; + std::string type_url; + // Note that XdsAdsResponseDecodeAndParse() also validate the response. + grpc_error* parse_error = XdsAdsResponseDecodeAndParse( + response_slice, xds_client->EdsServiceNames(), &cds_update_map, + &eds_update_map, &version, &nonce, &type_url); grpc_slice_unref_internal(response_slice); + if (type_url.empty()) { + // Ignore unparsable response. + gpr_log(GPR_ERROR, "[xds_client %p] No type_url found. error=%s", + xds_client, grpc_error_string(parse_error)); + GRPC_ERROR_UNREF(parse_error); + } else { + // Update nonce and error. + if (type_url == kCdsTypeUrl) { + ads_calld->cds_version_.nonce = nonce; + GRPC_ERROR_UNREF(ads_calld->cds_version_.error); + ads_calld->cds_version_.error = GRPC_ERROR_REF(parse_error); + } else if (type_url == kEdsTypeUrl) { + ads_calld->eds_version_.nonce = nonce; + GRPC_ERROR_UNREF(ads_calld->eds_version_.error); + ads_calld->eds_version_.error = GRPC_ERROR_REF(parse_error); + } + // NACK or ACK the response. + if (parse_error != GRPC_ERROR_NONE) { + // NACK unacceptable update. + gpr_log( + GPR_ERROR, + "[xds_client %p] ADS response can't be accepted, NACKing. error=%s", + xds_client, grpc_error_string(parse_error)); + ads_calld->SendMessageLocked(type_url, nonce, parse_error, false); + } else { + ads_calld->seen_response_ = true; + // Accept the (CDS or EDS) response. + if (type_url == kCdsTypeUrl) { + ads_calld->AcceptCdsUpdate(std::move(cds_update_map), + std::move(version)); + } else if (type_url == kEdsTypeUrl) { + ads_calld->AcceptEdsUpdate(std::move(eds_update_map), + std::move(version)); + } + // ACK the update. + ads_calld->SendMessageLocked(type_url, "", nullptr, false); + // Start load reporting if needed. + auto& lrs_call = ads_calld->chand()->lrs_calld_; + if (lrs_call != nullptr) { + LrsCallState* lrs_calld = lrs_call->calld(); + if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); + } + } + } if (xds_client->shutting_down_) { ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown"); return; } - // Keep listening for serverlist updates. + // Keep listening for updates. grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; @@ -869,15 +1084,8 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Create a request that contains the load report. - // TODO(roth): Currently, it is not possible to have multiple client - // stats objects for a given cluster. However, in the future, we may - // run into cases where this happens (e.g., due to graceful LB policy - // switching). If/when this becomes a problem, replace this assertion - // with code to merge data from multiple client stats objects. - GPR_ASSERT(xds_client()->cluster_state_.client_stats.size() == 1); - auto* client_stats = *xds_client()->cluster_state_.client_stats.begin(); grpc_slice request_payload_slice = - XdsLrsRequestCreateAndEncode(parent_->cluster_name_.get(), client_stats); + XdsLrsRequestCreateAndEncode(xds_client()->ClientStatsMap()); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. const bool old_val = last_report_counters_were_zero_; @@ -945,8 +1153,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( // activity in xds_client()->interested_parties_, which is comprised of // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); - GPR_ASSERT(xds_client()->server_name_ != nullptr); - GPR_ASSERT(*xds_client()->server_name_.get() != '\0'); + GPR_ASSERT(!xds_client()->server_name_.empty()); call_ = grpc_channel_create_pollset_set_call( chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, xds_client()->interested_parties_, @@ -955,7 +1162,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( GPR_ASSERT(call_ != nullptr); // Init the request payload. grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode( - xds_client()->server_name_.get(), xds_client()->bootstrap_->node(), + xds_client()->server_name_, xds_client()->bootstrap_->node(), xds_client()->build_version_.get()); send_message_payload_ = grpc_raw_byte_buffer_create(&request_payload_slice, 1); @@ -1069,13 +1276,22 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { AdsCallState* ads_calld = chand()->ads_calld_->calld(); if (ads_calld == nullptr || !ads_calld->seen_response()) return; // Start reporting. - for (auto* client_stats : chand()->xds_client_->cluster_state_.client_stats) { - client_stats->MaybeInitLastReportTime(); + for (auto& p : chand()->xds_client_->endpoint_map_) { + for (auto* client_stats : p.second.client_stats) { + client_stats->MaybeInitLastReportTime(); + } } reporter_ = MakeOrphanable( Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); } +bool XdsClient::ChannelState::LrsCallState::ShouldSendLoadReports( + const StringView& cluster_name) const { + // Only send load reports for the clusters that are asked for by the LRS + // server. + return cluster_names_.find(std::string(cluster_name)) != cluster_names_.end(); +} + void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); @@ -1124,10 +1340,10 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( // This anonymous lambda is a hack to avoid the usage of goto. [&]() { // Parse the response. - grpc_core::UniquePtr new_cluster_name; + std::set new_cluster_names; grpc_millis new_load_reporting_interval; grpc_error* parse_error = XdsLrsResponseDecodeAndParse( - response_slice, &new_cluster_name, &new_load_reporting_interval); + response_slice, &new_cluster_names, &new_load_reporting_interval); if (parse_error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "[xds_client %p] LRS response parsing failed. error=%s", @@ -1138,9 +1354,15 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( lrs_calld->seen_response_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] LRS response received, cluster_name=%s, " - "load_report_interval=%" PRId64 "ms", - xds_client, new_cluster_name.get(), new_load_reporting_interval); + "[xds_client %p] LRS response received, %" PRIuPTR + " cluster names, load_report_interval=%" PRId64 "ms", + xds_client, new_cluster_names.size(), + new_load_reporting_interval); + size_t i = 0; + for (const auto& name : new_cluster_names) { + gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s", + xds_client, i++, name.c_str()); + } } if (new_load_reporting_interval < GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) { @@ -1154,8 +1376,8 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( } } // Ignore identical update. - if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval && - strcmp(lrs_calld->cluster_name_.get(), new_cluster_name.get()) == 0) { + if (lrs_calld->cluster_names_ == new_cluster_names && + lrs_calld->load_reporting_interval_ == new_load_reporting_interval) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] Incoming LRS response identical to current, " @@ -1167,7 +1389,7 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( // Stop current load reporting (if any) to adopt the new config. lrs_calld->reporter_.reset(); // Record the new config. - lrs_calld->cluster_name_ = std::move(new_cluster_name); + lrs_calld->cluster_names_ = std::move(new_cluster_names); lrs_calld->load_reporting_interval_ = new_load_reporting_interval; // Try starting sending load report. lrs_calld->MaybeStartReportingLocked(); @@ -1253,11 +1475,12 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, StringView server_name, std::unique_ptr watcher, const grpc_channel_args& channel_args, grpc_error** error) - : build_version_(GenerateBuildVersionString()), + : InternallyRefCounted(&grpc_xds_client_trace), + build_version_(GenerateBuildVersionString()), combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), interested_parties_(interested_parties), bootstrap_(XdsBootstrap::ReadFromFile(error)), - server_name_(StringViewToCString(server_name)), + server_name_(server_name), service_config_watcher_(std::move(watcher)) { if (*error != GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -1286,77 +1509,95 @@ XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); } void XdsClient::Orphan() { shutting_down_ = true; chand_.reset(); + cluster_map_.clear(); + endpoint_map_.clear(); Unref(DEBUG_LOCATION, "XdsClient::Orphan()"); } void XdsClient::WatchClusterData( - StringView cluster, std::unique_ptr watcher) { + StringView cluster_name, std::unique_ptr watcher) { + const bool new_name = cluster_map_.find(cluster_name) == cluster_map_.end(); + ClusterState& cluster_state = cluster_map_[cluster_name]; ClusterWatcherInterface* w = watcher.get(); - cluster_state_.cluster_watchers[w] = std::move(watcher); - // TODO(juanlishen): Start CDS call if not already started and return - // real data via watcher. - CdsUpdate update; - update.eds_service_name = StringViewToCString(cluster); - update.lrs_load_reporting_server_name.reset(gpr_strdup("")); - w->OnClusterChanged(std::move(update)); + cluster_state.watchers[w] = std::move(watcher); + // If we've already received an CDS update, notify the new watcher + // immediately. + if (cluster_state.update.has_value()) { + w->OnClusterChanged(cluster_state.update.value()); + } + if (new_name) chand_->OnResourceNamesChanged(kCdsTypeUrl); } -void XdsClient::CancelClusterDataWatch(StringView /*cluster*/, +void XdsClient::CancelClusterDataWatch(StringView cluster_name, ClusterWatcherInterface* watcher) { - auto it = cluster_state_.cluster_watchers.find(watcher); - if (it != cluster_state_.cluster_watchers.end()) { - cluster_state_.cluster_watchers.erase(it); - } - if (chand_ != nullptr && cluster_state_.cluster_watchers.empty()) { - // TODO(juanlishen): Stop CDS call. + if (shutting_down_) return; + ClusterState& cluster_state = cluster_map_[cluster_name]; + auto it = cluster_state.watchers.find(watcher); + if (it != cluster_state.watchers.end()) { + cluster_state.watchers.erase(it); + if (cluster_state.watchers.empty()) { + cluster_map_.erase(cluster_name); + chand_->OnResourceNamesChanged(kCdsTypeUrl); + } } + chand_->OnWatcherRemoved(); } void XdsClient::WatchEndpointData( - StringView /*cluster*/, std::unique_ptr watcher) { + StringView eds_service_name, + std::unique_ptr watcher) { + const bool new_name = + endpoint_map_.find(eds_service_name) == endpoint_map_.end(); + EndpointState& endpoint_state = endpoint_map_[eds_service_name]; EndpointWatcherInterface* w = watcher.get(); - cluster_state_.endpoint_watchers[w] = std::move(watcher); + endpoint_state.watchers[w] = std::move(watcher); // If we've already received an EDS update, notify the new watcher // immediately. - if (!cluster_state_.eds_update.priority_list_update.empty()) { - w->OnEndpointChanged(cluster_state_.eds_update); + if (!endpoint_state.update.priority_list_update.empty()) { + w->OnEndpointChanged(endpoint_state.update); } - chand_->MaybeStartAdsCall(); + if (new_name) chand_->OnResourceNamesChanged(kEdsTypeUrl); } -void XdsClient::CancelEndpointDataWatch(StringView /*cluster*/, +void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, EndpointWatcherInterface* watcher) { - auto it = cluster_state_.endpoint_watchers.find(watcher); - if (it != cluster_state_.endpoint_watchers.end()) { - cluster_state_.endpoint_watchers.erase(it); - } - if (chand_ != nullptr && cluster_state_.endpoint_watchers.empty()) { - chand_->StopAdsCall(); + if (shutting_down_) return; + EndpointState& endpoint_state = endpoint_map_[eds_service_name]; + auto it = endpoint_state.watchers.find(watcher); + if (it != endpoint_state.watchers.end()) { + endpoint_state.watchers.erase(it); + if (endpoint_state.watchers.empty()) { + endpoint_map_.erase(eds_service_name); + chand_->OnResourceNamesChanged(kEdsTypeUrl); + } } + chand_->OnWatcherRemoved(); } void XdsClient::AddClientStats(StringView /*lrs_server*/, - StringView /*cluster*/, + StringView cluster_name, XdsClientStats* client_stats) { + EndpointState& endpoint_state = endpoint_map_[cluster_name]; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. - cluster_state_.client_stats.insert(client_stats); + endpoint_state.client_stats.insert(client_stats); chand_->MaybeStartLrsCall(); } void XdsClient::RemoveClientStats(StringView /*lrs_server*/, - StringView /*cluster*/, + StringView cluster_name, XdsClientStats* client_stats) { + EndpointState& endpoint_state = endpoint_map_[cluster_name]; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. // TODO(roth): In principle, we should try to send a final load report // containing whatever final stats have been accumulated since the // last load report. - auto it = cluster_state_.client_stats.find(client_stats); - if (it != cluster_state_.client_stats.end()) { - cluster_state_.client_stats.erase(it); + auto it = endpoint_state.client_stats.find(client_stats); + if (it != endpoint_state.client_stats.end()) { + endpoint_state.client_stats.erase(it); } - if (chand_ != nullptr && cluster_state_.client_stats.empty()) { + if (chand_ != nullptr && endpoint_state.client_stats.empty()) { chand_->StopLrsCall(); } } @@ -1367,15 +1608,55 @@ void XdsClient::ResetBackoff() { } } +std::set XdsClient::WatchedClusterNames() const { + std::set cluster_names; + for (const auto& p : cluster_map_) { + const StringView& cluster_name = p.first; + const ClusterState& cluster_state = p.second; + // Don't request for the clusters that are cached before watched. + if (cluster_state.watchers.empty()) continue; + cluster_names.emplace(cluster_name); + } + return cluster_names; +} + +std::set XdsClient::EdsServiceNames() const { + std::set eds_service_names; + for (const auto& p : endpoint_map_) { + const StringView& eds_service_name = p.first; + eds_service_names.emplace(eds_service_name); + } + return eds_service_names; +} + +std::map> XdsClient::ClientStatsMap() + const { + std::map> client_stats_map; + for (const auto& p : endpoint_map_) { + const StringView& cluster_name = p.first; + const auto& client_stats = p.second.client_stats; + if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) { + client_stats_map.emplace(cluster_name, client_stats); + } + } + return client_stats_map; +} + void XdsClient::NotifyOnError(grpc_error* error) { if (service_config_watcher_ != nullptr) { service_config_watcher_->OnError(GRPC_ERROR_REF(error)); } - for (const auto& p : cluster_state_.cluster_watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& p : cluster_map_) { + const ClusterState& cluster_state = p.second; + for (const auto& p : cluster_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } } - for (const auto& p : cluster_state_.endpoint_watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& p : endpoint_map_) { + const EndpointState& endpoint_state = p.second; + for (const auto& p : endpoint_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } } GRPC_ERROR_UNREF(error); } @@ -1393,7 +1674,7 @@ void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { " } }\n" " ]\n" "}", - self->server_name_.get()); + self->server_name_.c_str()); RefCountedPtr service_config = ServiceConfig::Create(json, &error); gpr_free(json); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 8bb0dc443c1..619b545ceb3 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -27,6 +27,7 @@ #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/optional.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -85,9 +86,9 @@ class XdsClient : public InternallyRefCounted { // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) - void WatchClusterData(StringView cluster, + void WatchClusterData(StringView cluster_name, std::unique_ptr watcher); - void CancelClusterDataWatch(StringView cluster, + void CancelClusterDataWatch(StringView cluster_name, ClusterWatcherInterface* watcher); // Start and cancel endpoint data watch for a cluster. @@ -95,15 +96,15 @@ class XdsClient : public InternallyRefCounted { // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) - void WatchEndpointData(StringView cluster, + void WatchEndpointData(StringView eds_service_name, std::unique_ptr watcher); - void CancelEndpointDataWatch(StringView cluster, + void CancelEndpointDataWatch(StringView eds_service_name, EndpointWatcherInterface* watcher); - // Adds and removes client stats for cluster. - void AddClientStats(StringView lrs_server, StringView cluster, + // Adds and removes client stats for \a cluster_name. + void AddClientStats(StringView /*lrs_server*/, StringView cluster_name, XdsClientStats* client_stats); - void RemoveClientStats(StringView lrs_server, StringView cluster, + void RemoveClientStats(StringView /*lrs_server*/, StringView cluster_name, XdsClientStats* client_stats); // Resets connection backoff state. @@ -115,6 +116,8 @@ class XdsClient : public InternallyRefCounted { const grpc_channel_args& args); private: + static const grpc_arg_pointer_vtable kXdsClientVtable; + // Contains a channel to the xds server and all the data related to the // channel. Holds a ref to the xds client object. // TODO(roth): This is separate from the XdsClient object because it was @@ -144,9 +147,6 @@ class XdsClient : public InternallyRefCounted { AdsCallState* ads_calld() const; LrsCallState* lrs_calld() const; - void MaybeStartAdsCall(); - void StopAdsCall(); - void MaybeStartLrsCall(); void StopLrsCall(); @@ -155,6 +155,9 @@ class XdsClient : public InternallyRefCounted { void StartConnectivityWatchLocked(); void CancelConnectivityWatchLocked(); + void OnResourceNamesChanged(const std::string& type_url); + void OnWatcherRemoved(); + private: class StateWatcher; @@ -173,13 +176,18 @@ class XdsClient : public InternallyRefCounted { struct ClusterState { std::map> - cluster_watchers; + watchers; + // The latest data seen from CDS. + Optional update; + }; + + struct EndpointState { std::map> - endpoint_watchers; + watchers; std::set client_stats; // The latest data seen from EDS. - EdsUpdate eds_update; + EdsUpdate update; }; // Sends an error notification to all watchers. @@ -189,12 +197,22 @@ class XdsClient : public InternallyRefCounted { // normal method instead of a closure callback. static void NotifyOnServiceConfig(void* arg, grpc_error* error); + std::set WatchedClusterNames() const; + + std::set EdsServiceNames() const; + + std::map> ClientStatsMap() const; + // Channel arg vtable functions. static void* ChannelArgCopy(void* p); static void ChannelArgDestroy(void* p); static int ChannelArgCmp(void* p, void* q); - static const grpc_arg_pointer_vtable kXdsClientVtable; + // All the received clusters are cached, no matter they are watched or not. + std::map cluster_map_; + // Only the watched EDS service names are stored. + std::map + endpoint_map_; grpc_core::UniquePtr build_version_; @@ -203,7 +221,7 @@ class XdsClient : public InternallyRefCounted { std::unique_ptr bootstrap_; - grpc_core::UniquePtr server_name_; + std::string server_name_; std::unique_ptr service_config_watcher_; // TODO(juanlishen): Once we implement LDS support, this will no // longer be needed. @@ -212,12 +230,6 @@ class XdsClient : public InternallyRefCounted { // The channel for communicating with the xds server. OrphanablePtr chand_; - // TODO(juanlishen): As part of adding CDS support, replace - // cluster_state_ with a map keyed by cluster name, so that we can - // support multiple clusters for both CDS and EDS. - ClusterState cluster_state_; - // Map clusters_; - bool shutting_down_ = false; }; diff --git a/src/core/ext/filters/client_channel/xds/xds_client_stats.cc b/src/core/ext/filters/client_channel/xds/xds_client_stats.cc index e04755235d7..03587d3e7f2 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client_stats.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client_stats.cc @@ -87,11 +87,10 @@ XdsClientStats::LocalityStats::GetSnapshotAndReset() { { MutexLock lock(&load_metric_stats_mu_); for (auto& p : load_metric_stats_) { - const char* metric_name = p.first.get(); + const std::string& metric_name = p.first; LoadMetric& metric_value = p.second; - snapshot.load_metric_stats.emplace( - grpc_core::UniquePtr(gpr_strdup(metric_name)), - metric_value.GetSnapshotAndReset()); + snapshot.load_metric_stats.emplace(metric_name, + metric_value.GetSnapshotAndReset()); } } return snapshot; @@ -178,14 +177,12 @@ void XdsClientStats::PruneLocalityStats() { } } -void XdsClientStats::AddCallDropped( - const grpc_core::UniquePtr& category) { +void XdsClientStats::AddCallDropped(const std::string& category) { total_dropped_requests_.FetchAdd(1, MemoryOrder::RELAXED); MutexLock lock(&dropped_requests_mu_); auto iter = dropped_requests_.find(category); if (iter == dropped_requests_.end()) { - dropped_requests_.emplace( - grpc_core::UniquePtr(gpr_strdup(category.get())), 1); + dropped_requests_.emplace(category, 1); } else { ++iter->second; } diff --git a/src/core/ext/filters/client_channel/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/xds/xds_client_stats.h index 15b246bd3e0..b895a3205fd 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client_stats.h +++ b/src/core/ext/filters/client_channel/xds/xds_client_stats.h @@ -38,46 +38,43 @@ class XdsLocalityName : public RefCounted { struct Less { bool operator()(const RefCountedPtr& lhs, const RefCountedPtr& rhs) const { - int cmp_result = strcmp(lhs->region_.get(), rhs->region_.get()); + int cmp_result = lhs->region_.compare(rhs->region_); if (cmp_result != 0) return cmp_result < 0; - cmp_result = strcmp(lhs->zone_.get(), rhs->zone_.get()); + cmp_result = lhs->zone_.compare(rhs->zone_); if (cmp_result != 0) return cmp_result < 0; - return strcmp(lhs->sub_zone_.get(), rhs->sub_zone_.get()) < 0; + return lhs->sub_zone_.compare(rhs->sub_zone_) < 0; } }; - XdsLocalityName(grpc_core::UniquePtr region, - grpc_core::UniquePtr zone, - grpc_core::UniquePtr subzone) + XdsLocalityName(std::string region, std::string zone, std::string subzone) : region_(std::move(region)), zone_(std::move(zone)), sub_zone_(std::move(subzone)) {} bool operator==(const XdsLocalityName& other) const { - return strcmp(region_.get(), other.region_.get()) == 0 && - strcmp(zone_.get(), other.zone_.get()) == 0 && - strcmp(sub_zone_.get(), other.sub_zone_.get()) == 0; + return region_ == other.region_ && zone_ == other.zone_ && + sub_zone_ == other.sub_zone_; } - const char* region() const { return region_.get(); } - const char* zone() const { return zone_.get(); } - const char* sub_zone() const { return sub_zone_.get(); } + const std::string& region() const { return region_; } + const std::string& zone() const { return zone_; } + const std::string& sub_zone() const { return sub_zone_; } const char* AsHumanReadableString() { if (human_readable_string_ == nullptr) { char* tmp; gpr_asprintf(&tmp, "{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", - region_.get(), zone_.get(), sub_zone_.get()); + region_.c_str(), zone_.c_str(), sub_zone_.c_str()); human_readable_string_.reset(tmp); } return human_readable_string_.get(); } private: - grpc_core::UniquePtr region_; - grpc_core::UniquePtr zone_; - grpc_core::UniquePtr sub_zone_; - grpc_core::UniquePtr human_readable_string_; + std::string region_; + std::string zone_; + std::string sub_zone_; + UniquePtr human_readable_string_; }; // The stats classes (i.e., XdsClientStats, LocalityStats, and LoadMetric) can @@ -112,10 +109,8 @@ class XdsClientStats { double total_metric_value_{0}; }; - using LoadMetricMap = - std::map, LoadMetric, StringLess>; - using LoadMetricSnapshotMap = - std::map, LoadMetric::Snapshot, StringLess>; + using LoadMetricMap = std::map; + using LoadMetricSnapshotMap = std::map; struct Snapshot { // TODO(juanlishen): Change this to const method when const_iterator is @@ -187,8 +182,7 @@ class XdsClientStats { using LocalityStatsSnapshotMap = std::map, LocalityStats::Snapshot, XdsLocalityName::Less>; - using DroppedRequestsMap = - std::map, uint64_t, StringLess>; + using DroppedRequestsMap = std::map; using DroppedRequestsSnapshotMap = DroppedRequestsMap; struct Snapshot { @@ -211,7 +205,7 @@ class XdsClientStats { RefCountedPtr FindLocalityStats( const RefCountedPtr& locality_name); void PruneLocalityStats(); - void AddCallDropped(const grpc_core::UniquePtr& category); + void AddCallDropped(const std::string& category); private: // The stats for each locality. diff --git a/src/proto/grpc/testing/xds/BUILD b/src/proto/grpc/testing/xds/BUILD index b9fb013512b..0bd14afd072 100644 --- a/src/proto/grpc/testing/xds/BUILD +++ b/src/proto/grpc/testing/xds/BUILD @@ -30,6 +30,13 @@ grpc_proto_library( deps = ["eds_for_test_proto"], ) +grpc_proto_library( + name = "cds_for_test_proto", + srcs = [ + "cds_for_test.proto", + ], +) + grpc_proto_library( name = "eds_for_test_proto", srcs = [ diff --git a/src/proto/grpc/testing/xds/cds_for_test.proto b/src/proto/grpc/testing/xds/cds_for_test.proto new file mode 100644 index 00000000000..a4ee4b3b7fe --- /dev/null +++ b/src/proto/grpc/testing/xds/cds_for_test.proto @@ -0,0 +1,157 @@ +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file contains the xds protocol and its dependency. It can't be used by +// the gRPC library; otherwise there can be duplicate definition problems if +// users depend on both gRPC and Envoy. It can only be used by gRPC tests. +// +// TODO(juanlishen): This file is a hack to avoid a problem we're +// currently having where we can't depend on a proto file in an external +// repo due to bazel limitations. Once that's fixed, this should be +// removed. Until this, it should be used in the gRPC tests only, or else it +// will cause a conflict due to the same proto messages being defined in +// multiple files in the same binary. + +syntax = "proto3"; + +package envoy.api.v2; + +// Aggregated Discovery Service (ADS) options. This is currently empty, but when +// set in :ref:`ConfigSource ` can be used to +// specify that ADS is to be used. +message AggregatedConfigSource { +} + +message SelfConfigSource { +} + +message ConfigSource { + oneof config_source_specifier { + // When set, ADS will be used to fetch resources. The ADS API configuration + // source in the bootstrap configuration is used. + AggregatedConfigSource ads = 3; + + // [#not-implemented-hide:] + // When set, the client will access the resources from the same server it got the + // ConfigSource from, although not necessarily from the same stream. This is similar to the + // :ref:`ads` field, except that the client may use a + // different stream to the same server. As a result, this field can be used for things + // like LRS that cannot be sent on an ADS stream. It can also be used to link from (e.g.) + // LDS to RDS on the same server without requiring the management server to know its name + // or required credentials. + // [#next-major-version: In xDS v3, consider replacing the ads field with this one, since + // this field can implicitly mean to use the same stream in the case where the ConfigSource + // is provided via ADS and the specified data can also be obtained via ADS.] + SelfConfigSource self = 5; + } +} + +message Cluster { + // Refer to :ref:`service discovery type ` + // for an explanation on each type. + enum DiscoveryType { + // Refer to the :ref:`static discovery type` + // for an explanation. + STATIC = 0; + + // Refer to the :ref:`strict DNS discovery + // type` + // for an explanation. + STRICT_DNS = 1; + + // Refer to the :ref:`logical DNS discovery + // type` + // for an explanation. + LOGICAL_DNS = 2; + + // Refer to the :ref:`service discovery type` + // for an explanation. + EDS = 3; + + // Refer to the :ref:`original destination discovery + // type` + // for an explanation. + ORIGINAL_DST = 4; + } + + string name = 1; + + oneof cluster_discovery_type { + // The :ref:`service discovery type ` + // to use for resolving the cluster. + DiscoveryType type = 2; + } + + // Only valid when discovery type is EDS. + message EdsClusterConfig { + // Configuration for the source of EDS updates for this Cluster. + ConfigSource eds_config = 1; + + // Optional alternative to cluster name to present to EDS. This does not + // have the same restrictions as cluster name, i.e. it may be arbitrary + // length. + string service_name = 2; + } + + // Refer to :ref:`load balancer type ` architecture + // overview section for information on each type. + enum LbPolicy { + // Refer to the :ref:`round robin load balancing + // policy` + // for an explanation. + ROUND_ROBIN = 0; + + // Refer to the :ref:`least request load balancing + // policy` + // for an explanation. + LEAST_REQUEST = 1; + + // Refer to the :ref:`ring hash load balancing + // policy` + // for an explanation. + RING_HASH = 2; + + // Refer to the :ref:`random load balancing + // policy` + // for an explanation. + RANDOM = 3; + + // Refer to the :ref:`original destination load balancing + // policy` + // for an explanation. + // + // .. attention:: + // + // **This load balancing policy is deprecated**. Use CLUSTER_PROVIDED instead. + // + ORIGINAL_DST_LB = 4; + + // Refer to the :ref:`Maglev load balancing policy` + // for an explanation. + MAGLEV = 5; + + // This load balancer type must be specified if the configured cluster provides a cluster + // specific load balancer. Consult the configured cluster's documentation for whether to set + // this option or not. + CLUSTER_PROVIDED = 6; + } + // The :ref:`load balancer type ` to use + // when picking a host in the cluster. + LbPolicy lb_policy = 6; + + // Configuration to use for EDS updates for the Cluster. + EdsClusterConfig eds_cluster_config = 3; + + ConfigSource lrs_server = 42; +} diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 2bc9854fa74..3b490e08848 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -516,6 +516,7 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", "//src/proto/grpc/testing/xds:ads_for_test_proto", + "//src/proto/grpc/testing/xds:cds_for_test_proto", "//src/proto/grpc/testing/xds:eds_for_test_proto", "//src/proto/grpc/testing/xds:lrs_for_test_proto", "//test/core/util:grpc_test_util", diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 03b2044fab4..847dec9ce66 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -54,6 +54,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h" +#include "src/proto/grpc/testing/xds/cds_for_test.grpc.pb.h" #include "src/proto/grpc/testing/xds/eds_for_test.grpc.pb.h" #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h" @@ -83,6 +84,7 @@ namespace { using std::chrono::system_clock; +using ::envoy::api::v2::Cluster; using ::envoy::api::v2::ClusterLoadAssignment; using ::envoy::api::v2::DiscoveryRequest; using ::envoy::api::v2::DiscoveryResponse; @@ -94,6 +96,7 @@ using ::envoy::service::load_stats::v2::LoadStatsRequest; using ::envoy::service::load_stats::v2::LoadStatsResponse; using ::envoy::service::load_stats::v2::UpstreamLocalityStats; +constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster"; constexpr char kEdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region"; @@ -336,9 +339,16 @@ class ClientStats { std::map dropped_requests_; }; -// Only the EDS functionality is implemented. +// TODO(roth): Change this service to a real fake. class AdsServiceImpl : public AdsService { public: + enum ResponseState { + NOT_SENT, + SENT, + ACKED, + NACKED, + }; + struct ResponseArgs { struct Locality { Locality(const grpc::string& sub_zone, std::vector ports, @@ -371,6 +381,70 @@ class AdsServiceImpl : public AdsService { using Stream = ServerReaderWriter; using ResponseDelayPair = std::pair; + AdsServiceImpl(bool enable_load_reporting) { + default_cluster_.set_name("application_target_name"); + default_cluster_.set_type(envoy::api::v2::Cluster::EDS); + default_cluster_.mutable_eds_cluster_config() + ->mutable_eds_config() + ->mutable_ads(); + default_cluster_.set_lb_policy(envoy::api::v2::Cluster::ROUND_ROBIN); + if (enable_load_reporting) { + default_cluster_.mutable_lrs_server()->mutable_self(); + } + cds_response_data_ = { + {"application_target_name", default_cluster_}, + }; + } + + void HandleCdsRequest(DiscoveryRequest* request, Stream* stream) { + gpr_log(GPR_INFO, "ADS[%p]: received CDS request '%s'", this, + request->DebugString().c_str()); + const std::string version_str = "version_1"; + const std::string nonce_str = "nonce_1"; + grpc_core::MutexLock lock(&ads_mu_); + if (cds_response_state_ == NOT_SENT) { + DiscoveryResponse response; + response.set_type_url(kCdsTypeUrl); + response.set_version_info(version_str); + response.set_nonce(nonce_str); + for (const auto& cluster_name : request->resource_names()) { + auto iter = cds_response_data_.find(cluster_name); + if (iter == cds_response_data_.end()) continue; + response.add_resources()->PackFrom(iter->second); + } + stream->Write(response); + cds_response_state_ = SENT; + } else if (cds_response_state_ == SENT) { + GPR_ASSERT(!request->response_nonce().empty()); + cds_response_state_ = + request->version_info() == version_str ? ACKED : NACKED; + } + } + + void HandleEdsRequest(DiscoveryRequest* request, Stream* stream) { + gpr_log(GPR_INFO, "ADS[%p]: received EDS request '%s'", this, + request->DebugString().c_str()); + IncreaseRequestCount(); + std::vector responses_and_delays; + { + grpc_core::MutexLock lock(&ads_mu_); + responses_and_delays = eds_responses_and_delays_; + } + // Send response. + for (const auto& p : responses_and_delays) { + const DiscoveryResponse& response = p.first; + const int delay_ms = p.second; + gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms); + if (delay_ms > 0) { + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); + } + gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this, + response.DebugString().c_str()); + IncreaseResponseCount(); + stream->Write(response); + } + } + Status StreamAggregatedResources(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this); @@ -382,21 +456,21 @@ class AdsServiceImpl : public AdsService { // Balancer shouldn't receive the call credentials metadata. EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), context->client_metadata().end()); - // Read request. + // Keep servicing requests until the EDS response has been sent back. DiscoveryRequest request; - if (!stream->Read(&request)) return; - IncreaseRequestCount(); - gpr_log(GPR_INFO, "ADS[%p]: received initial message '%s'", this, - request.DebugString().c_str()); - // Send response. - std::vector responses_and_delays; - { - grpc_core::MutexLock lock(&ads_mu_); - responses_and_delays = responses_and_delays_; - } - for (const auto& response_and_delay : responses_and_delays) { - SendResponse(stream, response_and_delay.first, - response_and_delay.second); + // TODO(roth): For each supported type, we currently only handle one + // request without replying to any new requests (for ACK/NACK or new + // resource names). It's not causing a big problem now but should be + // fixed. + bool eds_sent = false; + while (!eds_sent || cds_response_state_ == SENT) { + if (!stream->Read(&request)) return; + if (request.type_url() == kCdsTypeUrl) { + HandleCdsRequest(&request, stream); + } else if (request.type_url() == kEdsTypeUrl) { + HandleEdsRequest(&request, stream); + eds_sent = true; + } } // Wait until notified done. grpc_core::MutexLock lock(&ads_mu_); @@ -406,29 +480,42 @@ class AdsServiceImpl : public AdsService { return Status::OK; } - void add_response(const DiscoveryResponse& response, int send_after_ms) { + Cluster GetDefaultCluster() const { return default_cluster_; } + + void SetCdsResponse( + std::map cds_response_data) { + cds_response_data_ = std::move(cds_response_data); + } + + ResponseState cds_response_state() { + grpc_core::MutexLock lock(&ads_mu_); + return cds_response_state_; + } + + void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) { grpc_core::MutexLock lock(&ads_mu_); - responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); + eds_responses_and_delays_.push_back( + std::make_pair(response, send_after_ms)); } void Start() { grpc_core::MutexLock lock(&ads_mu_); ads_done_ = false; - responses_and_delays_.clear(); + eds_responses_and_delays_.clear(); } void Shutdown() { { grpc_core::MutexLock lock(&ads_mu_); NotifyDoneWithAdsCallLocked(); - responses_and_delays_.clear(); + eds_responses_and_delays_.clear(); } gpr_log(GPR_INFO, "ADS[%p]: shut down", this); } static DiscoveryResponse BuildResponse(const ResponseArgs& args) { ClusterLoadAssignment assignment; - assignment.set_cluster_name("service name"); + assignment.set_cluster_name("application_target_name"); for (const auto& locality : args.locality_list) { auto* endpoints = assignment.add_endpoints(); endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight); @@ -482,23 +569,16 @@ class AdsServiceImpl : public AdsService { } private: - void SendResponse(Stream* stream, const DiscoveryResponse& response, - int delay_ms) { - gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms); - if (delay_ms > 0) { - gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); - } - gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this, - response.DebugString().c_str()); - IncreaseResponseCount(); - stream->Write(response); - } - grpc_core::CondVar ads_cond_; // Protect the members below. grpc_core::Mutex ads_mu_; bool ads_done_ = false; - std::vector responses_and_delays_; + // CDS response data. + Cluster default_cluster_; + std::map cds_response_data_; + ResponseState cds_response_state_ = NOT_SENT; + // EDS response data. + std::vector eds_responses_and_delays_; }; class LrsServiceImpl : public LrsService { @@ -621,7 +701,7 @@ class TestType { class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, - int client_load_reporting_interval_seconds) + int client_load_reporting_interval_seconds = 100) : server_host_("localhost"), num_backends_(num_backends), num_balancers_(num_balancers), @@ -656,7 +736,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // Start the load balancers. for (size_t i = 0; i < num_balancers_; ++i) { balancers_.emplace_back( - new BalancerServerThread(client_load_reporting_interval_seconds_)); + new BalancerServerThread(GetParam().enable_load_reporting() + ? client_load_reporting_interval_seconds_ + : 0)); balancers_.back()->Start(server_host_); } ResetStub(); @@ -692,7 +774,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // If the parent channel is using the fake resolver, we inject the // response generator for the parent here, and then SetNextResolution() // will inject the xds channel's response generator via the parent's - // reponse generator. + // response generator. // // In contrast, if we are using the xds resolver, then the parent // channel never uses a response generator, and we inject the xds @@ -868,7 +950,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { void ScheduleResponseForBalancer(size_t i, const DiscoveryResponse& response, int delay_ms) { - balancers_[i]->ads_service()->add_response(response, delay_ms); + balancers_[i]->ads_service()->AddEdsResponse(response, delay_ms); } Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000, @@ -984,7 +1066,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { class BalancerServerThread : public ServerThread { public: explicit BalancerServerThread(int client_load_reporting_interval = 0) - : lrs_service_(client_load_reporting_interval) {} + : ads_service_(client_load_reporting_interval > 0), + lrs_service_(client_load_reporting_interval) {} AdsServiceImpl* ads_service() { return &ads_service_; } LrsServiceImpl* lrs_service() { return &lrs_service_; } @@ -1046,7 +1129,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { class BasicTest : public XdsEnd2endTest { public: - BasicTest() : XdsEnd2endTest(4, 1, 0) {} + BasicTest() : XdsEnd2endTest(4, 1) {} }; // Tests that the balancer sends the correct response to the client, and the @@ -1252,6 +1335,73 @@ TEST_P(SecureNamingTest, TargetNameIsUnexpected) { ""); } +using CdsTest = BasicTest; + +// Tests that CDS client should send an ACK upon correct CDS response. +TEST_P(CdsTest, Vanilla) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + SendRpc(); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::ACKED); +} + +// Tests that CDS client should send a NACK if the cluster type in CDS response +// is other than EDS. +TEST_P(CdsTest, WrongClusterType) { + auto cluster = balancers_[0]->ads_service()->GetDefaultCluster(); + cluster.set_type(envoy::api::v2::Cluster::STATIC); + balancers_[0]->ads_service()->SetCdsResponse( + {{"application_target_name", std::move(cluster)}}); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + SendRpc(); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::NACKED); +} + +// Tests that CDS client should send a NACK if the eds_config in CDS response is +// other than ADS. +TEST_P(CdsTest, WrongEdsConfig) { + auto cluster = balancers_[0]->ads_service()->GetDefaultCluster(); + cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); + balancers_[0]->ads_service()->SetCdsResponse( + {{"application_target_name", std::move(cluster)}}); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + SendRpc(); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::NACKED); +} + +// Tests that CDS client should send a NACK if the lb_policy in CDS response is +// other than ROUND_ROBIN. +TEST_P(CdsTest, WrongLbPolicy) { + auto cluster = balancers_[0]->ads_service()->GetDefaultCluster(); + cluster.set_lb_policy(envoy::api::v2::Cluster::LEAST_REQUEST); + balancers_[0]->ads_service()->SetCdsResponse( + {{"application_target_name", std::move(cluster)}}); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + SendRpc(); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::NACKED); +} + +// Tests that CDS client should send a NACK if the lrs_server in CDS response is +// other than SELF. +TEST_P(CdsTest, WrongLrsServer) { + auto cluster = balancers_[0]->ads_service()->GetDefaultCluster(); + cluster.mutable_lrs_server()->mutable_ads(); + balancers_[0]->ads_service()->SetCdsResponse( + {{"application_target_name", std::move(cluster)}}); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + SendRpc(); + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::NACKED); +} + using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their @@ -1444,7 +1594,7 @@ TEST_P(FailoverTest, ChooseHighestPriority) { ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0); WaitForBackend(3, false); for (size_t i = 0; i < 3; ++i) { - EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } // The ADS service got a single request, and sent a single response. EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count()); @@ -1468,7 +1618,7 @@ TEST_P(FailoverTest, Failover) { WaitForBackend(1, false); for (size_t i = 0; i < 4; ++i) { if (i == 1) continue; - EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } // The ADS service got a single request, and sent a single response. EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count()); @@ -1493,7 +1643,7 @@ TEST_P(FailoverTest, SwitchBackToHigherPriority) { WaitForBackend(1, false); for (size_t i = 0; i < 4; ++i) { if (i == 1) continue; - EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } StartBackend(0); WaitForBackend(0); @@ -1532,7 +1682,7 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) { WaitForBackend(2, false); for (size_t i = 0; i < 4; ++i) { if (i == 2) continue; - EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } // The ADS service got a single request, and sent a single response. EXPECT_EQ(1U, balancers_[0]->ads_service()->request_count()); @@ -1561,7 +1711,7 @@ TEST_P(FailoverTest, UpdatePriority) { ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 1000); WaitForBackend(3, false); for (size_t i = 0; i < 3; ++i) { - EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } WaitForBackend(1); CheckRpcSendOk(kNumRpcs); @@ -2057,7 +2207,7 @@ TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) { class BalancerUpdateTest : public XdsEnd2endTest { public: - BalancerUpdateTest() : XdsEnd2endTest(4, 3, 0) {} + BalancerUpdateTest() : XdsEnd2endTest(4, 3) {} }; // Tests that the old LB call is still used after the balancer address update as @@ -2433,36 +2583,44 @@ grpc::string TestTypeName(const ::testing::TestParamInfo& info) { return info.param.AsString(); } -// TODO(juanlishen): Load reporting disabled is currently tested only with DNS -// resolver. Once we implement CDS, test it via the xds resolver too. - INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest, ::testing::Values(TestType(false, true), TestType(false, false), + TestType(true, false), TestType(true, true)), &TestTypeName); INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest, ::testing::Values(TestType(false, true), TestType(false, false), + TestType(true, false), + TestType(true, true)), + &TestTypeName); + +// CDS depends on XdsResolver. +INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, + ::testing::Values(TestType(true, false), TestType(true, true)), &TestTypeName); INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, ::testing::Values(TestType(false, true), TestType(false, false), + TestType(true, false), TestType(true, true)), &TestTypeName); INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest, ::testing::Values(TestType(false, true), TestType(false, false), + TestType(true, false), TestType(true, true)), &TestTypeName); INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest, ::testing::Values(TestType(false, true), TestType(false, false), + TestType(true, false), TestType(true, true)), &TestTypeName);