Share XdsClient between channels.

pull/24239/head
Mark D. Roth 4 years ago
parent b830c86d09
commit 1ba51dcb1a
  1. 1
      BUILD
  2. 1
      BUILD.gn
  3. 8
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 15
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  9. 147
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  10. 21
      src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
  11. 15
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  12. 76
      src/core/ext/xds/xds_client.cc
  13. 61
      src/core/ext/xds/xds_client.h
  14. 6
      src/core/plugin_registry/grpc_plugin_registry.cc
  15. 67
      test/cpp/end2end/xds_end2end_test.cc
  16. 1
      tools/doxygen/Doxyfile.c++.internal
  17. 1
      tools/doxygen/Doxyfile.core.internal

@ -980,6 +980,7 @@ grpc_cc_library(
language = "c++",
public_hdrs = GRPC_PUBLIC_HDRS,
deps = [
"dual_ref_counted",
"eventmanager_libuv",
"gpr_base",
"grpc_codegen",

@ -606,6 +606,7 @@ config("grpc_config") {
"src/core/lib/debug/trace.h",
"src/core/lib/gprpp/atomic.h",
"src/core/lib/gprpp/debug_location.h",
"src/core/lib/gprpp/dual_ref_counted.h",
"src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h",

@ -570,6 +570,7 @@ libs:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
@ -1470,6 +1471,7 @@ libs:
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
@ -5632,8 +5634,7 @@ targets:
gtest: true
build: test
language: c++
headers:
- src/core/lib/gprpp/dual_ref_counted.h
headers: []
src:
- test/core/gprpp/dual_ref_counted_test.cc
deps:
@ -6764,8 +6765,7 @@ targets:
gtest: true
build: test
language: c++
headers:
- src/core/lib/gprpp/dual_ref_counted.h
headers: []
src:
- test/core/gprpp/ref_counted_ptr_test.cc
deps:

@ -407,6 +407,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/global_config.h',
'src/core/lib/gprpp/global_config_custom.h',
@ -913,6 +914,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/global_config.h',
'src/core/lib/gprpp/global_config_custom.h',

@ -640,6 +640,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/global_config.h',
@ -1340,6 +1341,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/global_config.h',
'src/core/lib/gprpp/global_config_custom.h',

@ -558,6 +558,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/arena.h )
s.files += %w( src/core/lib/gprpp/atomic.h )
s.files += %w( src/core/lib/gprpp/debug_location.h )
s.files += %w( src/core/lib/gprpp/dual_ref_counted.h )
s.files += %w( src/core/lib/gprpp/fork.cc )
s.files += %w( src/core/lib/gprpp/fork.h )
s.files += %w( src/core/lib/gprpp/global_config.h )

@ -538,6 +538,7 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/arena.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/atomic.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/debug_location.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/dual_ref_counted.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/fork.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/fork.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/global_config.h" role="src" />

@ -234,8 +234,8 @@ void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p from channel",
this, xds_client_.get());
gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
xds_client_.get());
}
}
@ -430,12 +430,13 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"cds LB policy");
"cannot get XdsClient to instantiate cds LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
}
return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));

@ -91,7 +91,7 @@ class EdsLbConfig : public LoadBalancingPolicy::Config {
// EDS LB policy.
class EdsLb : public LoadBalancingPolicy {
public:
explicit EdsLb(Args args);
EdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
const char* name() const override { return kEds; }
@ -198,7 +198,7 @@ class EdsLb : public LoadBalancingPolicy {
// Caller must ensure that config_ is set before calling.
const absl::string_view GetEdsResourceName() const {
if (xds_client_from_channel_ == nullptr) return server_name_;
if (!is_xds_uri_) return server_name_;
if (!config_->eds_service_name().empty()) {
return config_->eds_service_name();
}
@ -209,17 +209,13 @@ class EdsLb : public LoadBalancingPolicy {
// for LRS load reporting.
// Caller must ensure that config_ is set before calling.
std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
if (xds_client_from_channel_ == nullptr) return {server_name_, nullptr};
if (!is_xds_uri_) return {server_name_, nullptr};
return {config_->cluster_name(), config_->eds_service_name()};
}
XdsClient* xds_client() const {
return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get()
: xds_client_.get();
}
// Server name from target URI.
std::string server_name_;
bool is_xds_uri_;
// Current channel args and config from the resolver.
const grpc_channel_args* args_ = nullptr;
@ -229,11 +225,7 @@ class EdsLb : public LoadBalancingPolicy {
bool shutting_down_ = false;
// The xds client and endpoint watcher.
// If we get the XdsClient from the channel, we store it in
// xds_client_from_channel_; if we create it ourselves, we store it in
// xds_client_.
RefCountedPtr<XdsClient> xds_client_from_channel_;
OrphanablePtr<XdsClient> xds_client_;
RefCountedPtr<XdsClient> xds_client_;
// A pointer to the endpoint watcher, to be used when cancelling the watch.
// Note that this is not owned, so this pointer must never be derefernced.
EndpointWatcher* endpoint_watcher_ = nullptr;
@ -380,25 +372,38 @@ void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
// EdsLb public methods
//
EdsLb::EdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)) {
EdsLb::EdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] created -- xds client from channel: %p", this,
xds_client_from_channel_.get());
gpr_log(GPR_INFO, "[edslb %p] created -- using xds client %p", this,
xds_client_.get());
}
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
const char* server_uri =
grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path;
is_xds_uri_ = strcmp(uri->scheme, "xds") == 0;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] server name from channel: %s", this,
server_name_.c_str());
gpr_log(GPR_INFO, "[edslb %p] server name from channel (is_xds_uri=%d): %s",
this, is_xds_uri_, server_name_.c_str());
}
grpc_uri_destroy(uri);
// EDS-only flow.
if (!is_xds_uri_) {
// Setup channelz linkage.
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->AddChannelzLinkage(parent_channelz_node);
}
// Couple polling.
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
interested_parties());
}
}
EdsLb::~EdsLb() {
@ -417,32 +422,29 @@ void EdsLb::ShutdownLocked() {
child_picker_.reset();
MaybeDestroyChildPolicyLocked();
drop_stats_.reset();
// Cancel the endpoint watch here instead of in our dtor if we are using the
// xds resolver, because the watcher holds a ref to us and we might not be
// destroying the XdsClient, leading to a situation where this LB policy is
// never destroyed.
if (xds_client_from_channel_ != nullptr) {
if (config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this,
std::string(GetEdsResourceName()).c_str());
}
xds_client()->CancelEndpointDataWatch(GetEdsResourceName(),
endpoint_watcher_);
// Cancel watcher.
if (endpoint_watcher_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this,
std::string(GetEdsResourceName()).c_str());
}
xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb");
xds_client_->CancelEndpointDataWatch(GetEdsResourceName(),
endpoint_watcher_);
}
if (xds_client_ != nullptr) {
if (!is_xds_uri_) {
// Remove channelz linkage.
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->RemoveChannelzLinkage(parent_channelz_node);
}
// Decouple polling.
grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
interested_parties());
xds_client_.reset();
}
xds_client_.reset(DEBUG_LOCATION, "EdsLb");
// Destroy channel args.
grpc_channel_args_destroy(args_);
args_ = nullptr;
}
@ -467,35 +469,13 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
grpc_channel_args_destroy(args_);
args_ = args.args;
args.args = nullptr;
if (is_initial_update) {
// Initialize XdsClient.
if (xds_client_from_channel_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(&error);
// TODO(roth): If we decide that we care about EDS-only mode, add
// proper error handling here.
GPR_ASSERT(error == GRPC_ERROR_NONE);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (parent_channelz_node != nullptr) {
xds_client_->AddChannelzLinkage(parent_channelz_node);
}
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
gpr_log(GPR_INFO, "[edslb %p] Created xds client %p", this,
xds_client_.get());
}
}
}
// Update drop stats for load reporting if needed.
if (is_initial_update || config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name()) {
drop_stats_.reset();
if (config_->lrs_load_reporting_server_name().has_value()) {
const auto key = GetLrsClusterKey();
drop_stats_ = xds_client()->AddClusterDropStats(
drop_stats_ = xds_client_->AddClusterDropStats(
config_->lrs_load_reporting_server_name().value(),
key.first /*cluster_name*/, key.second /*eds_service_name*/);
}
@ -514,15 +494,14 @@ void EdsLb::UpdateLocked(UpdateArgs args) {
auto watcher = absl::make_unique<EndpointWatcher>(
Ref(DEBUG_LOCATION, "EndpointWatcher"));
endpoint_watcher_ = watcher.get();
xds_client()->WatchEndpointData(GetEdsResourceName(), std::move(watcher));
xds_client_->WatchEndpointData(GetEdsResourceName(), std::move(watcher));
}
}
void EdsLb::ResetBackoffLocked() {
// When the XdsClient is instantiated in the resolver instead of in this
// LB policy, this is done via the resolver, so we don't need to do it
// for xds_client_from_channel_ here.
if (xds_client_ != nullptr) xds_client_->ResetBackoff();
// LB policy, this is done via the resolver, so we don't need to do it here.
if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
}
@ -789,9 +768,11 @@ void EdsLb::UpdateChildPolicyLocked() {
grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked(
const grpc_channel_args* args) {
absl::InlinedVector<grpc_arg, 3> args_to_add = {
grpc_arg args_to_add[] = {
// A channel arg indicating if the target is a backend inferred from an
// xds load balancer.
// TODO(roth): This isn't needed with the new fallback design.
// Remove as part of implementing the new fallback functionality.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
1),
@ -800,18 +781,8 @@ grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked(
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
};
absl::InlinedVector<const char*, 1> args_to_remove;
if (xds_client_from_channel_ == nullptr) {
args_to_add.emplace_back(xds_client_->MakeChannelArg());
} else if (!config_->lrs_load_reporting_server_name().has_value()) {
// Remove XdsClient from channel args, so that its presence doesn't
// prevent us from sharing subchannels between channels.
// If load reporting is enabled, this happens in the LRS policy instead.
args_to_remove.push_back(GRPC_ARG_XDS_CLIENT);
}
return grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove.data(), args_to_remove.size(), args_to_add.data(),
args_to_add.size());
return grpc_channel_args_copy_and_add(args, args_to_add,
GPR_ARRAY_SIZE(args_to_add));
}
OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked(
@ -863,7 +834,17 @@ class EdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<EdsChildHandler>(std::move(args), &grpc_lb_eds_trace);
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"cannot get XdsClient to instantiate eds LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
}
return MakeOrphanable<EdsChildHandler>(std::move(xds_client),
std::move(args));
}
const char* name() const override { return kEds; }
@ -974,8 +955,9 @@ class EdsLbFactory : public LoadBalancingPolicyFactory {
private:
class EdsChildHandler : public ChildPolicyHandler {
public:
EdsChildHandler(Args args, TraceFlag* tracer)
: ChildPolicyHandler(std::move(args), tracer) {}
EdsChildHandler(RefCountedPtr<XdsClient> xds_client, Args args)
: ChildPolicyHandler(std::move(args), &grpc_lb_eds_trace),
xds_client_(std::move(xds_client)) {}
bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
@ -991,8 +973,11 @@ class EdsLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<EdsLb>(std::move(args));
return MakeOrphanable<EdsLb>(xds_client_, std::move(args));
}
private:
RefCountedPtr<XdsClient> xds_client_;
};
};

@ -196,8 +196,8 @@ LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick(
LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel",
this, xds_client_.get());
gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p", this,
xds_client_.get());
}
}
@ -255,11 +255,9 @@ void LrsLb::UpdateLocked(UpdateArgs args) {
config_->eds_service_name(), config_->locality_name());
MaybeUpdatePickerLocked();
}
// Remove XdsClient from channel args, so that its presence doesn't
// prevent us from sharing subchannels between channels.
grpc_channel_args* new_args = XdsClient::RemoveFromChannelArgs(*args.args);
// Update child policy.
UpdateChildPolicyLocked(std::move(args.addresses), new_args);
UpdateChildPolicyLocked(std::move(args.addresses), args.args);
args.args = nullptr;
}
void LrsLb::MaybeUpdatePickerLocked() {
@ -368,12 +366,13 @@ class LrsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"lrs LB policy");
"cannot get XdsClient to instantiate lrs LB policy: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
return nullptr;
}
return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args));

@ -183,7 +183,7 @@ class XdsResolver : public Resolver {
std::string server_name_;
const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_;
OrphanablePtr<XdsClient> xds_client_;
RefCountedPtr<XdsClient> xds_client_;
XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
std::string route_config_name_;
XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
@ -513,7 +513,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(&error);
xds_client_ = XdsClient::GetOrCreate(&error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
@ -607,9 +607,8 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
void XdsResolver::OnError(grpc_error* error) {
gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
this, grpc_error_string(error));
grpc_arg xds_client_arg = xds_client_->MakeChannelArg();
Result result;
result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1);
result.args = grpc_channel_args_copy(args_);
result.service_config_error = error;
result_handler()->ReturnResult(std::move(result));
}
@ -674,12 +673,8 @@ void XdsResolver::GenerateResult() {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
result.service_config->json_string().c_str());
}
grpc_arg new_args[] = {
xds_client_->MakeChannelArg(),
config_selector->MakeChannelArg(),
};
result.args =
grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
grpc_arg new_arg = config_selector->MakeChannelArg();
result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
result_handler()->ReturnResult(std::move(result));
}

@ -70,16 +70,12 @@ namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client");
namespace {
const grpc_channel_args* g_channel_args = nullptr;
} // namespace
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args) {
g_channel_args = args;
}
Mutex* g_mu = nullptr;
const grpc_channel_args* g_channel_args = nullptr;
XdsClient* g_xds_client = nullptr;
} // namespace internal
} // namespace
//
// Internal class declarations
@ -435,7 +431,7 @@ class XdsClient::ChannelState::StateWatcher
// XdsClient::ChannelState
//
XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client,
XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
grpc_channel* channel)
: InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
xds_client_(std::move(xds_client)),
@ -1739,7 +1735,7 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
} // namespace
XdsClient::XdsClient(grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
: DualRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout()),
interested_parties_(grpc_pollset_set_create()),
bootstrap_(
@ -1765,7 +1761,7 @@ XdsClient::XdsClient(grpc_error** error)
}
// Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
}
XdsClient::~XdsClient() {
@ -1797,6 +1793,10 @@ void XdsClient::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
}
{
MutexLock lock(g_mu);
if (g_xds_client == this) g_xds_client = nullptr;
}
{
MutexLock lock(&mu_);
shutting_down_ = true;
@ -1813,7 +1813,6 @@ void XdsClient::Orphan() {
endpoint_map_.clear();
}
}
Unref(DEBUG_LOCATION, "XdsClient::Orphan()");
}
void XdsClient::WatchListenerData(
@ -2168,41 +2167,40 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
return snapshot_map;
}
void* XdsClient::ChannelArgCopy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
return p;
}
//
// accessors for global state
//
void XdsClient::ChannelArgDestroy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Unref(DEBUG_LOCATION, "channel arg");
void XdsClientGlobalInit() { g_mu = new Mutex; }
void XdsClientGlobalShutdown() {
delete g_mu;
g_mu = nullptr;
}
int XdsClient::ChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
MutexLock lock(g_mu);
if (g_xds_client != nullptr) {
auto xds_client = g_xds_client->RefIfNonZero();
if (xds_client != nullptr) return xds_client;
}
auto xds_client = MakeRefCounted<XdsClient>(error);
g_xds_client = xds_client.get();
return xds_client;
}
const grpc_arg_pointer_vtable XdsClient::kXdsClientVtable = {
XdsClient::ChannelArgCopy, XdsClient::ChannelArgDestroy,
XdsClient::ChannelArgCmp};
namespace internal {
grpc_arg XdsClient::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
const_cast<XdsClient*>(this),
&XdsClient::kXdsClientVtable);
void SetXdsChannelArgsForTest(grpc_channel_args* args) {
MutexLock lock(g_mu);
g_channel_args = args;
}
RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
const grpc_channel_args& args) {
XdsClient* xds_client =
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
if (xds_client == nullptr) return nullptr;
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
void UnsetGlobalXdsClientForTest() {
MutexLock lock(g_mu);
g_xds_client = nullptr;
}
grpc_channel_args* XdsClient::RemoveFromChannelArgs(
const grpc_channel_args& args) {
const char* arg_name = GRPC_ARG_XDS_CLIENT;
return grpc_channel_args_copy_and_remove(&args, &arg_name, 1);
}
} // namespace internal
} // namespace grpc_core

@ -29,6 +29,7 @@
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -40,17 +41,14 @@ namespace grpc_core {
extern TraceFlag xds_client_trace;
class XdsClient : public InternallyRefCounted<XdsClient> {
class XdsClient : public DualRefCounted<XdsClient> {
public:
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface {
public:
virtual ~ListenerWatcherInterface() = default;
virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
@ -58,11 +56,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
class RouteConfigWatcherInterface {
public:
virtual ~RouteConfigWatcherInterface() = default;
virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
@ -70,11 +65,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
class ClusterWatcherInterface {
public:
virtual ~ClusterWatcherInterface() = default;
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
@ -82,16 +74,17 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
class EndpointWatcherInterface {
public:
virtual ~EndpointWatcherInterface() = default;
virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
virtual void OnError(grpc_error* error) = 0;
virtual void OnResourceDoesNotExist() = 0;
};
// If *error is not GRPC_ERROR_NONE after construction, then there was
// Factory function to get or create the global XdsClient instance.
// If *error is not GRPC_ERROR_NONE upon return, then there was
// an error initializing the client.
static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error);
// Callers should not instantiate directly. Use GetOrCreate() instead.
explicit XdsClient(grpc_error** error);
~XdsClient();
@ -188,24 +181,14 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// Resets connection backoff state.
void ResetBackoff();
// Helpers for encoding the XdsClient object in channel args.
grpc_arg MakeChannelArg() const;
static RefCountedPtr<XdsClient> GetFromChannelArgs(
const grpc_channel_args& args);
static grpc_channel_args* RemoveFromChannelArgs(
const grpc_channel_args& args);
private:
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
// TODO(roth): This is separate from the XdsClient object because it was
// originally designed to be able to swap itself out in case the
// balancer name changed. Now that the balancer name is going to be
// coming from the bootstrap file, we don't really need this level of
// indirection unless we decide to support watching the bootstrap file
// for changes. At some point, if we decide that we're never going to
// need to do that, then we can eliminate this class and move its
// contents directly into the XdsClient class.
//
// Currently, there is only one ChannelState object per XdsClient
// object, and it has essentially the same lifetime. But in the
// future, when we add federation support, a single XdsClient may have
// multiple underlying channels to talk to different xDS servers.
class ChannelState : public InternallyRefCounted<ChannelState> {
public:
template <typename T>
@ -214,7 +197,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
class AdsCallState;
class LrsCallState;
ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
grpc_channel* channel);
~ChannelState();
void Orphan() override;
@ -240,7 +224,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
class StateWatcher;
// The owning xds client.
RefCountedPtr<XdsClient> xds_client_;
WeakRefCountedPtr<XdsClient> xds_client_;
// The channel and its status.
grpc_channel* channel_;
@ -283,6 +267,10 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
absl::optional<XdsApi::EdsUpdate> update;
};
// TODO(roth): Change this to store exactly one instance of
// XdsClusterDropStats and exactly one instance of
// XdsClusterLocalityStats per locality. We can return multiple refs
// to the same object instead of registering multiple objects.
struct LoadReportState {
struct LocalityState {
std::set<XdsClusterLocalityStats*> locality_stats;
@ -303,13 +291,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters);
// Channel arg vtable functions.
static void* ChannelArgCopy(void* p);
static void ChannelArgDestroy(void* p);
static int ChannelArgCmp(void* p, void* q);
static const grpc_arg_pointer_vtable kXdsClientVtable;
const grpc_millis request_timeout_;
grpc_pollset_set* interested_parties_;
std::unique_ptr<XdsBootstrap> bootstrap_;
@ -319,7 +300,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;
RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
// One entry for each watched LDS resource.
std::map<std::string /*listener_name*/, ListenerState> listener_map_;
@ -341,9 +321,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
};
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args);
void UnsetGlobalXdsClientForTest();
} // namespace internal
} // namespace grpc_core

@ -62,6 +62,10 @@ void grpc_workaround_cronet_compression_filter_init(void);
void grpc_workaround_cronet_compression_filter_shutdown(void);
#ifndef GRPC_NO_XDS
namespace grpc_core {
void XdsClientGlobalInit();
void XdsClientGlobalShutdown();
} // namespace grpc_core
void grpc_certificate_provider_registry_init(void);
void grpc_certificate_provider_registry_shutdown(void);
void grpc_lb_policy_cds_init(void);
@ -118,6 +122,8 @@ void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,
grpc_workaround_cronet_compression_filter_shutdown);
#ifndef GRPC_NO_XDS
grpc_register_plugin(grpc_core::XdsClientGlobalInit,
grpc_core::XdsClientGlobalShutdown);
grpc_register_plugin(grpc_certificate_provider_registry_init,
grpc_certificate_provider_registry_shutdown);
grpc_register_plugin(grpc_lb_policy_cds_init,

@ -673,6 +673,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
}
std::set<std::string> clients() {
grpc_core::MutexLock lock(&clients_mu_);
return clients_;
}
private:
// A queue of resource type/name pairs that have changed since the client
// subscribed to them.
@ -719,6 +724,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
Status StreamAggregatedResources(ServerContext* context,
Stream* stream) override {
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
parent_->AddClient(context->peer());
if (is_v2_) {
parent_->seen_v2_client_ = true;
} else {
@ -936,6 +942,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
}
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this);
parent_->RemoveClient(context->peer());
return Status::OK;
}
@ -1088,6 +1095,16 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
}
void AddClient(const std::string& client) {
grpc_core::MutexLock lock(&clients_mu_);
clients_.insert(client);
}
void RemoveClient(const std::string& client) {
grpc_core::MutexLock lock(&clients_mu_);
clients_.erase(client);
}
RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService,
::envoy::api::v2::DiscoveryRequest,
::envoy::api::v2::DiscoveryResponse>
@ -1116,6 +1133,9 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
// yet been destroyed by UnsetResource()).
// - There is at least one subscription for the resource.
ResourceMap resource_map_;
grpc_core::Mutex clients_mu_;
std::set<std::string> clients_;
};
class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
@ -1196,7 +1216,7 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
Status StreamLoadStats(ServerContext* /*context*/,
Stream* stream) override {
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
GPR_ASSERT(parent_->client_load_reporting_interval_seconds_ > 0);
EXPECT_GT(parent_->client_load_reporting_interval_seconds_, 0);
// Take a reference of the LrsServiceImpl object, reference will go
// out of scope after this method exits.
std::shared_ptr<LrsServiceImpl> lrs_service_impl =
@ -1377,6 +1397,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void TearDown() override {
ShutdownAllBackends();
for (auto& balancer : balancers_) balancer->Shutdown();
// Make sure each test creates a new XdsClient instance rather than
// reusing the one from the previous test. This avoids spurious failures
// caused when a load reporting test runs after a non-load reporting test
// and the XdsClient is still talking to the old LRS server, which fails
// because it's not expecting the client to connect. It also
// ensures that each test can independently set the global channel
// args for the xDS channel.
grpc_core::internal::UnsetGlobalXdsClientForTest();
}
void StartAllBackends() {
@ -1392,6 +1420,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int failover_timeout = 0) {
channel_ = CreateChannel(failover_timeout);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
}
std::shared_ptr<Channel> CreateChannel(
int failover_timeout = 0, const char* server_name = kServerName) {
ChannelArguments args;
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout);
@ -1403,7 +1439,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
response_generator_.get());
}
std::string uri = absl::StrCat(
GetParam().use_xds_resolver() ? "xds" : "fake", ":///", kServerName);
GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name);
// TODO(dgq): templatize tests to run everything using both secure and
// insecure channel credentials.
grpc_channel_credentials* channel_creds =
@ -1415,10 +1451,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
channel_creds, call_creds, nullptr)));
call_creds->Unref();
channel_creds->Unref();
channel_ = ::grpc::CreateCustomChannel(uri, creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
return ::grpc::CreateCustomChannel(uri, creds, args);
}
enum RpcService {
@ -2226,6 +2259,28 @@ TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) {
WaitForAllBackends();
}
TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) {
const char* kNewServerName = "new-server.example.com";
Listener listener = balancers_[0]->ads_service()->default_listener();
listener.set_name(kNewServerName);
balancers_[0]->ads_service()->SetLdsResource(listener);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
WaitForAllBackends();
// Create second channel and tell it to connect to kNewServerName.
auto channel2 = CreateChannel(/*failover_timeout=*/0, kNewServerName);
channel2->GetState(/*try_to_connect=*/true);
ASSERT_TRUE(
channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
// Make sure there's only one client connected.
EXPECT_EQ(1UL, balancers_[0]->ads_service()->clients().size());
}
class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest {
public:
XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {}

@ -1494,6 +1494,7 @@ src/core/lib/gprpp/arena.cc \
src/core/lib/gprpp/arena.h \
src/core/lib/gprpp/atomic.h \
src/core/lib/gprpp/debug_location.h \
src/core/lib/gprpp/dual_ref_counted.h \
src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/fork.h \
src/core/lib/gprpp/global_config.h \

@ -1333,6 +1333,7 @@ src/core/lib/gprpp/arena.cc \
src/core/lib/gprpp/arena.h \
src/core/lib/gprpp/atomic.h \
src/core/lib/gprpp/debug_location.h \
src/core/lib/gprpp/dual_ref_counted.h \
src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/fork.h \
src/core/lib/gprpp/global_config.h \

Loading…
Cancel
Save