Merge pull request #24201 from markdroth/xds_client_no_parent_channel_args

Stop propagating parent channel args into xDS channel.
pull/24244/head
Mark D. Roth 4 years ago committed by GitHub
commit 5e30156069
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      include/grpc/impl/codegen/grpc_types.h
  2. 14
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  3. 14
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  4. 9
      src/core/ext/xds/xds_channel_args.h
  5. 133
      src/core/ext/xds/xds_client.cc
  6. 25
      src/core/ext/xds/xds_client.h
  7. 16
      src/core/lib/security/security_connector/fake/fake_security_connector.cc
  8. 241
      test/cpp/end2end/xds_end2end_test.cc

@ -355,11 +355,6 @@ typedef struct {
over to the next priority. Default value is 10 seconds. */ over to the next priority. Default value is 10 seconds. */
#define GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS \ #define GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS \
"grpc.priority_failover_timeout_ms" "grpc.priority_failover_timeout_ms"
/* Timeout in milliseconds to wait for a resource to be returned from
* the xds server before assuming that it does not exist.
* The default is 15 seconds. */
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
"grpc.xds_resource_does_not_exist_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */ /** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression" "grpc.workaround.cronet_compression"

@ -433,6 +433,12 @@ void EdsLb::ShutdownLocked() {
xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb"); xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb");
} }
if (xds_client_ != nullptr) { if (xds_client_ != nullptr) {
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->RemoveChannelzLinkage(parent_channelz_node);
}
grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
interested_parties()); interested_parties());
xds_client_.reset(); xds_client_.reset();
@ -465,10 +471,16 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
// Initialize XdsClient. // Initialize XdsClient.
if (xds_client_from_channel_ == nullptr) { if (xds_client_from_channel_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(*args_, &error); xds_client_ = MakeOrphanable<XdsClient>(&error);
// TODO(roth): If we decide that we care about EDS-only mode, add // TODO(roth): If we decide that we care about EDS-only mode, add
// proper error handling here. // proper error handling here.
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->AddChannelzLinkage(parent_channelz_node);
}
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
interested_parties()); interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {

@ -513,7 +513,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
void XdsResolver::StartLocked() { void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(*args_, &error); xds_client_ = MakeOrphanable<XdsClient>(&error);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in " "Failed to create xds client -- channel will remain in "
@ -524,6 +524,12 @@ void XdsResolver::StartLocked() {
} }
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
interested_parties_); interested_parties_);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->AddChannelzLinkage(parent_channelz_node);
}
auto watcher = absl::make_unique<ListenerWatcher>(Ref()); auto watcher = absl::make_unique<ListenerWatcher>(Ref());
listener_watcher_ = watcher.get(); listener_watcher_ = watcher.get();
xds_client_->WatchListenerData(server_name_, std::move(watcher)); xds_client_->WatchListenerData(server_name_, std::move(watcher));
@ -542,6 +548,12 @@ void XdsResolver::ShutdownLocked() {
xds_client_->CancelRouteConfigDataWatch( xds_client_->CancelRouteConfigDataWatch(
server_name_, route_config_watcher_, /*delay_unsubscription=*/false); server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
} }
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->RemoveChannelzLinkage(parent_channelz_node);
}
grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
interested_parties_); interested_parties_);
xds_client_.reset(); xds_client_.reset();

@ -17,10 +17,13 @@
#ifndef GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H #ifndef GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H
#define GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H #define GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H
// Boolean channel arg indicating whether the target is an xds server.
#define GRPC_ARG_ADDRESS_IS_XDS_SERVER "grpc.address_is_xds_server"
// Pointer channel arg containing a ref to the XdsClient object. // Pointer channel arg containing a ref to the XdsClient object.
#define GRPC_ARG_XDS_CLIENT "grpc.xds_client" #define GRPC_ARG_XDS_CLIENT "grpc.xds_client"
// Timeout in milliseconds to wait for a resource to be returned from
// the xds server before assuming that it does not exist.
// The default is 15 seconds.
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
"grpc.xds_resource_does_not_exist_timeout_ms"
#endif /* GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H */ #endif /* GRPC_CORE_EXT_XDS_XDS_CHANNEL_ARGS_H */

@ -69,6 +69,18 @@ namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client"); TraceFlag grpc_xds_client_trace(false, "xds_client");
namespace {
const grpc_channel_args* g_channel_args = nullptr;
} // namespace
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args) {
g_channel_args = args;
}
} // namespace internal
// //
// Internal class declarations // Internal class declarations
// //
@ -423,59 +435,6 @@ class XdsClient::ChannelState::StateWatcher
// XdsClient::ChannelState // XdsClient::ChannelState
// //
namespace {
// Returns the channel args for the xds channel.
grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) {
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME,
// The service config that contains the LB config. We don't want to
// recursively use xds in the LB channel.
GRPC_ARG_SERVICE_CONFIG,
// The channel arg for the server URI, since that will be different for
// the xds channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI,
// The xds channel should use the authority indicated by the target
// authority table (see \a ModifyXdsChannelArgs),
// as opposed to the authority from the parent channel.
GRPC_ARG_DEFAULT_AUTHORITY,
// Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the xds channel should be
// treated as a stand-alone channel and not inherit this argument from the
// args of the parent channel.
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
// Don't want to pass down channelz node from parent; the balancer
// channel will get its own.
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
// Keepalive interval. We are explicitly setting our own value below.
GRPC_ARG_KEEPALIVE_TIME_MS,
};
// Channel args to add.
absl::InlinedVector<grpc_arg, 3> args_to_add = {
// Keepalive interval.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
// Tell channelz this is an internal channel.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
// A channel arg indicating that the target is an xds server.
// TODO(roth): Once we figure out our fallback and credentials story,
// decide whether this is actually needed. Note that it's currently
// used by the fake security connector as well.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1),
};
// Construct channel args.
return grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
}
} // namespace
XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client, XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client,
grpc_channel* channel) grpc_channel* channel)
: InternallyRefCounted<ChannelState>(&grpc_xds_client_trace), : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
@ -1730,15 +1689,25 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
namespace { namespace {
grpc_millis GetRequestTimeout(const grpc_channel_args& args) { grpc_millis GetRequestTimeout() {
return grpc_channel_args_find_integer( return grpc_channel_args_find_integer(
&args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
{15000, 0, INT_MAX}); {15000, 0, INT_MAX});
} }
grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap, grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
const grpc_channel_args& args,
grpc_error** error) { grpc_error** error) {
// Build channel args.
absl::InlinedVector<grpc_arg, 2> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
g_channel_args, args_to_add.data(), args_to_add.size());
// Find credentials and create channel.
RefCountedPtr<grpc_channel_credentials> creds; RefCountedPtr<grpc_channel_credentials> creds;
for (const auto& channel_creds : bootstrap.server().channel_creds) { for (const auto& channel_creds : bootstrap.server().channel_creds) {
if (channel_creds.type == "google_default") { if (channel_creds.type == "google_default") {
@ -1746,8 +1715,10 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
break; break;
} }
if (channel_creds.type == "insecure") { if (channel_creds.type == "insecure") {
return grpc_insecure_channel_create(bootstrap.server().server_uri.c_str(), grpc_channel* channel = grpc_insecure_channel_create(
&args, nullptr); bootstrap.server().server_uri.c_str(), new_args, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
} }
if (channel_creds.type == "fake") { if (channel_creds.type == "fake") {
creds.reset(grpc_fake_transport_security_credentials_create()); creds.reset(grpc_fake_transport_security_credentials_create());
@ -1759,9 +1730,6 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
"no supported credential types found"); "no supported credential types found");
return nullptr; return nullptr;
} }
const char* arg_to_remove = GRPC_ARG_CHANNEL_CREDENTIALS;
grpc_channel_args* new_args =
grpc_channel_args_copy_and_remove(&args, &arg_to_remove, 1);
grpc_channel* channel = grpc_secure_channel_create( grpc_channel* channel = grpc_secure_channel_create(
creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr); creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
@ -1770,9 +1738,9 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
} // namespace } // namespace
XdsClient::XdsClient(const grpc_channel_args& channel_args, grpc_error** error) XdsClient::XdsClient(grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace), : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)), request_timeout_(GetRequestTimeout()),
interested_parties_(grpc_pollset_set_create()), interested_parties_(grpc_pollset_set_create()),
bootstrap_( bootstrap_(
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
@ -1789,24 +1757,12 @@ XdsClient::XdsClient(const grpc_channel_args& channel_args, grpc_error** error)
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this, gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
bootstrap_->server().server_uri.c_str()); bootstrap_->server().server_uri.c_str());
} }
grpc_channel_args* new_args = BuildXdsChannelArgs(channel_args); grpc_channel* channel = CreateXdsChannel(*bootstrap_, error);
grpc_channel* channel = CreateXdsChannel(*bootstrap_, *new_args, error);
grpc_channel_args_destroy(new_args);
if (*error != GRPC_ERROR_NONE) { if (*error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this, gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
grpc_error_string(*error)); grpc_error_string(*error));
return; return;
} }
// Add channelz linkage.
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(channel);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
&channel_args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (xds_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
}
// Create ChannelState object. // Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>( chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
@ -1819,6 +1775,24 @@ XdsClient::~XdsClient() {
grpc_pollset_set_destroy(interested_parties_); grpc_pollset_set_destroy(interested_parties_);
} }
void XdsClient::AddChannelzLinkage(
channelz::ChannelNode* parent_channelz_node) {
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(chand_->channel());
if (xds_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
}
}
void XdsClient::RemoveChannelzLinkage(
channelz::ChannelNode* parent_channelz_node) {
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(chand_->channel());
if (xds_channelz_node != nullptr) {
parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
}
}
void XdsClient::Orphan() { void XdsClient::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this); gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
@ -1826,13 +1800,6 @@ void XdsClient::Orphan() {
{ {
MutexLock lock(&mu_); MutexLock lock(&mu_);
shutting_down_ = true; shutting_down_ = true;
// Remove channelz linkage.
if (parent_channelz_node_ != nullptr) {
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(chand_->channel());
GPR_ASSERT(xds_channelz_node != nullptr);
parent_channelz_node_->RemoveChildChannel(xds_channelz_node->uuid());
}
// Orphan ChannelState object. // Orphan ChannelState object.
chand_.reset(); chand_.reset();
// We do not clear cluster_map_ and endpoint_map_ if the xds client was // We do not clear cluster_map_ and endpoint_map_ if the xds client was

@ -92,11 +92,23 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// If *error is not GRPC_ERROR_NONE after construction, then there was // If *error is not GRPC_ERROR_NONE after construction, then there was
// an error initializing the client. // an error initializing the client.
XdsClient(const grpc_channel_args& channel_args, grpc_error** error); explicit XdsClient(grpc_error** error);
~XdsClient(); ~XdsClient();
grpc_pollset_set* interested_parties() const { return interested_parties_; } grpc_pollset_set* interested_parties() const { return interested_parties_; }
// TODO(roth): When we add federation, there will be multiple channels
// inside the XdsClient, and the set of channels may change over time,
// but not every channel may use every one of the child channels, so
// this API will need to change. At minumum, we will need to hold a
// ref to the parent channelz node so that we can update its list of
// children as the set of xDS channels changes. However, we may also
// want to make this a bit more selective such that only those
// channels on which a given parent channel is actually requesting
// resources will actually be marked as its children.
void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
void Orphan() override; void Orphan() override;
// Start and cancel listener data watch for a listener. // Start and cancel listener data watch for a listener.
@ -299,13 +311,12 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static const grpc_arg_pointer_vtable kXdsClientVtable; static const grpc_arg_pointer_vtable kXdsClientVtable;
const grpc_millis request_timeout_; const grpc_millis request_timeout_;
Mutex mu_;
grpc_pollset_set* interested_parties_; grpc_pollset_set* interested_parties_;
std::unique_ptr<XdsBootstrap> bootstrap_; std::unique_ptr<XdsBootstrap> bootstrap_;
XdsApi api_; XdsApi api_;
Mutex mu_;
// The channel for communicating with the xds server. // The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_; OrphanablePtr<ChannelState> chand_;
RefCountedPtr<channelz::ChannelNode> parent_channelz_node_; RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
@ -329,6 +340,12 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
bool shutting_down_ = false; bool shutting_down_ = false;
}; };
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args);
} // namespace internal
} // namespace grpc_core } // namespace grpc_core
#endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */ #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */

@ -56,11 +56,9 @@ class grpc_fake_channel_security_connector final
target_(gpr_strdup(target)), target_(gpr_strdup(target)),
expected_targets_( expected_targets_(
gpr_strdup(grpc_fake_transport_get_expected_targets(args))), gpr_strdup(grpc_fake_transport_get_expected_targets(args))),
is_lb_channel_( is_lb_channel_(grpc_channel_args_find(
grpc_channel_args_find(args, GRPC_ARG_ADDRESS_IS_XDS_SERVER) != args, GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER) !=
nullptr || nullptr) {
grpc_channel_args_find(
args, GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER) != nullptr) {
const grpc_arg* target_name_override_arg = const grpc_arg* target_name_override_arg =
grpc_channel_args_find(args, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG); grpc_channel_args_find(args, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
if (target_name_override_arg != nullptr) { if (target_name_override_arg != nullptr) {
@ -147,9 +145,7 @@ class grpc_fake_channel_security_connector final
char* target_name_override() const { return target_name_override_; } char* target_name_override() const { return target_name_override_; }
private: private:
bool fake_check_target(const char* target_type, const char* target, bool fake_check_target(const char* target, const char* set_str) const {
const char* set_str) const {
GPR_ASSERT(target_type != nullptr);
GPR_ASSERT(target != nullptr); GPR_ASSERT(target != nullptr);
char** set = nullptr; char** set = nullptr;
size_t set_size = 0; size_t set_size = 0;
@ -185,14 +181,14 @@ class grpc_fake_channel_security_connector final
expected_targets_); expected_targets_);
goto done; goto done;
} }
if (!fake_check_target("LB", target_, lbs_and_backends[1])) { if (!fake_check_target(target_, lbs_and_backends[1])) {
gpr_log(GPR_ERROR, "LB target '%s' not found in expected set '%s'", gpr_log(GPR_ERROR, "LB target '%s' not found in expected set '%s'",
target_, lbs_and_backends[1]); target_, lbs_and_backends[1]);
goto done; goto done;
} }
success = true; success = true;
} else { } else {
if (!fake_check_target("Backend", target_, lbs_and_backends[0])) { if (!fake_check_target(target_, lbs_and_backends[0])) {
gpr_log(GPR_ERROR, "Backend target '%s' not found in expected set '%s'", gpr_log(GPR_ERROR, "Backend target '%s' not found in expected set '%s'",
target_, lbs_and_backends[0]); target_, lbs_and_backends[0]);
goto done; goto done;

@ -46,6 +46,9 @@
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/tmpfile.h" #include "src/core/lib/gpr/tmpfile.h"
#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/map.h"
@ -144,7 +147,7 @@ constexpr char kBootstrapFileV3[] =
"{\n" "{\n"
" \"xds_servers\": [\n" " \"xds_servers\": [\n"
" {\n" " {\n"
" \"server_uri\": \"fake:///lb\",\n" " \"server_uri\": \"fake:///xds_server\",\n"
" \"channel_creds\": [\n" " \"channel_creds\": [\n"
" {\n" " {\n"
" \"type\": \"fake\"\n" " \"type\": \"fake\"\n"
@ -171,7 +174,7 @@ constexpr char kBootstrapFileV2[] =
"{\n" "{\n"
" \"xds_servers\": [\n" " \"xds_servers\": [\n"
" {\n" " {\n"
" \"server_uri\": \"fake:///lb\",\n" " \"server_uri\": \"fake:///xds_server\",\n"
" \"channel_creds\": [\n" " \"channel_creds\": [\n"
" {\n" " {\n"
" \"type\": \"fake\"\n" " \"type\": \"fake\"\n"
@ -193,25 +196,8 @@ constexpr char kBootstrapFileV2[] =
" }\n" " }\n"
"}\n"; "}\n";
constexpr char kBootstrapFileBad[] =
"{\n"
" \"xds_servers\": [\n"
" {\n"
" \"server_uri\": \"fake:///wrong_lb\",\n"
" \"channel_creds\": [\n"
" {\n"
" \"type\": \"fake\"\n"
" }\n"
" ]\n"
" }\n"
" ],\n"
" \"node\": {\n"
" }\n"
"}\n";
char* g_bootstrap_file_v3; char* g_bootstrap_file_v3;
char* g_bootstrap_file_v2; char* g_bootstrap_file_v2;
char* g_bootstrap_file_bad;
void WriteBootstrapFiles() { void WriteBootstrapFiles() {
char* bootstrap_file; char* bootstrap_file;
@ -223,10 +209,6 @@ void WriteBootstrapFiles() {
fputs(kBootstrapFileV2, out); fputs(kBootstrapFileV2, out);
fclose(out); fclose(out);
g_bootstrap_file_v2 = bootstrap_file; g_bootstrap_file_v2 = bootstrap_file;
out = gpr_tmpfile("xds_bootstrap_bad", &bootstrap_file);
fputs(kBootstrapFileBad, out);
fclose(out);
g_bootstrap_file_bad = bootstrap_file;
} }
// Helper class to minimize the number of unique ports we use for this test. // Helper class to minimize the number of unique ports we use for this test.
@ -842,6 +824,12 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
resource->set_type_url(request.type_url()); resource->set_type_url(request.type_url());
} }
} }
} else {
gpr_log(GPR_INFO,
"ADS[%p]: client does not need update for "
"type=%s name=%s version=%d",
this, request.type_url().c_str(),
resource_name.c_str(), resource_state.version);
} }
} }
// Process unsubscriptions for any resource no longer // Process unsubscriptions for any resource no longer
@ -1349,8 +1337,20 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
g_port_saver->Reset(); g_port_saver->Reset();
response_generator_ = response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>(); grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
// Inject xDS channel response generator.
lb_channel_response_generator_ = lb_channel_response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>(); grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
xds_channel_args_to_add_.emplace_back(
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_channel_response_generator_.get()));
if (xds_resource_does_not_exist_timeout_ms_ > 0) {
xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
xds_resource_does_not_exist_timeout_ms_));
}
xds_channel_args_.num_args = xds_channel_args_to_add_.size();
xds_channel_args_.args = xds_channel_args_to_add_.data();
grpc_core::internal::SetXdsChannelArgsForTest(&xds_channel_args_);
// Start the backends. // Start the backends.
for (size_t i = 0; i < num_backends_; ++i) { for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServerThread); backends_.emplace_back(new BackendServerThread);
@ -1391,31 +1391,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int failover_timeout = 0, void ResetStub(int failover_timeout = 0) {
const std::string& expected_targets = "",
int xds_resource_does_not_exist_timeout = 0) {
ChannelArguments args; ChannelArguments args;
if (failover_timeout > 0) { if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout); args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout);
} }
if (xds_resource_does_not_exist_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
xds_resource_does_not_exist_timeout);
}
// If the parent channel is using the fake resolver, we inject the // If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution() // response generator here.
// will inject the xds channel's response generator via the parent's if (!GetParam().use_xds_resolver()) {
// response generator.
//
// In contrast, if we are using the xds resolver, then the parent
// channel never uses a response generator, and we inject the xds
// channel's response generator here.
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
GetParam().use_xds_resolver() response_generator_.get());
? lb_channel_response_generator_.get()
: response_generator_.get());
if (!expected_targets.empty()) {
args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
} }
std::string uri = absl::StrCat( std::string uri = absl::StrCat(
GetParam().use_xds_resolver() ? "xds" : "fake", ":///", kServerName); GetParam().use_xds_resolver() ? "xds" : "fake", ":///", kServerName);
@ -1603,9 +1588,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return addresses; return addresses;
} }
void SetNextResolution(const std::vector<int>& ports, void SetNextResolution(const std::vector<int>& ports) {
grpc_core::FakeResolverResponseGenerator*
lb_channel_response_generator = nullptr) {
if (GetParam().use_xds_resolver()) return; // Not used with xds resolver. if (GetParam().use_xds_resolver()) return; // Not used with xds resolver.
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result; grpc_core::Resolver::Result result;
@ -1619,30 +1602,22 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
grpc_core::ServiceConfig::Create(service_config_json, &error); grpc_core::ServiceConfig::Create(service_config_json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error); ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error);
ASSERT_NE(result.service_config.get(), nullptr); ASSERT_NE(result.service_config.get(), nullptr);
grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_channel_response_generator == nullptr
? lb_channel_response_generator_.get()
: lb_channel_response_generator);
result.args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
response_generator_->SetResponse(std::move(result)); response_generator_->SetResponse(std::move(result));
} }
void SetNextResolutionForLbChannelAllBalancers( void SetNextResolutionForLbChannelAllBalancers(
const char* service_config_json = nullptr, const char* service_config_json = nullptr,
grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = const char* expected_targets = nullptr) {
nullptr) {
std::vector<int> ports; std::vector<int> ports;
for (size_t i = 0; i < balancers_.size(); ++i) { for (size_t i = 0; i < balancers_.size(); ++i) {
ports.emplace_back(balancers_[i]->port()); ports.emplace_back(balancers_[i]->port());
} }
SetNextResolutionForLbChannel(ports, service_config_json, SetNextResolutionForLbChannel(ports, service_config_json, expected_targets);
lb_channel_response_generator);
} }
void SetNextResolutionForLbChannel( void SetNextResolutionForLbChannel(const std::vector<int>& ports,
const std::vector<int>& ports, const char* service_config_json = nullptr, const char* service_config_json = nullptr,
grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = const char* expected_targets = nullptr) {
nullptr) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result; grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(ports); result.addresses = CreateAddressListFromPortList(ports);
@ -1653,10 +1628,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
ASSERT_NE(result.service_config.get(), nullptr); ASSERT_NE(result.service_config.get(), nullptr);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error); ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error);
} }
if (lb_channel_response_generator == nullptr) { if (expected_targets != nullptr) {
lb_channel_response_generator = lb_channel_response_generator_.get(); grpc_arg expected_targets_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS),
const_cast<char*>(expected_targets));
result.args =
grpc_channel_args_copy_and_add(nullptr, &expected_targets_arg, 1);
} }
lb_channel_response_generator->SetResponse(std::move(result)); lb_channel_response_generator_->SetResponse(std::move(result));
} }
void SetNextReresolutionResponse(const std::vector<int>& ports) { void SetNextReresolutionResponse(const std::vector<int>& ports) {
@ -1725,9 +1704,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
} }
} }
void CheckRpcSendFailure(const size_t times = 1, bool server_fail = false) { void CheckRpcSendFailure(const size_t times = 1,
const RpcOptions& rpc_options = RpcOptions()) {
for (size_t i = 0; i < times; ++i) { for (size_t i = 0; i < times; ++i) {
const Status status = SendRpc(RpcOptions().set_server_fail(server_fail)); const Status status = SendRpc(rpc_options);
EXPECT_FALSE(status.ok()); EXPECT_FALSE(status.ok());
} }
} }
@ -1912,6 +1892,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
response_generator_; response_generator_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
lb_channel_response_generator_; lb_channel_response_generator_;
int xds_resource_does_not_exist_timeout_ms_ = 0;
absl::InlinedVector<grpc_arg, 2> xds_channel_args_to_add_;
grpc_channel_args xds_channel_args_;
}; };
class BasicTest : public XdsEnd2endTest { class BasicTest : public XdsEnd2endTest {
@ -2384,43 +2367,30 @@ using SecureNamingTest = BasicTest;
// Tests that secure naming check passes if target name is expected. // Tests that secure naming check passes if target name is expected.
TEST_P(SecureNamingTest, TargetNameIsExpected) { TEST_P(SecureNamingTest, TargetNameIsExpected) {
// TODO(juanlishen): Use separate fake creds for the balancer channel.
ResetStub(0, absl::StrCat(kServerName, ";lb"));
SetNextResolution({}); SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()}); SetNextResolutionForLbChannel({balancers_[0]->port()}, nullptr, "xds_server");
const size_t kNumRpcsPerAddress = 100;
AdsServiceImpl::EdsResourceArgs args({ AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()}, {"locality0", GetBackendPorts()},
}); });
balancers_[0]->ads_service()->SetEdsResource( balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args, DefaultEdsServiceName())); AdsServiceImpl::BuildEdsResource(args, DefaultEdsServiceName()));
// Make sure that trying to connect works without a call. CheckRpcSendOk();
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
} }
// Tests that secure naming check fails if target name is unexpected. // Tests that secure naming check fails if target name is unexpected.
TEST_P(SecureNamingTest, TargetNameIsUnexpected) { TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
gpr_setenv("GRPC_XDS_BOOTSTRAP", g_bootstrap_file_bad);
::testing::FLAGS_gtest_death_test_style = "threadsafe"; ::testing::FLAGS_gtest_death_test_style = "threadsafe";
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()}, nullptr,
"incorrect_server_name");
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args, DefaultEdsServiceName()));
// Make sure that we blow up (via abort() from the security connector) when // Make sure that we blow up (via abort() from the security connector) when
// the name from the balancer doesn't match expectations. // the name from the balancer doesn't match expectations.
ASSERT_DEATH_IF_SUPPORTED( ASSERT_DEATH_IF_SUPPORTED({ CheckRpcSendOk(); }, "");
{
ResetStub(0, absl::StrCat(kServerName, ";lb"));
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
},
"");
} }
using LdsTest = BasicTest; using LdsTest = BasicTest;
@ -2964,19 +2934,6 @@ TEST_P(LdsRdsTest, RouteHeaderMatchInvalidRange) {
"cannot be smaller than start."); "cannot be smaller than start.");
} }
// Tests that LDS client times out when no response received.
TEST_P(LdsRdsTest, Timeout) {
ResetStub(0, "", 500);
if (GetParam().enable_rds_testing()) {
balancers_[0]->ads_service()->SetResourceIgnore(kRdsTypeUrl);
} else {
balancers_[0]->ads_service()->SetResourceIgnore(kLdsTypeUrl);
}
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
// Tests that LDS client should choose the default route (with no matching // Tests that LDS client should choose the default route (with no matching
// specified) after unable to find a match with previous routes. // specified) after unable to find a match with previous routes.
TEST_P(LdsRdsTest, XdsRoutingPathMatching) { TEST_P(LdsRdsTest, XdsRoutingPathMatching) {
@ -4270,25 +4227,8 @@ TEST_P(CdsTest, WrongLrsServer) {
EXPECT_EQ(response_state.error_message, "LRS ConfigSource is not self."); EXPECT_EQ(response_state.error_message, "LRS ConfigSource is not self.");
} }
// Tests that CDS client times out when no response received.
TEST_P(CdsTest, Timeout) {
ResetStub(0, "", 500);
balancers_[0]->ads_service()->SetResourceIgnore(kCdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using EdsTest = BasicTest; using EdsTest = BasicTest;
TEST_P(EdsTest, Timeout) {
ResetStub(0, "", 500);
balancers_[0]->ads_service()->SetResourceIgnore(kEdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
// Tests that EDS client should send a NACK if the EDS update contains // Tests that EDS client should send a NACK if the EDS update contains
// sparse priorities. // sparse priorities.
TEST_P(EdsTest, NacksSparsePriorityList) { TEST_P(EdsTest, NacksSparsePriorityList) {
@ -4326,6 +4266,44 @@ TEST_P(EdsTest, EdsServiceNameDefaultsToClusterName) {
CheckRpcSendOk(); CheckRpcSendOk();
} }
class TimeoutTest : public BasicTest {
protected:
void SetUp() override {
xds_resource_does_not_exist_timeout_ms_ = 500;
BasicTest::SetUp();
}
};
// Tests that LDS client times out when no response received.
TEST_P(TimeoutTest, Lds) {
balancers_[0]->ads_service()->SetResourceIgnore(kLdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, Rds) {
balancers_[0]->ads_service()->SetResourceIgnore(kRdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
// Tests that CDS client times out when no response received.
TEST_P(TimeoutTest, Cds) {
balancers_[0]->ads_service()->SetResourceIgnore(kCdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
TEST_P(TimeoutTest, Eds) {
balancers_[0]->ads_service()->SetResourceIgnore(kEdsTypeUrl);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using LocalityMapTest = BasicTest; using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their // Tests that the localities in a locality map are picked according to their
@ -4564,7 +4542,7 @@ class FailoverTest : public BasicTest {
public: public:
void SetUp() override { void SetUp() override {
BasicTest::SetUp(); BasicTest::SetUp();
ResetStub(500, ""); ResetStub(500);
} }
}; };
@ -5276,7 +5254,7 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
// Send kNumRpcsPerAddress RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
CheckRpcSendFailure(kNumFailuresPerAddress * num_backends_, CheckRpcSendFailure(kNumFailuresPerAddress * num_backends_,
/*server_fail=*/true); RpcOptions().set_server_fail(true));
// Check that each backend got the right number of requests. // Check that each backend got the right number of requests.
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress, EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
@ -5323,7 +5301,7 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) {
// Send kNumRpcsPerAddress RPCs per server. // Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
CheckRpcSendFailure(kNumFailuresPerAddress * num_backends_, CheckRpcSendFailure(kNumFailuresPerAddress * num_backends_,
/*server_fail=*/true); RpcOptions().set_server_fail(true));
// Check that each backend got the right number of requests. // Check that each backend got the right number of requests.
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress, EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
@ -5536,6 +5514,12 @@ std::string TestTypeName(const ::testing::TestParamInfo<TestType>& info) {
return info.param.AsString(); return info.param.AsString();
} }
// TestType params:
// - use_xds_resolver
// - enable_load_reporting
// - enable_rds_testing = false
// - use_v2 = false
INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest, INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
::testing::Values(TestType(false, true), ::testing::Values(TestType(false, true),
TestType(false, false), TestType(false, false),
@ -5543,11 +5527,12 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
TestType(true, true)), TestType(true, true)),
&TestTypeName); &TestTypeName);
// Run with both fake resolver and xds resolver.
// Don't run with load reporting or v2 or RDS, since they are irrelevant to
// the tests.
INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest, INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
::testing::Values(TestType(false, true), ::testing::Values(TestType(false, false),
TestType(false, false), TestType(true, false)),
TestType(true, false),
TestType(true, true)),
&TestTypeName); &TestTypeName);
// LDS depends on XdsResolver. // LDS depends on XdsResolver.
@ -5579,6 +5564,14 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest,
TestType(true, true)), TestType(true, true)),
&TestTypeName); &TestTypeName);
// Test initial resource timeouts for each resource type.
// Do this only for XdsResolver with RDS enabled, so that we can test
// all resource types.
// Run with V3 only, since the functionality is no different in V2.
INSTANTIATE_TEST_SUITE_P(XdsTest, TimeoutTest,
::testing::Values(TestType(true, false, true)),
&TestTypeName);
// XdsResolverOnlyTest depends on XdsResolver. // XdsResolverOnlyTest depends on XdsResolver.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest, INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest,
::testing::Values(TestType(true, false), ::testing::Values(TestType(true, false),

Loading…
Cancel
Save