LRS changes for federation (#28504)

* Passing xds server object instead of just a string name

* Adding xds server to policy

* Refactor ToJson

* Using XdsServer for load reporting

* code review comments

* fixing code review comments

* Taking care of lifetime of the XdsServer key

* code review comments

* Fixing channel_state storage and re-run tests (1 assert hit)

* Checking for server in the bootstrap file

* Adding LRS test

* adding a bootstrap file ToJson and parse test

* fixing code review comments

* fixing code review comments.

* fixing test

* break out the federation lrs test

* Fixed last bit of code review comments

* fixing error message to be more precise
pull/28565/head
donnadionne 3 years ago committed by GitHub
parent 01011ab259
commit 55db347396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 67
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  3. 28
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  4. 2
      src/core/ext/xds/upb_utils.h
  5. 29
      src/core/ext/xds/xds_api.cc
  6. 30
      src/core/ext/xds/xds_bootstrap.cc
  7. 11
      src/core/ext/xds/xds_bootstrap.h
  8. 133
      src/core/ext/xds/xds_client.cc
  9. 31
      src/core/ext/xds/xds_client.h
  10. 28
      src/core/ext/xds/xds_client_stats.cc
  11. 9
      src/core/ext/xds/xds_client_stats.h
  12. 6
      src/core/ext/xds/xds_cluster.cc
  13. 7
      src/core/ext/xds/xds_cluster.h
  14. 26
      test/core/xds/xds_bootstrap_test.cc
  15. 125
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -366,9 +366,9 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster(
GPR_ASSERT(0); GPR_ASSERT(0);
break; break;
} }
if (state.update->lrs_load_reporting_server_name.has_value()) { if (state.update->lrs_load_reporting_server.has_value()) {
mechanism["lrsLoadReportingServerName"] = mechanism["lrsLoadReportingServer"] =
state.update->lrs_load_reporting_server_name.value(); state.update->lrs_load_reporting_server->ToJson();
} }
discovery_mechanisms->emplace_back(std::move(mechanism)); discovery_mechanisms->emplace_back(std::move(mechanism));
return true; return true;

@ -119,14 +119,13 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
XdsClusterImplLbConfig( XdsClusterImplLbConfig(
RefCountedPtr<LoadBalancingPolicy::Config> child_policy, RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string cluster_name, std::string eds_service_name, std::string cluster_name, std::string eds_service_name,
absl::optional<std::string> lrs_load_reporting_server_name, absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server,
uint32_t max_concurrent_requests, uint32_t max_concurrent_requests,
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config) RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
: child_policy_(std::move(child_policy)), : child_policy_(std::move(child_policy)),
cluster_name_(std::move(cluster_name)), cluster_name_(std::move(cluster_name)),
eds_service_name_(std::move(eds_service_name)), eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_name_( lrs_load_reporting_server_(std::move(lrs_load_reporting_server)),
std::move(lrs_load_reporting_server_name)),
max_concurrent_requests_(max_concurrent_requests), max_concurrent_requests_(max_concurrent_requests),
drop_config_(std::move(drop_config)) {} drop_config_(std::move(drop_config)) {}
@ -137,8 +136,9 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
} }
const std::string& cluster_name() const { return cluster_name_; } const std::string& cluster_name() const { return cluster_name_; }
const std::string& eds_service_name() const { return eds_service_name_; } const std::string& eds_service_name() const { return eds_service_name_; }
const absl::optional<std::string>& lrs_load_reporting_server_name() const { const absl::optional<XdsBootstrap::XdsServer>& lrs_load_reporting_server()
return lrs_load_reporting_server_name_; const {
return lrs_load_reporting_server_;
}; };
uint32_t max_concurrent_requests() const { return max_concurrent_requests_; } uint32_t max_concurrent_requests() const { return max_concurrent_requests_; }
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config() const { RefCountedPtr<XdsEndpointResource::DropConfig> drop_config() const {
@ -149,7 +149,7 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_; RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_; std::string cluster_name_;
std::string eds_service_name_; std::string eds_service_name_;
absl::optional<std::string> lrs_load_reporting_server_name_; absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server_;
uint32_t max_concurrent_requests_; uint32_t max_concurrent_requests_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_; RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
}; };
@ -462,10 +462,19 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
config_ = std::move(args.config); config_ = std::move(args.config);
// On initial update, create drop stats. // On initial update, create drop stats.
if (is_initial_update) { if (is_initial_update) {
if (config_->lrs_load_reporting_server_name().has_value()) { if (config_->lrs_load_reporting_server().has_value()) {
drop_stats_ = xds_client_->AddClusterDropStats( drop_stats_ = xds_client_->AddClusterDropStats(
config_->lrs_load_reporting_server_name().value(), config_->lrs_load_reporting_server().value(), config_->cluster_name(),
config_->cluster_name(), config_->eds_service_name()); config_->eds_service_name());
if (drop_stats_ == nullptr) {
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
"LRS server %s, cluster %s, EDS service name %s, load "
"reporting for drops will not be done.",
this, config_->lrs_load_reporting_server()->server_uri.c_str(),
config_->cluster_name().c_str(),
config_->eds_service_name().c_str());
}
} }
call_counter_ = g_call_counter_map->GetOrCreate( call_counter_ = g_call_counter_map->GetOrCreate(
config_->cluster_name(), config_->eds_service_name()); config_->cluster_name(), config_->eds_service_name());
@ -475,8 +484,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
// swapped out if that happens. // swapped out if that happens.
GPR_ASSERT(config_->cluster_name() == old_config->cluster_name()); GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name()); GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
GPR_ASSERT(config_->lrs_load_reporting_server_name() == GPR_ASSERT(config_->lrs_load_reporting_server() ==
old_config->lrs_load_reporting_server_name()); old_config->lrs_load_reporting_server());
} }
// Update picker if max_concurrent_requests has changed. // Update picker if max_concurrent_requests has changed.
if (is_initial_update || config_->max_concurrent_requests() != if (is_initial_update || config_->max_concurrent_requests() !=
@ -575,7 +584,7 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
if (xds_cluster_impl_policy_->shutting_down_) return nullptr; if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it // If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker. // includes the locality stats object, which will be used by the EdsPicker.
if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name() if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
.has_value()) { .has_value()) {
RefCountedPtr<XdsLocalityName> locality_name; RefCountedPtr<XdsLocalityName> locality_name;
auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey); auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
@ -586,16 +595,27 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
} }
RefCountedPtr<XdsClusterLocalityStats> locality_stats = RefCountedPtr<XdsClusterLocalityStats> locality_stats =
xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats( xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
*xds_cluster_impl_policy_->config_ xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
->lrs_load_reporting_server_name(), .value(),
xds_cluster_impl_policy_->config_->cluster_name(), xds_cluster_impl_policy_->config_->cluster_name(),
xds_cluster_impl_policy_->config_->eds_service_name(), xds_cluster_impl_policy_->config_->eds_service_name(),
std::move(locality_name)); std::move(locality_name));
if (locality_stats != nullptr) {
return MakeRefCounted<StatsSubchannelWrapper>( return MakeRefCounted<StatsSubchannelWrapper>(
xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args), std::move(address), args),
std::move(locality_stats)); std::move(locality_stats));
} }
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get locality stats object for "
"LRS server %s, cluster %s, EDS service name %s; load reports will "
"not be generated (not wrapping subchannel)",
this,
xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
->server_uri.c_str(),
xds_cluster_impl_policy_->config_->cluster_name().c_str(),
xds_cluster_impl_policy_->config_->eds_service_name().c_str());
}
// Load reporting not enabled, so don't wrap the subchannel. // Load reporting not enabled, so don't wrap the subchannel.
return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel( return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args); std::move(address), args);
@ -715,14 +735,21 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
} }
} }
// LRS load reporting server name. // LRS load reporting server name.
absl::optional<std::string> lrs_load_reporting_server_name; absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
it = json.object_value().find("lrsLoadReportingServerName"); it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) { if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:type should be string")); "field:lrsLoadReportingServer error:type should be object"));
} else { } else {
lrs_load_reporting_server_name = it->second.string_value(); grpc_error_handle parser_error;
lrs_load_reporting_server = XdsBootstrap::XdsServer::Parse(
it->second.object_value(), &parser_error);
if (parser_error != GRPC_ERROR_NONE) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("errors parsing lrs_load_reporting_server")));
error_list.push_back(parser_error);
}
} }
} }
// Max concurrent requests. // Max concurrent requests.
@ -758,7 +785,7 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
} }
return MakeRefCounted<XdsClusterImplLbConfig>( return MakeRefCounted<XdsClusterImplLbConfig>(
std::move(child_policy), std::move(cluster_name), std::move(child_policy), std::move(cluster_name),
std::move(eds_service_name), std::move(lrs_load_reporting_server_name), std::move(eds_service_name), std::move(lrs_load_reporting_server),
max_concurrent_requests, std::move(drop_config)); max_concurrent_requests, std::move(drop_config));
} }

@ -34,6 +34,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
@ -65,7 +66,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
public: public:
struct DiscoveryMechanism { struct DiscoveryMechanism {
std::string cluster_name; std::string cluster_name;
absl::optional<std::string> lrs_load_reporting_server_name; absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
uint32_t max_concurrent_requests; uint32_t max_concurrent_requests;
enum DiscoveryMechanismType { enum DiscoveryMechanismType {
EDS, EDS,
@ -77,8 +78,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
bool operator==(const DiscoveryMechanism& other) const { bool operator==(const DiscoveryMechanism& other) const {
return (cluster_name == other.cluster_name && return (cluster_name == other.cluster_name &&
lrs_load_reporting_server_name == lrs_load_reporting_server == other.lrs_load_reporting_server &&
other.lrs_load_reporting_server_name &&
max_concurrent_requests == other.max_concurrent_requests && max_concurrent_requests == other.max_concurrent_requests &&
type == other.type && type == other.type &&
eds_service_name == other.eds_service_name && eds_service_name == other.eds_service_name &&
@ -887,10 +887,10 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second); xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
} }
if (config_->discovery_mechanisms()[discovery_index] if (config_->discovery_mechanisms()[discovery_index]
.lrs_load_reporting_server_name.has_value()) { .lrs_load_reporting_server.has_value()) {
xds_cluster_impl_config["lrsLoadReportingServerName"] = xds_cluster_impl_config["lrsLoadReportingServer"] =
config_->discovery_mechanisms()[discovery_index] config_->discovery_mechanisms()[discovery_index]
.lrs_load_reporting_server_name.value(); .lrs_load_reporting_server->ToJson();
} }
Json locality_picking_policy = Json::Array{Json::Object{ Json locality_picking_policy = Json::Array{Json::Object{
{"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)}, {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
@ -1151,14 +1151,20 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
discovery_mechanism->cluster_name = it->second.string_value(); discovery_mechanism->cluster_name = it->second.string_value();
} }
// LRS load reporting server name. // LRS load reporting server name.
it = json.object_value().find("lrsLoadReportingServerName"); it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) { if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:type should be string")); "field:lrsLoadReportingServer error:type should be object"));
} else { } else {
discovery_mechanism->lrs_load_reporting_server_name.emplace( grpc_error_handle parse_error;
it->second.string_value()); discovery_mechanism->lrs_load_reporting_server.emplace(
XdsBootstrap::XdsServer::Parse(it->second, &parse_error));
if (parse_error != GRPC_ERROR_NONE) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("errors parsing lrs_load_reporting_server")));
error_list.push_back(parse_error);
}
} }
} }
// Max concurrent requests. // Max concurrent requests.

@ -27,6 +27,7 @@
#include "upb/upb.hpp" #include "upb/upb.hpp"
#include "src/core/ext/xds/certificate_provider_store.h" #include "src/core/ext/xds/certificate_provider_store.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
namespace grpc_core { namespace grpc_core {
@ -38,6 +39,7 @@ class XdsClient;
// passing through XdsApi code, maybe via the AdsResponseParser. // passing through XdsApi code, maybe via the AdsResponseParser.
struct XdsEncodingContext { struct XdsEncodingContext {
XdsClient* client; // Used only for logging. Unsafe for dereferencing. XdsClient* client; // Used only for logging. Unsafe for dereferencing.
const XdsBootstrap::XdsServer& server;
TraceFlag* tracer; TraceFlag* tracer;
upb_symtab* symtab; upb_symtab* symtab;
upb_arena* arena; upb_arena* arena;

@ -277,6 +277,7 @@ grpc_slice XdsApi::CreateAdsRequest(
bool populate_node) { bool populate_node) {
upb::Arena arena; upb::Arena arena;
const XdsEncodingContext context = {client_, const XdsEncodingContext context = {client_,
server,
tracer_, tracer_,
symtab_->ptr(), symtab_->ptr(),
arena.ptr(), arena.ptr(),
@ -356,6 +357,7 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server,
AdsResponseParserInterface* parser) { AdsResponseParserInterface* parser) {
upb::Arena arena; upb::Arena arena;
const XdsEncodingContext context = {client_, const XdsEncodingContext context = {client_,
server,
tracer_, tracer_,
symtab_->ptr(), symtab_->ptr(),
arena.ptr(), arena.ptr(),
@ -431,6 +433,7 @@ grpc_slice XdsApi::CreateLrsInitialRequest(
const XdsBootstrap::XdsServer& server) { const XdsBootstrap::XdsServer& server) {
upb::Arena arena; upb::Arena arena;
const XdsEncodingContext context = {client_, const XdsEncodingContext context = {client_,
server,
tracer_, tracer_,
symtab_->ptr(), symtab_->ptr(),
arena.ptr(), arena.ptr(),
@ -505,9 +508,16 @@ void LocalityStatsPopulate(
grpc_slice XdsApi::CreateLrsRequest( grpc_slice XdsApi::CreateLrsRequest(
ClusterLoadReportMap cluster_load_report_map) { ClusterLoadReportMap cluster_load_report_map) {
upb::Arena arena; upb::Arena arena;
const XdsEncodingContext context = { // The xDS server info is not actually needed here, so we seed it with an
client_, tracer_, symtab_->ptr(), // empty value.
arena.ptr(), false, certificate_provider_definition_map_}; XdsBootstrap::XdsServer empty_server;
const XdsEncodingContext context = {client_,
empty_server,
tracer_,
symtab_->ptr(),
arena.ptr(),
false,
certificate_provider_definition_map_};
// Create a request. // Create a request.
envoy_service_load_stats_v3_LoadStatsRequest* request = envoy_service_load_stats_v3_LoadStatsRequest* request =
envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr()); envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
@ -629,9 +639,16 @@ std::string XdsApi::AssembleClientConfig(
// Fill-in the node information // Fill-in the node information
auto* node = envoy_service_status_v3_ClientConfig_mutable_node(client_config, auto* node = envoy_service_status_v3_ClientConfig_mutable_node(client_config,
arena.ptr()); arena.ptr());
const XdsEncodingContext context = { // The xDS server info is not actually needed here, so we seed it with an
client_, tracer_, symtab_->ptr(), // empty value.
arena.ptr(), true, certificate_provider_definition_map_}; XdsBootstrap::XdsServer empty_server;
const XdsEncodingContext context = {client_,
empty_server,
tracer_,
symtab_->ptr(),
arena.ptr(),
true,
certificate_provider_definition_map_};
PopulateNode(context, node_, build_version_, user_agent_name_, PopulateNode(context, node_, build_version_, user_agent_name_,
user_agent_version_, node); user_agent_version_, node);
// Dump each resource. // Dump each resource.

@ -137,6 +137,25 @@ XdsBootstrap::XdsServer XdsBootstrap::XdsServer::Parse(
return server; return server;
} }
Json::Object XdsBootstrap::XdsServer::ToJson() const {
Json::Object channel_creds_json{{"type", channel_creds_type}};
if (channel_creds_config.type() != Json::Type::JSON_NULL) {
channel_creds_json["config"] = channel_creds_config;
}
Json::Object json{
{"server_uri", server_uri},
{"channel_creds", Json::Array{std::move(channel_creds_json)}},
};
if (!server_features.empty()) {
Json::Array server_features_json;
for (auto& feature : server_features) {
server_features_json.emplace_back(feature);
}
json["server_features"] = std::move(server_features_json);
}
return json;
}
bool XdsBootstrap::XdsServer::ShouldUseV3() const { bool XdsBootstrap::XdsServer::ShouldUseV3() const {
return server_features.find("xds_v3") != server_features.end(); return server_features.find("xds_v3") != server_features.end();
} }
@ -244,6 +263,17 @@ const XdsBootstrap::Authority* XdsBootstrap::LookupAuthority(
return nullptr; return nullptr;
} }
bool XdsBootstrap::XdsServerExists(
const XdsBootstrap::XdsServer& server) const {
if (server == servers_[0]) return true;
for (auto& authority : authorities_) {
for (auto& xds_server : authority.second.xds_servers) {
if (server == xds_server) return true;
}
}
return false;
}
grpc_error_handle XdsBootstrap::ParseXdsServerList( grpc_error_handle XdsBootstrap::ParseXdsServerList(
Json* json, absl::InlinedVector<XdsServer, 1>* servers) { Json* json, absl::InlinedVector<XdsServer, 1>* servers) {
std::vector<grpc_error_handle> error_list; std::vector<grpc_error_handle> error_list;

@ -58,6 +58,13 @@ class XdsBootstrap {
static XdsServer Parse(const Json& json, grpc_error_handle* error); static XdsServer Parse(const Json& json, grpc_error_handle* error);
bool operator==(const XdsServer& other) const {
return (server_uri == other.server_uri &&
channel_creds_type == other.channel_creds_type &&
channel_creds_config == other.channel_creds_config &&
server_features == other.server_features);
}
bool operator<(const XdsServer& other) const { bool operator<(const XdsServer& other) const {
if (server_uri < other.server_uri) return true; if (server_uri < other.server_uri) return true;
if (channel_creds_type < other.channel_creds_type) return true; if (channel_creds_type < other.channel_creds_type) return true;
@ -68,6 +75,8 @@ class XdsBootstrap {
return false; return false;
} }
Json::Object ToJson() const;
bool ShouldUseV3() const; bool ShouldUseV3() const;
}; };
@ -105,6 +114,8 @@ class XdsBootstrap {
const { const {
return certificate_providers_; return certificate_providers_;
} }
// A util method to check that an xds server exists in this bootstrap file.
bool XdsServerExists(const XdsServer& server) const;
private: private:
grpc_error_handle ParseXdsServerList( grpc_error_handle ParseXdsServerList(

@ -581,7 +581,10 @@ void XdsClient::ChannelState::MaybeStartLrsCall() {
WeakRef(DEBUG_LOCATION, "ChannelState+lrs"))); WeakRef(DEBUG_LOCATION, "ChannelState+lrs")));
} }
void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); } void XdsClient::ChannelState::StopLrsCallLocked() {
xds_client_->xds_load_report_server_map_.erase(server_);
lrs_calld_.reset();
}
void XdsClient::ChannelState::StartConnectivityWatchLocked() { void XdsClient::ChannelState::StartConnectivityWatchLocked() {
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_); ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
@ -1385,15 +1388,19 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats. // Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot = XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_, xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_,
parent_->send_all_clusters_,
parent_->cluster_names_); parent_->cluster_names_);
// Skip client load report if the counters were all zero in the last // Skip client load report if the counters were all zero in the last
// report and they are still zero in this one. // report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_; const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) { if (old_val && last_report_counters_were_zero_) {
if (xds_client()->load_report_map_.empty()) { auto it = xds_client()->xds_load_report_server_map_.find(
parent_->chand()->StopLrsCall(); parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.channel_state->StopLrsCallLocked();
return true; return true;
} }
ScheduleNextReportLocked(); ScheduleNextReportLocked();
@ -1439,8 +1446,11 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
grpc_byte_buffer_destroy(parent_->send_message_payload_); grpc_byte_buffer_destroy(parent_->send_message_payload_);
parent_->send_message_payload_ = nullptr; parent_->send_message_payload_ = nullptr;
// If there are no more registered stats to report, cancel the call. // If there are no more registered stats to report, cancel the call.
if (xds_client()->load_report_map_.empty()) { auto it =
parent_->chand()->StopLrsCall(); xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.channel_state->StopLrsCallLocked();
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return true; return true;
} }
@ -1755,7 +1765,6 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
} }
// Ignore status from a stale call. // Ignore status from a stale call.
if (IsCurrentCallOnChannel()) { if (IsCurrentCallOnChannel()) {
GPR_ASSERT(!xds_client()->shutting_down_);
// Try to restart the call. // Try to restart the call.
parent_->OnCallFinishedLocked(); parent_->OnCallFinishedLocked();
} }
@ -1899,8 +1908,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
{ {
MutexLock lock(&mu_); MutexLock lock(&mu_);
MaybeRegisterResourceTypeLocked(type); MaybeRegisterResourceTypeLocked(type);
// TODO(donnadionne): If we get a request for an authority that is not
// configured in the bootstrap file, reject it.
AuthorityState& authority_state = AuthorityState& authority_state =
authority_state_map_[resource_name->authority]; authority_state_map_[resource_name->authority];
ResourceState& resource_state = ResourceState& resource_state =
@ -2036,20 +2043,26 @@ std::string XdsClient::ConstructFullXdsResourceName(
} }
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats( RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
absl::string_view lrs_server, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name) { absl::string_view eds_service_name) {
// TODO(roth): When we add support for direct federation, use the if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
// server name specified in lrs_server.
auto key = auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_); MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the absl::string_views // We jump through some hoops here to make sure that the const
// stored in the XdsClusterDropStats object point to the strings // XdsBootstrap::XdsServer& and absl::string_views
// stored in the XdsClusterDropStats object point to the
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime. // in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_ auto server_it =
.emplace(std::make_pair(std::move(key), LoadReportState())) xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first; .first;
LoadReportState& load_report_state = it->second; LoadReportState& load_report_state = load_report_it->second;
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats; RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
if (load_report_state.drop_stats != nullptr) { if (load_report_state.drop_stats != nullptr) {
cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero(); cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
@ -2060,32 +2073,26 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
load_report_state.drop_stats->GetSnapshotAndReset(); load_report_state.drop_stats->GetSnapshotAndReset();
} }
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>( cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), lrs_server, Ref(DEBUG_LOCATION, "DropStats"), server_it->first,
it->first.first /*cluster_name*/, load_report_it->first.first /*cluster_name*/,
it->first.second /*eds_service_name*/); load_report_it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get(); load_report_state.drop_stats = cluster_drop_stats.get();
} }
auto resource_name = server_it->second.channel_state->MaybeStartLrsCall();
ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get());
GPR_ASSERT(resource_name.ok());
auto a = authority_state_map_.find(resource_name->authority);
if (a != authority_state_map_.end()) {
a->second.channel_state->MaybeStartLrsCall();
}
return cluster_drop_stats; return cluster_drop_stats;
} }
void XdsClient::RemoveClusterDropStats( void XdsClient::RemoveClusterDropStats(
absl::string_view /*lrs_server*/, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) { XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
// TODO(roth): When we add support for direct federation, use the auto server_it = xds_load_report_server_map_.find(xds_server);
// server name specified in lrs_server. if (server_it == xds_load_report_server_map_.end()) return;
auto it = load_report_map_.find( auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (it == load_report_map_.end()) return; if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = it->second; LoadReportState& load_report_state = load_report_it->second;
if (load_report_state.drop_stats == cluster_drop_stats) { if (load_report_state.drop_stats == cluster_drop_stats) {
// Record final snapshot in deleted_drop_stats, which will be // Record final snapshot in deleted_drop_stats, which will be
// added to the next load report. // added to the next load report.
@ -2096,21 +2103,27 @@ void XdsClient::RemoveClusterDropStats(
} }
RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
absl::string_view lrs_server, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) { RefCountedPtr<XdsLocalityName> locality) {
// TODO(roth): When we add support for direct federation, use the if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
// server name specified in lrs_server.
auto key = auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name)); std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_); MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the absl::string_views // We jump through some hoops here to make sure that the const
// stored in the XdsClusterLocalityStats object point to the strings // XdsBootstrap::XdsServer& and absl::string_views
// stored in the XdsClusterDropStats object point to the
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime. // in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_ auto server_it =
.emplace(std::make_pair(std::move(key), LoadReportState())) xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first; .first;
LoadReportState& load_report_state = it->second; LoadReportState& load_report_state = load_report_it->second;
LoadReportState::LocalityState& locality_state = LoadReportState::LocalityState& locality_state =
load_report_state.locality_stats[locality]; load_report_state.locality_stats[locality];
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats; RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
@ -2123,33 +2136,27 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
locality_state.locality_stats->GetSnapshotAndReset(); locality_state.locality_stats->GetSnapshotAndReset();
} }
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>( cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, load_report_it->first.first /*cluster_name*/,
std::move(locality)); load_report_it->first.second /*eds_service_name*/, std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get(); locality_state.locality_stats = cluster_locality_stats.get();
} }
auto resource_name = server_it->second.channel_state->MaybeStartLrsCall();
ParseXdsResourceName(cluster_name, XdsClusterResourceType::Get());
GPR_ASSERT(resource_name.ok());
auto a = authority_state_map_.find(resource_name->authority);
if (a != authority_state_map_.end()) {
a->second.channel_state->MaybeStartLrsCall();
}
return cluster_locality_stats; return cluster_locality_stats;
} }
void XdsClient::RemoveClusterLocalityStats( void XdsClient::RemoveClusterLocalityStats(
absl::string_view /*lrs_server*/, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality, const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) { XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
// TODO(roth): When we add support for direct federation, use the auto server_it = xds_load_report_server_map_.find(xds_server);
// server name specified in lrs_server. if (server_it == xds_load_report_server_map_.end()) return;
auto it = load_report_map_.find( auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name))); std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (it == load_report_map_.end()) return; if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = it->second; LoadReportState& load_report_state = load_report_it->second;
auto locality_it = load_report_state.locality_stats.find(locality); auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return; if (locality_it == load_report_state.locality_stats.end()) return;
LoadReportState::LocalityState& locality_state = locality_it->second; LoadReportState::LocalityState& locality_state = locality_it->second;
@ -2193,13 +2200,17 @@ void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
} }
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) { const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
} }
XdsApi::ClusterLoadReportMap snapshot_map; XdsApi::ClusterLoadReportMap snapshot_map;
for (auto load_report_it = load_report_map_.begin(); auto server_it = xds_load_report_server_map_.find(xds_server);
load_report_it != load_report_map_.end();) { if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
auto& load_report_map = server_it->second.load_report_map;
for (auto load_report_it = load_report_map.begin();
load_report_it != load_report_map.end();) {
// Cluster key is cluster and EDS service name. // Cluster key is cluster and EDS service name.
const auto& cluster_key = load_report_it->first; const auto& cluster_key = load_report_it->first;
LoadReportState& load_report = load_report_it->second; LoadReportState& load_report = load_report_it->second;
@ -2265,7 +2276,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
// deleted stats objects, remove the entry. // deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() && if (load_report.locality_stats.empty() &&
load_report.drop_stats == nullptr) { load_report.drop_stats == nullptr) {
load_report_it = load_report_map_.erase(load_report_it); load_report_it = load_report_map.erase(load_report_it);
} else { } else {
++load_report_it; ++load_report_it;
} }

@ -114,9 +114,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Adds and removes drop stats for cluster_name and eds_service_name. // Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
absl::string_view lrs_server, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name); absl::string_view eds_service_name);
void RemoveClusterDropStats(absl::string_view /*lrs_server*/, void RemoveClusterDropStats(const XdsBootstrap::XdsServer& xds_server,
absl::string_view cluster_name, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats); XdsClusterDropStats* cluster_drop_stats);
@ -124,11 +124,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Adds and removes locality stats for cluster_name and eds_service_name // Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality. // for the specified locality.
RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
absl::string_view lrs_server, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality); RefCountedPtr<XdsLocalityName> locality);
void RemoveClusterLocalityStats( void RemoveClusterLocalityStats(
absl::string_view /*lrs_server*/, absl::string_view cluster_name, const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality, const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats); XdsClusterLocalityStats* cluster_locality_stats);
@ -189,7 +189,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
LrsCallState* lrs_calld() const; LrsCallState* lrs_calld() const;
void MaybeStartLrsCall(); void MaybeStartLrsCall();
void StopLrsCall(); void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool HasAdsCall() const; bool HasAdsCall() const;
bool HasActiveAdsCall() const; bool HasActiveAdsCall() const;
@ -255,6 +255,16 @@ class XdsClient : public DualRefCounted<XdsClient> {
grpc_millis last_report_time = ExecCtx::Get()->Now(); grpc_millis last_report_time = ExecCtx::Get()->Now();
}; };
// Load report data.
using LoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>;
struct LoadReportServer {
RefCountedPtr<ChannelState> channel_state;
LoadReportMap load_report_map;
};
class Notifier; class Notifier;
// Sends an error notification to all watchers. // Sends an error notification to all watchers.
@ -275,8 +285,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
const XdsResourceKey& key); const XdsResourceKey& key);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked( RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
@ -305,11 +315,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::map<std::string /*authority*/, AuthorityState> authority_state_map_ std::map<std::string /*authority*/, AuthorityState> authority_state_map_
ABSL_GUARDED_BY(mu_); ABSL_GUARDED_BY(mu_);
// Load report data. std::map<XdsBootstrap::XdsServer, LoadReportServer>
std::map< xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>
load_report_map_ ABSL_GUARDED_BY(mu_);
// Stores started watchers whose resource name was not parsed successfully, // Stores started watchers whose resource name was not parsed successfully,
// waiting to be cancelled or reset in Orphan(). // waiting to be cancelled or reset in Orphan().

@ -41,20 +41,20 @@ uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
// XdsClusterDropStats // XdsClusterDropStats
// //
XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, XdsClusterDropStats::XdsClusterDropStats(
absl::string_view lrs_server_name, RefCountedPtr<XdsClient> xds_client,
absl::string_view cluster_name, const XdsBootstrap::XdsServer& lrs_server, absl::string_view cluster_name,
absl::string_view eds_service_name) absl::string_view eds_service_name)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
? "XdsClusterDropStats" ? "XdsClusterDropStats"
: nullptr), : nullptr),
xds_client_(std::move(xds_client)), xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name), lrs_server_(lrs_server),
cluster_name_(cluster_name), cluster_name_(cluster_name),
eds_service_name_(eds_service_name) { eds_service_name_(eds_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}", gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(), xds_client_.get(), this, lrs_server_.server_uri.c_str(),
std::string(cluster_name_).c_str(), std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str()); std::string(eds_service_name_).c_str());
} }
@ -64,11 +64,11 @@ XdsClusterDropStats::~XdsClusterDropStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] destroying drop stats %p for {%s, %s, %s}", "[xds_client %p] destroying drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(), xds_client_.get(), this, lrs_server_.server_uri.c_str(),
std::string(cluster_name_).c_str(), std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str()); std::string(eds_service_name_).c_str());
} }
xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_, xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_,
eds_service_name_, this); eds_service_name_, this);
xds_client_.reset(DEBUG_LOCATION, "DropStats"); xds_client_.reset(DEBUG_LOCATION, "DropStats");
} }
@ -95,21 +95,21 @@ void XdsClusterDropStats::AddCallDropped(const std::string& category) {
// //
XdsClusterLocalityStats::XdsClusterLocalityStats( XdsClusterLocalityStats::XdsClusterLocalityStats(
RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server_name, RefCountedPtr<XdsClient> xds_client,
absl::string_view cluster_name, absl::string_view eds_service_name, const XdsBootstrap::XdsServer& lrs_server, absl::string_view cluster_name,
RefCountedPtr<XdsLocalityName> name) absl::string_view eds_service_name, RefCountedPtr<XdsLocalityName> name)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
? "XdsClusterLocalityStats" ? "XdsClusterLocalityStats"
: nullptr), : nullptr),
xds_client_(std::move(xds_client)), xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name), lrs_server_(lrs_server),
cluster_name_(cluster_name), cluster_name_(cluster_name),
eds_service_name_(eds_service_name), eds_service_name_(eds_service_name),
name_(std::move(name)) { name_(std::move(name)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] created locality stats %p for {%s, %s, %s, %s}", "[xds_client %p] created locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(), xds_client_.get(), this, lrs_server_.server_uri.c_str(),
std::string(cluster_name_).c_str(), std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(), std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str()); name_->AsHumanReadableString().c_str());
@ -120,12 +120,12 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}", "[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(), xds_client_.get(), this, lrs_server_.server_uri.c_str(),
std::string(cluster_name_).c_str(), std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(), std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str()); name_->AsHumanReadableString().c_str());
} }
xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_, xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
eds_service_name_, name_, this); eds_service_name_, name_, this);
xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); xds_client_.reset(DEBUG_LOCATION, "LocalityStats");
} }

@ -29,6 +29,7 @@
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
@ -128,7 +129,7 @@ class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> {
}; };
XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server_name, const XdsBootstrap::XdsServer& lrs_server,
absl::string_view cluster_name, absl::string_view cluster_name,
absl::string_view eds_service_name); absl::string_view eds_service_name);
~XdsClusterDropStats() override; ~XdsClusterDropStats() override;
@ -141,7 +142,7 @@ class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> {
private: private:
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
absl::string_view lrs_server_name_; const XdsBootstrap::XdsServer& lrs_server_;
absl::string_view cluster_name_; absl::string_view cluster_name_;
absl::string_view eds_service_name_; absl::string_view eds_service_name_;
std::atomic<uint64_t> uncategorized_drops_{0}; std::atomic<uint64_t> uncategorized_drops_{0};
@ -202,7 +203,7 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
}; };
XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client, XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server_name, const XdsBootstrap::XdsServer& lrs_server_,
absl::string_view cluster_name, absl::string_view cluster_name,
absl::string_view eds_service_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name); RefCountedPtr<XdsLocalityName> name);
@ -216,7 +217,7 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
private: private:
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
absl::string_view lrs_server_name_; const XdsBootstrap::XdsServer& lrs_server_;
absl::string_view cluster_name_; absl::string_view cluster_name_;
absl::string_view eds_service_name_; absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_; RefCountedPtr<XdsLocalityName> name_;

@ -70,9 +70,9 @@ std::string XdsClusterResource::ToString() const {
contents.push_back(absl::StrFormat("common_tls_context=%s", contents.push_back(absl::StrFormat("common_tls_context=%s",
common_tls_context.ToString())); common_tls_context.ToString()));
} }
if (lrs_load_reporting_server_name.has_value()) { if (lrs_load_reporting_server.has_value()) {
contents.push_back(absl::StrFormat("lrs_load_reporting_server_name=%s", contents.push_back(absl::StrFormat("lrs_load_reporting_server_name=%s",
lrs_load_reporting_server_name.value())); lrs_load_reporting_server->server_uri));
} }
contents.push_back(absl::StrCat("lb_policy=", lb_policy)); contents.push_back(absl::StrCat("lb_policy=", lb_policy));
if (lb_policy == "RING_HASH") { if (lb_policy == "RING_HASH") {
@ -369,7 +369,7 @@ grpc_error_handle CdsResourceParse(
errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
": LRS ConfigSource is not self.")); ": LRS ConfigSource is not self."));
} }
cds_update->lrs_load_reporting_server_name.emplace(""); cds_update->lrs_load_reporting_server.emplace(context.server);
} }
// The Cluster resource encodes the circuit breaking parameters in a list of // The Cluster resource encodes the circuit breaking parameters in a list of
// Thresholds messages, where each message specifies the parameters for a // Thresholds messages, where each message specifies the parameters for a

@ -52,9 +52,7 @@ struct XdsClusterResource {
// The LRS server to use for load reporting. // The LRS server to use for load reporting.
// If not set, load reporting will be disabled. // If not set, load reporting will be disabled.
// If set to the empty string, will use the same server we obtained the CDS absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
// data from.
absl::optional<std::string> lrs_load_reporting_server_name;
// The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH"). // The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH").
std::string lb_policy; std::string lb_policy;
@ -71,8 +69,7 @@ struct XdsClusterResource {
dns_hostname == other.dns_hostname && dns_hostname == other.dns_hostname &&
prioritized_cluster_names == other.prioritized_cluster_names && prioritized_cluster_names == other.prioritized_cluster_names &&
common_tls_context == other.common_tls_context && common_tls_context == other.common_tls_context &&
lrs_load_reporting_server_name == lrs_load_reporting_server == other.lrs_load_reporting_server &&
other.lrs_load_reporting_server_name &&
lb_policy == other.lb_policy && lb_policy == other.lb_policy &&
min_ring_size == other.min_ring_size && min_ring_size == other.min_ring_size &&
max_ring_size == other.max_ring_size && max_ring_size == other.max_ring_size &&

@ -738,6 +738,32 @@ TEST(XdsBootstrapTest, CertificateProvidersFakePluginEmptyConfig) {
0); 0);
} }
TEST(XdsBootstrapTest, XdsServerToJsonAndParse) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true");
const char* json_str =
" {"
" \"server_uri\": \"fake:///lb\","
" \"channel_creds\": ["
" {"
" \"type\": \"fake\","
" \"ignore\": 0"
" }"
" ],"
" \"ignore\": 0"
" }";
grpc_error_handle error = GRPC_ERROR_NONE;
Json json = Json::Parse(json_str, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
XdsBootstrap::XdsServer xds_server =
XdsBootstrap::XdsServer::Parse(json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
Json::Object output = xds_server.ToJson();
XdsBootstrap::XdsServer output_xds_server =
XdsBootstrap::XdsServer::Parse(output, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc_core } // namespace grpc_core

@ -2580,7 +2580,7 @@ TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) {
class XdsFederationTest : public XdsEnd2endTest { class XdsFederationTest : public XdsEnd2endTest {
protected: protected:
XdsFederationTest() : XdsEnd2endTest(2, 100, 0, true) { XdsFederationTest() : XdsEnd2endTest(2, 3, 0, true) {
authority_balancer_ = CreateAndStartBalancer(); authority_balancer_ = CreateAndStartBalancer();
} }
@ -2876,6 +2876,121 @@ TEST_P(XdsFederationTest, FederationServer) {
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"); gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
} }
using XdsFederationLoadReportingTest = XdsFederationTest;
// Channel is created with URI "xds://xds.example.com/server.example.com".
// Bootstrap entry for that authority specifies a client listener name template.
// Sending traffic to both default balancer and authority balancer and checking
// load reporting with each one.
TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) {
gpr_setenv("GRPC_EXPERIMENTAL_XDS_FEDERATION", "true");
const char* kAuthority = "xds.example.com";
const char* kNewServerName = "whee%/server.example.com";
const char* kNewListenerTemplate =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"client/%s?psm_project_id=1234";
const char* kNewListenerName =
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
"client/whee%25/server.example.com?psm_project_id=1234";
const char* kNewRouteConfigName =
"xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
"new_route_config_name";
const char* kNewEdsServiceName =
"xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
"edsservice_name";
const char* kNewClusterName =
"xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
"cluster_name";
const size_t kNumRpcsToDefaultBalancer = 5;
const size_t kNumRpcsToAuthorityBalancer = 10;
BootstrapBuilder builder = BootstrapBuilder();
builder.AddAuthority(kAuthority,
absl::StrCat("localhost:", authority_balancer_->port()),
kNewListenerTemplate);
CreateClientsAndServers(builder);
StartAllBackends();
// Eds for 2 balancers to ensure RPCs sent using current stub go to backend 0
// and RPCs sent using the new stub go to backend 1.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
authority_balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, kNewEdsServiceName));
authority_balancer_->lrs_service()->set_cluster_names({kNewClusterName});
// New cluster
Cluster new_cluster = default_cluster_;
new_cluster.set_name(kNewClusterName);
new_cluster.mutable_lrs_server()->mutable_self();
new_cluster.mutable_eds_cluster_config()->set_service_name(
kNewEdsServiceName);
authority_balancer_->ads_service()->SetCdsResource(new_cluster);
// New Route
RouteConfiguration new_route_config = default_route_config_;
new_route_config.set_name(kNewRouteConfigName);
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
// New Listener
Listener listener = default_listener_;
listener.set_name(kNewListenerName);
SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
new_route_config);
// Ensure update has reached and send 10 RPCs to the current stub.
CheckRpcSendOk(kNumRpcsToDefaultBalancer);
// Create second channel to new target uri and send 1 RPC .
auto channel2 =
CreateChannel(/*failover_timeout=*/0, kNewServerName, kAuthority);
channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE(
channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
for (size_t i = 0; i < kNumRpcsToAuthorityBalancer; ++i) {
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
// Each backend should have received the expected number of RPCs,
// and the load report also reflect the correct numbers.
EXPECT_EQ(kNumRpcsToAuthorityBalancer,
backends_[1]->backend_service()->request_count());
EXPECT_EQ(kNumRpcsToDefaultBalancer,
backends_[0]->backend_service()->request_count());
// Load report for authority LRS.
std::vector<ClientStats> authority_load_report =
authority_balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(authority_load_report.size(), 1UL);
ClientStats& authority_client_stats = authority_load_report.front();
EXPECT_EQ(kNumRpcsToAuthorityBalancer,
authority_client_stats.total_successful_requests());
EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress());
EXPECT_EQ(kNumRpcsToAuthorityBalancer,
authority_client_stats.total_issued_requests());
EXPECT_EQ(0U, authority_client_stats.total_error_requests());
EXPECT_EQ(0U, authority_client_stats.total_dropped_requests());
EXPECT_EQ(1U, authority_balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, authority_balancer_->lrs_service()->response_count());
// Load report for default LRS.
std::vector<ClientStats> default_load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(default_load_report.size(), 1UL);
ClientStats& default_client_stats = default_load_report.front();
EXPECT_EQ(kNumRpcsToDefaultBalancer,
default_client_stats.total_successful_requests());
EXPECT_EQ(0U, default_client_stats.total_requests_in_progress());
EXPECT_EQ(kNumRpcsToDefaultBalancer,
default_client_stats.total_issued_requests());
EXPECT_EQ(0U, default_client_stats.total_error_requests());
EXPECT_EQ(0U, default_client_stats.total_dropped_requests());
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}
class SecureNamingTest : public XdsEnd2endTest { class SecureNamingTest : public XdsEnd2endTest {
public: public:
SecureNamingTest() SecureNamingTest()
@ -13576,10 +13691,18 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values( ::testing::Values(
TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar), TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar),
TestType() TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_rds_testing()),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(
XdsTest, XdsFederationLoadReportingTest,
::testing::Values(TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar) .set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_load_reporting(), .set_enable_load_reporting(),
TestType() TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar) .set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_load_reporting()
.set_enable_rds_testing()), .set_enable_rds_testing()),
&TestTypeName); &TestTypeName);

Loading…
Cancel
Save