add test-only channel arg to set per-channel xDS bootstrap config (#25936)

* pass XdsClient to LB policies via channel args

* add channel arg for overriding bootstrap config on a per-channel basis

* change tests to use new channel arg approach -- currently failing for server-side tests

* use the same channel args approach on the server side

* clang-format

* fix CircuitBreakingMultipleChannelsShareCallCounter test

* fix XdsEnabledServerTest test

* add TODO

* clang-format

* generate_projects

* fix clang-tidy

* fix build

* attempt to fix python
pull/26010/head^2
Mark D. Roth 4 years ago committed by GitHub
parent 16684bf11b
commit feff79abc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpc/grpc.h
  2. 11
      include/grpc/impl/codegen/grpc_types.h
  3. 3
      include/grpcpp/server_builder.h
  4. 14
      include/grpcpp/xds_server_builder.h
  5. 11
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  6. 14
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  7. 103
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  8. 46
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  9. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  10. 11
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  11. 179
      src/core/ext/xds/xds_bootstrap.cc
  12. 18
      src/core/ext/xds/xds_bootstrap.h
  13. 7
      src/core/ext/xds/xds_channel_args.h
  14. 171
      src/core/ext/xds/xds_client.cc
  15. 15
      src/core/ext/xds/xds_client.h
  16. 4
      src/core/ext/xds/xds_server_config_fetcher.cc
  17. 11
      src/cpp/server/server_builder.cc
  18. 3
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  19. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  20. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  21. 372
      test/cpp/end2end/xds_end2end_test.cc

@ -424,7 +424,7 @@ typedef struct grpc_server_config_fetcher grpc_server_config_fetcher;
/** EXPERIMENTAL. Creates an xDS config fetcher. */
GRPCAPI grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier);
grpc_server_xds_status_notifier notifier, const grpc_channel_args* args);
/** EXPERIMENTAL. Destroys a config fetcher. */
GRPCAPI void grpc_server_config_fetcher_destroy(

@ -353,6 +353,17 @@ typedef struct {
/* Timeout in milliseconds to use for calls to the grpclb load balancer.
If 0 or unset, the balancer calls will have no deadline. */
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
/* Specifies the xDS bootstrap config as a JSON string.
FOR TESTING PURPOSES ONLY -- DO NOT USE IN PRODUCTION.
This option allows controlling the bootstrap configuration on a
per-channel basis, which is useful in tests. However, this results
in having a separate xDS client instance per channel rather than
using the global instance, which is not the intended way to use xDS.
Currently, this will (a) add unnecessary load on the xDS server and
(b) break use of CSDS, and there may be additional side effects in
the future. */
#define GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG \
"grpc.TEST_ONLY_DO_NOT_USE_IN_PROD.xds_bootstrap_config"
/* Timeout in milliseconds to wait for the serverlist from the grpclb load
balancer before using fallback backend addresses from the resolver.
If 0, enter fallback mode immediately. Default value is 10000. */

@ -357,6 +357,9 @@ class ServerBuilder {
server_config_fetcher_ = server_config_fetcher;
}
/// Experimental API, subject to change.
virtual ChannelArguments BuildChannelArgs();
private:
friend class ::grpc::testing::ServerBuilderPluginTest;

@ -47,15 +47,17 @@ class XdsServerBuilder : public ::grpc::ServerBuilder {
notifier_ = notifier;
}
std::unique_ptr<Server> BuildAndStart() override {
private:
// Called at the beginning of BuildAndStart().
ChannelArguments BuildChannelArgs() override {
ChannelArguments args = ServerBuilder::BuildChannelArgs();
grpc_channel_args c_channel_args = args.c_channel_args();
grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create(
{OnServingStatusChange, notifier_});
if (fetcher == nullptr) return nullptr;
set_fetcher(fetcher);
return ServerBuilder::BuildAndStart();
{OnServingStatusChange, notifier_}, &c_channel_args);
if (fetcher != nullptr) set_fetcher(fetcher);
return args;
}
private:
static void OnServingStatusChange(void* user_data, const char* uri,
grpc_status_code code,
const char* error_message) {

@ -693,13 +693,12 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
"cannot get XdsClient to instantiate cds LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
"XdsClient not present in channel args -- cannot instantiate "
"cds LB policy");
return nullptr;
}
return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));

@ -596,14 +596,12 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(
GPR_ERROR,
"cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"xds_cluster_impl LB policy");
return nullptr;
}
return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),

@ -101,7 +101,8 @@ class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
// Xds Cluster Resolver LB policy.
class XdsClusterResolverLb : public LoadBalancingPolicy {
public:
XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args);
XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args,
std::string server_name, bool is_xds_uri);
const char* name() const override { return kXdsClusterResolver; }
@ -317,6 +318,9 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in);
// The xds client and endpoint watcher.
RefCountedPtr<XdsClient> xds_client_;
// Server name from target URI.
std::string server_name_;
bool is_xds_uri_;
@ -328,9 +332,6 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// Internal state.
bool shutting_down_ = false;
// The xds client and endpoint watcher.
RefCountedPtr<XdsClient> xds_client_;
// Vector of discovery mechansism entries in priority order.
std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
@ -550,26 +551,17 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
//
XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_resolver_lb %p] created -- using xds client %p", this,
xds_client_.get());
}
// Record server name.
const char* server_uri =
grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(server_uri != nullptr);
absl::StatusOr<URI> uri = URI::Parse(server_uri);
GPR_ASSERT(uri.ok() && !uri->path().empty());
server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
is_xds_uri_ = uri->scheme() == "xds";
Args args, std::string server_name,
bool is_xds_uri)
: LoadBalancingPolicy(std::move(args)),
xds_client_(std::move(xds_client)),
server_name_(std::move(server_name)),
is_xds_uri_(is_xds_uri) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_resolver_lb %p] server name from channel "
"(is_xds_uri=%d): %s",
this, is_xds_uri_, server_name_.c_str());
"[xds_cluster_resolver_lb %p] created -- xds_client=%p, "
"server_name=%s, is_xds_uri=%d",
this, xds_client_.get(), server_name_.c_str(), is_xds_uri_);
}
// EDS-only flow.
if (!is_xds_uri_) {
@ -1054,10 +1046,14 @@ void XdsClusterResolverLb::UpdateChildPolicyLocked() {
grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
const grpc_channel_args* args) {
// Inhibit client-side health checking, since the balancer does this for us.
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
return grpc_channel_args_copy_and_add(args, &new_arg, 1);
absl::InlinedVector<grpc_arg, 2> new_args = {
// Inhibit client-side health checking, since the balancer does this
// for us.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
};
if (!is_xds_uri_) new_args.push_back(xds_client_->MakeChannelArg());
return grpc_channel_args_copy_and_add(args, new_args.data(), new_args.size());
}
OrphanablePtr<LoadBalancingPolicy>
@ -1096,18 +1092,39 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"cannot get XdsClient to instantiate xds_cluster_resolver LB "
"policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
// Find server name.
const char* server_uri =
grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(server_uri != nullptr);
absl::StatusOr<URI> uri = URI::Parse(server_uri);
GPR_ASSERT(uri.ok() && !uri->path().empty());
absl::string_view server_name = absl::StripPrefix(uri->path(), "/");
// Determine if it's an xds URI.
bool is_xds_uri = uri->scheme() == "xds";
// Get XdsClient.
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
if (!is_xds_uri) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client = XdsClient::GetOrCreate(args.args, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"cannot get or create XdsClient to instantiate "
"xds_cluster_resolver LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
}
} else {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"xds_cluster_resolver LB policy");
return nullptr;
}
}
return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client),
std::move(args));
return MakeOrphanable<XdsClusterResolverChildHandler>(
std::move(xds_client), std::move(args), server_name, is_xds_uri);
}
const char* name() const override { return kXdsClusterResolver; }
@ -1339,10 +1356,13 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
class XdsClusterResolverChildHandler : public ChildPolicyHandler {
public:
XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
Args args)
Args args, absl::string_view server_name,
bool is_xds_uri)
: ChildPolicyHandler(std::move(args),
&grpc_lb_xds_cluster_resolver_trace),
xds_client_(std::move(xds_client)) {}
xds_client_(std::move(xds_client)),
server_name_(server_name),
is_xds_uri_(is_xds_uri) {}
bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
@ -1359,11 +1379,14 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* /*name*/, LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args));
return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args),
server_name_, is_xds_uri_);
}
private:
RefCountedPtr<XdsClient> xds_client_;
std::string server_name_;
bool is_xds_uri_;
};
};

@ -313,48 +313,40 @@ void FakeResolverResponseGenerator::SetFakeResolver(
namespace {
static void* response_generator_arg_copy(void* p) {
FakeResolverResponseGenerator* generator =
static_cast<FakeResolverResponseGenerator*>(p);
// TODO(roth): We currently deal with this ref manually. Once the
// new channel args code is converted to C++, find a way to track this ref
// in a cleaner way.
RefCountedPtr<FakeResolverResponseGenerator> copy = generator->Ref();
copy.release();
void* ResponseGeneratorChannelArgCopy(void* p) {
auto* generator = static_cast<FakeResolverResponseGenerator*>(p);
generator->Ref().release();
return p;
}
static void response_generator_arg_destroy(void* p) {
FakeResolverResponseGenerator* generator =
static_cast<FakeResolverResponseGenerator*>(p);
void ResponseGeneratorChannelArgDestroy(void* p) {
auto* generator = static_cast<FakeResolverResponseGenerator*>(p);
generator->Unref();
}
static int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
static const grpc_arg_pointer_vtable response_generator_arg_vtable = {
response_generator_arg_copy, response_generator_arg_destroy,
response_generator_cmp};
int ResponseGeneratorChannelArgCmp(void* a, void* b) { return GPR_ICMP(a, b); }
} // namespace
const grpc_arg_pointer_vtable
FakeResolverResponseGenerator::kChannelArgPointerVtable = {
ResponseGeneratorChannelArgCopy, ResponseGeneratorChannelArgDestroy,
ResponseGeneratorChannelArgCmp};
grpc_arg FakeResolverResponseGenerator::MakeChannelArg(
FakeResolverResponseGenerator* generator) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = const_cast<char*>(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
arg.value.pointer.p = generator;
arg.value.pointer.vtable = &response_generator_arg_vtable;
return arg;
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR), generator,
&kChannelArgPointerVtable);
}
RefCountedPtr<FakeResolverResponseGenerator>
FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) {
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr;
return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p)
->Ref();
auto* response_generator =
grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
if (response_generator == nullptr) return nullptr;
return response_generator->Ref();
}
//

@ -42,6 +42,8 @@ class FakeResolver;
class FakeResolverResponseGenerator
: public RefCounted<FakeResolverResponseGenerator> {
public:
static const grpc_arg_pointer_vtable kChannelArgPointerVtable;
FakeResolverResponseGenerator();
~FakeResolverResponseGenerator() override;
@ -69,6 +71,7 @@ class FakeResolverResponseGenerator
void SetFailureOnReresolution();
// Returns a channel arg containing \a generator.
// TODO(roth): When we have time, make this a non-static method.
static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator);
// Returns the response generator in \a args, or null if not found.

@ -28,6 +28,7 @@
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/lib/channel/channel_args.h"
@ -711,7 +712,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = XdsClient::GetOrCreate(&error);
xds_client_ = XdsClient::GetOrCreate(args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
@ -888,8 +889,12 @@ void XdsResolver::GenerateResult() {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
result.service_config->json_string().c_str());
}
grpc_arg new_arg = config_selector->MakeChannelArg();
result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
grpc_arg new_args[] = {
xds_client_->MakeChannelArg(),
config_selector->MakeChannelArg(),
};
result.args =
grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
result_handler_->ReturnResult(std::move(result));
}

@ -30,7 +30,6 @@
#include "src/core/ext/xds/certificate_provider_registry.h"
#include "src/core/ext/xds/xds_api.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/credentials/credentials.h"
@ -81,133 +80,17 @@ bool XdsBootstrap::XdsServer::ShouldUseV3() const {
// XdsBootstrap
//
namespace {
std::string BootstrapString(const XdsBootstrap& bootstrap) {
std::vector<std::string> parts;
if (bootstrap.node() != nullptr) {
parts.push_back(absl::StrFormat(
"node={\n"
" id=\"%s\",\n"
" cluster=\"%s\",\n"
" locality={\n"
" region=\"%s\",\n"
" zone=\"%s\",\n"
" sub_zone=\"%s\"\n"
" },\n"
" metadata=%s,\n"
"},\n",
bootstrap.node()->id, bootstrap.node()->cluster,
bootstrap.node()->locality_region, bootstrap.node()->locality_zone,
bootstrap.node()->locality_sub_zone,
bootstrap.node()->metadata.Dump()));
}
parts.push_back(absl::StrFormat(
"servers=[\n"
" {\n"
" uri=\"%s\",\n"
" creds_type=%s,\n",
bootstrap.server().server_uri, bootstrap.server().channel_creds_type));
if (bootstrap.server().channel_creds_config.type() != Json::Type::JSON_NULL) {
parts.push_back(
absl::StrFormat(" creds_config=%s,",
bootstrap.server().channel_creds_config.Dump()));
}
if (!bootstrap.server().server_features.empty()) {
parts.push_back(absl::StrCat(
" server_features=[",
absl::StrJoin(bootstrap.server().server_features, ", "), "],\n"));
}
parts.push_back(" }\n],\n");
if (!bootstrap.server_listener_resource_name_template().empty()) {
parts.push_back(
absl::StrFormat("server_listener_resource_name_template=\"%s\",\n",
bootstrap.server_listener_resource_name_template()));
}
parts.push_back("certificate_providers={\n");
for (const auto& entry : bootstrap.certificate_providers()) {
parts.push_back(
absl::StrFormat(" %s={\n"
" plugin_name=%s\n"
" config=%s\n"
" },\n",
entry.first, entry.second.plugin_name,
entry.second.config->ToString()));
}
parts.push_back("}");
return absl::StrJoin(parts, "");
}
std::unique_ptr<XdsBootstrap> ParseJsonAndCreate(
XdsClient* client, TraceFlag* tracer, absl::string_view json_string,
absl::string_view bootstrap_source, grpc_error** error) {
std::unique_ptr<XdsBootstrap> XdsBootstrap::Create(
absl::string_view json_string, grpc_error** error) {
Json json = Json::Parse(json_string, error);
if (*error != GRPC_ERROR_NONE) {
grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
absl::StrCat("Failed to parse bootstrap from ", bootstrap_source)
.c_str(),
error, 1);
grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to parse bootstrap JSON string", error, 1);
GRPC_ERROR_UNREF(*error);
*error = error_out;
return nullptr;
}
std::unique_ptr<XdsBootstrap> result =
absl::make_unique<XdsBootstrap>(std::move(json), error);
if (*error == GRPC_ERROR_NONE && GRPC_TRACE_FLAG_ENABLED(*tracer)) {
gpr_log(GPR_INFO,
"[xds_client %p] Bootstrap config for creating xds client:\n%s",
client, BootstrapString(*result).c_str());
}
return result;
}
} // namespace
std::unique_ptr<XdsBootstrap> XdsBootstrap::Create(XdsClient* client,
TraceFlag* tracer,
const char* fallback_config,
grpc_error** error) {
// First, try GRPC_XDS_BOOTSTRAP env var.
grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
if (path != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer)) {
gpr_log(GPR_INFO,
"[xds_client %p] Got bootstrap file location from "
"GRPC_XDS_BOOTSTRAP environment variable: %s",
client, path.get());
}
grpc_slice contents;
*error =
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
if (*error != GRPC_ERROR_NONE) return nullptr;
absl::string_view contents_str_view = StringViewFromSlice(contents);
if (GRPC_TRACE_FLAG_ENABLED(*tracer)) {
gpr_log(GPR_DEBUG, "[xds_client %p] Bootstrap file contents: %s", client,
std::string(contents_str_view).c_str());
}
std::string bootstrap_source = absl::StrCat("file ", path.get());
auto result = ParseJsonAndCreate(client, tracer, contents_str_view,
bootstrap_source, error);
grpc_slice_unref_internal(contents);
return result;
}
// Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
grpc_core::UniquePtr<char> env_config(
gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
if (env_config != nullptr) {
return ParseJsonAndCreate(client, tracer, env_config.get(),
"GRPC_XDS_BOOTSTRAP_CONFIG env var", error);
}
// Finally, try fallback config.
if (fallback_config != nullptr) {
return ParseJsonAndCreate(client, tracer, fallback_config,
"fallback config", error);
}
// No bootstrap config found.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
"not defined");
return nullptr;
return absl::make_unique<XdsBootstrap>(std::move(json), error);
}
XdsBootstrap::XdsBootstrap(Json json, grpc_error** error) {
@ -552,4 +435,56 @@ grpc_error* XdsBootstrap::ParseCertificateProvider(
return error;
}
std::string XdsBootstrap::ToString() const {
std::vector<std::string> parts;
if (node_ != nullptr) {
parts.push_back(absl::StrFormat(
"node={\n"
" id=\"%s\",\n"
" cluster=\"%s\",\n"
" locality={\n"
" region=\"%s\",\n"
" zone=\"%s\",\n"
" sub_zone=\"%s\"\n"
" },\n"
" metadata=%s,\n"
"},\n",
node_->id, node_->cluster, node_->locality_region, node_->locality_zone,
node_->locality_sub_zone, node_->metadata.Dump()));
}
parts.push_back(
absl::StrFormat("servers=[\n"
" {\n"
" uri=\"%s\",\n"
" creds_type=%s,\n",
server().server_uri, server().channel_creds_type));
if (server().channel_creds_config.type() != Json::Type::JSON_NULL) {
parts.push_back(absl::StrFormat(" creds_config=%s,",
server().channel_creds_config.Dump()));
}
if (!server().server_features.empty()) {
parts.push_back(absl::StrCat(" server_features=[",
absl::StrJoin(server().server_features, ", "),
"],\n"));
}
parts.push_back(" }\n],\n");
if (!server_listener_resource_name_template_.empty()) {
parts.push_back(
absl::StrFormat("server_listener_resource_name_template=\"%s\",\n",
server_listener_resource_name_template_));
}
parts.push_back("certificate_providers={\n");
for (const auto& entry : certificate_providers_) {
parts.push_back(
absl::StrFormat(" %s={\n"
" plugin_name=%s\n"
" config=%s\n"
" },\n",
entry.first, entry.second.plugin_name,
entry.second.config->ToString()));
}
parts.push_back("}");
return absl::StrJoin(parts, "");
}
} // namespace grpc_core

@ -67,23 +67,17 @@ class XdsBootstrap {
bool ShouldUseV3() const;
};
// Creates bootstrap object, obtaining the bootstrap JSON as appropriate
// for the environment:
// - If the GRPC_XDS_BOOTSTRAP env var is set, reads the file it specifies
// to obtain the bootstrap JSON.
// - Otherwise, if the GRPC_XDS_BOOTSTRAP_CONFIG env var is set, reads the
// content of that env var to obtain the bootstrap JSON.
// - Otherwise, the JSON will be read from fallback_config (if non-null).
// Creates bootstrap object from json_string.
// If *error is not GRPC_ERROR_NONE after returning, then there was an
// error (e.g., no config found or error reading the file).
static std::unique_ptr<XdsBootstrap> Create(XdsClient* client,
TraceFlag* tracer,
const char* fallback_config,
// error parsing the contents.
static std::unique_ptr<XdsBootstrap> Create(absl::string_view json_string,
grpc_error** error);
// Do not instantiate directly -- use ReadFromFile() above instead.
// Do not instantiate directly -- use Create() above instead.
XdsBootstrap(Json json, grpc_error** error);
std::string ToString() const;
// TODO(roth): We currently support only one server. Fix this when we
// add support for fallback for the xds channel.
const XdsServer& server() const { return servers_[0]; }

@ -17,8 +17,11 @@
#ifndef GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H
#define GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H
// Pointer channel arg containing a ref to the XdsClient object.
#define GRPC_ARG_XDS_CLIENT "grpc.xds_client"
// Specifies channel args for the xDS client.
// Used only when GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG
// is set.
#define GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS \
"grpc.xds_client_channel_args"
// Timeout in milliseconds to wait for a resource to be returned from
// the xds server before assuming that it does not exist.

@ -41,6 +41,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -1770,37 +1771,41 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
namespace {
grpc_millis GetRequestTimeout(grpc_channel_args* args) {
grpc_millis GetRequestTimeout(const grpc_channel_args* args) {
return grpc_channel_args_find_integer(
args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
{15000, 0, INT_MAX});
}
grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
return grpc_channel_args_copy_and_add(args, args_to_add.data(),
args_to_add.size());
}
} // namespace
XdsClient::XdsClient(grpc_channel_args* args, grpc_error** error)
XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
const grpc_channel_args* args)
: DualRefCounted<XdsClient>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
: nullptr),
args_(args),
bootstrap_(std::move(bootstrap)),
args_(ModifyChannelArgs(args)),
request_timeout_(GetRequestTimeout(args)),
interested_parties_(grpc_pollset_set_create()),
bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace,
g_fallback_bootstrap_config, error)),
certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
bootstrap_ == nullptr
? CertificateProviderStore::PluginDefinitionMap()
: bootstrap_->certificate_providers())),
api_(this, &grpc_xds_client_trace,
bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
bootstrap_->certificate_providers())),
api_(this, &grpc_xds_client_trace, bootstrap_->node()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
if (*error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
this, grpc_error_string(*error));
return;
}
// Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>(
WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
@ -2339,27 +2344,95 @@ void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
XdsHttpFilterRegistry::Shutdown();
}
RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
namespace {
std::string GetBootstrapContents(const char* fallback_config,
grpc_error** error) {
// First, try GRPC_XDS_BOOTSTRAP env var.
grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
if (path != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
"environment variable: %s",
path.get());
}
grpc_slice contents;
*error =
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
if (*error != GRPC_ERROR_NONE) return "";
std::string contents_str(StringViewFromSlice(contents));
grpc_slice_unref_internal(contents);
return contents_str;
}
// Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
grpc_core::UniquePtr<char> env_config(
gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
if (env_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
"environment variable");
}
return env_config.get();
}
// Finally, try fallback config.
if (fallback_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
}
return fallback_config;
}
// No bootstrap config found.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
"not defined");
return "";
}
} // namespace
RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
grpc_error** error) {
RefCountedPtr<XdsClient> xds_client;
// If getting bootstrap from channel args, create a local XdsClient
// instance for the channel or server instead of using the global instance.
const char* bootstrap_config = grpc_channel_args_find_string(
args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
if (bootstrap_config != nullptr) {
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(bootstrap_config, error);
if (*error == GRPC_ERROR_NONE) {
grpc_channel_args* xds_channel_args =
grpc_channel_args_find_pointer<grpc_channel_args>(
args,
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
}
return nullptr;
}
// Otherwise, use the global instance.
{
MutexLock lock(g_mu);
if (g_xds_client != nullptr) {
auto xds_client = g_xds_client->RefIfNonZero();
if (xds_client != nullptr) return xds_client;
}
// Build channel args.
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
grpc_channel_args* args = grpc_channel_args_copy_and_add(
g_channel_args, args_to_add.data(), args_to_add.size());
// Instantiate XdsClient.
xds_client = MakeRefCounted<XdsClient>(args, error);
// Find bootstrap contents.
std::string bootstrap_contents =
GetBootstrapContents(g_fallback_bootstrap_config, error);
if (*error != GRPC_ERROR_NONE) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
bootstrap_contents.c_str());
}
// Parse bootstrap.
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(bootstrap_contents, error);
if (*error != GRPC_ERROR_NONE) return nullptr;
// Instantiate XdsClient.
xds_client =
MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
g_xds_client = xds_client.get();
}
return xds_client;
@ -2385,6 +2458,46 @@ void SetXdsFallbackBootstrapConfig(const char* config) {
} // namespace internal
//
// embedding XdsClient in channel args
//
#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
namespace {
void* XdsClientArgCopy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
return p;
}
void XdsClientArgDestroy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Unref(DEBUG_LOCATION, "channel arg");
}
int XdsClientArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
const grpc_arg_pointer_vtable kXdsClientArgVtable = {
XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
} // namespace
grpc_arg XdsClient::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
const_cast<XdsClient*>(this),
&kXdsClientArgVtable);
}
RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
const grpc_channel_args& args) {
XdsClient* xds_client =
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
if (xds_client == nullptr) return nullptr;
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
}
} // namespace grpc_core
// The returned bytes may contain NULL(0), so we can't use c-string.
@ -2392,7 +2505,7 @@ grpc_slice grpc_dump_xds_configs() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error* error = GRPC_ERROR_NONE;
auto xds_client = grpc_core::XdsClient::GetOrCreate(&error);
auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
if (error != GRPC_ERROR_NONE) {
// If we isn't using xDS, just return an empty string.
GRPC_ERROR_UNREF(error);

@ -82,10 +82,12 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Factory function to get or create the global XdsClient instance.
// If *error is not GRPC_ERROR_NONE upon return, then there was
// an error initializing the client.
static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
static RefCountedPtr<XdsClient> GetOrCreate(const grpc_channel_args* args,
grpc_error** error);
// Callers should not instantiate directly. Use GetOrCreate() instead.
XdsClient(grpc_channel_args* args, grpc_error** error);
// Most callers should not instantiate directly. Use GetOrCreate() instead.
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
const grpc_channel_args* args);
~XdsClient() override;
const XdsBootstrap& bootstrap() const {
@ -200,6 +202,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
// implementation.
std::string DumpClientConfigBinary();
// Helpers for encoding the XdsClient object in channel args.
grpc_arg MakeChannelArg() const;
static RefCountedPtr<XdsClient> GetFromChannelArgs(
const grpc_channel_args& args);
private:
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
@ -321,10 +328,10 @@ class XdsClient : public DualRefCounted<XdsClient> {
grpc_millis update_time, const XdsApi::AdsParseResult& result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::unique_ptr<XdsBootstrap> bootstrap_;
grpc_channel_args* args_;
const grpc_millis request_timeout_;
grpc_pollset_set* interested_parties_;
std::unique_ptr<XdsBootstrap> bootstrap_;
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
XdsApi api_;

@ -519,13 +519,13 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
} // namespace grpc_core
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier) {
grpc_server_xds_status_notifier notifier, const grpc_channel_args* args) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_config_fetcher_xds_create()", 0, ());
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client =
grpc_core::XdsClient::GetOrCreate(&error);
grpc_core::XdsClient::GetOrCreate(args, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to create xds client: %s",
grpc_error_string(error));

@ -223,8 +223,8 @@ ServerBuilder& ServerBuilder::AddListeningPort(
return *this;
}
std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
grpc::ChannelArguments args;
ChannelArguments ServerBuilder::BuildChannelArgs() {
ChannelArguments args;
if (max_receive_message_size_ >= -1) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
@ -245,16 +245,19 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
if (resource_quota_ != nullptr) {
args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
grpc_resource_quota_arg_vtable());
}
for (const auto& plugin : plugins_) {
plugin->UpdateServerBuilder(this);
plugin->UpdateChannelArguments(&args);
}
return args;
}
std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
ChannelArguments args = BuildChannelArgs();
// == Determine if the server has any syncrhonous methods ==
bool has_sync_methods = false;

@ -411,7 +411,8 @@ cdef extern from "grpc/grpc.h":
void* user_data;
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_server_xds_status_notifier notifier) nogil
grpc_server_xds_status_notifier notifier,
const grpc_channel_args* args) nogil
int grpc_server_add_insecure_http2_port(

@ -30,7 +30,7 @@ cdef class Server:
notifier.user_data = NULL
if xds:
grpc_server_set_config_fetcher(self.c_server,
grpc_server_config_fetcher_xds_create(notifier))
grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args()))
self.references.append(arguments)
def request_call(

@ -215,7 +215,7 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server* server, grpc_completion_queue* cq, void* reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(grpc_server_xds_status_notifier notifier);
typedef grpc_server_config_fetcher*(*grpc_server_config_fetcher_xds_create_type)(grpc_server_xds_status_notifier notifier, const grpc_channel_args* args);
extern grpc_server_config_fetcher_xds_create_type grpc_server_config_fetcher_xds_create_import;
#define grpc_server_config_fetcher_xds_create grpc_server_config_fetcher_xds_create_import
typedef void(*grpc_server_config_fetcher_destroy_type)(grpc_server_config_fetcher* config_fetcher);

@ -1322,6 +1322,12 @@ class TestType {
kRouteOverride,
};
enum BootstrapSource {
kBootstrapFromChannelArg,
kBootstrapFromFile,
kBootstrapFromEnvVar,
};
TestType& set_use_fake_resolver() {
use_fake_resolver_ = true;
return *this;
@ -1352,20 +1358,24 @@ class TestType {
return *this;
}
TestType& set_filter_config_setup(const FilterConfigSetup& setup) {
TestType& set_filter_config_setup(FilterConfigSetup setup) {
filter_config_setup_ = setup;
return *this;
}
TestType& set_bootstrap_source(BootstrapSource bootstrap_source) {
bootstrap_source_ = bootstrap_source;
return *this;
}
bool use_fake_resolver() const { return use_fake_resolver_; }
bool enable_load_reporting() const { return enable_load_reporting_; }
bool enable_rds_testing() const { return enable_rds_testing_; }
bool use_v2() const { return use_v2_; }
bool use_xds_credentials() const { return use_xds_credentials_; }
bool use_csds_streaming() const { return use_csds_streaming_; }
const FilterConfigSetup& filter_config_setup() const {
return filter_config_setup_;
}
FilterConfigSetup filter_config_setup() const { return filter_config_setup_; }
BootstrapSource bootstrap_source() const { return bootstrap_source_; }
std::string AsString() const {
std::string retval = (use_fake_resolver_ ? "FakeResolver" : "XdsResolver");
@ -1377,6 +1387,11 @@ class TestType {
if (filter_config_setup_ == kRouteOverride) {
retval += "FilterPerRouteOverride";
}
if (bootstrap_source_ == kBootstrapFromFile) {
retval += "BootstrapFromFile";
} else if (bootstrap_source_ == kBootstrapFromEnvVar) {
retval += "BootstrapFromEnvVar";
}
return retval;
}
@ -1388,6 +1403,7 @@ class TestType {
bool use_xds_credentials_ = false;
bool use_csds_streaming_ = false;
FilterConfigSetup filter_config_setup_ = kHTTPConnectionManagerOriginal;
BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg;
};
std::string ReadFile(const char* file_path) {
@ -1581,26 +1597,6 @@ class NoOpHttpFilter : public grpc_core::XdsHttpFilterImpl {
const bool supported_on_servers_;
};
namespace {
void* response_generator_arg_copy(void* p) {
auto* generator = static_cast<grpc_core::FakeResolverResponseGenerator*>(p);
generator->Ref().release();
return p;
}
void response_generator_arg_destroy(void* p) {
auto* generator = static_cast<grpc_core::FakeResolverResponseGenerator*>(p);
generator->Unref();
}
int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
const grpc_arg_pointer_vtable
kLogicalDnsClusterResolverResponseGeneratorVtable = {
response_generator_arg_copy, response_generator_arg_destroy,
response_generator_cmp};
// There is slight difference between time fetched by GPR and by C++ system
// clock API. It's unclear if they are using the same syscall, but we do know
// GPR round the number at millisecond-level. This creates a 1ms difference,
@ -1610,7 +1606,23 @@ grpc_millis NowFromCycleCounter() {
return grpc_cycle_counter_to_millis_round_up(now);
}
} // namespace
// Channel arg pointer vtable for storing xDS channel args in the parent
// channel's channel args.
void* ChannelArgsArgCopy(void* p) {
auto* args = static_cast<grpc_channel_args*>(p);
return grpc_channel_args_copy(args);
}
void ChannelArgsArgDestroy(void* p) {
auto* args = static_cast<grpc_channel_args*>(p);
grpc_channel_args_destroy(args);
}
int ChannelArgsArgCmp(void* a, void* b) {
auto* args_a = static_cast<grpc_channel_args*>(a);
auto* args_b = static_cast<grpc_channel_args*>(b);
return grpc_channel_args_compare(args_a, args_b);
}
const grpc_arg_pointer_vtable kChannelArgsArgVtable = {
ChannelArgsArgCopy, ChannelArgsArgDestroy, ChannelArgsArgCmp};
class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
protected:
@ -1625,24 +1637,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
// balancers that it needs, so that we aren't wasting resources.
XdsEnd2endTest(size_t num_backends, size_t num_balancers,
int client_load_reporting_interval_seconds = 100,
bool use_xds_enabled_server = false,
bool bootstrap_contents_from_env_var = false)
bool use_xds_enabled_server = false)
: num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds),
use_xds_enabled_server_(use_xds_enabled_server),
bootstrap_contents_from_env_var_(bootstrap_contents_from_env_var) {}
use_xds_enabled_server_(use_xds_enabled_server) {}
void SetUp() override {
if (bootstrap_contents_from_env_var_) {
gpr_setenv("GRPC_XDS_BOOTSTRAP_CONFIG",
GetParam().use_v2() ? kBootstrapFileV2 : kBootstrapFileV3);
} else {
gpr_setenv("GRPC_XDS_BOOTSTRAP", GetParam().use_v2()
? g_bootstrap_file_v2
: g_bootstrap_file_v3);
}
bool localhost_resolves_to_ipv4 = false;
bool localhost_resolves_to_ipv6 = false;
grpc_core::LocalhostResolves(&localhost_resolves_to_ipv4,
@ -1679,28 +1681,29 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
}
// Start the load balancers.
for (size_t i = 0; i < num_balancers_; ++i) {
balancers_.emplace_back(
new BalancerServerThread(GetParam().enable_load_reporting()
? client_load_reporting_interval_seconds_
: 0));
balancers_.emplace_back(new BalancerServerThread(
this, GetParam().enable_load_reporting()
? client_load_reporting_interval_seconds_
: 0));
balancers_.back()->Start();
// Initialize resources.
SetListenerAndRouteConfiguration(i, default_listener_,
default_route_config_);
balancers_.back()->ads_service()->SetCdsResource(default_cluster_);
}
// Initialize XdsClient state.
response_generator_ =
// Create fake resolver response generators used by client.
if (GetParam().use_fake_resolver()) {
response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
}
logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
// Inject xDS channel response generator.
lb_channel_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
// Construct channel args for XdsClient.
xds_channel_args_to_add_.emplace_back(
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_channel_response_generator_.get()));
// Inject xDS logical cluster resolver response generator.
logical_dns_cluster_resolver_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
if (xds_resource_does_not_exist_timeout_ms_ > 0) {
xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
@ -1708,18 +1711,36 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
}
xds_channel_args_.num_args = xds_channel_args_to_add_.size();
xds_channel_args_.args = xds_channel_args_to_add_.data();
grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
// Make sure each test creates a new XdsClient instance rather than
// reusing the one from the previous test. This avoids spurious failures
// caused when a load reporting test runs after a non-load reporting test
// and the XdsClient is still talking to the old LRS server, which fails
// because it's not expecting the client to connect. It also
// ensures that each test can independently set the global channel
// args for the xDS channel.
grpc_core::internal::UnsetGlobalXdsClientForTest();
// Initialize XdsClient state.
// TODO(roth): Consider changing this to dynamically generate the
// bootstrap config in each individual test instead of hard-coding
// the contents here. That would allow us to use an ipv4: or ipv6:
// URI for the xDS server instead of using the fake resolver.
if (GetParam().bootstrap_source() == TestType::kBootstrapFromEnvVar) {
gpr_setenv("GRPC_XDS_BOOTSTRAP_CONFIG",
GetParam().use_v2() ? kBootstrapFileV2 : kBootstrapFileV3);
} else if (GetParam().bootstrap_source() == TestType::kBootstrapFromFile) {
gpr_setenv("GRPC_XDS_BOOTSTRAP", GetParam().use_v2()
? g_bootstrap_file_v2
: g_bootstrap_file_v3);
}
if (GetParam().bootstrap_source() != TestType::kBootstrapFromChannelArg) {
// If getting bootstrap from channel arg, we'll pass these args in
// via the parent channel args in CreateChannel() instead.
grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
// Make sure each test creates a new XdsClient instance rather than
// reusing the one from the previous test. This avoids spurious failures
// caused when a load reporting test runs after a non-load reporting test
// and the XdsClient is still talking to the old LRS server, which fails
// because it's not expecting the client to connect. It also
// ensures that each test can independently set the global channel
// args for the xDS channel.
grpc_core::internal::UnsetGlobalXdsClientForTest();
}
// Start the backends.
for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServerThread(use_xds_enabled_server_));
backends_.emplace_back(
new BackendServerThread(this, use_xds_enabled_server_));
backends_.back()->Start();
}
// Create channel and stub.
@ -1762,7 +1783,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
std::shared_ptr<Channel> CreateChannel(
int failover_timeout = 0, const char* server_name = kServerName,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr,
grpc_channel_args* xds_channel_args = nullptr) {
ChannelArguments args;
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout);
@ -1773,13 +1795,25 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
if (response_generator == nullptr) {
response_generator = response_generator_.get();
}
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator);
args.SetPointerWithVtable(
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator,
&grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
}
if (GetParam().bootstrap_source() == TestType::kBootstrapFromChannelArg) {
// We're getting the bootstrap from a channel arg, so we do the
// same thing for the response generator to use for the xDS
// channel and the xDS resource-does-not-exist timeout value.
args.SetString(GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
GetParam().use_v2() ? kBootstrapFileV2 : kBootstrapFileV3);
if (xds_channel_args == nullptr) xds_channel_args = &xds_channel_args_;
args.SetPointerWithVtable(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
xds_channel_args, &kChannelArgsArgVtable);
}
args.SetPointerWithVtable(
GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
logical_dns_cluster_resolver_response_generator_.get(),
&kLogicalDnsClusterResolverResponseGeneratorVtable);
&grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
std::string uri = absl::StrCat(
GetParam().use_fake_resolver() ? "fake" : "xds", ":///", server_name);
std::shared_ptr<ChannelCredentials> channel_creds =
@ -2022,17 +2056,20 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void SetNextResolutionForLbChannelAllBalancers(
const char* service_config_json = nullptr,
const char* expected_targets = nullptr) {
const char* expected_targets = nullptr,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
std::vector<int> ports;
for (size_t i = 0; i < balancers_.size(); ++i) {
ports.emplace_back(balancers_[i]->port());
}
SetNextResolutionForLbChannel(ports, service_config_json, expected_targets);
SetNextResolutionForLbChannel(ports, service_config_json, expected_targets,
response_generator);
}
void SetNextResolutionForLbChannel(const std::vector<int>& ports,
const char* service_config_json = nullptr,
const char* expected_targets = nullptr) {
void SetNextResolutionForLbChannel(
const std::vector<int>& ports, const char* service_config_json = nullptr,
const char* expected_targets = nullptr,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(ports);
@ -2050,7 +2087,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
result.args =
grpc_channel_args_copy_and_add(nullptr, &expected_targets_arg, 1);
}
lb_channel_response_generator_->SetResponse(std::move(result));
if (response_generator == nullptr) {
response_generator = lb_channel_response_generator_.get();
}
response_generator->SetResponse(std::move(result));
}
void SetNextReresolutionResponse(const std::vector<int>& ports) {
@ -2248,8 +2288,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
class ServerThread {
public:
explicit ServerThread(bool use_xds_enabled_server = false)
: port_(grpc_pick_unused_port_or_die()),
explicit ServerThread(XdsEnd2endTest* test_obj,
bool use_xds_enabled_server = false)
: test_obj_(test_obj),
port_(grpc_pick_unused_port_or_die()),
use_xds_enabled_server_(use_xds_enabled_server) {}
virtual ~ServerThread(){};
@ -2277,6 +2319,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
server_address << "localhost:" << port_;
if (use_xds_enabled_server_) {
experimental::XdsServerBuilder builder;
if (GetParam().bootstrap_source() ==
TestType::kBootstrapFromChannelArg) {
builder.SetOption(
absl::make_unique<XdsChannelArgsServerBuilderOption>(test_obj_));
}
builder.set_status_notifier(&notifier_);
builder.AddListeningPort(server_address.str(), Credentials());
RegisterAllServices(&builder);
@ -2312,12 +2359,36 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
XdsServingStatusNotifier* notifier() { return &notifier_; }
private:
class XdsChannelArgsServerBuilderOption
: public ::grpc::ServerBuilderOption {
public:
explicit XdsChannelArgsServerBuilderOption(XdsEnd2endTest* test_obj)
: test_obj_(test_obj) {}
void UpdateArguments(grpc::ChannelArguments* args) override {
args->SetString(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG,
GetParam().use_v2() ? kBootstrapFileV2 : kBootstrapFileV3);
args->SetPointerWithVtable(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS,
&test_obj_->xds_channel_args_, &kChannelArgsArgVtable);
}
void UpdatePlugins(
std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/)
override {}
private:
XdsEnd2endTest* test_obj_;
};
virtual void RegisterAllServices(ServerBuilder* builder) = 0;
virtual void StartAllServices() = 0;
virtual void ShutdownAllServices() = 0;
virtual const char* Type() = 0;
XdsEnd2endTest* test_obj_;
const int port_;
std::unique_ptr<Server> server_;
XdsServingStatusNotifier notifier_;
@ -2328,8 +2399,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
class BackendServerThread : public ServerThread {
public:
explicit BackendServerThread(bool use_xds_enabled_server)
: ServerThread(use_xds_enabled_server) {}
explicit BackendServerThread(XdsEnd2endTest* test_obj,
bool use_xds_enabled_server)
: ServerThread(test_obj, use_xds_enabled_server) {}
BackendServiceImpl<::grpc::testing::EchoTestService::Service>*
backend_service() {
@ -2403,8 +2475,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
class BalancerServerThread : public ServerThread {
public:
explicit BalancerServerThread(int client_load_reporting_interval = 0)
: ads_service_(new AdsServiceImpl()),
explicit BalancerServerThread(XdsEnd2endTest* test_obj,
int client_load_reporting_interval = 0)
: ServerThread(test_obj),
ads_service_(new AdsServiceImpl()),
lrs_service_(new LrsServiceImpl(client_load_reporting_interval)) {}
AdsServiceImpl* ads_service() { return ads_service_.get(); }
@ -2436,6 +2510,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
#ifndef DISABLED_XDS_PROTO_IN_CC
class AdminServerThread : public ServerThread {
public:
explicit AdminServerThread(XdsEnd2endTest* test_obj)
: ServerThread(test_obj) {}
private:
void RegisterAllServices(ServerBuilder* builder) override {
builder->RegisterService(&csds_service_);
@ -2453,8 +2531,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
public:
void StartRpc(grpc::testing::EchoTestService::Stub* stub,
const RpcOptions& rpc_options =
RpcOptions().set_client_cancel_after_us(1 * 1000 *
1000)) {
RpcOptions().set_timeout_ms(0).set_client_cancel_after_us(
1 * 1000 * 1000)) {
sender_thread_ = std::thread([this, stub, rpc_options]() {
EchoRequest request;
EchoResponse response;
@ -2925,14 +3003,21 @@ TEST_P(XdsResolverOnlyTest, CircuitBreakingMultipleChannelsShareCallCounter) {
// Create second channel.
auto response_generator2 =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
auto lb_response_generator2 =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
grpc_arg xds_arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_response_generator2.get());
grpc_channel_args xds_channel_args2 = {1, &xds_arg};
auto channel2 = CreateChannel(
/*failover_timeout=*/0, /*server_name=*/kServerName,
response_generator2.get());
response_generator2.get(), &xds_channel_args2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
// Set resolution results for both channels and for the xDS channel.
SetNextResolution({});
SetNextResolution({}, response_generator2.get());
SetNextResolutionForLbChannelAllBalancers();
SetNextResolutionForLbChannelAllBalancers(nullptr, nullptr,
lb_response_generator2.get());
// Send exactly max_concurrent_requests long RPCs, alternating between
// the two channels.
LongRunningRpc rpcs[kMaxConcurrentRequests];
@ -2962,7 +3047,9 @@ TEST_P(XdsResolverOnlyTest, CircuitBreakingMultipleChannelsShareCallCounter) {
backends_[0]->backend_service()->request_count());
}
TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) {
using GlobalXdsClientTest = BasicTest;
TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) {
const char* kNewServerName = "new-server.example.com";
Listener listener = default_listener_;
listener.set_name(kNewServerName);
@ -2983,6 +3070,38 @@ TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) {
EXPECT_EQ(1UL, balancers_[0]->ads_service()->clients().size());
}
// Tests that the NACK for multiple bad LDS resources includes both errors.
TEST_P(GlobalXdsClientTest, MultipleBadResources) {
constexpr char kServerName2[] = "server.other.com";
auto listener = default_listener_;
listener.clear_api_listener();
balancers_[0]->ads_service()->SetLdsResource(listener);
listener.set_name(kServerName2);
balancers_[0]->ads_service()->SetLdsResource(listener);
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
// Need to create a second channel to subscribe to a second LDS resource.
auto channel2 = CreateChannel(0, kServerName2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
const auto response_state =
balancers_[0]->ads_service()->lds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(
response_state.error_message,
::testing::AllOf(
::testing::HasSubstr(absl::StrCat(
kServerName, ": Listener has neither address nor ApiListener")),
::testing::HasSubstr(
absl::StrCat(kServerName2,
": Listener has neither address nor ApiListener"))));
}
class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest {
public:
XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {}
@ -3236,38 +3355,6 @@ TEST_P(LdsTest, RdsConfigSourceDoesNotSpecifyAds) {
"HttpConnectionManager ConfigSource for RDS does not specify ADS."));
}
// Tests that the NACK for multiple bad LDS resources includes both errors.
TEST_P(LdsTest, MultipleBadResources) {
constexpr char kServerName2[] = "server.other.com";
auto listener = default_listener_;
listener.clear_api_listener();
balancers_[0]->ads_service()->SetLdsResource(listener);
listener.set_name(kServerName2);
balancers_[0]->ads_service()->SetLdsResource(listener);
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
// Need to create a second channel to subscribe to a second LDS resource.
auto channel2 = CreateChannel(0, kServerName2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
const auto response_state =
balancers_[0]->ads_service()->lds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(
response_state.error_message,
::testing::AllOf(
::testing::HasSubstr(absl::StrCat(
kServerName, ": Listener has neither address nor ApiListener")),
::testing::HasSubstr(
absl::StrCat(kServerName2,
": Listener has neither address nor ApiListener"))));
}
// Tests that we ignore filters after the router filter.
TEST_P(LdsTest, IgnoresHttpFiltersAfterRouterFilter) {
SetNextResolutionForLbChannelAllBalancers();
@ -7254,7 +7341,6 @@ TEST_P(XdsEnabledServerTest, UnsupportedL4Filter) {
}
TEST_P(XdsEnabledServerTest, UnsupportedHttpFilter) {
// Set env var to enable filters parsing.
Listener listener;
listener.set_name(
absl::StrCat("grpc/server?xds.resource.listening_address=",
@ -7288,7 +7374,6 @@ TEST_P(XdsEnabledServerTest, UnsupportedHttpFilter) {
}
TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServer) {
// Set env var to enable filters parsing.
Listener listener;
listener.set_name(
absl::StrCat("grpc/server?xds.resource.listening_address=",
@ -7324,7 +7409,6 @@ TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServer) {
TEST_P(XdsEnabledServerTest,
HttpFilterNotSupportedOnServerIgnoredWhenOptional) {
// Set env var to enable filters parsing.
Listener listener;
listener.set_name(
absl::StrCat("grpc/server?xds.resource.listening_address=",
@ -10366,12 +10450,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) {
EXPECT_EQ(kNumRpcs - kMaxFault, num_ok);
}
class BootstrapContentsFromEnvVarTest : public XdsEnd2endTest {
class BootstrapSourceTest : public XdsEnd2endTest {
public:
BootstrapContentsFromEnvVarTest() : XdsEnd2endTest(4, 1, 100, false, true) {}
BootstrapSourceTest() : XdsEnd2endTest(4, 1) {}
};
TEST_P(BootstrapContentsFromEnvVarTest, Vanilla) {
TEST_P(BootstrapSourceTest, Vanilla) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
@ -10389,7 +10473,7 @@ class ClientStatusDiscoveryServiceTest : public XdsEnd2endTest {
void SetUp() override {
XdsEnd2endTest::SetUp();
admin_server_thread_ = absl::make_unique<AdminServerThread>();
admin_server_thread_ = absl::make_unique<AdminServerThread>(this);
admin_server_thread_->Start();
std::string admin_server_address = absl::StrCat(
ipv6_only_ ? "[::1]:" : "127.0.0.1:", admin_server_thread_->port());
@ -11197,8 +11281,13 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values(TestType().set_use_xds_credentials()), &TestTypeName);
// We are only testing the server here.
// Run with bootstrap from env var, so that we use a global XdsClient
// instance. Otherwise, we would need to use a separate fake resolver
// result generator on the client and server sides.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsEnabledServerTest,
::testing::Values(TestType()), &TestTypeName);
::testing::Values(TestType().set_bootstrap_source(
TestType::kBootstrapFromEnvVar)),
&TestTypeName);
// We are only testing the server here.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsServerSecurityTest,
@ -11242,6 +11331,16 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values(TestType(), TestType().set_enable_load_reporting()),
&TestTypeName);
// Runs with bootstrap from env var, so that there's a global XdsClient.
INSTANTIATE_TEST_SUITE_P(
XdsTest, GlobalXdsClientTest,
::testing::Values(
TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_load_reporting()),
&TestTypeName);
// XdsResolverLoadReprtingOnlyTest depends on XdsResolver and load reporting.
INSTANTIATE_TEST_SUITE_P(
XdsTest, XdsResolverLoadReportingOnlyTest,
@ -11305,25 +11404,46 @@ INSTANTIATE_TEST_SUITE_P(
TestType::FilterConfigSetup::kRouteOverride)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, BootstrapContentsFromEnvVarTest,
::testing::Values(TestType()), &TestTypeName);
INSTANTIATE_TEST_SUITE_P(
XdsTest, BootstrapSourceTest,
::testing::Values(
TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar),
TestType().set_bootstrap_source(TestType::kBootstrapFromFile)),
&TestTypeName);
#ifndef DISABLED_XDS_PROTO_IN_CC
// Run CSDS tests with RDS enabled and disabled.
// These need to run with the bootstrap from an env var instead of from
// a channel arg, since there needs to be a global XdsClient instance.
INSTANTIATE_TEST_SUITE_P(
XdsTest, ClientStatusDiscoveryServiceTest,
::testing::Values(
TestType(), TestType().set_enable_rds_testing(),
TestType().set_use_csds_streaming(),
TestType().set_enable_rds_testing().set_use_csds_streaming()),
TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_rds_testing(),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_use_csds_streaming(),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_rds_testing()
.set_use_csds_streaming()),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(
XdsTest, CsdsShortAdsTimeoutTest,
::testing::Values(
TestType(), TestType().set_enable_rds_testing(),
TestType().set_use_csds_streaming(),
TestType().set_enable_rds_testing().set_use_csds_streaming()),
TestType().set_bootstrap_source(TestType::kBootstrapFromEnvVar),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_rds_testing(),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_use_csds_streaming(),
TestType()
.set_bootstrap_source(TestType::kBootstrapFromEnvVar)
.set_enable_rds_testing()
.set_use_csds_streaming()),
&TestTypeName);
#endif // DISABLED_XDS_PROTO_IN_CC

Loading…
Cancel
Save