XdsClient: convert xDS bootstrap code to use new JSON parsing API (#30431)

* Declarative JSON parser

* Automated change: Fix sanity tests

* fix

* shrinking stuff a little

* static vtables

* separate fns

* simpler?

* make maps work

* windows fixes

* Automated change: Fix sanity tests

* simplify code

* Automated change: Fix sanity tests

* vtable-test

* dont always create vec/map impls for every type

* comments

* make error consistent

* move method private

* progress

* durations!

* Automated change: Fix sanity tests

* fix

* fix

* fix

* Automated change: Fix sanity tests

* post-load

* Automated change: Fix sanity tests

* document JsonPostLoad() and add static_assert

* don't copy field names, to avoid length limitations

* use absl::Status

* accept either string or number for numeric values

* add test for direct data member of another struct type

* remove unused method

* add support for retaining part of the JSON wirthout processing

* update test for changes in Json::Parse() API

* add absl::optional support

* Automated change: Fix sanity tests

* add new parsing code

* use absl::optional<> parsing

* switch to new parsing code

* fix tests

* clang-format

* fix tests, improve error messages, and add overload to parse to existing object

* remove overload of LoadFromJson()

* make XdsBootstrap movable

* work around googletest bug

* Automated change: Fix sanity tests

* change special case for Json to instead use Json::Object

* use Json::Object instead of just Json

* remove copy ctor/assignment, spell out move ctor/assignment

* fix regex portability issue

* Automated change: Fix sanity tests

* fix build

* improve error structure, add missing types, and improve tests

* clang-format

* Automated change: Fix sanity tests

* fix build

* fix build

* attempt to work around gcc6 bug

* add LoadJsonObjectField(), add LoadFromJson() overload that takes an ErrorList parameter, and add tests for parsing bare top-level types

* fix msan

* Automated change: Fix sanity tests

* fix error message

* Automated change: Fix sanity tests

* add mechanism to conditionally disable individual fields

* clean up channel creds parsing

* use conditional enabling mechanism for federation-specific fields

* fix build

* Automated change: Fix sanity tests

* fix build

* Automated change: Fix sanity tests

* avoid unnecessary copies

* Automated change: Fix sanity tests

* make XdsBootstrap a clean interface

* add missing build dep

* fix xds_lb_policy_registry_test

* iwyu

* add missing build deps

Co-authored-by: Craig Tiller <craig.tiller@gmail.com>
Co-authored-by: ctiller <ctiller@users.noreply.github.com>
Co-authored-by: Craig Tiller <ctiller@google.com>
Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30909/head
Mark D. Roth 2 years ago committed by GitHub
parent 1f1f923a72
commit 04ddf3d0b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 4
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 31
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  4. 21
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  5. 5
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  6. 56
      src/core/ext/xds/certificate_provider_store.cc
  7. 6
      src/core/ext/xds/certificate_provider_store.h
  8. 28
      src/core/ext/xds/xds_api.cc
  9. 48
      src/core/ext/xds/xds_bootstrap.cc
  10. 80
      src/core/ext/xds/xds_bootstrap.h
  11. 648
      src/core/ext/xds/xds_bootstrap_grpc.cc
  12. 127
      src/core/ext/xds/xds_bootstrap_grpc.h
  13. 105
      src/core/ext/xds/xds_client.cc
  14. 16
      src/core/ext/xds/xds_client.h
  15. 18
      src/core/ext/xds/xds_client_grpc.cc
  16. 4
      src/core/ext/xds/xds_client_grpc.h
  17. 8
      src/core/ext/xds/xds_client_stats.cc
  18. 23
      src/core/ext/xds/xds_cluster.cc
  19. 4
      src/core/ext/xds/xds_cluster.h
  20. 11
      src/core/ext/xds/xds_transport_grpc.cc
  21. 423
      test/core/xds/xds_bootstrap_test.cc
  22. 4
      test/core/xds/xds_lb_policy_registry_test.cc

@ -4511,6 +4511,7 @@ grpc_cc_library(
"grpc_transport_chttp2_client_connector",
"iomgr_timer",
"json",
"json_object_loader",
"json_util",
"lb_policy_registry",
"match",
@ -4754,6 +4755,7 @@ grpc_cc_library(
"grpc_trace",
"grpc_xds_client",
"json",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
@ -4795,6 +4797,7 @@ grpc_cc_library(
"grpc_trace",
"grpc_xds_client",
"json",
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",

@ -445,8 +445,8 @@ absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
break;
}
if (state.update->lrs_load_reporting_server.has_value()) {
mechanism["lrsLoadReportingServer"] = GrpcXdsBootstrap::XdsServerToJson(
*state.update->lrs_load_reporting_server);
mechanism["lrsLoadReportingServer"] =
state.update->lrs_load_reporting_server->ToJson();
}
discovery_mechanisms->emplace_back(std::move(mechanism));
return true;

@ -43,7 +43,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
@ -58,9 +57,9 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -149,7 +148,7 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
XdsClusterImplLbConfig(
RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string cluster_name, std::string eds_service_name,
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server,
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server,
uint32_t max_concurrent_requests,
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
: child_policy_(std::move(child_policy)),
@ -166,8 +165,8 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
}
const std::string& cluster_name() const { return cluster_name_; }
const std::string& eds_service_name() const { return eds_service_name_; }
const absl::optional<XdsBootstrap::XdsServer>& lrs_load_reporting_server()
const {
const absl::optional<GrpcXdsBootstrap::GrpcXdsServer>&
lrs_load_reporting_server() const {
return lrs_load_reporting_server_;
};
uint32_t max_concurrent_requests() const { return max_concurrent_requests_; }
@ -179,7 +178,7 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_;
std::string eds_service_name_;
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server_;
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
};
@ -502,7 +501,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
"[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
"LRS server %s, cluster %s, EDS service name %s, load "
"reporting for drops will not be done.",
this, config_->lrs_load_reporting_server()->server_uri.c_str(),
this,
config_->lrs_load_reporting_server()->server_uri().c_str(),
config_->cluster_name().c_str(),
config_->eds_service_name().c_str());
}
@ -641,7 +641,8 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
"not be generated (not wrapping subchannel)",
this,
xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
->server_uri.c_str(),
->server_uri()
.c_str(),
xds_cluster_impl_policy_->config_->cluster_name().c_str(),
xds_cluster_impl_policy_->config_->eds_service_name().c_str());
}
@ -757,21 +758,21 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
}
}
// LRS load reporting server name.
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server;
it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back(
"field:lrsLoadReportingServer error:type should be object");
} else {
grpc_error_handle parser_error;
lrs_load_reporting_server = GrpcXdsBootstrap::XdsServerParse(
it->second.object_value(), &parser_error);
if (!GRPC_ERROR_IS_NONE(parser_error)) {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
errors.emplace_back(
absl::StrCat("error parsing lrs_load_reporting_server: ",
grpc_error_std_string(parser_error)));
GRPC_ERROR_UNREF(parser_error);
xds_server.status().ToString()));
} else {
lrs_load_reporting_server = std::move(*xds_server);
}
}
}

@ -46,7 +46,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
@ -64,6 +63,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -92,7 +92,7 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
public:
struct DiscoveryMechanism {
std::string cluster_name;
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server;
uint32_t max_concurrent_requests;
enum DiscoveryMechanismType {
EDS,
@ -926,8 +926,7 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
}
if (discovery_config.lrs_load_reporting_server.has_value()) {
xds_cluster_impl_config["lrsLoadReportingServer"] =
GrpcXdsBootstrap::XdsServerToJson(
*discovery_config.lrs_load_reporting_server);
discovery_config.lrs_load_reporting_server->ToJson();
}
Json locality_picking_policy;
if (XdsOutlierDetectionEnabled()) {
@ -1193,13 +1192,15 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServer error:type should be object"));
} else {
grpc_error_handle parse_error;
discovery_mechanism->lrs_load_reporting_server.emplace(
GrpcXdsBootstrap::XdsServerParse(it->second, &parse_error));
if (!GRPC_ERROR_IS_NONE(parse_error)) {
auto xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(it->second);
if (!xds_server.ok()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("errors parsing lrs_load_reporting_server")));
error_list.push_back(parse_error);
absl::StrCat("error parsing lrs_load_reporting_server: ",
xds_server.status().ToString())));
} else {
discovery_mechanism->lrs_load_reporting_server.emplace(
std::move(*xds_server));
}
}
}

@ -814,7 +814,8 @@ void XdsResolver::StartLocked() {
if (!uri_.authority().empty()) {
// target_uri.authority is set case
const auto* authority_config =
xds_client_->bootstrap().LookupAuthority(uri_.authority());
static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
xds_client_->bootstrap().LookupAuthority(uri_.authority()));
if (authority_config == nullptr) {
absl::Status status = absl::UnavailableError(
absl::StrCat("Invalid target URI -- authority not found for ",
@ -827,7 +828,7 @@ void XdsResolver::StartLocked() {
return;
}
std::string name_template =
authority_config->client_listener_resource_name_template;
authority_config->client_listener_resource_name_template();
if (name_template.empty()) {
name_template = absl::StrCat(
"xdstp://", URI::PercentEncodeAuthority(uri_.authority()),

@ -20,12 +20,68 @@
#include "src/core/ext/xds/certificate_provider_store.h"
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/security/certificate_provider/certificate_provider_registry.h"
namespace grpc_core {
//
// CertificateProviderStore::PluginDefinition
//
const JsonLoaderInterface*
CertificateProviderStore::PluginDefinition::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<PluginDefinition>()
.Field("plugin_name", &PluginDefinition::plugin_name)
.Finish();
return loader;
}
void CertificateProviderStore::PluginDefinition::JsonPostLoad(
const Json& json, const JsonArgs&, ErrorList* errors) {
// Check that plugin is supported.
CertificateProviderFactory* factory = nullptr;
if (!plugin_name.empty()) {
ScopedField field(errors, ".plugin_name");
factory = CertificateProviderRegistry::LookupCertificateProviderFactory(
plugin_name);
if (factory == nullptr) {
errors->AddError(absl::StrCat("Unrecognized plugin name: ", plugin_name));
return; // No point checking config.
}
}
// Parse the config field.
{
ScopedField field(errors, ".config");
auto it = json.object_value().find("config");
// The config field is optional; if not present, we use an empty JSON
// object.
Json::Object config_json;
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors->AddError("is not an object");
return; // No point parsing config.
} else {
config_json = it->second.object_value();
}
}
if (factory == nullptr) return;
// Use plugin to validate and parse config.
grpc_error_handle parse_error = GRPC_ERROR_NONE;
config =
factory->CreateCertificateProviderConfig(config_json, &parse_error);
if (!GRPC_ERROR_IS_NONE(parse_error)) {
errors->AddError(grpc_error_std_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
}
}
}
//
// CertificateProviderStore::CertificateProviderWrapper
//

@ -36,6 +36,9 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/security/certificate_provider/certificate_provider_factory.h"
#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_distributor.h"
#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h"
@ -49,6 +52,9 @@ class CertificateProviderStore
struct PluginDefinition {
std::string plugin_name;
RefCountedPtr<CertificateProviderFactory::Config> config;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&, ErrorList* errors);
};
// Maps plugin instance (opaque) name to plugin defition.

@ -201,34 +201,34 @@ void PopulateNode(const XdsApiContext& context, const XdsBootstrap::Node* node,
const std::string& user_agent_version,
envoy_config_core_v3_Node* node_msg) {
if (node != nullptr) {
if (!node->id.empty()) {
if (!node->id().empty()) {
envoy_config_core_v3_Node_set_id(node_msg,
StdStringToUpbString(node->id));
StdStringToUpbString(node->id()));
}
if (!node->cluster.empty()) {
if (!node->cluster().empty()) {
envoy_config_core_v3_Node_set_cluster(
node_msg, StdStringToUpbString(node->cluster));
node_msg, StdStringToUpbString(node->cluster()));
}
if (!node->metadata.object_value().empty()) {
if (!node->metadata().empty()) {
google_protobuf_Struct* metadata =
envoy_config_core_v3_Node_mutable_metadata(node_msg, context.arena);
PopulateMetadata(context, metadata, node->metadata.object_value());
PopulateMetadata(context, metadata, node->metadata());
}
if (!node->locality_region.empty() || !node->locality_zone.empty() ||
!node->locality_sub_zone.empty()) {
if (!node->locality_region().empty() || !node->locality_zone().empty() ||
!node->locality_sub_zone().empty()) {
envoy_config_core_v3_Locality* locality =
envoy_config_core_v3_Node_mutable_locality(node_msg, context.arena);
if (!node->locality_region.empty()) {
if (!node->locality_region().empty()) {
envoy_config_core_v3_Locality_set_region(
locality, StdStringToUpbString(node->locality_region));
locality, StdStringToUpbString(node->locality_region()));
}
if (!node->locality_zone.empty()) {
if (!node->locality_zone().empty()) {
envoy_config_core_v3_Locality_set_zone(
locality, StdStringToUpbString(node->locality_zone));
locality, StdStringToUpbString(node->locality_zone()));
}
if (!node->locality_sub_zone.empty()) {
if (!node->locality_sub_zone().empty()) {
envoy_config_core_v3_Locality_set_sub_zone(
locality, StdStringToUpbString(node->locality_sub_zone));
locality, StdStringToUpbString(node->locality_sub_zone()));
}
}
}

@ -18,12 +18,6 @@
#include "src/core/ext/xds/xds_bootstrap.h"
#include <set>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include <grpc/support/alloc.h>
#include "src/core/lib/gpr/env.h"
@ -41,46 +35,4 @@ bool XdsFederationEnabled() {
return parse_succeeded && parsed_value;
}
//
// XdsBootstrap::XdsServer
//
constexpr absl::string_view XdsBootstrap::XdsServer::kServerFeatureXdsV3;
constexpr absl::string_view
XdsBootstrap::XdsServer::kServerFeatureIgnoreResourceDeletion;
bool XdsBootstrap::XdsServer::ShouldUseV3() const {
return server_features.find(std::string(kServerFeatureXdsV3)) !=
server_features.end();
}
bool XdsBootstrap::XdsServer::IgnoreResourceDeletion() const {
return server_features.find(std::string(
kServerFeatureIgnoreResourceDeletion)) != server_features.end();
}
//
// XdsBootstrap
//
const XdsBootstrap::Authority* XdsBootstrap::LookupAuthority(
const std::string& name) const {
auto it = authorities().find(name);
if (it != authorities().end()) {
return &it->second;
}
return nullptr;
}
bool XdsBootstrap::XdsServerExists(
const XdsBootstrap::XdsServer& server) const {
if (server == this->server()) return true;
for (auto& authority : authorities()) {
for (auto& xds_server : authority.second.xds_servers) {
if (server == xds_server) return true;
}
}
return false;
}
} // namespace grpc_core

@ -19,12 +19,7 @@
#include <grpc/support/port_platform.h>
#include <map>
#include <set>
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include "src/core/lib/json/json.h"
@ -34,49 +29,34 @@ bool XdsFederationEnabled();
class XdsBootstrap {
public:
struct Node {
std::string id;
std::string cluster;
std::string locality_region;
std::string locality_zone;
std::string locality_sub_zone;
Json metadata;
class Node {
public:
virtual ~Node() = default;
virtual const std::string& id() const = 0;
virtual const std::string& cluster() const = 0;
virtual const std::string& locality_region() const = 0;
virtual const std::string& locality_zone() const = 0;
virtual const std::string& locality_sub_zone() const = 0;
virtual const Json::Object& metadata() const = 0;
};
struct XdsServer {
static constexpr absl::string_view kServerFeatureXdsV3 = "xds_v3";
static constexpr absl::string_view kServerFeatureIgnoreResourceDeletion =
"ignore_resource_deletion";
std::string server_uri;
std::string channel_creds_type;
Json channel_creds_config;
std::set<std::string> server_features;
bool operator==(const XdsServer& other) const {
return (server_uri == other.server_uri &&
channel_creds_type == other.channel_creds_type &&
channel_creds_config == other.channel_creds_config &&
server_features == other.server_features);
}
bool operator<(const XdsServer& other) const {
if (server_uri < other.server_uri) return true;
if (channel_creds_type < other.channel_creds_type) return true;
if (channel_creds_config.Dump() < other.channel_creds_config.Dump()) {
return true;
}
if (server_features < other.server_features) return true;
return false;
}
bool ShouldUseV3() const;
bool IgnoreResourceDeletion() const;
class XdsServer {
public:
virtual ~XdsServer() = default;
virtual const std::string& server_uri() const = 0;
virtual bool ShouldUseV3() const = 0;
virtual bool IgnoreResourceDeletion() const = 0;
virtual bool operator==(const XdsServer& other) const = 0;
};
struct Authority {
std::string client_listener_resource_name_template;
std::vector<XdsServer> xds_servers;
class Authority {
public:
virtual ~Authority() = default;
virtual const XdsServer* server() const = 0;
};
virtual ~XdsBootstrap() = default;
@ -86,16 +66,18 @@ class XdsBootstrap {
// TODO(roth): We currently support only one server. Fix this when we
// add support for fallback for the xds channel.
virtual const XdsServer& server() const = 0;
// Returns the node information, or null if not present in the bootstrap
// config.
virtual const Node* node() const = 0;
virtual const std::map<std::string, Authority>& authorities() const = 0;
// Returns a pointer to the specified authority, or null if it does
// not exist in this bootstrap config.
const Authority* LookupAuthority(const std::string& name) const;
virtual const Authority* LookupAuthority(const std::string& name) const = 0;
// A utility method to check that an xDS server exists in this
// bootstrap config.
bool XdsServerExists(const XdsServer& server) const;
// If the server exists in the bootstrap config, returns a pointer to
// the XdsServer instance in the config. Otherwise, returns null.
virtual const XdsServer* FindXdsServer(const XdsServer& server) const = 0;
};
} // namespace grpc_core

@ -38,409 +38,271 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_util.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/security/certificate_provider/certificate_provider_factory.h"
#include "src/core/lib/security/certificate_provider/certificate_provider_registry.h"
#include "src/core/lib/security/credentials/channel_creds_registry.h"
namespace grpc_core {
namespace {
//
// GrpcXdsBootstrap::GrpcNode::Locality
//
grpc_error_handle ParseChannelCreds(const Json::Object& json, size_t idx,
XdsBootstrap::XdsServer* server) {
std::vector<grpc_error_handle> error_list;
std::string type;
ParseJsonObjectField(json, "type", &type, &error_list);
const Json::Object* config_ptr = nullptr;
ParseJsonObjectField(json, "config", &config_ptr, &error_list,
/*required=*/false);
// Select the first channel creds type that we support.
if (server->channel_creds_type.empty() &&
CoreConfiguration::Get().channel_creds_registry().IsSupported(type)) {
Json config;
if (config_ptr != nullptr) config = *config_ptr;
if (!CoreConfiguration::Get().channel_creds_registry().IsValidConfig(
type, config)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"invalid config for channel creds type \"", type, "\"")));
}
server->channel_creds_type = std::move(type);
server->channel_creds_config = std::move(config);
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("errors parsing index ", idx), &error_list);
const JsonLoaderInterface* GrpcXdsBootstrap::GrpcNode::Locality::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<Locality>()
.OptionalField("region", &Locality::region)
.OptionalField("zone", &Locality::zone)
.OptionalField("sub_zone", &Locality::sub_zone)
.Finish();
return loader;
}
grpc_error_handle ParseChannelCredsArray(const Json::Array& json,
XdsBootstrap::XdsServer* server) {
std::vector<grpc_error_handle> error_list;
for (size_t i = 0; i < json.size(); ++i) {
const Json& child = json.at(i);
if (child.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("array element ", i, " is not an object")));
} else {
grpc_error_handle parse_error =
ParseChannelCreds(child.object_value(), i, server);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
if (server->channel_creds_type.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"no known creds type found in \"channel_creds\""));
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"channel_creds\" array",
&error_list);
//
// GrpcXdsBootstrap::GrpcNode
//
const JsonLoaderInterface* GrpcXdsBootstrap::GrpcNode::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcNode>()
.OptionalField("id", &GrpcNode::id_)
.OptionalField("cluster", &GrpcNode::cluster_)
.OptionalField("locality", &GrpcNode::locality_)
.OptionalField("metadata", &GrpcNode::metadata_)
.Finish();
return loader;
}
} // namespace
//
// GrpcXdsBootstrap::GrpcXdsServer::ChannelCreds
//
const JsonLoaderInterface*
GrpcXdsBootstrap::GrpcXdsServer::ChannelCreds::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<ChannelCreds>()
.Field("type", &ChannelCreds::type)
.OptionalField("config", &ChannelCreds::config)
.Finish();
return loader;
}
//
// GrpcXdsBootstrap
// GrpcXdsBootstrap::GrpcXdsServer
//
std::unique_ptr<GrpcXdsBootstrap> GrpcXdsBootstrap::Create(
absl::string_view json_string, grpc_error_handle* error) {
auto json = Json::Parse(json_string);
if (!json.ok()) {
*error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"Failed to parse bootstrap JSON string: ", json.status().ToString()));
return nullptr;
}
return absl::make_unique<GrpcXdsBootstrap>(std::move(*json), error);
namespace {
constexpr absl::string_view kServerFeatureXdsV3 = "xds_v3";
constexpr absl::string_view kServerFeatureIgnoreResourceDeletion =
"ignore_resource_deletion";
} // namespace
bool GrpcXdsBootstrap::GrpcXdsServer::ShouldUseV3() const {
return server_features_.find(std::string(kServerFeatureXdsV3)) !=
server_features_.end();
}
GrpcXdsBootstrap::GrpcXdsBootstrap(Json json, grpc_error_handle* error) {
if (json.type() != Json::Type::OBJECT) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"malformed JSON in bootstrap file");
return;
}
std::vector<grpc_error_handle> error_list;
auto it = json.mutable_object()->find("xds_servers");
if (it == json.mutable_object()->end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"xds_servers\" field not present"));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"xds_servers\" field is not an array"));
} else {
grpc_error_handle parse_error = ParseXdsServerList(&it->second, &servers_);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
it = json.mutable_object()->find("node");
if (it != json.mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"node\" field is not an object"));
} else {
grpc_error_handle parse_error = ParseNode(&it->second);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
if (XdsFederationEnabled()) {
it = json.mutable_object()->find("authorities");
if (it != json.mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"authorities\" field is not an object"));
} else {
grpc_error_handle parse_error = ParseAuthorities(&it->second);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
it = json.mutable_object()->find(
"client_default_listener_resource_name_template");
if (it != json.mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"client_default_listener_resource_name_template\" field is not a "
"string"));
} else {
client_default_listener_resource_name_template_ =
std::move(*it->second.mutable_string_value());
}
}
}
it = json.mutable_object()->find("server_listener_resource_name_template");
if (it != json.mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"server_listener_resource_name_template\" field is not a string"));
} else {
server_listener_resource_name_template_ =
std::move(*it->second.mutable_string_value());
}
}
it = json.mutable_object()->find("certificate_providers");
if (it != json.mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"certificate_providers\" field is not an object"));
} else {
grpc_error_handle parse_error = ParseCertificateProviders(&it->second);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
*error = GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing xds bootstrap file",
&error_list);
bool GrpcXdsBootstrap::GrpcXdsServer::IgnoreResourceDeletion() const {
return server_features_.find(std::string(
kServerFeatureIgnoreResourceDeletion)) != server_features_.end();
}
grpc_error_handle GrpcXdsBootstrap::ParseXdsServerList(
Json* json, std::vector<XdsServer>* servers) {
std::vector<grpc_error_handle> error_list;
for (size_t i = 0; i < json->mutable_array()->size(); ++i) {
Json& child = json->mutable_array()->at(i);
if (child.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("array element ", i, " is not an object")));
} else {
grpc_error_handle parse_error;
servers->emplace_back(XdsServerParse(child, &parse_error));
if (!GRPC_ERROR_IS_NONE(parse_error)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("errors parsing index ", i)));
error_list.push_back(parse_error);
}
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"xds_servers\" array",
&error_list);
bool GrpcXdsBootstrap::GrpcXdsServer::operator==(const XdsServer& other) const {
const auto& o = static_cast<const GrpcXdsServer&>(other);
return (server_uri_ == o.server_uri_ &&
channel_creds_.type == o.channel_creds_.type &&
channel_creds_.config == o.channel_creds_.config &&
server_features_ == o.server_features_);
}
grpc_error_handle GrpcXdsBootstrap::ParseAuthorities(Json* json) {
std::vector<grpc_error_handle> error_list;
for (auto& p : *(json->mutable_object())) {
if (p.second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
"field:authorities element error: element is not a object"));
continue;
}
grpc_error_handle parse_error = ParseAuthority(&p.second, p.first);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"authorities\"",
&error_list);
const JsonLoaderInterface* GrpcXdsBootstrap::GrpcXdsServer::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcXdsServer>()
.Field("server_uri", &GrpcXdsServer::server_uri_)
.Finish();
return loader;
}
grpc_error_handle GrpcXdsBootstrap::ParseAuthority(Json* json,
const std::string& name) {
std::vector<grpc_error_handle> error_list;
Authority authority;
auto it =
json->mutable_object()->find("client_listener_resource_name_template");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"client_listener_resource_name_template\" field is not a string"));
} else {
std::string expected_prefix = absl::StrCat("xdstp://", name, "/");
if (!absl::StartsWith(it->second.string_value(), expected_prefix)) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("\"client_listener_resource_name_template\" field "
"must begin with \"",
expected_prefix, "\"")));
} else {
authority.client_listener_resource_name_template =
std::move(*it->second.mutable_string_value());
void GrpcXdsBootstrap::GrpcXdsServer::JsonPostLoad(const Json& json,
const JsonArgs& args,
ErrorList* errors) {
// Parse "channel_creds".
auto channel_creds_list = LoadJsonObjectField<std::vector<ChannelCreds>>(
json.object_value(), args, "channel_creds", errors);
if (channel_creds_list.has_value()) {
ScopedField field(errors, ".channel_creds");
for (size_t i = 0; i < channel_creds_list->size(); ++i) {
ScopedField field(errors, absl::StrCat("[", i, "]"));
auto& creds = (*channel_creds_list)[i];
// Select the first channel creds type that we support.
if (channel_creds_.type.empty() &&
CoreConfiguration::Get().channel_creds_registry().IsSupported(
creds.type)) {
if (!CoreConfiguration::Get().channel_creds_registry().IsValidConfig(
creds.type, creds.config)) {
errors->AddError(absl::StrCat(
"invalid config for channel creds type \"", creds.type, "\""));
continue;
}
channel_creds_.type = std::move(creds.type);
channel_creds_.config = std::move(creds.config);
}
}
}
it = json->mutable_object()->find("xds_servers");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"xds_servers\" field is not an array"));
} else {
grpc_error_handle parse_error =
ParseXdsServerList(&it->second, &authority.xds_servers);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
if (channel_creds_.type.empty()) {
errors->AddError("no known creds type found");
}
}
if (error_list.empty()) {
authorities_[name] = std::move(authority);
// Parse "server_features".
{
ScopedField field(errors, ".server_features");
auto it = json.object_value().find("server_features");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::ARRAY) {
errors->AddError("is not an array");
} else {
const Json::Array& array = it->second.array_value();
for (const Json& feature_json : array) {
if (feature_json.type() == Json::Type::STRING &&
(feature_json.string_value() == kServerFeatureXdsV3 ||
feature_json.string_value() ==
kServerFeatureIgnoreResourceDeletion)) {
server_features_.insert(feature_json.string_value());
}
}
}
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("errors parsing authority ", name), &error_list);
}
grpc_error_handle GrpcXdsBootstrap::ParseNode(Json* json) {
std::vector<grpc_error_handle> error_list;
node_ = absl::make_unique<Node>();
auto it = json->mutable_object()->find("id");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("\"id\" field is not a string"));
} else {
node_->id = std::move(*it->second.mutable_string_value());
}
Json GrpcXdsBootstrap::GrpcXdsServer::ToJson() const {
Json::Object channel_creds_json{{"type", channel_creds_.type}};
if (!channel_creds_.config.empty()) {
channel_creds_json["config"] = channel_creds_.config;
}
it = json->mutable_object()->find("cluster");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"cluster\" field is not a string"));
} else {
node_->cluster = std::move(*it->second.mutable_string_value());
}
}
it = json->mutable_object()->find("locality");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"locality\" field is not an object"));
} else {
grpc_error_handle parse_error = ParseLocality(&it->second);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
it = json->mutable_object()->find("metadata");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"metadata\" field is not an object"));
} else {
node_->metadata = std::move(it->second);
Json::Object json{
{"server_uri", server_uri_},
{"channel_creds", Json::Array{std::move(channel_creds_json)}},
};
if (!server_features_.empty()) {
Json::Array server_features_json;
for (auto& feature : server_features_) {
server_features_json.emplace_back(feature);
}
json["server_features"] = std::move(server_features_json);
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"node\" object",
&error_list);
return json;
}
grpc_error_handle GrpcXdsBootstrap::ParseLocality(Json* json) {
std::vector<grpc_error_handle> error_list;
auto it = json->mutable_object()->find("region");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"region\" field is not a string"));
} else {
node_->locality_region = std::move(*it->second.mutable_string_value());
}
}
it = json->mutable_object()->find("zone");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"zone\" field is not a string"));
} else {
node_->locality_zone = std::move(*it->second.mutable_string_value());
}
//
// GrpcXdsBootstrap::GrpcAuthority
//
const JsonLoaderInterface* GrpcXdsBootstrap::GrpcAuthority::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcAuthority>()
.OptionalField(
"client_listener_resource_name_template",
&GrpcAuthority::client_listener_resource_name_template_)
.OptionalField("xds_servers", &GrpcAuthority::servers_)
.Finish();
return loader;
}
//
// GrpcXdsBootstrap
//
absl::StatusOr<std::unique_ptr<GrpcXdsBootstrap>> GrpcXdsBootstrap::Create(
absl::string_view json_string) {
auto json = Json::Parse(json_string);
if (!json.ok()) {
return absl::InvalidArgumentError(absl::StrCat(
"Failed to parse bootstrap JSON string: ", json.status().ToString()));
}
it = json->mutable_object()->find("sub_zone");
if (it != json->mutable_object()->end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"sub_zone\" field is not a string"));
} else {
node_->locality_sub_zone = std::move(*it->second.mutable_string_value());
// Validate JSON.
class XdsJsonArgs : public JsonArgs {
public:
bool IsEnabled(absl::string_view key) const override {
if (key == "federation") return XdsFederationEnabled();
return true;
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"locality\" object",
&error_list);
};
auto bootstrap = LoadFromJson<GrpcXdsBootstrap>(*json, XdsJsonArgs());
if (!bootstrap.ok()) return bootstrap.status();
return absl::make_unique<GrpcXdsBootstrap>(std::move(*bootstrap));
}
grpc_error_handle GrpcXdsBootstrap::ParseCertificateProviders(Json* json) {
std::vector<grpc_error_handle> error_list;
for (auto& certificate_provider : *(json->mutable_object())) {
if (certificate_provider.second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"element \"", certificate_provider.first, "\" is not an object")));
} else {
grpc_error_handle parse_error = ParseCertificateProvider(
certificate_provider.first, &certificate_provider.second);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR(
"errors parsing \"certificate_providers\" object", &error_list);
const JsonLoaderInterface* GrpcXdsBootstrap::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<GrpcXdsBootstrap>()
.Field("xds_servers", &GrpcXdsBootstrap::servers_)
.OptionalField("node", &GrpcXdsBootstrap::node_)
.OptionalField("certificate_providers",
&GrpcXdsBootstrap::certificate_providers_)
.OptionalField(
"server_listener_resource_name_template",
&GrpcXdsBootstrap::server_listener_resource_name_template_)
.OptionalField("authorities", &GrpcXdsBootstrap::authorities_,
"federation")
.OptionalField("client_default_listener_resource_name_template",
&GrpcXdsBootstrap::
client_default_listener_resource_name_template_,
"federation")
.Finish();
return loader;
}
grpc_error_handle GrpcXdsBootstrap::ParseCertificateProvider(
const std::string& instance_name, Json* certificate_provider_json) {
std::vector<grpc_error_handle> error_list;
auto it = certificate_provider_json->mutable_object()->find("plugin_name");
if (it == certificate_provider_json->mutable_object()->end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"plugin_name\" field not present"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"plugin_name\" field is not a string"));
} else {
std::string plugin_name = std::move(*(it->second.mutable_string_value()));
// Find config JSON.
absl::optional<Json> config_json;
it = certificate_provider_json->mutable_object()->find("config");
if (it != certificate_provider_json->mutable_object()->end()) {
if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"config\" field is not an object"));
} else {
config_json = it->second;
}
} else {
// "config" is an optional field, so default to an empty JSON object.
config_json = Json::Object();
}
// Try to instantiate the provider.
CertificateProviderFactory* factory =
CertificateProviderRegistry::LookupCertificateProviderFactory(
plugin_name);
if (factory == nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("Unrecognized plugin name: ", plugin_name)));
} else if (config_json.has_value()) {
grpc_error_handle parse_error = GRPC_ERROR_NONE;
RefCountedPtr<CertificateProviderFactory::Config> config =
factory->CreateCertificateProviderConfig(*config_json, &parse_error);
if (!GRPC_ERROR_IS_NONE(parse_error)) {
error_list.push_back(parse_error);
} else {
certificate_providers_.insert(
{instance_name, {std::move(plugin_name), std::move(config)}});
void GrpcXdsBootstrap::JsonPostLoad(const Json& /*json*/,
const JsonArgs& /*args*/,
ErrorList* errors) {
// Verify that each authority has the right prefix in the
// client_listener_resource_name_template field.
{
ScopedField field(errors, ".authorities");
for (const auto& p : authorities_) {
const std::string& name = p.first;
const GrpcAuthority& authority =
static_cast<const GrpcAuthority&>(p.second);
ScopedField field(
errors, absl::StrCat("[\"", name,
"\"].client_listener_resource_name_template"));
std::string expected_prefix = absl::StrCat("xdstp://", name, "/");
if (!authority.client_listener_resource_name_template().empty() &&
!absl::StartsWith(authority.client_listener_resource_name_template(),
expected_prefix)) {
errors->AddError(
absl::StrCat("field must begin with \"", expected_prefix, "\""));
}
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
absl::StrCat("errors parsing element \"", instance_name, "\""),
&error_list);
}
std::string GrpcXdsBootstrap::ToString() const {
std::vector<std::string> parts;
if (node_ != nullptr) {
parts.push_back(absl::StrFormat(
"node={\n"
" id=\"%s\",\n"
" cluster=\"%s\",\n"
" locality={\n"
" region=\"%s\",\n"
" zone=\"%s\",\n"
" sub_zone=\"%s\"\n"
" },\n"
" metadata=%s,\n"
"},\n",
node_->id, node_->cluster, node_->locality_region, node_->locality_zone,
node_->locality_sub_zone, node_->metadata.Dump()));
if (node_.has_value()) {
parts.push_back(
absl::StrFormat("node={\n"
" id=\"%s\",\n"
" cluster=\"%s\",\n"
" locality={\n"
" region=\"%s\",\n"
" zone=\"%s\",\n"
" sub_zone=\"%s\"\n"
" },\n"
" metadata=%s,\n"
"},\n",
node_->id(), node_->cluster(), node_->locality_region(),
node_->locality_zone(), node_->locality_sub_zone(),
Json{node_->metadata()}.Dump()));
}
parts.push_back(
absl::StrFormat("servers=[\n"
" {\n"
" uri=\"%s\",\n"
" creds_type=%s,\n",
server().server_uri, server().channel_creds_type));
if (server().channel_creds_config.type() != Json::Type::JSON_NULL) {
parts.push_back(absl::StrFormat(" creds_config=%s,",
server().channel_creds_config.Dump()));
}
if (!server().server_features.empty()) {
parts.push_back(absl::StrCat(" server_features=[",
absl::StrJoin(server().server_features, ", "),
"],\n"));
}
parts.push_back(" }\n],\n");
absl::StrFormat("servers=[\n%s\n],\n", servers_[0].ToJson().Dump()));
if (!client_default_listener_resource_name_template_.empty()) {
parts.push_back(absl::StrFormat(
"client_default_listener_resource_name_template=\"%s\",\n",
@ -456,14 +318,14 @@ std::string GrpcXdsBootstrap::ToString() const {
parts.push_back(absl::StrFormat(" %s={\n", entry.first));
parts.push_back(
absl::StrFormat(" client_listener_resource_name_template=\"%s\",\n",
entry.second.client_listener_resource_name_template));
parts.push_back(
absl::StrFormat(" servers=[\n"
" {\n"
" uri=\"%s\",\n"
" creds_type=%s,\n",
entry.second.xds_servers[0].server_uri,
entry.second.xds_servers[0].channel_creds_type));
entry.second.client_listener_resource_name_template()));
if (entry.second.server() != nullptr) {
parts.push_back(absl::StrFormat(
" servers=[\n%s\n],\n",
static_cast<const GrpcXdsServer*>(entry.second.server())
->ToJson()
.Dump()));
}
parts.push_back(" },\n");
}
parts.push_back("}\n");
@ -481,56 +343,28 @@ std::string GrpcXdsBootstrap::ToString() const {
return absl::StrJoin(parts, "");
}
XdsBootstrap::XdsServer GrpcXdsBootstrap::XdsServerParse(
const Json& json, grpc_error_handle* error) {
std::vector<grpc_error_handle> error_list;
XdsServer server;
ParseJsonObjectField(json.object_value(), "server_uri", &server.server_uri,
&error_list);
const Json::Array* creds_array = nullptr;
ParseJsonObjectField(json.object_value(), "channel_creds", &creds_array,
&error_list);
if (creds_array != nullptr) {
grpc_error_handle parse_error =
ParseChannelCredsArray(*creds_array, &server);
if (!GRPC_ERROR_IS_NONE(parse_error)) error_list.push_back(parse_error);
const XdsBootstrap::Authority* GrpcXdsBootstrap::LookupAuthority(
const std::string& name) const {
auto it = authorities_.find(name);
if (it != authorities_.end()) {
return &it->second;
}
const Json::Array* server_features_array = nullptr;
ParseJsonObjectField(json.object_value(), "server_features",
&server_features_array, &error_list, /*required=*/false);
if (server_features_array != nullptr) {
for (const Json& feature_json : *server_features_array) {
if (feature_json.type() == Json::Type::STRING &&
(feature_json.string_value() ==
XdsBootstrap::XdsServer::kServerFeatureXdsV3 ||
feature_json.string_value() ==
XdsBootstrap::XdsServer::kServerFeatureIgnoreResourceDeletion)) {
server.server_features.insert(feature_json.string_value());
}
}
}
*error = GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
"errors parsing xds server", &error_list);
return server;
return nullptr;
}
Json::Object GrpcXdsBootstrap::XdsServerToJson(const XdsServer& server) {
Json::Object channel_creds_json{{"type", server.channel_creds_type}};
if (server.channel_creds_config.type() != Json::Type::JSON_NULL) {
channel_creds_json["config"] = server.channel_creds_config;
}
Json::Object json{
{"server_uri", server.server_uri},
{"channel_creds", Json::Array{std::move(channel_creds_json)}},
};
if (!server.server_features.empty()) {
Json::Array server_features_json;
for (auto& feature : server.server_features) {
server_features_json.emplace_back(feature);
const XdsBootstrap::XdsServer* GrpcXdsBootstrap::FindXdsServer(
const XdsBootstrap::XdsServer& server) const {
if (static_cast<const GrpcXdsServer&>(server) == servers_[0]) {
return &servers_[0];
}
for (auto& p : authorities_) {
const auto* authority_server =
static_cast<const GrpcXdsServer*>(p.second.server());
if (authority_server != nullptr && *authority_server == server) {
return authority_server;
}
json["server_features"] = std::move(server_features_json);
}
return json;
return nullptr;
}
} // namespace grpc_core

@ -21,36 +21,121 @@
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/ext/xds/certificate_provider_store.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
namespace grpc_core {
class GrpcXdsBootstrap : public XdsBootstrap {
public:
class GrpcNode : public Node {
public:
const std::string& id() const override { return id_; }
const std::string& cluster() const override { return cluster_; }
const std::string& locality_region() const override {
return locality_.region;
}
const std::string& locality_zone() const override { return locality_.zone; }
const std::string& locality_sub_zone() const override {
return locality_.sub_zone;
}
const Json::Object& metadata() const override { return metadata_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
private:
struct Locality {
std::string region;
std::string zone;
std::string sub_zone;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
};
std::string id_;
std::string cluster_;
Locality locality_;
Json::Object metadata_;
};
class GrpcXdsServer : public XdsServer {
public:
const std::string& server_uri() const override { return server_uri_; }
bool ShouldUseV3() const override;
bool IgnoreResourceDeletion() const override;
bool operator==(const XdsServer& other) const override;
const std::string& channel_creds_type() const {
return channel_creds_.type;
}
const Json::Object& channel_creds_config() const {
return channel_creds_.config;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args,
ErrorList* errors);
Json ToJson() const;
private:
struct ChannelCreds {
std::string type;
Json::Object config;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
};
std::string server_uri_;
ChannelCreds channel_creds_;
std::set<std::string> server_features_;
};
class GrpcAuthority : public Authority {
public:
const XdsServer* server() const override {
return servers_.empty() ? nullptr : &servers_[0];
}
const std::string& client_listener_resource_name_template() const {
return client_listener_resource_name_template_;
}
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
private:
std::vector<GrpcXdsServer> servers_;
std::string client_listener_resource_name_template_;
};
// Creates bootstrap object from json_string.
// If *error is not GRPC_ERROR_NONE after returning, then there was an
// error parsing the contents.
static std::unique_ptr<GrpcXdsBootstrap> Create(absl::string_view json_string,
grpc_error_handle* error);
static absl::StatusOr<std::unique_ptr<GrpcXdsBootstrap>> Create(
absl::string_view json_string);
// Do not instantiate directly -- use Create() above instead.
GrpcXdsBootstrap(Json json, grpc_error_handle* error);
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args, ErrorList* errors);
std::string ToString() const override;
const XdsServer& server() const override { return servers_[0]; }
const Node* node() const override { return node_.get(); }
const std::map<std::string, Authority>& authorities() const override {
return authorities_;
const Node* node() const override {
return node_.has_value() ? &*node_ : nullptr;
}
const Authority* LookupAuthority(const std::string& name) const override;
const XdsServer* FindXdsServer(const XdsServer& server) const override;
const std::string& client_default_listener_resource_name_template() const {
return client_default_listener_resource_name_template_;
@ -63,25 +148,17 @@ class GrpcXdsBootstrap : public XdsBootstrap {
return certificate_providers_;
}
static XdsServer XdsServerParse(const Json& json, grpc_error_handle* error);
static Json::Object XdsServerToJson(const XdsServer& server);
// Exposed for testing purposes only.
const std::map<std::string, GrpcAuthority>& authorities() const {
return authorities_;
}
private:
grpc_error_handle ParseXdsServerList(Json* json,
std::vector<XdsServer>* servers);
grpc_error_handle ParseAuthorities(Json* json);
grpc_error_handle ParseAuthority(Json* json, const std::string& name);
grpc_error_handle ParseNode(Json* json);
grpc_error_handle ParseLocality(Json* json);
grpc_error_handle ParseCertificateProviders(Json* json);
grpc_error_handle ParseCertificateProvider(const std::string& instance_name,
Json* certificate_provider_json);
std::vector<XdsServer> servers_;
std::unique_ptr<Node> node_;
std::vector<GrpcXdsServer> servers_;
absl::optional<GrpcNode> node_;
std::string client_default_listener_resource_name_template_;
std::string server_listener_resource_name_template_;
std::map<std::string, Authority> authorities_;
std::map<std::string, GrpcAuthority> authorities_;
CertificateProviderStore::PluginDefinitionMap certificate_providers_;
};

@ -233,7 +233,7 @@ class XdsClient::ChannelState::AdsCallState
"[xds_client %p] xds server %s: timeout obtaining resource "
"{type=%s name=%s} from xds server",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
ads_calld_->chand()->server_.server_uri().c_str(),
std::string(type_->type_url()).c_str(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)
@ -426,7 +426,7 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
server_(server) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
xds_client_.get(), server.server_uri.c_str());
xds_client_.get(), server.server_uri().c_str());
}
absl::Status status;
transport_ = xds_client_->transport_factory_->Create(
@ -443,7 +443,7 @@ XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
XdsClient::ChannelState::~ChannelState() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
xds_client(), this, server_.server_uri.c_str());
xds_client(), this, server_.server_uri().c_str());
}
xds_client_.reset(DEBUG_LOCATION, "ChannelState");
}
@ -458,7 +458,7 @@ void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
// At this time, all strong refs are removed, remove from channel map to
// prevent subsequent subscription from trying to use this ChannelState as
// it is shutting down.
xds_client_->xds_server_channel_map_.erase(server_);
xds_client_->xds_server_channel_map_.erase(&server_);
ads_calld_.reset();
lrs_calld_.reset();
}
@ -486,7 +486,7 @@ void XdsClient::ChannelState::MaybeStartLrsCall() {
}
void XdsClient::ChannelState::StopLrsCallLocked() {
xds_client_->xds_load_report_server_map_.erase(server_);
xds_client_->xds_load_report_server_map_.erase(&server_);
lrs_calld_.reset();
}
@ -537,7 +537,7 @@ void XdsClient::ChannelState::OnConnectivityStateChangeLocked(
gpr_log(GPR_INFO,
"[xds_client %p] xds channel for server %s in "
"state TRANSIENT_FAILURE: %s",
xds_client(), server_.server_uri.c_str(),
xds_client(), server_.server_uri().c_str(),
status.ToString().c_str());
xds_client_->NotifyOnErrorLocked(absl::UnavailableError(
absl::StrCat("xds channel in TRANSIENT_FAILURE, connectivity error: ",
@ -592,7 +592,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: start new call from retryable "
"call %p",
chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
chand()->xds_client(), chand()->server_.server_uri().c_str(), this);
}
calld_ = MakeOrphanable<T>(
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
@ -608,7 +608,7 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: call attempt failed; "
"retry timer will fire in %" PRId64 "ms.",
chand()->xds_client(), chand()->server_.server_uri.c_str(),
chand()->xds_client(), chand()->server_.server_uri().c_str(),
timeout.millis());
}
timer_handle_ = GetDefaultEventEngine()->RunAfter(
@ -630,7 +630,8 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: retry timer fired (retryable "
"call: %p)",
chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
chand()->xds_client(), chand()->server_.server_uri().c_str(),
this);
}
StartNewCallLocked();
}
@ -648,7 +649,7 @@ absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
"[xds_client %p] xds server %s: received ADS response: type_url=%s, "
"version=%s, nonce=%s, num_resources=%" PRIuPTR,
ads_call_state_->xds_client(),
ads_call_state_->chand()->server_.server_uri.c_str(),
ads_call_state_->chand()->server_.server_uri().c_str(),
fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
fields.num_resources);
}
@ -771,7 +772,8 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
"[xds_client %p] xds server %s: server returned new version of "
"resource for which we previously ignored a deletion: type %s "
"name %s",
xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(),
xds_client(),
ads_call_state_->chand()->server_.server_uri().c_str(),
std::string(type_url).c_str(), std::string(resource_name).c_str());
resource_state.ignored_deletion = false;
}
@ -858,7 +860,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: starting ADS call "
"(calld: %p, call: %p)",
xds_client(), chand()->server_.server_uri.c_str(), this,
xds_client(), chand()->server_.server_uri().c_str(), this,
call_.get());
}
// If this is a reconnect, add any necessary subscriptions from what's
@ -908,7 +910,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: sending ADS request: type=%s "
"version=%s nonce=%s error=%s",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
std::string(type->type_url()).c_str(),
chand()->resource_type_version_map_[type].c_str(),
state.nonce.c_str(), state.status.ToString().c_str());
@ -981,7 +983,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: error parsing ADS response (%s) "
"-- ignoring",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
status.ToString().c_str());
} else {
seen_response_ = true;
@ -998,7 +1000,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
"[xds_client %p] xds server %s: ADS response invalid for "
"resource "
"type %s version %s, will NACK: nonce=%s status=%s",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
result.type_url.c_str(), result.version.c_str(),
state.nonce.c_str(), state.status.ToString().c_str());
}
@ -1033,7 +1035,7 @@ void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: ignoring deletion "
"for resource type %s name %s",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
result.type_url.c_str(),
XdsClient::ConstructFullXdsResourceName(
authority, result.type_url.c_str(), resource_key)
@ -1075,8 +1077,8 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: ADS call status received "
"(chand=%p, ads_calld=%p, call=%p): %s",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
call_.get(), status.ToString().c_str());
xds_client(), chand()->server_.server_uri().c_str(), chand(),
this, call_.get(), status.ToString().c_str());
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
@ -1085,7 +1087,7 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
// Send error to all watchers.
xds_client()->NotifyOnErrorLocked(absl::UnavailableError(absl::StrFormat(
"xDS call failed: xDS server: %s, ADS call status: %s",
chand()->server_.server_uri, status.ToString().c_str())));
chand()->server_.server_uri(), status.ToString().c_str())));
}
}
xds_client()->work_serializer_.DrainQueue();
@ -1177,7 +1179,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
auto it = xds_client()->xds_load_report_server_map_.find(
parent_->chand()->server_);
&parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.channel_state->StopLrsCallLocked();
@ -1203,8 +1205,8 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked() {
// we just ignore the completion and wait for the timer to fire.
if (timer_handle_.has_value()) return;
// If there are no more registered stats to report, cancel the call.
auto it =
xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);
auto it = xds_client()->xds_load_report_server_map_.find(
&parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end()) return;
if (it->second.load_report_map.empty()) {
if (it->second.channel_state != nullptr) {
@ -1247,7 +1249,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: starting LRS call (calld=%p, "
"call=%p)",
xds_client(), chand()->server_.server_uri.c_str(), this,
xds_client(), chand()->server_.server_uri().c_str(), this,
call_.get());
}
// Send the initial request.
@ -1311,7 +1313,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
if (!status.ok()) {
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: LRS response parsing failed: %s",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
status.ToString().c_str());
return;
}
@ -1322,7 +1324,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
"[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
"ms",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
new_cluster_names.size(), send_all_clusters,
new_load_reporting_interval.millis());
size_t i = 0;
@ -1339,7 +1341,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: increased load_report_interval "
"to minimum value %dms",
xds_client(), chand()->server_.server_uri.c_str(),
xds_client(), chand()->server_.server_uri().c_str(),
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
}
}
@ -1351,7 +1353,7 @@ void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: incoming LRS response identical "
"to current, ignoring.",
xds_client(), chand()->server_.server_uri.c_str());
xds_client(), chand()->server_.server_uri().c_str());
}
return;
}
@ -1372,7 +1374,7 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: LRS call status received "
"(chand=%p, calld=%p, call=%p): %s",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
xds_client(), chand()->server_.server_uri().c_str(), chand(), this,
call_.get(), status.ToString().c_str());
}
// Ignore status from a stale call.
@ -1407,6 +1409,7 @@ XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
GPR_ASSERT(bootstrap_ != nullptr);
}
XdsClient::~XdsClient() {
@ -1435,14 +1438,14 @@ void XdsClient::Orphan() {
RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server, const char* reason) {
auto it = xds_server_channel_map_.find(server);
auto it = xds_server_channel_map_.find(&server);
if (it != xds_server_channel_map_.end()) {
return it->second->Ref(DEBUG_LOCATION, reason);
}
// Channel not found, so create a new one.
auto channel_state = MakeRefCounted<ChannelState>(
WeakRef(DEBUG_LOCATION, "ChannelState"), server);
xds_server_channel_map_[server] = channel_state.get();
xds_server_channel_map_[&server] = channel_state.get();
return channel_state;
}
@ -1482,9 +1485,7 @@ void XdsClient::WatchResource(const XdsResourceType* type,
"\" not present in bootstrap config")));
return;
}
if (!authority->xds_servers.empty()) {
xds_server = &authority->xds_servers[0];
}
xds_server = authority->server();
}
if (xds_server == nullptr) xds_server = &bootstrap_->server();
{
@ -1634,7 +1635,8 @@ std::string XdsClient::ConstructFullXdsResourceName(
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name) {
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
const auto* server = bootstrap_->FindXdsServer(xds_server);
if (server == nullptr) return nullptr;
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
@ -1646,11 +1648,10 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime.
auto server_it =
xds_load_report_server_map_.emplace(xds_server, LoadReportServer())
.first;
xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(
xds_server, "load report map (drop stats)");
*server, "load report map (drop stats)");
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
@ -1665,7 +1666,7 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
load_report_state.drop_stats->GetSnapshotAndReset();
}
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), server_it->first,
Ref(DEBUG_LOCATION, "DropStats"), *server,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get();
@ -1680,8 +1681,10 @@ void XdsClient::RemoveClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
const auto* server = bootstrap_->FindXdsServer(xds_server);
if (server == nullptr) return;
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server);
auto server_it = xds_load_report_server_map_.find(server);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
@ -1700,7 +1703,8 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
const auto* server = bootstrap_->FindXdsServer(xds_server);
if (server == nullptr) return nullptr;
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
@ -1712,11 +1716,10 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime.
auto server_it =
xds_load_report_server_map_.emplace(xds_server, LoadReportServer())
.first;
xds_load_report_server_map_.emplace(server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(
xds_server, "load report map (locality stats)");
*server, "load report map (locality stats)");
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
@ -1733,7 +1736,7 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
locality_state.locality_stats->GetSnapshotAndReset();
}
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first,
Ref(DEBUG_LOCATION, "LocalityStats"), *server,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/,
std::move(locality));
@ -1750,8 +1753,10 @@ void XdsClient::RemoveClusterLocalityStats(
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
const auto* server = bootstrap_->FindXdsServer(xds_server);
if (server == nullptr) return;
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server);
auto server_it = xds_load_report_server_map_.find(server);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
@ -1780,8 +1785,8 @@ void XdsClient::NotifyOnErrorLocked(absl::Status status) {
const auto* node = bootstrap_->node();
if (node != nullptr) {
status = absl::Status(
status.code(), absl::StrCat(status.message(),
" (node ID:", bootstrap_->node()->id, ")"));
status.code(),
absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
}
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
for (const auto& a : authority_state_map_) { // authority
@ -1811,8 +1816,8 @@ void XdsClient::NotifyWatchersOnErrorLocked(
const auto* node = bootstrap_->node();
if (node != nullptr) {
status = absl::Status(
status.code(), absl::StrCat(status.message(),
" (node ID:", bootstrap_->node()->id, ")"));
status.code(),
absl::StrCat(status.message(), " (node ID:", node->id(), ")"));
}
work_serializer_.Schedule(
[watchers, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
@ -1842,7 +1847,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
}
XdsApi::ClusterLoadReportMap snapshot_map;
auto server_it = xds_load_report_server_map_.find(xds_server);
auto server_it = xds_load_report_server_map_.find(&xds_server);
if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
auto& load_report_map = server_it->second.load_report_map;
for (auto load_report_it = load_report_map.begin();

@ -76,9 +76,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
~XdsClient() override;
const XdsBootstrap& bootstrap() const {
// bootstrap_ is guaranteed to be non-null since XdsClient::GetOrCreate()
// would return a null object if bootstrap_ was null.
return *bootstrap_;
return *bootstrap_; // ctor asserts that it is non-null
}
XdsTransportFactory* transport_factory() const {
@ -207,7 +205,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
// The owning xds client.
WeakRefCountedPtr<XdsClient> xds_client_;
const XdsBootstrap::XdsServer& server_;
const XdsBootstrap::XdsServer& server_; // Owned by bootstrap.
OrphanablePtr<XdsTransportFactory::XdsTransport> transport_;
@ -311,14 +309,16 @@ class XdsClient : public DualRefCounted<XdsClient> {
v2_resource_types_ ABSL_GUARDED_BY(mu_);
upb::SymbolTable symtab_ ABSL_GUARDED_BY(mu_);
// Map of existing xDS server channels.
std::map<XdsBootstrap::XdsServer, ChannelState*> xds_server_channel_map_
ABSL_GUARDED_BY(mu_);
// Map of existing xDS server channels.
// Key is owned by the bootstrap config.
std::map<const XdsBootstrap::XdsServer*, ChannelState*>
xds_server_channel_map_ ABSL_GUARDED_BY(mu_);
std::map<std::string /*authority*/, AuthorityState> authority_state_map_
ABSL_GUARDED_BY(mu_);
std::map<XdsBootstrap::XdsServer, LoadReportServer>
// Key is owned by the bootstrap config.
std::map<const XdsBootstrap::XdsServer*, LoadReportServer>
xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
// Stores started watchers whose resource name was not parsed successfully,

@ -141,13 +141,11 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
absl::optional<absl::string_view> bootstrap_config = args.GetString(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
if (bootstrap_config.has_value()) {
grpc_error_handle error = GRPC_ERROR_NONE;
std::unique_ptr<GrpcXdsBootstrap> bootstrap =
GrpcXdsBootstrap::Create(*bootstrap_config, &error);
if (!GRPC_ERROR_IS_NONE(error)) return grpc_error_to_absl_status(error);
auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_config);
if (!bootstrap.ok()) return bootstrap.status();
grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
return MakeRefCounted<GrpcXdsClient>(std::move(bootstrap),
return MakeRefCounted<GrpcXdsClient>(std::move(*bootstrap),
ChannelArgs::FromC(xds_channel_args));
}
// Otherwise, use the global instance.
@ -164,18 +162,16 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
bootstrap_contents->c_str());
}
// Parse bootstrap.
grpc_error_handle error = GRPC_ERROR_NONE;
std::unique_ptr<GrpcXdsBootstrap> bootstrap =
GrpcXdsBootstrap::Create(*bootstrap_contents, &error);
if (!GRPC_ERROR_IS_NONE(error)) return grpc_error_to_absl_status(error);
auto bootstrap = GrpcXdsBootstrap::Create(*bootstrap_contents);
if (!bootstrap.ok()) return bootstrap.status();
// Instantiate XdsClient.
auto xds_client = MakeRefCounted<GrpcXdsClient>(
std::move(bootstrap), ChannelArgs::FromC(g_channel_args));
std::move(*bootstrap), ChannelArgs::FromC(g_channel_args));
g_xds_client = xds_client.get();
return xds_client;
}
GrpcXdsClient::GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
GrpcXdsClient::GrpcXdsClient(std::unique_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args)
: XdsClient(
std::move(bootstrap), MakeOrphanable<GrpcXdsTransportFactory>(args),

@ -27,7 +27,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/ext/xds/certificate_provider_store.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
@ -44,7 +44,7 @@ class GrpcXdsClient : public XdsClient {
const ChannelArgs& args, const char* reason);
// Do not instantiate directly -- use GetOrCreate() instead.
GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
GrpcXdsClient(std::unique_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args);
~GrpcXdsClient() override;

@ -53,7 +53,7 @@ XdsClusterDropStats::XdsClusterDropStats(
eds_service_name_(eds_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, lrs_server_.server_uri.c_str(),
xds_client_.get(), this, lrs_server_.server_uri().c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str());
}
@ -63,7 +63,7 @@ XdsClusterDropStats::~XdsClusterDropStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] destroying drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, lrs_server_.server_uri.c_str(),
xds_client_.get(), this, lrs_server_.server_uri().c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str());
}
@ -108,7 +108,7 @@ XdsClusterLocalityStats::XdsClusterLocalityStats(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] created locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, lrs_server_.server_uri.c_str(),
xds_client_.get(), this, lrs_server_.server_uri().c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());
@ -119,7 +119,7 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, lrs_server_.server_uri.c_str(),
xds_client_.get(), this, lrs_server_.server_uri().c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());

@ -25,7 +25,6 @@
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "envoy/config/cluster/v3/circuit_breaker.upb.h"
#include "envoy/config/cluster/v3/cluster.upb.h"
@ -65,27 +64,26 @@ std::string XdsClusterResource::ToString() const {
case EDS:
contents.push_back("cluster_type=EDS");
if (!eds_service_name.empty()) {
contents.push_back(
absl::StrFormat("eds_service_name=%s", eds_service_name));
contents.push_back(absl::StrCat("eds_service_name=", eds_service_name));
}
break;
case LOGICAL_DNS:
contents.push_back("cluster_type=LOGICAL_DNS");
contents.push_back(absl::StrFormat("dns_hostname=%s", dns_hostname));
contents.push_back(absl::StrCat("dns_hostname=", dns_hostname));
break;
case AGGREGATE:
contents.push_back("cluster_type=AGGREGATE");
contents.push_back(
absl::StrFormat("prioritized_cluster_names=[%s]",
absl::StrJoin(prioritized_cluster_names, ", ")));
absl::StrCat("prioritized_cluster_names=[",
absl::StrJoin(prioritized_cluster_names, ", "), "]"));
}
if (!common_tls_context.Empty()) {
contents.push_back(absl::StrFormat("common_tls_context=%s",
common_tls_context.ToString()));
contents.push_back(
absl::StrCat("common_tls_context=", common_tls_context.ToString()));
}
if (lrs_load_reporting_server.has_value()) {
contents.push_back(absl::StrFormat("lrs_load_reporting_server_name=%s",
lrs_load_reporting_server->server_uri));
contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
lrs_load_reporting_server->server_uri()));
}
contents.push_back(absl::StrCat("lb_policy=", lb_policy));
if (lb_policy == "RING_HASH") {
@ -93,7 +91,7 @@ std::string XdsClusterResource::ToString() const {
contents.push_back(absl::StrCat("max_ring_size=", max_ring_size));
}
contents.push_back(
absl::StrFormat("max_concurrent_requests=%d", max_concurrent_requests));
absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
@ -353,7 +351,8 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) {
errors.emplace_back("LRS ConfigSource is not self.");
}
cds_update.lrs_load_reporting_server.emplace(context.server);
cds_update.lrs_load_reporting_server.emplace(
static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server));
}
// The Cluster resource encodes the circuit breaking parameters in a list of
// Thresholds messages, where each message specifies the parameters for a

@ -35,7 +35,7 @@
#include "upb/def.h"
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/ext/xds/xds_resource_type_impl.h"
@ -61,7 +61,7 @@ struct XdsClusterResource {
// The LRS server to use for load reporting.
// If not set, load reporting will be disabled.
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server;
// The LB policy to use (e.g., "ROUND_ROBIN" or "RING_HASH").
std::string lb_policy;

@ -34,6 +34,7 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
@ -247,11 +248,11 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
namespace {
grpc_channel* CreateXdsChannel(const ChannelArgs& args,
const XdsBootstrap::XdsServer& server) {
const GrpcXdsBootstrap::GrpcXdsServer& server) {
RefCountedPtr<grpc_channel_credentials> channel_creds =
CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
server.channel_creds_type, server.channel_creds_config);
return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(),
server.channel_creds_type(), server.channel_creds_config());
return grpc_channel_create(server.server_uri().c_str(), channel_creds.get(),
args.ToC().get());
}
@ -268,7 +269,9 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
std::function<void(absl::Status)> on_connectivity_failure,
absl::Status* status)
: factory_(factory) {
channel_ = CreateXdsChannel(factory->args_, server);
channel_ = CreateXdsChannel(
factory->args_,
static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(server));
GPR_ASSERT(channel_ != nullptr);
if (IsLameChannel(channel_)) {
*status = absl::UnavailableError("xds client has a lame channel");

@ -112,46 +112,49 @@ TEST(XdsBootstrapTest, Basic) {
" \"server_listener_resource_name_template\": \"example/resource\","
" \"ignore\": {}"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
EXPECT_EQ(bootstrap.server().server_uri, "fake:///lb");
EXPECT_EQ(bootstrap.server().channel_creds_type, "fake");
EXPECT_EQ(bootstrap.server().channel_creds_config.type(),
Json::Type::JSON_NULL);
EXPECT_EQ(bootstrap.authorities().size(), 2);
const XdsBootstrap::Authority* authority1 =
bootstrap.LookupAuthority("xds.example.com");
ASSERT_NE(authority1, nullptr);
EXPECT_EQ(authority1->client_listener_resource_name_template,
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
auto* server =
&static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(bootstrap->server());
EXPECT_EQ(server->server_uri(), "fake:///lb");
EXPECT_EQ(server->channel_creds_type(), "fake");
EXPECT_TRUE(server->channel_creds_config().empty())
<< Json{server->channel_creds_config()}.Dump();
EXPECT_EQ(bootstrap->authorities().size(), 2);
auto* authority = static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
bootstrap->LookupAuthority("xds.example.com"));
ASSERT_NE(authority, nullptr);
EXPECT_EQ(authority->client_listener_resource_name_template(),
"xdstp://xds.example.com/envoy.config.listener.v3.Listener/grpc/"
"server/%s");
EXPECT_EQ(authority1->xds_servers.size(), 1);
EXPECT_EQ(authority1->xds_servers[0].server_uri, "fake:///xds_server");
EXPECT_EQ(authority1->xds_servers[0].channel_creds_type, "fake");
EXPECT_EQ(authority1->xds_servers[0].channel_creds_config.type(),
Json::Type::JSON_NULL);
const XdsBootstrap::Authority* authority2 =
bootstrap.LookupAuthority("xds.example2.com");
ASSERT_NE(authority2, nullptr);
EXPECT_EQ(authority2->client_listener_resource_name_template,
server =
static_cast<const GrpcXdsBootstrap::GrpcXdsServer*>(authority->server());
ASSERT_NE(server, nullptr);
EXPECT_EQ(server->server_uri(), "fake:///xds_server");
EXPECT_EQ(server->channel_creds_type(), "fake");
EXPECT_TRUE(server->channel_creds_config().empty())
<< Json{server->channel_creds_config()}.Dump();
authority = static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
bootstrap->LookupAuthority("xds.example2.com"));
ASSERT_NE(authority, nullptr);
EXPECT_EQ(authority->client_listener_resource_name_template(),
"xdstp://xds.example2.com/envoy.config.listener.v3.Listener/grpc/"
"server/%s");
EXPECT_EQ(authority2->xds_servers.size(), 1);
EXPECT_EQ(authority2->xds_servers[0].server_uri, "fake:///xds_server2");
EXPECT_EQ(authority2->xds_servers[0].channel_creds_type, "fake");
EXPECT_EQ(authority2->xds_servers[0].channel_creds_config.type(),
Json::Type::JSON_NULL);
ASSERT_NE(bootstrap.node(), nullptr);
EXPECT_EQ(bootstrap.node()->id, "foo");
EXPECT_EQ(bootstrap.node()->cluster, "bar");
EXPECT_EQ(bootstrap.node()->locality_region, "milky_way");
EXPECT_EQ(bootstrap.node()->locality_zone, "sol_system");
EXPECT_EQ(bootstrap.node()->locality_sub_zone, "earth");
ASSERT_EQ(bootstrap.node()->metadata.type(), Json::Type::OBJECT);
EXPECT_THAT(bootstrap.node()->metadata.object_value(),
server =
static_cast<const GrpcXdsBootstrap::GrpcXdsServer*>(authority->server());
ASSERT_NE(server, nullptr);
EXPECT_EQ(server->server_uri(), "fake:///xds_server2");
EXPECT_EQ(server->channel_creds_type(), "fake");
EXPECT_TRUE(server->channel_creds_config().empty())
<< Json{server->channel_creds_config()}.Dump();
ASSERT_NE(bootstrap->node(), nullptr);
EXPECT_EQ(bootstrap->node()->id(), "foo");
EXPECT_EQ(bootstrap->node()->cluster(), "bar");
EXPECT_EQ(bootstrap->node()->locality_region(), "milky_way");
EXPECT_EQ(bootstrap->node()->locality_zone(), "sol_system");
EXPECT_EQ(bootstrap->node()->locality_sub_zone(), "earth");
EXPECT_THAT(bootstrap->node()->metadata(),
::testing::ElementsAre(
::testing::Pair(
::testing::Eq("bar"),
@ -163,7 +166,7 @@ TEST(XdsBootstrapTest, Basic) {
::testing::AllOf(
::testing::Property(&Json::type, Json::Type::NUMBER),
::testing::Property(&Json::string_value, "1")))));
EXPECT_EQ(bootstrap.server_listener_resource_name_template(),
EXPECT_EQ(bootstrap->server_listener_resource_name_template(),
"example/resource");
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}
@ -178,14 +181,14 @@ TEST(XdsBootstrapTest, ValidWithoutNode) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
EXPECT_EQ(bootstrap.server().server_uri, "fake:///lb");
EXPECT_EQ(bootstrap.server().channel_creds_type, "fake");
EXPECT_EQ(bootstrap.node(), nullptr);
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
auto* server =
&static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(bootstrap->server());
EXPECT_EQ(server->server_uri(), "fake:///lb");
EXPECT_EQ(server->channel_creds_type(), "fake");
EXPECT_EQ(bootstrap->node(), nullptr);
}
TEST(XdsBootstrapTest, InsecureCreds) {
@ -198,14 +201,14 @@ TEST(XdsBootstrapTest, InsecureCreds) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
EXPECT_EQ(bootstrap.server().server_uri, "fake:///lb");
EXPECT_EQ(bootstrap.server().channel_creds_type, "insecure");
EXPECT_EQ(bootstrap.node(), nullptr);
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
auto* server =
&static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(bootstrap->server());
EXPECT_EQ(server->server_uri(), "fake:///lb");
EXPECT_EQ(server->channel_creds_type(), "insecure");
EXPECT_EQ(bootstrap->node(), nullptr);
}
TEST(XdsBootstrapTest, GoogleDefaultCreds) {
@ -234,14 +237,14 @@ TEST(XdsBootstrapTest, GoogleDefaultCreds) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
EXPECT_EQ(bootstrap.server().server_uri, "fake:///lb");
EXPECT_EQ(bootstrap.server().channel_creds_type, "google_default");
EXPECT_EQ(bootstrap.node(), nullptr);
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
auto* server =
&static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(bootstrap->server());
EXPECT_EQ(server->server_uri(), "fake:///lb");
EXPECT_EQ(server->channel_creds_type(), "google_default");
EXPECT_EQ(bootstrap->node(), nullptr);
}
TEST(XdsBootstrapTest, MissingChannelCreds) {
@ -253,14 +256,11 @@ TEST(XdsBootstrapTest, MissingChannelCreds) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(
grpc_error_std_string(error),
::testing::ContainsRegex("field:channel_creds error:does not exist."));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:xds_servers[0].channel_creds error:field not present]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, NoKnownChannelCreds) {
@ -273,24 +273,20 @@ TEST(XdsBootstrapTest, NoKnownChannelCreds) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"no known creds type found in \"channel_creds\""));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:xds_servers[0].channel_creds "
"error:no known creds type found]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, MissingXdsServers) {
auto json = Json::Parse("{}");
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex("\"xds_servers\" field not present"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create("{}");
EXPECT_EQ(
bootstrap.status().message(),
"errors validating JSON: [field:xds_servers error:field not present]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, TopFieldsWrongTypes) {
@ -301,37 +297,28 @@ TEST(XdsBootstrapTest, TopFieldsWrongTypes) {
" \"server_listener_resource_name_template\":1,"
" \"certificate_providers\":1"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex("\"xds_servers\" field is not an array.*"
"\"node\" field is not an object.*"
"\"server_listener_resource_name_"
"template\" field is not a string.*"));
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"\"certificate_providers\" field is not an object"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(
bootstrap.status().message(),
"errors validating JSON: ["
"field:certificate_providers error:is not an object; "
"field:node error:is not an object; "
"field:server_listener_resource_name_template error:is not a string; "
"field:xds_servers error:is not an array]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, XdsServerMissingServerUri) {
TEST(XdsBootstrapTest, XdsServerMissingFields) {
const char* json_str =
"{"
" \"xds_servers\":[{}]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(
grpc_error_std_string(error),
::testing::ContainsRegex("errors parsing \"xds_servers\" array.*"
"errors parsing index 0.*"
"errors parsing xds server.*"
"field:server_uri error:does not exist."));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:xds_servers[0].channel_creds error:field not present; "
"field:xds_servers[0].server_uri error:field not present]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, XdsServerUriAndCredsWrongTypes) {
@ -344,18 +331,12 @@ TEST(XdsBootstrapTest, XdsServerUriAndCredsWrongTypes) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"xds_servers\" array.*"
"errors parsing index 0.*"
"errors parsing xds server.*"
"field:server_uri error:type should be STRING.*"
"field:channel_creds error:type should be ARRAY"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:xds_servers[0].channel_creds error:is not an array; "
"field:xds_servers[0].server_uri error:is not a string]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, ChannelCredsFieldsWrongTypes) {
@ -373,20 +354,13 @@ TEST(XdsBootstrapTest, ChannelCredsFieldsWrongTypes) {
" }"
" ]"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(
grpc_error_std_string(error),
::testing::ContainsRegex("errors parsing \"xds_servers\" array.*"
"errors parsing index 0.*"
"errors parsing xds server.*"
"errors parsing \"channel_creds\" array.*"
"errors parsing index 0.*"
"field:type error:type should be STRING.*"
"field:config error:type should be OBJECT"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(
bootstrap.status().message(),
"errors validating JSON: ["
"field:xds_servers[0].channel_creds[0].config error:is not an object; "
"field:xds_servers[0].channel_creds[0].type error:is not a string]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, NodeFieldsWrongTypes) {
@ -399,17 +373,15 @@ TEST(XdsBootstrapTest, NodeFieldsWrongTypes) {
" \"metadata\":0"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex("errors parsing \"node\" object.*"
"\"id\" field is not a string.*"
"\"cluster\" field is not a string.*"
"\"locality\" field is not an object.*"
"\"metadata\" field is not an object"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:node.cluster error:is not a string; "
"field:node.id error:is not a string; "
"field:node.locality error:is not an object; "
"field:node.metadata error:is not an object; "
"field:xds_servers error:field not present]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, LocalityFieldsWrongType) {
@ -423,17 +395,14 @@ TEST(XdsBootstrapTest, LocalityFieldsWrongType) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex("errors parsing \"node\" object.*"
"errors parsing \"locality\" object.*"
"\"region\" field is not a string.*"
"\"zone\" field is not a string.*"
"\"sub_zone\" field is not a string"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:node.locality.region error:is not a string; "
"field:node.locality.sub_zone error:is not a string; "
"field:node.locality.zone error:is not a string; "
"field:xds_servers error:field not present]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, CertificateProvidersElementWrongType) {
@ -449,15 +418,11 @@ TEST(XdsBootstrapTest, CertificateProvidersElementWrongType) {
" \"plugin\":1"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"certificate_providers\" object.*"
"element \"plugin\" is not an object"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:certificate_providers[\"plugin\"] error:is not an object]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, CertificateProvidersPluginNameWrongType) {
@ -475,16 +440,12 @@ TEST(XdsBootstrapTest, CertificateProvidersPluginNameWrongType) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"certificate_providers\" object.*"
"errors parsing element \"plugin\".*"
"\"plugin_name\" field is not a string"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:certificate_providers[\"plugin\"].plugin_name error:"
"is not a string]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, CertificateProvidersUnrecognizedPluginName) {
@ -502,16 +463,12 @@ TEST(XdsBootstrapTest, CertificateProvidersUnrecognizedPluginName) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"certificate_providers\" object.*"
"errors parsing element \"plugin\".*"
"Unrecognized plugin name: unknown"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:certificate_providers[\"plugin\"].plugin_name error:"
"Unrecognized plugin name: unknown]")
<< bootstrap.status();
}
TEST(XdsBootstrapTest, AuthorityXdsServerInvalidResourceTemplate) {
@ -543,16 +500,13 @@ TEST(XdsBootstrapTest, AuthorityXdsServerInvalidResourceTemplate) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"authorities\".*"
"errors parsing authority xds.example.com.*"
"field must begin with \"xdstp://xds.example.com/\""));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(bootstrap.status().message(),
"errors validating JSON: ["
"field:authorities[\"xds.example.com\"]"
".client_listener_resource_name_template error:"
"field must begin with \"xdstp://xds.example.com/\"]")
<< bootstrap.status();
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}
@ -575,19 +529,15 @@ TEST(XdsBootstrapTest, AuthorityXdsServerMissingServerUri) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(
grpc_error_std_string(error),
::testing::ContainsRegex("errors parsing \"authorities\".*"
"errors parsing authority xds.example.com.*"
"errors parsing \"xds_servers\" array.*"
"errors parsing index 0.*"
"errors parsing xds server.*"
"field:server_uri error:does not exist."));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_EQ(
bootstrap.status().message(),
"errors validating JSON: ["
"field:authorities[\"xds.example.com\"].xds_servers[0].channel_creds "
"error:field not present; "
"field:authorities[\"xds.example.com\"].xds_servers[0].server_uri "
"error:field not present]")
<< bootstrap.status();
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}
@ -658,16 +608,16 @@ TEST(XdsBootstrapTest, CertificateProvidersFakePluginParsingError) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
EXPECT_THAT(grpc_error_std_string(error),
::testing::ContainsRegex(
"errors parsing \"certificate_providers\" object.*"
"errors parsing element \"fake_plugin\".*"
"field:config field:value not of type number"));
GRPC_ERROR_UNREF(error);
auto bootstrap = GrpcXdsBootstrap::Create(json_str);
EXPECT_THAT(
// Explicit conversion to std::string to work around
// https://github.com/google/googletest/issues/3949.
std::string(bootstrap.status().message()),
::testing::MatchesRegex(
"errors validating JSON: \\["
"field:certificate_providers\\[\"fake_plugin\"\\].config "
"error:UNKNOWN:field:config field:value not of type number.*\\]"))
<< bootstrap.status();
}
TEST(XdsBootstrapTest, CertificateProvidersFakePluginParsingSuccess) {
@ -688,13 +638,11 @@ TEST(XdsBootstrapTest, CertificateProvidersFakePluginParsingSuccess) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)) << grpc_error_std_string(error);
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
const CertificateProviderStore::PluginDefinition& fake_plugin =
bootstrap.certificate_providers().at("fake_plugin");
bootstrap->certificate_providers().at("fake_plugin");
ASSERT_EQ(fake_plugin.plugin_name, "fake");
ASSERT_STREQ(fake_plugin.config->name(), "fake");
ASSERT_EQ(static_cast<RefCountedPtr<FakeCertificateProviderFactory::Config>>(
@ -718,13 +666,11 @@ TEST(XdsBootstrapTest, CertificateProvidersFakePluginEmptyConfig) {
" }"
" }"
"}";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
GrpcXdsBootstrap bootstrap(std::move(*json), &error);
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)) << grpc_error_std_string(error);
auto bootstrap_or = GrpcXdsBootstrap::Create(json_str);
ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status();
auto bootstrap = std::move(*bootstrap_or);
const CertificateProviderStore::PluginDefinition& fake_plugin =
bootstrap.certificate_providers().at("fake_plugin");
bootstrap->certificate_providers().at("fake_plugin");
ASSERT_EQ(fake_plugin.plugin_name, "fake");
ASSERT_STREQ(fake_plugin.config->name(), "fake");
ASSERT_EQ(static_cast<RefCountedPtr<FakeCertificateProviderFactory::Config>>(
@ -748,14 +694,13 @@ TEST(XdsBootstrapTest, XdsServerToJsonAndParse) {
" }";
auto json = Json::Parse(json_str);
ASSERT_TRUE(json.ok()) << json.status();
grpc_error_handle error = GRPC_ERROR_NONE;
XdsBootstrap::XdsServer xds_server =
GrpcXdsBootstrap::XdsServerParse(*json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
Json::Object output = GrpcXdsBootstrap::XdsServerToJson(xds_server);
XdsBootstrap::XdsServer output_xds_server =
GrpcXdsBootstrap::XdsServerParse(output, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_std_string(error);
auto xds_server = LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(*json);
ASSERT_TRUE(xds_server.ok()) << xds_server.status();
Json output = xds_server->ToJson();
auto output_xds_server =
LoadFromJson<GrpcXdsBootstrap::GrpcXdsServer>(output);
ASSERT_TRUE(output_xds_server.ok()) << output_xds_server.status();
EXPECT_EQ(*xds_server, *output_xds_server);
gpr_unsetenv("GRPC_EXPERIMENTAL_XDS_FEDERATION");
}

@ -28,6 +28,7 @@
#include <grpc/grpc.h>
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
@ -58,7 +59,8 @@ absl::StatusOr<Json::Array> ConvertXdsPolicy(LoadBalancingPolicyProto policy) {
std::string serialized_policy = policy.SerializeAsString();
upb::Arena arena;
upb::SymbolTable symtab;
XdsResourceType::DecodeContext context = {nullptr, XdsBootstrap::XdsServer(),
XdsResourceType::DecodeContext context = {nullptr,
GrpcXdsBootstrap::GrpcXdsServer(),
nullptr, symtab.ptr(), arena.ptr()};
auto* upb_policy = envoy_config_cluster_v3_LoadBalancingPolicy_parse(
serialized_policy.data(), serialized_policy.size(), arena.ptr());

Loading…
Cancel
Save