diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index cffba662576..83091dc6a70 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -366,9 +366,9 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster( GPR_ASSERT(0); break; } - if (state.update->lrs_load_reporting_server_name.has_value()) { - mechanism["lrsLoadReportingServerName"] = - state.update->lrs_load_reporting_server_name.value(); + if (state.update->lrs_load_reporting_server.has_value()) { + mechanism["lrsLoadReportingServer"] = + state.update->lrs_load_reporting_server->ToJson(); } discovery_mechanisms->emplace_back(std::move(mechanism)); return true; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 5064c05b905..02ded9134a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -119,14 +119,13 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { XdsClusterImplLbConfig( RefCountedPtr child_policy, std::string cluster_name, std::string eds_service_name, - absl::optional lrs_load_reporting_server_name, + absl::optional lrs_load_reporting_server, uint32_t max_concurrent_requests, RefCountedPtr drop_config) : child_policy_(std::move(child_policy)), cluster_name_(std::move(cluster_name)), eds_service_name_(std::move(eds_service_name)), - lrs_load_reporting_server_name_( - std::move(lrs_load_reporting_server_name)), + lrs_load_reporting_server_(std::move(lrs_load_reporting_server)), max_concurrent_requests_(max_concurrent_requests), drop_config_(std::move(drop_config)) {} @@ -137,8 +136,9 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { } const std::string& cluster_name() const { return cluster_name_; } const std::string& eds_service_name() const { return eds_service_name_; } - const absl::optional& lrs_load_reporting_server_name() const { - return lrs_load_reporting_server_name_; + const absl::optional& lrs_load_reporting_server() + const { + return lrs_load_reporting_server_; }; uint32_t max_concurrent_requests() const { return max_concurrent_requests_; } RefCountedPtr drop_config() const { @@ -149,7 +149,7 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config { RefCountedPtr child_policy_; std::string cluster_name_; std::string eds_service_name_; - absl::optional lrs_load_reporting_server_name_; + absl::optional lrs_load_reporting_server_; uint32_t max_concurrent_requests_; RefCountedPtr drop_config_; }; @@ -462,10 +462,19 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { config_ = std::move(args.config); // On initial update, create drop stats. if (is_initial_update) { - if (config_->lrs_load_reporting_server_name().has_value()) { + if (config_->lrs_load_reporting_server().has_value()) { drop_stats_ = xds_client_->AddClusterDropStats( - config_->lrs_load_reporting_server_name().value(), - config_->cluster_name(), config_->eds_service_name()); + config_->lrs_load_reporting_server().value(), config_->cluster_name(), + config_->eds_service_name()); + if (drop_stats_ == nullptr) { + gpr_log(GPR_ERROR, + "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for " + "LRS server %s, cluster %s, EDS service name %s, load " + "reporting for drops will not be done.", + this, config_->lrs_load_reporting_server()->server_uri.c_str(), + config_->cluster_name().c_str(), + config_->eds_service_name().c_str()); + } } call_counter_ = g_call_counter_map->GetOrCreate( config_->cluster_name(), config_->eds_service_name()); @@ -475,8 +484,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { // swapped out if that happens. GPR_ASSERT(config_->cluster_name() == old_config->cluster_name()); GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name()); - GPR_ASSERT(config_->lrs_load_reporting_server_name() == - old_config->lrs_load_reporting_server_name()); + GPR_ASSERT(config_->lrs_load_reporting_server() == + old_config->lrs_load_reporting_server()); } // Update picker if max_concurrent_requests has changed. if (is_initial_update || config_->max_concurrent_requests() != @@ -575,7 +584,7 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( if (xds_cluster_impl_policy_->shutting_down_) return nullptr; // If load reporting is enabled, wrap the subchannel such that it // includes the locality stats object, which will be used by the EdsPicker. - if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name() + if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server() .has_value()) { RefCountedPtr locality_name; auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey); @@ -586,15 +595,26 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( } RefCountedPtr locality_stats = xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats( - *xds_cluster_impl_policy_->config_ - ->lrs_load_reporting_server_name(), + xds_cluster_impl_policy_->config_->lrs_load_reporting_server() + .value(), xds_cluster_impl_policy_->config_->cluster_name(), xds_cluster_impl_policy_->config_->eds_service_name(), std::move(locality_name)); - return MakeRefCounted( - xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( - std::move(address), args), - std::move(locality_stats)); + if (locality_stats != nullptr) { + return MakeRefCounted( + xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( + std::move(address), args), + std::move(locality_stats)); + } + gpr_log(GPR_ERROR, + "[xds_cluster_impl_lb %p] Failed to get locality stats object for " + "LRS server %s, cluster %s, EDS service name %s; load reports will " + "not be generated (not wrapping subchannel)", + this, + xds_cluster_impl_policy_->config_->lrs_load_reporting_server() + ->server_uri.c_str(), + xds_cluster_impl_policy_->config_->cluster_name().c_str(), + xds_cluster_impl_policy_->config_->eds_service_name().c_str()); } // Load reporting not enabled, so don't wrap the subchannel. return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( @@ -715,14 +735,21 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { } } // LRS load reporting server name. - absl::optional lrs_load_reporting_server_name; - it = json.object_value().find("lrsLoadReportingServerName"); + absl::optional lrs_load_reporting_server; + it = json.object_value().find("lrsLoadReportingServer"); if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::STRING) { + if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:lrsLoadReportingServerName error:type should be string")); + "field:lrsLoadReportingServer error:type should be object")); } else { - lrs_load_reporting_server_name = it->second.string_value(); + grpc_error_handle parser_error; + lrs_load_reporting_server = XdsBootstrap::XdsServer::Parse( + it->second.object_value(), &parser_error); + if (parser_error != GRPC_ERROR_NONE) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING( + absl::StrCat("errors parsing lrs_load_reporting_server"))); + error_list.push_back(parser_error); + } } } // Max concurrent requests. @@ -758,7 +785,7 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { } return MakeRefCounted( std::move(child_policy), std::move(cluster_name), - std::move(eds_service_name), std::move(lrs_load_reporting_server_name), + std::move(eds_service_name), std::move(lrs_load_reporting_server), max_concurrent_requests, std::move(drop_config)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index d73a7441e36..c0d806ee233 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -34,6 +34,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" @@ -65,7 +66,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { public: struct DiscoveryMechanism { std::string cluster_name; - absl::optional lrs_load_reporting_server_name; + absl::optional lrs_load_reporting_server; uint32_t max_concurrent_requests; enum DiscoveryMechanismType { EDS, @@ -77,8 +78,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config { bool operator==(const DiscoveryMechanism& other) const { return (cluster_name == other.cluster_name && - lrs_load_reporting_server_name == - other.lrs_load_reporting_server_name && + lrs_load_reporting_server == other.lrs_load_reporting_server && max_concurrent_requests == other.max_concurrent_requests && type == other.type && eds_service_name == other.eds_service_name && @@ -887,10 +887,10 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() { xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second); } if (config_->discovery_mechanisms()[discovery_index] - .lrs_load_reporting_server_name.has_value()) { - xds_cluster_impl_config["lrsLoadReportingServerName"] = + .lrs_load_reporting_server.has_value()) { + xds_cluster_impl_config["lrsLoadReportingServer"] = config_->discovery_mechanisms()[discovery_index] - .lrs_load_reporting_server_name.value(); + .lrs_load_reporting_server->ToJson(); } Json locality_picking_policy = Json::Array{Json::Object{ {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)}, @@ -1151,14 +1151,20 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { discovery_mechanism->cluster_name = it->second.string_value(); } // LRS load reporting server name. - it = json.object_value().find("lrsLoadReportingServerName"); + it = json.object_value().find("lrsLoadReportingServer"); if (it != json.object_value().end()) { - if (it->second.type() != Json::Type::STRING) { + if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:lrsLoadReportingServerName error:type should be string")); + "field:lrsLoadReportingServer error:type should be object")); } else { - discovery_mechanism->lrs_load_reporting_server_name.emplace( - it->second.string_value()); + grpc_error_handle parse_error; + discovery_mechanism->lrs_load_reporting_server.emplace( + XdsBootstrap::XdsServer::Parse(it->second, &parse_error)); + if (parse_error != GRPC_ERROR_NONE) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING( + absl::StrCat("errors parsing lrs_load_reporting_server"))); + error_list.push_back(parse_error); + } } } // Max concurrent requests. diff --git a/src/core/ext/xds/upb_utils.h b/src/core/ext/xds/upb_utils.h index c6a90fbc64d..ea879b102f6 100644 --- a/src/core/ext/xds/upb_utils.h +++ b/src/core/ext/xds/upb_utils.h @@ -27,6 +27,7 @@ #include "upb/upb.hpp" #include "src/core/ext/xds/certificate_provider_store.h" +#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/lib/debug/trace.h" namespace grpc_core { @@ -38,6 +39,7 @@ class XdsClient; // passing through XdsApi code, maybe via the AdsResponseParser. struct XdsEncodingContext { XdsClient* client; // Used only for logging. Unsafe for dereferencing. + const XdsBootstrap::XdsServer& server; TraceFlag* tracer; upb_symtab* symtab; upb_arena* arena; diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index d9cc5b8828a..6ea9c6cc139 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -277,6 +277,7 @@ grpc_slice XdsApi::CreateAdsRequest( bool populate_node) { upb::Arena arena; const XdsEncodingContext context = {client_, + server, tracer_, symtab_->ptr(), arena.ptr(), @@ -356,6 +357,7 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, AdsResponseParserInterface* parser) { upb::Arena arena; const XdsEncodingContext context = {client_, + server, tracer_, symtab_->ptr(), arena.ptr(), @@ -431,6 +433,7 @@ grpc_slice XdsApi::CreateLrsInitialRequest( const XdsBootstrap::XdsServer& server) { upb::Arena arena; const XdsEncodingContext context = {client_, + server, tracer_, symtab_->ptr(), arena.ptr(), @@ -505,9 +508,16 @@ void LocalityStatsPopulate( grpc_slice XdsApi::CreateLrsRequest( ClusterLoadReportMap cluster_load_report_map) { upb::Arena arena; - const XdsEncodingContext context = { - client_, tracer_, symtab_->ptr(), - arena.ptr(), false, certificate_provider_definition_map_}; + // The xDS server info is not actually needed here, so we seed it with an + // empty value. + XdsBootstrap::XdsServer empty_server; + const XdsEncodingContext context = {client_, + empty_server, + tracer_, + symtab_->ptr(), + arena.ptr(), + false, + certificate_provider_definition_map_}; // Create a request. envoy_service_load_stats_v3_LoadStatsRequest* request = envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr()); @@ -629,9 +639,16 @@ std::string XdsApi::AssembleClientConfig( // Fill-in the node information auto* node = envoy_service_status_v3_ClientConfig_mutable_node(client_config, arena.ptr()); - const XdsEncodingContext context = { - client_, tracer_, symtab_->ptr(), - arena.ptr(), true, certificate_provider_definition_map_}; + // The xDS server info is not actually needed here, so we seed it with an + // empty value. + XdsBootstrap::XdsServer empty_server; + const XdsEncodingContext context = {client_, + empty_server, + tracer_, + symtab_->ptr(), + arena.ptr(), + true, + certificate_provider_definition_map_}; PopulateNode(context, node_, build_version_, user_agent_name_, user_agent_version_, node); // Dump each resource. diff --git a/src/core/ext/xds/xds_bootstrap.cc b/src/core/ext/xds/xds_bootstrap.cc index 81d86fb5fff..e62ce7fb8d4 100644 --- a/src/core/ext/xds/xds_bootstrap.cc +++ b/src/core/ext/xds/xds_bootstrap.cc @@ -137,6 +137,25 @@ XdsBootstrap::XdsServer XdsBootstrap::XdsServer::Parse( return server; } +Json::Object XdsBootstrap::XdsServer::ToJson() const { + Json::Object channel_creds_json{{"type", channel_creds_type}}; + if (channel_creds_config.type() != Json::Type::JSON_NULL) { + channel_creds_json["config"] = channel_creds_config; + } + Json::Object json{ + {"server_uri", server_uri}, + {"channel_creds", Json::Array{std::move(channel_creds_json)}}, + }; + if (!server_features.empty()) { + Json::Array server_features_json; + for (auto& feature : server_features) { + server_features_json.emplace_back(feature); + } + json["server_features"] = std::move(server_features_json); + } + return json; +} + bool XdsBootstrap::XdsServer::ShouldUseV3() const { return server_features.find("xds_v3") != server_features.end(); } @@ -244,6 +263,17 @@ const XdsBootstrap::Authority* XdsBootstrap::LookupAuthority( return nullptr; } +bool XdsBootstrap::XdsServerExists( + const XdsBootstrap::XdsServer& server) const { + if (server == servers_[0]) return true; + for (auto& authority : authorities_) { + for (auto& xds_server : authority.second.xds_servers) { + if (server == xds_server) return true; + } + } + return false; +} + grpc_error_handle XdsBootstrap::ParseXdsServerList( Json* json, absl::InlinedVector* servers) { std::vector error_list; diff --git a/src/core/ext/xds/xds_bootstrap.h b/src/core/ext/xds/xds_bootstrap.h index 10511e5e9ff..181b3320be4 100644 --- a/src/core/ext/xds/xds_bootstrap.h +++ b/src/core/ext/xds/xds_bootstrap.h @@ -58,6 +58,13 @@ class XdsBootstrap { static XdsServer Parse(const Json& json, grpc_error_handle* error); + bool operator==(const XdsServer& other) const { + return (server_uri == other.server_uri && + channel_creds_type == other.channel_creds_type && + channel_creds_config == other.channel_creds_config && + server_features == other.server_features); + } + bool operator<(const XdsServer& other) const { if (server_uri < other.server_uri) return true; if (channel_creds_type < other.channel_creds_type) return true; @@ -68,6 +75,8 @@ class XdsBootstrap { return false; } + Json::Object ToJson() const; + bool ShouldUseV3() const; }; @@ -105,6 +114,8 @@ class XdsBootstrap { const { return certificate_providers_; } + // A util method to check that an xds server exists in this bootstrap file. + bool XdsServerExists(const XdsServer& server) const; private: grpc_error_handle ParseXdsServerList( diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 5660dfbb5ad..64812f63b48 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -581,7 +581,10 @@ void XdsClient::ChannelState::MaybeStartLrsCall() { WeakRef(DEBUG_LOCATION, "ChannelState+lrs"))); } -void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); } +void XdsClient::ChannelState::StopLrsCallLocked() { + xds_client_->xds_load_report_server_map_.erase(server_); + lrs_calld_.reset(); +} void XdsClient::ChannelState::StartConnectivityWatchLocked() { ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_); @@ -1385,15 +1388,19 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. XdsApi::ClusterLoadReportMap snapshot = - xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_, + xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_, + parent_->send_all_clusters_, parent_->cluster_names_); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. const bool old_val = last_report_counters_were_zero_; last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); if (old_val && last_report_counters_were_zero_) { - if (xds_client()->load_report_map_.empty()) { - parent_->chand()->StopLrsCall(); + auto it = xds_client()->xds_load_report_server_map_.find( + parent_->chand()->server_); + if (it == xds_client()->xds_load_report_server_map_.end() || + it->second.load_report_map.empty()) { + it->second.channel_state->StopLrsCallLocked(); return true; } ScheduleNextReportLocked(); @@ -1439,8 +1446,11 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( grpc_byte_buffer_destroy(parent_->send_message_payload_); parent_->send_message_payload_ = nullptr; // If there are no more registered stats to report, cancel the call. - if (xds_client()->load_report_map_.empty()) { - parent_->chand()->StopLrsCall(); + auto it = + xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_); + if (it == xds_client()->xds_load_report_server_map_.end() || + it->second.load_report_map.empty()) { + it->second.channel_state->StopLrsCallLocked(); GRPC_ERROR_UNREF(error); return true; } @@ -1755,7 +1765,6 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( } // Ignore status from a stale call. if (IsCurrentCallOnChannel()) { - GPR_ASSERT(!xds_client()->shutting_down_); // Try to restart the call. parent_->OnCallFinishedLocked(); } @@ -1899,8 +1908,6 @@ void XdsClient::WatchResource(const XdsResourceType* type, { MutexLock lock(&mu_); MaybeRegisterResourceTypeLocked(type); - // TODO(donnadionne): If we get a request for an authority that is not - // configured in the bootstrap file, reject it. AuthorityState& authority_state = authority_state_map_[resource_name->authority]; ResourceState& resource_state = @@ -2036,20 +2043,26 @@ std::string XdsClient::ConstructFullXdsResourceName( } RefCountedPtr XdsClient::AddClusterDropStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name) { - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. + if (!bootstrap_->XdsServerExists(xds_server)) return nullptr; auto key = std::make_pair(std::string(cluster_name), std::string(eds_service_name)); MutexLock lock(&mu_); - // We jump through some hoops here to make sure that the absl::string_views - // stored in the XdsClusterDropStats object point to the strings + // We jump through some hoops here to make sure that the const + // XdsBootstrap::XdsServer& and absl::string_views + // stored in the XdsClusterDropStats object point to the + // XdsBootstrap::XdsServer and strings // in the load_report_map_ key, so that they have the same lifetime. - auto it = load_report_map_ - .emplace(std::make_pair(std::move(key), LoadReportState())) - .first; - LoadReportState& load_report_state = it->second; + auto server_it = + xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first; + if (server_it->second.channel_state == nullptr) { + server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server); + } + auto load_report_it = server_it->second.load_report_map + .emplace(std::move(key), LoadReportState()) + .first; + LoadReportState& load_report_state = load_report_it->second; RefCountedPtr cluster_drop_stats; if (load_report_state.drop_stats != nullptr) { cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero(); @@ -2060,32 +2073,26 @@ RefCountedPtr XdsClient::AddClusterDropStats( load_report_state.drop_stats->GetSnapshotAndReset(); } cluster_drop_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "DropStats"), lrs_server, - it->first.first /*cluster_name*/, - it->first.second /*eds_service_name*/); + Ref(DEBUG_LOCATION, "DropStats"), server_it->first, + load_report_it->first.first /*cluster_name*/, + load_report_it->first.second /*eds_service_name*/); load_report_state.drop_stats = cluster_drop_stats.get(); } - auto resource_name = - ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get()); - GPR_ASSERT(resource_name.ok()); - auto a = authority_state_map_.find(resource_name->authority); - if (a != authority_state_map_.end()) { - a->second.channel_state->MaybeStartLrsCall(); - } + server_it->second.channel_state->MaybeStartLrsCall(); return cluster_drop_stats; } void XdsClient::RemoveClusterDropStats( - absl::string_view /*lrs_server*/, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats) { MutexLock lock(&mu_); - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. - auto it = load_report_map_.find( + auto server_it = xds_load_report_server_map_.find(xds_server); + if (server_it == xds_load_report_server_map_.end()) return; + auto load_report_it = server_it->second.load_report_map.find( std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (it == load_report_map_.end()) return; - LoadReportState& load_report_state = it->second; + if (load_report_it == server_it->second.load_report_map.end()) return; + LoadReportState& load_report_state = load_report_it->second; if (load_report_state.drop_stats == cluster_drop_stats) { // Record final snapshot in deleted_drop_stats, which will be // added to the next load report. @@ -2096,21 +2103,27 @@ void XdsClient::RemoveClusterDropStats( } RefCountedPtr XdsClient::AddClusterLocalityStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr locality) { - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. + if (!bootstrap_->XdsServerExists(xds_server)) return nullptr; auto key = std::make_pair(std::string(cluster_name), std::string(eds_service_name)); MutexLock lock(&mu_); - // We jump through some hoops here to make sure that the absl::string_views - // stored in the XdsClusterLocalityStats object point to the strings + // We jump through some hoops here to make sure that the const + // XdsBootstrap::XdsServer& and absl::string_views + // stored in the XdsClusterDropStats object point to the + // XdsBootstrap::XdsServer and strings // in the load_report_map_ key, so that they have the same lifetime. - auto it = load_report_map_ - .emplace(std::make_pair(std::move(key), LoadReportState())) - .first; - LoadReportState& load_report_state = it->second; + auto server_it = + xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first; + if (server_it->second.channel_state == nullptr) { + server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server); + } + auto load_report_it = server_it->second.load_report_map + .emplace(std::move(key), LoadReportState()) + .first; + LoadReportState& load_report_state = load_report_it->second; LoadReportState::LocalityState& locality_state = load_report_state.locality_stats[locality]; RefCountedPtr cluster_locality_stats; @@ -2123,33 +2136,27 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( locality_state.locality_stats->GetSnapshotAndReset(); } cluster_locality_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, - it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, - std::move(locality)); + Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first, + load_report_it->first.first /*cluster_name*/, + load_report_it->first.second /*eds_service_name*/, std::move(locality)); locality_state.locality_stats = cluster_locality_stats.get(); } - auto resource_name = - ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get()); - GPR_ASSERT(resource_name.ok()); - auto a = authority_state_map_.find(resource_name->authority); - if (a != authority_state_map_.end()) { - a->second.channel_state->MaybeStartLrsCall(); - } + server_it->second.channel_state->MaybeStartLrsCall(); return cluster_locality_stats; } void XdsClient::RemoveClusterLocalityStats( - absl::string_view /*lrs_server*/, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, const RefCountedPtr& locality, XdsClusterLocalityStats* cluster_locality_stats) { MutexLock lock(&mu_); - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. - auto it = load_report_map_.find( + auto server_it = xds_load_report_server_map_.find(xds_server); + if (server_it == xds_load_report_server_map_.end()) return; + auto load_report_it = server_it->second.load_report_map.find( std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (it == load_report_map_.end()) return; - LoadReportState& load_report_state = it->second; + if (load_report_it == server_it->second.load_report_map.end()) return; + LoadReportState& load_report_state = load_report_it->second; auto locality_it = load_report_state.locality_stats.find(locality); if (locality_it == load_report_state.locality_stats.end()) return; LoadReportState::LocalityState& locality_state = locality_it->second; @@ -2193,13 +2200,17 @@ void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) { } XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( - bool send_all_clusters, const std::set& clusters) { + const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, + const std::set& clusters) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); } XdsApi::ClusterLoadReportMap snapshot_map; - for (auto load_report_it = load_report_map_.begin(); - load_report_it != load_report_map_.end();) { + auto server_it = xds_load_report_server_map_.find(xds_server); + if (server_it == xds_load_report_server_map_.end()) return snapshot_map; + auto& load_report_map = server_it->second.load_report_map; + for (auto load_report_it = load_report_map.begin(); + load_report_it != load_report_map.end();) { // Cluster key is cluster and EDS service name. const auto& cluster_key = load_report_it->first; LoadReportState& load_report = load_report_it->second; @@ -2265,7 +2276,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( // deleted stats objects, remove the entry. if (load_report.locality_stats.empty() && load_report.drop_stats == nullptr) { - load_report_it = load_report_map_.erase(load_report_it); + load_report_it = load_report_map.erase(load_report_it); } else { ++load_report_it; } diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 6134b115465..d0e69250329 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -114,9 +114,9 @@ class XdsClient : public DualRefCounted { // Adds and removes drop stats for cluster_name and eds_service_name. RefCountedPtr AddClusterDropStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name); - void RemoveClusterDropStats(absl::string_view /*lrs_server*/, + void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats); @@ -124,11 +124,11 @@ class XdsClient : public DualRefCounted { // Adds and removes locality stats for cluster_name and eds_service_name // for the specified locality. RefCountedPtr AddClusterLocalityStats( - absl::string_view lrs_server, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr locality); void RemoveClusterLocalityStats( - absl::string_view /*lrs_server*/, absl::string_view cluster_name, + const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name, absl::string_view eds_service_name, const RefCountedPtr& locality, XdsClusterLocalityStats* cluster_locality_stats); @@ -189,7 +189,7 @@ class XdsClient : public DualRefCounted { LrsCallState* lrs_calld() const; void MaybeStartLrsCall(); - void StopLrsCall(); + void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool HasAdsCall() const; bool HasActiveAdsCall() const; @@ -255,6 +255,16 @@ class XdsClient : public DualRefCounted { grpc_millis last_report_time = ExecCtx::Get()->Now(); }; + // Load report data. + using LoadReportMap = std::map< + std::pair, + LoadReportState>; + + struct LoadReportServer { + RefCountedPtr channel_state; + LoadReportMap load_report_map; + }; + class Notifier; // Sends an error notification to all watchers. @@ -275,8 +285,8 @@ class XdsClient : public DualRefCounted { const XdsResourceKey& key); XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( - bool send_all_clusters, const std::set& clusters) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters, + const std::set& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); RefCountedPtr GetOrCreateChannelStateLocked( const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -305,11 +315,8 @@ class XdsClient : public DualRefCounted { std::map authority_state_map_ ABSL_GUARDED_BY(mu_); - // Load report data. - std::map< - std::pair, - LoadReportState> - load_report_map_ ABSL_GUARDED_BY(mu_); + std::map + xds_load_report_server_map_ ABSL_GUARDED_BY(mu_); // Stores started watchers whose resource name was not parsed successfully, // waiting to be cancelled or reset in Orphan(). diff --git a/src/core/ext/xds/xds_client_stats.cc b/src/core/ext/xds/xds_client_stats.cc index 437541dc1a1..3cd26527bca 100644 --- a/src/core/ext/xds/xds_client_stats.cc +++ b/src/core/ext/xds/xds_client_stats.cc @@ -41,20 +41,20 @@ uint64_t GetAndResetCounter(std::atomic* from) { // XdsClusterDropStats // -XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr xds_client, - absl::string_view lrs_server_name, - absl::string_view cluster_name, - absl::string_view eds_service_name) +XdsClusterDropStats::XdsClusterDropStats( + RefCountedPtr xds_client, + const XdsBootstrap::XdsServer& lrs_server, absl::string_view cluster_name, + absl::string_view eds_service_name) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClusterDropStats" : nullptr), xds_client_(std::move(xds_client)), - lrs_server_name_(lrs_server_name), + lrs_server_(lrs_server), cluster_name_(cluster_name), eds_service_name_(eds_service_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}", - xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + xds_client_.get(), this, lrs_server_.server_uri.c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str()); } @@ -64,11 +64,11 @@ XdsClusterDropStats::~XdsClusterDropStats() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying drop stats %p for {%s, %s, %s}", - xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + xds_client_.get(), this, lrs_server_.server_uri.c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str()); } - xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_, + xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_, eds_service_name_, this); xds_client_.reset(DEBUG_LOCATION, "DropStats"); } @@ -95,21 +95,21 @@ void XdsClusterDropStats::AddCallDropped(const std::string& category) { // XdsClusterLocalityStats::XdsClusterLocalityStats( - RefCountedPtr xds_client, absl::string_view lrs_server_name, - absl::string_view cluster_name, absl::string_view eds_service_name, - RefCountedPtr name) + RefCountedPtr xds_client, + const XdsBootstrap::XdsServer& lrs_server, absl::string_view cluster_name, + absl::string_view eds_service_name, RefCountedPtr name) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClusterLocalityStats" : nullptr), xds_client_(std::move(xds_client)), - lrs_server_name_(lrs_server_name), + lrs_server_(lrs_server), cluster_name_(cluster_name), eds_service_name_(eds_service_name), name_(std::move(name)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] created locality stats %p for {%s, %s, %s, %s}", - xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + xds_client_.get(), this, lrs_server_.server_uri.c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str(), name_->AsHumanReadableString().c_str()); @@ -120,12 +120,12 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}", - xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + xds_client_.get(), this, lrs_server_.server_uri.c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str(), name_->AsHumanReadableString().c_str()); } - xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_, + xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_, eds_service_name_, name_, this); xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); } diff --git a/src/core/ext/xds/xds_client_stats.h b/src/core/ext/xds/xds_client_stats.h index 7c123910da1..9ed5cf4a8c6 100644 --- a/src/core/ext/xds/xds_client_stats.h +++ b/src/core/ext/xds/xds_client_stats.h @@ -29,6 +29,7 @@ #include "absl/strings/str_format.h" #include "absl/strings/string_view.h" +#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -128,7 +129,7 @@ class XdsClusterDropStats : public RefCounted { }; XdsClusterDropStats(RefCountedPtr xds_client, - absl::string_view lrs_server_name, + const XdsBootstrap::XdsServer& lrs_server, absl::string_view cluster_name, absl::string_view eds_service_name); ~XdsClusterDropStats() override; @@ -141,7 +142,7 @@ class XdsClusterDropStats : public RefCounted { private: RefCountedPtr xds_client_; - absl::string_view lrs_server_name_; + const XdsBootstrap::XdsServer& lrs_server_; absl::string_view cluster_name_; absl::string_view eds_service_name_; std::atomic uncategorized_drops_{0}; @@ -202,7 +203,7 @@ class XdsClusterLocalityStats : public RefCounted { }; XdsClusterLocalityStats(RefCountedPtr xds_client, - absl::string_view lrs_server_name, + const XdsBootstrap::XdsServer& lrs_server_, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr name); @@ -216,7 +217,7 @@ class XdsClusterLocalityStats : public RefCounted { private: RefCountedPtr xds_client_; - absl::string_view lrs_server_name_; + const XdsBootstrap::XdsServer& lrs_server_; absl::string_view cluster_name_; absl::string_view eds_service_name_; RefCountedPtr name_; diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index b2718aef139..7d12275c54f 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -70,9 +70,9 @@ std::string XdsClusterResource::ToString() const { contents.push_back(absl::StrFormat("common_tls_context=%s", common_tls_context.ToString())); } - if (lrs_load_reporting_server_name.has_value()) { + if (lrs_load_reporting_server.has_value()) { contents.push_back(absl::StrFormat("lrs_load_reporting_server_name=%s", - lrs_load_reporting_server_name.value())); + lrs_load_reporting_server->server_uri)); } contents.push_back(absl::StrCat("lb_policy=", lb_policy)); if (lb_policy == "RING_HASH") { @@ -369,7 +369,7 @@ grpc_error_handle CdsResourceParse( errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( ": LRS ConfigSource is not self.")); } - cds_update->lrs_load_reporting_server_name.emplace(""); + cds_update->lrs_load_reporting_server.emplace(context.server); } // The Cluster resource encodes the circuit breaking parameters in a list of // Thresholds messages, where each message specifies the parameters for a diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index a78475324bb..35cb56ae484 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -52,9 +52,7 @@ struct XdsClusterResource { // The LRS server to use for load reporting. // If not set, load reporting will be disabled. - // If set to the empty string, will use the same server we obtained the CDS - // data from. - absl::optional lrs_load_reporting_server_name; + absl::optional lrs_load_reporting_server; // The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH"). std::string lb_policy; @@ -71,8 +69,7 @@ struct XdsClusterResource { dns_hostname == other.dns_hostname && prioritized_cluster_names == other.prioritized_cluster_names && common_tls_context == other.common_tls_context && - lrs_load_reporting_server_name == - other.lrs_load_reporting_server_name && + lrs_load_reporting_server == other.lrs_load_reporting_server && lb_policy == other.lb_policy && min_ring_size == other.min_ring_size && max_ring_size == other.max_ring_size && diff --git a/test/core/xds/xds_bootstrap_test.cc b/test/core/xds/xds_bootstrap_test.cc index 67301ef7176..f18efd560c3 100644 --- a/test/core/xds/xds_bootstrap_test.cc +++ b/test/core/xds/xds_bootstrap_test.cc @@ -738,6 +738,32 @@ TEST(XdsBootstrapTest, CertificateProvidersFakePluginEmptyConfig) { 0); } +TEST(XdsBootstrapTest, XdsServerToJsonAndParse) { + gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true"); + const char* json_str = + " {" + " \"server_uri\": \"fake:///lb\"," + " \"channel_creds\": [" + " {" + " \"type\": \"fake\"," + " \"ignore\": 0" + " }" + " ]," + " \"ignore\": 0" + " }"; + grpc_error_handle error = GRPC_ERROR_NONE; + Json json = Json::Parse(json_str, &error); + ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error); + XdsBootstrap::XdsServer xds_server = + XdsBootstrap::XdsServer::Parse(json, &error); + ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error); + Json::Object output = xds_server.ToJson(); + XdsBootstrap::XdsServer output_xds_server = + XdsBootstrap::XdsServer::Parse(output, &error); + ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error); + gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"); +} + } // namespace } // namespace testing } // namespace grpc_core diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index fc323ec6d78..8e6b428d164 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -2580,7 +2580,7 @@ TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) { class XdsFederationTest : public XdsEnd2endTest { protected: - XdsFederationTest() : XdsEnd2endTest(2, 100, 0, true) { + XdsFederationTest() : XdsEnd2endTest(2, 3, 0, true) { authority_balancer_ = CreateAndStartBalancer(); } @@ -2876,6 +2876,121 @@ TEST_P(XdsFederationTest, FederationServer) { gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"); } +using XdsFederationLoadReportingTest = XdsFederationTest; + +// Channel is created with URI "xds://xds.example.com/server.example.com". +// Bootstrap entry for that authority specifies a client listener name template. +// Sending traffic to both default balancer and authority balancer and checking +// load reporting with each one. +TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) { + gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true"); + const char* kAuthority = "xds.example.com"; + const char* kNewServerName = "whee%/server.example.com"; + const char* kNewListenerTemplate = + "xdstp://xds.example.com/envoy.config.listener.v3.Listener/" + "client/%s?psm_project_id=1234"; + const char* kNewListenerName = + "xdstp://xds.example.com/envoy.config.listener.v3.Listener/" + "client/whee%25/server.example.com?psm_project_id=1234"; + const char* kNewRouteConfigName = + "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/" + "new_route_config_name"; + const char* kNewEdsServiceName = + "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/" + "edsservice_name"; + const char* kNewClusterName = + "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/" + "cluster_name"; + const size_t kNumRpcsToDefaultBalancer = 5; + const size_t kNumRpcsToAuthorityBalancer = 10; + BootstrapBuilder builder = BootstrapBuilder(); + builder.AddAuthority(kAuthority, + absl::StrCat("localhost:", authority_balancer_->port()), + kNewListenerTemplate); + CreateClientsAndServers(builder); + StartAllBackends(); + // Eds for 2 balancers to ensure RPCs sent using current stub go to backend 0 + // and RPCs sent using the new stub go to backend 1. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + authority_balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsServiceName)); + authority_balancer_->lrs_service()->set_cluster_names({kNewClusterName}); + // New cluster + Cluster new_cluster = default_cluster_; + new_cluster.set_name(kNewClusterName); + new_cluster.mutable_lrs_server()->mutable_self(); + new_cluster.mutable_eds_cluster_config()->set_service_name( + kNewEdsServiceName); + authority_balancer_->ads_service()->SetCdsResource(new_cluster); + // New Route + RouteConfiguration new_route_config = default_route_config_; + new_route_config.set_name(kNewRouteConfigName); + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + // New Listener + Listener listener = default_listener_; + listener.set_name(kNewListenerName); + SetListenerAndRouteConfiguration(authority_balancer_.get(), listener, + new_route_config); + // Ensure update has reached and send 10 RPCs to the current stub. + CheckRpcSendOk(kNumRpcsToDefaultBalancer); + // Create second channel to new target uri and send 1 RPC . + auto channel2 = + CreateChannel(/*failover_timeout=*/0, kNewServerName, kAuthority); + channel2->GetState(/*try_to_connect=*/true); + ASSERT_TRUE( + channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100))); + auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); + for (size_t i = 0; i < kNumRpcsToAuthorityBalancer; ++i) { + ClientContext context; + EchoRequest request; + request.set_message(kRequestMessage); + EchoResponse response; + grpc::Status status = stub2->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + } + // Each backend should have received the expected number of RPCs, + // and the load report also reflect the correct numbers. + EXPECT_EQ(kNumRpcsToAuthorityBalancer, + backends_[1]->backend_service()->request_count()); + EXPECT_EQ(kNumRpcsToDefaultBalancer, + backends_[0]->backend_service()->request_count()); + // Load report for authority LRS. + std::vector authority_load_report = + authority_balancer_->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(authority_load_report.size(), 1UL); + ClientStats& authority_client_stats = authority_load_report.front(); + EXPECT_EQ(kNumRpcsToAuthorityBalancer, + authority_client_stats.total_successful_requests()); + EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress()); + EXPECT_EQ(kNumRpcsToAuthorityBalancer, + authority_client_stats.total_issued_requests()); + EXPECT_EQ(0U, authority_client_stats.total_error_requests()); + EXPECT_EQ(0U, authority_client_stats.total_dropped_requests()); + EXPECT_EQ(1U, authority_balancer_->lrs_service()->request_count()); + EXPECT_EQ(1U, authority_balancer_->lrs_service()->response_count()); + // Load report for default LRS. + std::vector default_load_report = + balancer_->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(default_load_report.size(), 1UL); + ClientStats& default_client_stats = default_load_report.front(); + EXPECT_EQ(kNumRpcsToDefaultBalancer, + default_client_stats.total_successful_requests()); + EXPECT_EQ(0U, default_client_stats.total_requests_in_progress()); + EXPECT_EQ(kNumRpcsToDefaultBalancer, + default_client_stats.total_issued_requests()); + EXPECT_EQ(0U, default_client_stats.total_error_requests()); + EXPECT_EQ(0U, default_client_stats.total_dropped_requests()); + EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); + EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); + gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"); +} + class SecureNamingTest : public XdsEnd2endTest { public: SecureNamingTest() @@ -13575,14 +13690,22 @@ INSTANTIATE_TEST_SUITE_P( XdsTest, XdsFederationTest, ::testing::Values( TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar), - TestType() - .set_bootstrap_source(TestType::kBootstrapFromEnvVar) - .set_enable_load_reporting(), TestType() .set_bootstrap_source(TestType::kBootstrapFromEnvVar) .set_enable_rds_testing()), &TestTypeName); +INSTANTIATE_TEST_SUITE_P( + XdsTest, XdsFederationLoadReportingTest, + ::testing::Values(TestType() + .set_bootstrap_source(TestType::kBootstrapFromEnvVar) + .set_enable_load_reporting(), + TestType() + .set_bootstrap_source(TestType::kBootstrapFromEnvVar) + .set_enable_load_reporting() + .set_enable_rds_testing()), + &TestTypeName); + INSTANTIATE_TEST_SUITE_P( XdsTest, LocalityMapTest, ::testing::Values(TestType(), TestType().set_enable_load_reporting()),