From 1ba51dcb1a01ec80afa0493d32f90ad91b832048 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 25 Sep 2020 11:09:18 -0700 Subject: [PATCH] Share XdsClient between channels. --- BUILD | 1 + BUILD.gn | 1 + build_autogenerated.yaml | 8 +- gRPC-C++.podspec | 2 + gRPC-Core.podspec | 2 + grpc.gemspec | 1 + package.xml | 1 + .../client_channel/lb_policy/xds/cds.cc | 15 +- .../client_channel/lb_policy/xds/eds.cc | 147 ++++++++---------- .../client_channel/lb_policy/xds/lrs.cc | 21 ++- .../resolver/xds/xds_resolver.cc | 15 +- src/core/ext/xds/xds_client.cc | 76 +++++---- src/core/ext/xds/xds_client.h | 61 +++----- .../plugin_registry/grpc_plugin_registry.cc | 6 + test/cpp/end2end/xds_end2end_test.cc | 67 +++++++- tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 17 files changed, 227 insertions(+), 199 deletions(-) diff --git a/BUILD b/BUILD index 1e1eb64b4c8..64414ea17a7 100644 --- a/BUILD +++ b/BUILD @@ -980,6 +980,7 @@ grpc_cc_library( language = "c++", public_hdrs = GRPC_PUBLIC_HDRS, deps = [ + "dual_ref_counted", "eventmanager_libuv", "gpr_base", "grpc_codegen", diff --git a/BUILD.gn b/BUILD.gn index 5a5adb0c61c..37263d41543 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -606,6 +606,7 @@ config("grpc_config") { "src/core/lib/debug/trace.h", "src/core/lib/gprpp/atomic.h", "src/core/lib/gprpp/debug_location.h", + "src/core/lib/gprpp/dual_ref_counted.h", "src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted_ptr.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index ada639cd9ee..e7c55d589de 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -570,6 +570,7 @@ libs: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic.h - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -1470,6 +1471,7 @@ libs: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic.h - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -5632,8 +5634,7 @@ targets: gtest: true build: test language: c++ - headers: - - src/core/lib/gprpp/dual_ref_counted.h + headers: [] src: - test/core/gprpp/dual_ref_counted_test.cc deps: @@ -6764,8 +6765,7 @@ targets: gtest: true build: test language: c++ - headers: - - src/core/lib/gprpp/dual_ref_counted.h + headers: [] src: - test/core/gprpp/ref_counted_ptr_test.cc deps: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 54c226e2b61..fbfbfe9eb3c 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -407,6 +407,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/arena.h', 'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/debug_location.h', + 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/global_config.h', 'src/core/lib/gprpp/global_config_custom.h', @@ -913,6 +914,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/arena.h', 'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/debug_location.h', + 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/global_config.h', 'src/core/lib/gprpp/global_config_custom.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index b75fb252c55..a1c97afcca5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -640,6 +640,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/arena.h', 'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/debug_location.h', + 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/fork.cc', 'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/global_config.h', @@ -1340,6 +1341,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/arena.h', 'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/debug_location.h', + 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/global_config.h', 'src/core/lib/gprpp/global_config_custom.h', diff --git a/grpc.gemspec b/grpc.gemspec index a27b1fe4c02..16b226e8a3e 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -558,6 +558,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gprpp/arena.h ) s.files += %w( src/core/lib/gprpp/atomic.h ) s.files += %w( src/core/lib/gprpp/debug_location.h ) + s.files += %w( src/core/lib/gprpp/dual_ref_counted.h ) s.files += %w( src/core/lib/gprpp/fork.cc ) s.files += %w( src/core/lib/gprpp/fork.h ) s.files += %w( src/core/lib/gprpp/global_config.h ) diff --git a/package.xml b/package.xml index dcb8a80f9a8..01f5c4c0d15 100644 --- a/package.xml +++ b/package.xml @@ -538,6 +538,7 @@ + diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index e5762d62f11..796d2ff90dc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -234,8 +234,8 @@ void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, CdsLb::CdsLb(RefCountedPtr xds_client, Args args) : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { - gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p from channel", - this, xds_client_.get()); + gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this, + xds_client_.get()); } } @@ -430,12 +430,13 @@ class CdsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - RefCountedPtr xds_client = - XdsClient::GetFromChannelArgs(*args.args); - if (xds_client == nullptr) { + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, - "XdsClient not present in channel args -- cannot instantiate " - "cds LB policy"); + "cannot get XdsClient to instantiate cds LB policy: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); return nullptr; } return MakeOrphanable(std::move(xds_client), std::move(args)); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 91c025cf85e..00db1b62d6b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -91,7 +91,7 @@ class EdsLbConfig : public LoadBalancingPolicy::Config { // EDS LB policy. class EdsLb : public LoadBalancingPolicy { public: - explicit EdsLb(Args args); + EdsLb(RefCountedPtr xds_client, Args args); const char* name() const override { return kEds; } @@ -198,7 +198,7 @@ class EdsLb : public LoadBalancingPolicy { // Caller must ensure that config_ is set before calling. const absl::string_view GetEdsResourceName() const { - if (xds_client_from_channel_ == nullptr) return server_name_; + if (!is_xds_uri_) return server_name_; if (!config_->eds_service_name().empty()) { return config_->eds_service_name(); } @@ -209,17 +209,13 @@ class EdsLb : public LoadBalancingPolicy { // for LRS load reporting. // Caller must ensure that config_ is set before calling. std::pair GetLrsClusterKey() const { - if (xds_client_from_channel_ == nullptr) return {server_name_, nullptr}; + if (!is_xds_uri_) return {server_name_, nullptr}; return {config_->cluster_name(), config_->eds_service_name()}; } - XdsClient* xds_client() const { - return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get() - : xds_client_.get(); - } - // Server name from target URI. std::string server_name_; + bool is_xds_uri_; // Current channel args and config from the resolver. const grpc_channel_args* args_ = nullptr; @@ -229,11 +225,7 @@ class EdsLb : public LoadBalancingPolicy { bool shutting_down_ = false; // The xds client and endpoint watcher. - // If we get the XdsClient from the channel, we store it in - // xds_client_from_channel_; if we create it ourselves, we store it in - // xds_client_. - RefCountedPtr xds_client_from_channel_; - OrphanablePtr xds_client_; + RefCountedPtr xds_client_; // A pointer to the endpoint watcher, to be used when cancelling the watch. // Note that this is not owned, so this pointer must never be derefernced. EndpointWatcher* endpoint_watcher_ = nullptr; @@ -380,25 +372,38 @@ void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) { // EdsLb public methods // -EdsLb::EdsLb(Args args) - : LoadBalancingPolicy(std::move(args)), - xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)) { +EdsLb::EdsLb(RefCountedPtr xds_client, Args args) + : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] created -- xds client from channel: %p", this, - xds_client_from_channel_.get()); + gpr_log(GPR_INFO, "[edslb %p] created -- using xds client %p", this, + xds_client_.get()); } // Record server name. - const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(arg); + const char* server_uri = + grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path; + is_xds_uri_ = strcmp(uri->scheme, "xds") == 0; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] server name from channel: %s", this, - server_name_.c_str()); + gpr_log(GPR_INFO, "[edslb %p] server name from channel (is_xds_uri=%d): %s", + this, is_xds_uri_, server_name_.c_str()); } grpc_uri_destroy(uri); + // EDS-only flow. + if (!is_xds_uri_) { + // Setup channelz linkage. + channelz::ChannelNode* parent_channelz_node = + grpc_channel_args_find_pointer( + args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE); + if (parent_channelz_node != nullptr) { + xds_client_->AddChannelzLinkage(parent_channelz_node); + } + // Couple polling. + grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), + interested_parties()); + } } EdsLb::~EdsLb() { @@ -417,32 +422,29 @@ void EdsLb::ShutdownLocked() { child_picker_.reset(); MaybeDestroyChildPolicyLocked(); drop_stats_.reset(); - // Cancel the endpoint watch here instead of in our dtor if we are using the - // xds resolver, because the watcher holds a ref to us and we might not be - // destroying the XdsClient, leading to a situation where this LB policy is - // never destroyed. - if (xds_client_from_channel_ != nullptr) { - if (config_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this, - std::string(GetEdsResourceName()).c_str()); - } - xds_client()->CancelEndpointDataWatch(GetEdsResourceName(), - endpoint_watcher_); + // Cancel watcher. + if (endpoint_watcher_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { + gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this, + std::string(GetEdsResourceName()).c_str()); } - xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb"); + xds_client_->CancelEndpointDataWatch(GetEdsResourceName(), + endpoint_watcher_); } - if (xds_client_ != nullptr) { + if (!is_xds_uri_) { + // Remove channelz linkage. channelz::ChannelNode* parent_channelz_node = grpc_channel_args_find_pointer( args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); if (parent_channelz_node != nullptr) { xds_client_->RemoveChannelzLinkage(parent_channelz_node); } + // Decouple polling. grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), interested_parties()); - xds_client_.reset(); } + xds_client_.reset(DEBUG_LOCATION, "EdsLb"); + // Destroy channel args. grpc_channel_args_destroy(args_); args_ = nullptr; } @@ -467,35 +469,13 @@ void EdsLb::UpdateLocked(UpdateArgs args) { grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; - if (is_initial_update) { - // Initialize XdsClient. - if (xds_client_from_channel_ == nullptr) { - grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = MakeOrphanable(&error); - // TODO(roth): If we decide that we care about EDS-only mode, add - // proper error handling here. - GPR_ASSERT(error == GRPC_ERROR_NONE); - channelz::ChannelNode* parent_channelz_node = - grpc_channel_args_find_pointer( - args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); - if (parent_channelz_node != nullptr) { - xds_client_->AddChannelzLinkage(parent_channelz_node); - } - grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), - interested_parties()); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] Created xds client %p", this, - xds_client_.get()); - } - } - } // Update drop stats for load reporting if needed. if (is_initial_update || config_->lrs_load_reporting_server_name() != old_config->lrs_load_reporting_server_name()) { drop_stats_.reset(); if (config_->lrs_load_reporting_server_name().has_value()) { const auto key = GetLrsClusterKey(); - drop_stats_ = xds_client()->AddClusterDropStats( + drop_stats_ = xds_client_->AddClusterDropStats( config_->lrs_load_reporting_server_name().value(), key.first /*cluster_name*/, key.second /*eds_service_name*/); } @@ -514,15 +494,14 @@ void EdsLb::UpdateLocked(UpdateArgs args) { auto watcher = absl::make_unique( Ref(DEBUG_LOCATION, "EndpointWatcher")); endpoint_watcher_ = watcher.get(); - xds_client()->WatchEndpointData(GetEdsResourceName(), std::move(watcher)); + xds_client_->WatchEndpointData(GetEdsResourceName(), std::move(watcher)); } } void EdsLb::ResetBackoffLocked() { // When the XdsClient is instantiated in the resolver instead of in this - // LB policy, this is done via the resolver, so we don't need to do it - // for xds_client_from_channel_ here. - if (xds_client_ != nullptr) xds_client_->ResetBackoff(); + // LB policy, this is done via the resolver, so we don't need to do it here. + if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff(); if (child_policy_ != nullptr) { child_policy_->ResetBackoffLocked(); } @@ -789,9 +768,11 @@ void EdsLb::UpdateChildPolicyLocked() { grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked( const grpc_channel_args* args) { - absl::InlinedVector args_to_add = { + grpc_arg args_to_add[] = { // A channel arg indicating if the target is a backend inferred from an // xds load balancer. + // TODO(roth): This isn't needed with the new fallback design. + // Remove as part of implementing the new fallback functionality. grpc_channel_arg_integer_create( const_cast(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), 1), @@ -800,18 +781,8 @@ grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked( grpc_channel_arg_integer_create( const_cast(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), }; - absl::InlinedVector args_to_remove; - if (xds_client_from_channel_ == nullptr) { - args_to_add.emplace_back(xds_client_->MakeChannelArg()); - } else if (!config_->lrs_load_reporting_server_name().has_value()) { - // Remove XdsClient from channel args, so that its presence doesn't - // prevent us from sharing subchannels between channels. - // If load reporting is enabled, this happens in the LRS policy instead. - args_to_remove.push_back(GRPC_ARG_XDS_CLIENT); - } - return grpc_channel_args_copy_and_add_and_remove( - args, args_to_remove.data(), args_to_remove.size(), args_to_add.data(), - args_to_add.size()); + return grpc_channel_args_copy_and_add(args, args_to_add, + GPR_ARRAY_SIZE(args_to_add)); } OrphanablePtr EdsLb::CreateChildPolicyLocked( @@ -863,7 +834,17 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - return MakeOrphanable(std::move(args), &grpc_lb_eds_trace); + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, + "cannot get XdsClient to instantiate eds LB policy: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + return nullptr; + } + return MakeOrphanable(std::move(xds_client), + std::move(args)); } const char* name() const override { return kEds; } @@ -974,8 +955,9 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { private: class EdsChildHandler : public ChildPolicyHandler { public: - EdsChildHandler(Args args, TraceFlag* tracer) - : ChildPolicyHandler(std::move(args), tracer) {} + EdsChildHandler(RefCountedPtr xds_client, Args args) + : ChildPolicyHandler(std::move(args), &grpc_lb_eds_trace), + xds_client_(std::move(xds_client)) {} bool ConfigChangeRequiresNewPolicyInstance( LoadBalancingPolicy::Config* old_config, @@ -991,8 +973,11 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr CreateLoadBalancingPolicy( const char* name, LoadBalancingPolicy::Args args) const override { - return MakeOrphanable(std::move(args)); + return MakeOrphanable(xds_client_, std::move(args)); } + + private: + RefCountedPtr xds_client_; }; }; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc b/src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc index 297f83c4226..917cf493d09 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc @@ -196,8 +196,8 @@ LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick( LrsLb::LrsLb(RefCountedPtr xds_client, Args args) : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) { - gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel", - this, xds_client_.get()); + gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p", this, + xds_client_.get()); } } @@ -255,11 +255,9 @@ void LrsLb::UpdateLocked(UpdateArgs args) { config_->eds_service_name(), config_->locality_name()); MaybeUpdatePickerLocked(); } - // Remove XdsClient from channel args, so that its presence doesn't - // prevent us from sharing subchannels between channels. - grpc_channel_args* new_args = XdsClient::RemoveFromChannelArgs(*args.args); // Update child policy. - UpdateChildPolicyLocked(std::move(args.addresses), new_args); + UpdateChildPolicyLocked(std::move(args.addresses), args.args); + args.args = nullptr; } void LrsLb::MaybeUpdatePickerLocked() { @@ -368,12 +366,13 @@ class LrsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - RefCountedPtr xds_client = - XdsClient::GetFromChannelArgs(*args.args); - if (xds_client == nullptr) { + grpc_error* error = GRPC_ERROR_NONE; + RefCountedPtr xds_client = XdsClient::GetOrCreate(&error); + if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, - "XdsClient not present in channel args -- cannot instantiate " - "lrs LB policy"); + "cannot get XdsClient to instantiate lrs LB policy: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); return nullptr; } return MakeOrphanable(std::move(xds_client), std::move(args)); diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 51f5db83542..6dc9a3f02a8 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -183,7 +183,7 @@ class XdsResolver : public Resolver { std::string server_name_; const grpc_channel_args* args_; grpc_pollset_set* interested_parties_; - OrphanablePtr xds_client_; + RefCountedPtr xds_client_; XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr; std::string route_config_name_; XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr; @@ -513,7 +513,7 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = MakeOrphanable(&error); + xds_client_ = XdsClient::GetOrCreate(&error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " @@ -607,9 +607,8 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { void XdsResolver::OnError(grpc_error* error) { gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s", this, grpc_error_string(error)); - grpc_arg xds_client_arg = xds_client_->MakeChannelArg(); Result result; - result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1); + result.args = grpc_channel_args_copy(args_); result.service_config_error = error; result_handler()->ReturnResult(std::move(result)); } @@ -674,12 +673,8 @@ void XdsResolver::GenerateResult() { gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this, result.service_config->json_string().c_str()); } - grpc_arg new_args[] = { - xds_client_->MakeChannelArg(), - config_selector->MakeChannelArg(), - }; - result.args = - grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args)); + grpc_arg new_arg = config_selector->MakeChannelArg(); + result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); result_handler()->ReturnResult(std::move(result)); } diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 632b9600b81..ee96b9fe5c2 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -70,16 +70,12 @@ namespace grpc_core { TraceFlag grpc_xds_client_trace(false, "xds_client"); namespace { -const grpc_channel_args* g_channel_args = nullptr; -} // namespace - -namespace internal { -void SetXdsChannelArgsForTest(grpc_channel_args* args) { - g_channel_args = args; -} +Mutex* g_mu = nullptr; +const grpc_channel_args* g_channel_args = nullptr; +XdsClient* g_xds_client = nullptr; -} // namespace internal +} // namespace // // Internal class declarations @@ -435,7 +431,7 @@ class XdsClient::ChannelState::StateWatcher // XdsClient::ChannelState // -XdsClient::ChannelState::ChannelState(RefCountedPtr xds_client, +XdsClient::ChannelState::ChannelState(WeakRefCountedPtr xds_client, grpc_channel* channel) : InternallyRefCounted(&grpc_xds_client_trace), xds_client_(std::move(xds_client)), @@ -1739,7 +1735,7 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap, } // namespace XdsClient::XdsClient(grpc_error** error) - : InternallyRefCounted(&grpc_xds_client_trace), + : DualRefCounted(&grpc_xds_client_trace), request_timeout_(GetRequestTimeout()), interested_parties_(grpc_pollset_set_create()), bootstrap_( @@ -1765,7 +1761,7 @@ XdsClient::XdsClient(grpc_error** error) } // Create ChannelState object. chand_ = MakeOrphanable( - Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); + WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); } XdsClient::~XdsClient() { @@ -1797,6 +1793,10 @@ void XdsClient::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this); } + { + MutexLock lock(g_mu); + if (g_xds_client == this) g_xds_client = nullptr; + } { MutexLock lock(&mu_); shutting_down_ = true; @@ -1813,7 +1813,6 @@ void XdsClient::Orphan() { endpoint_map_.clear(); } } - Unref(DEBUG_LOCATION, "XdsClient::Orphan()"); } void XdsClient::WatchListenerData( @@ -2168,41 +2167,40 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( return snapshot_map; } -void* XdsClient::ChannelArgCopy(void* p) { - XdsClient* xds_client = static_cast(p); - xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); - return p; -} +// +// accessors for global state +// -void XdsClient::ChannelArgDestroy(void* p) { - XdsClient* xds_client = static_cast(p); - xds_client->Unref(DEBUG_LOCATION, "channel arg"); +void XdsClientGlobalInit() { g_mu = new Mutex; } + +void XdsClientGlobalShutdown() { + delete g_mu; + g_mu = nullptr; } -int XdsClient::ChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); } +RefCountedPtr XdsClient::GetOrCreate(grpc_error** error) { + MutexLock lock(g_mu); + if (g_xds_client != nullptr) { + auto xds_client = g_xds_client->RefIfNonZero(); + if (xds_client != nullptr) return xds_client; + } + auto xds_client = MakeRefCounted(error); + g_xds_client = xds_client.get(); + return xds_client; +} -const grpc_arg_pointer_vtable XdsClient::kXdsClientVtable = { - XdsClient::ChannelArgCopy, XdsClient::ChannelArgDestroy, - XdsClient::ChannelArgCmp}; +namespace internal { -grpc_arg XdsClient::MakeChannelArg() const { - return grpc_channel_arg_pointer_create(const_cast(GRPC_ARG_XDS_CLIENT), - const_cast(this), - &XdsClient::kXdsClientVtable); +void SetXdsChannelArgsForTest(grpc_channel_args* args) { + MutexLock lock(g_mu); + g_channel_args = args; } -RefCountedPtr XdsClient::GetFromChannelArgs( - const grpc_channel_args& args) { - XdsClient* xds_client = - grpc_channel_args_find_pointer(&args, GRPC_ARG_XDS_CLIENT); - if (xds_client == nullptr) return nullptr; - return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); +void UnsetGlobalXdsClientForTest() { + MutexLock lock(g_mu); + g_xds_client = nullptr; } -grpc_channel_args* XdsClient::RemoveFromChannelArgs( - const grpc_channel_args& args) { - const char* arg_name = GRPC_ARG_XDS_CLIENT; - return grpc_channel_args_copy_and_remove(&args, &arg_name, 1); -} +} // namespace internal } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 147af7ce012..6eeb1790e66 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -29,6 +29,7 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" @@ -40,17 +41,14 @@ namespace grpc_core { extern TraceFlag xds_client_trace; -class XdsClient : public InternallyRefCounted { +class XdsClient : public DualRefCounted { public: // Listener data watcher interface. Implemented by callers. class ListenerWatcherInterface { public: virtual ~ListenerWatcherInterface() = default; - virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; - virtual void OnError(grpc_error* error) = 0; - virtual void OnResourceDoesNotExist() = 0; }; @@ -58,11 +56,8 @@ class XdsClient : public InternallyRefCounted { class RouteConfigWatcherInterface { public: virtual ~RouteConfigWatcherInterface() = default; - virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; - virtual void OnError(grpc_error* error) = 0; - virtual void OnResourceDoesNotExist() = 0; }; @@ -70,11 +65,8 @@ class XdsClient : public InternallyRefCounted { class ClusterWatcherInterface { public: virtual ~ClusterWatcherInterface() = default; - virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; - virtual void OnError(grpc_error* error) = 0; - virtual void OnResourceDoesNotExist() = 0; }; @@ -82,16 +74,17 @@ class XdsClient : public InternallyRefCounted { class EndpointWatcherInterface { public: virtual ~EndpointWatcherInterface() = default; - virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; - virtual void OnError(grpc_error* error) = 0; - virtual void OnResourceDoesNotExist() = 0; }; - // If *error is not GRPC_ERROR_NONE after construction, then there was + // Factory function to get or create the global XdsClient instance. + // If *error is not GRPC_ERROR_NONE upon return, then there was // an error initializing the client. + static RefCountedPtr GetOrCreate(grpc_error** error); + + // Callers should not instantiate directly. Use GetOrCreate() instead. explicit XdsClient(grpc_error** error); ~XdsClient(); @@ -188,24 +181,14 @@ class XdsClient : public InternallyRefCounted { // Resets connection backoff state. void ResetBackoff(); - // Helpers for encoding the XdsClient object in channel args. - grpc_arg MakeChannelArg() const; - static RefCountedPtr GetFromChannelArgs( - const grpc_channel_args& args); - static grpc_channel_args* RemoveFromChannelArgs( - const grpc_channel_args& args); - private: // Contains a channel to the xds server and all the data related to the // channel. Holds a ref to the xds client object. - // TODO(roth): This is separate from the XdsClient object because it was - // originally designed to be able to swap itself out in case the - // balancer name changed. Now that the balancer name is going to be - // coming from the bootstrap file, we don't really need this level of - // indirection unless we decide to support watching the bootstrap file - // for changes. At some point, if we decide that we're never going to - // need to do that, then we can eliminate this class and move its - // contents directly into the XdsClient class. + // + // Currently, there is only one ChannelState object per XdsClient + // object, and it has essentially the same lifetime. But in the + // future, when we add federation support, a single XdsClient may have + // multiple underlying channels to talk to different xDS servers. class ChannelState : public InternallyRefCounted { public: template @@ -214,7 +197,8 @@ class XdsClient : public InternallyRefCounted { class AdsCallState; class LrsCallState; - ChannelState(RefCountedPtr xds_client, grpc_channel* channel); + ChannelState(WeakRefCountedPtr xds_client, + grpc_channel* channel); ~ChannelState(); void Orphan() override; @@ -240,7 +224,7 @@ class XdsClient : public InternallyRefCounted { class StateWatcher; // The owning xds client. - RefCountedPtr xds_client_; + WeakRefCountedPtr xds_client_; // The channel and its status. grpc_channel* channel_; @@ -283,6 +267,10 @@ class XdsClient : public InternallyRefCounted { absl::optional update; }; + // TODO(roth): Change this to store exactly one instance of + // XdsClusterDropStats and exactly one instance of + // XdsClusterLocalityStats per locality. We can return multiple refs + // to the same object instead of registering multiple objects. struct LoadReportState { struct LocalityState { std::set locality_stats; @@ -303,13 +291,6 @@ class XdsClient : public InternallyRefCounted { XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set& clusters); - // Channel arg vtable functions. - static void* ChannelArgCopy(void* p); - static void ChannelArgDestroy(void* p); - static int ChannelArgCmp(void* p, void* q); - - static const grpc_arg_pointer_vtable kXdsClientVtable; - const grpc_millis request_timeout_; grpc_pollset_set* interested_parties_; std::unique_ptr bootstrap_; @@ -319,7 +300,6 @@ class XdsClient : public InternallyRefCounted { // The channel for communicating with the xds server. OrphanablePtr chand_; - RefCountedPtr parent_channelz_node_; // One entry for each watched LDS resource. std::map listener_map_; @@ -341,9 +321,8 @@ class XdsClient : public InternallyRefCounted { }; namespace internal { - void SetXdsChannelArgsForTest(grpc_channel_args* args); - +void UnsetGlobalXdsClientForTest(); } // namespace internal } // namespace grpc_core diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index 5d724ee8fc9..b079267442f 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -62,6 +62,10 @@ void grpc_workaround_cronet_compression_filter_init(void); void grpc_workaround_cronet_compression_filter_shutdown(void); #ifndef GRPC_NO_XDS +namespace grpc_core { +void XdsClientGlobalInit(); +void XdsClientGlobalShutdown(); +} // namespace grpc_core void grpc_certificate_provider_registry_init(void); void grpc_certificate_provider_registry_shutdown(void); void grpc_lb_policy_cds_init(void); @@ -118,6 +122,8 @@ void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_workaround_cronet_compression_filter_init, grpc_workaround_cronet_compression_filter_shutdown); #ifndef GRPC_NO_XDS + grpc_register_plugin(grpc_core::XdsClientGlobalInit, + grpc_core::XdsClientGlobalShutdown); grpc_register_plugin(grpc_certificate_provider_registry_init, grpc_certificate_provider_registry_shutdown); grpc_register_plugin(grpc_lb_policy_cds_init, diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 14aac91884c..aa6666727d8 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -673,6 +673,11 @@ class AdsServiceImpl : public std::enable_shared_from_this { } } + std::set clients() { + grpc_core::MutexLock lock(&clients_mu_); + return clients_; + } + private: // A queue of resource type/name pairs that have changed since the client // subscribed to them. @@ -719,6 +724,7 @@ class AdsServiceImpl : public std::enable_shared_from_this { Status StreamAggregatedResources(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this); + parent_->AddClient(context->peer()); if (is_v2_) { parent_->seen_v2_client_ = true; } else { @@ -936,6 +942,7 @@ class AdsServiceImpl : public std::enable_shared_from_this { } } gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this); + parent_->RemoveClient(context->peer()); return Status::OK; } @@ -1088,6 +1095,16 @@ class AdsServiceImpl : public std::enable_shared_from_this { } } + void AddClient(const std::string& client) { + grpc_core::MutexLock lock(&clients_mu_); + clients_.insert(client); + } + + void RemoveClient(const std::string& client) { + grpc_core::MutexLock lock(&clients_mu_); + clients_.erase(client); + } + RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService, ::envoy::api::v2::DiscoveryRequest, ::envoy::api::v2::DiscoveryResponse> @@ -1116,6 +1133,9 @@ class AdsServiceImpl : public std::enable_shared_from_this { // yet been destroyed by UnsetResource()). // - There is at least one subscription for the resource. ResourceMap resource_map_; + + grpc_core::Mutex clients_mu_; + std::set clients_; }; class LrsServiceImpl : public std::enable_shared_from_this { @@ -1196,7 +1216,7 @@ class LrsServiceImpl : public std::enable_shared_from_this { Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override { gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this); - GPR_ASSERT(parent_->client_load_reporting_interval_seconds_ > 0); + EXPECT_GT(parent_->client_load_reporting_interval_seconds_, 0); // Take a reference of the LrsServiceImpl object, reference will go // out of scope after this method exits. std::shared_ptr lrs_service_impl = @@ -1377,6 +1397,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam { void TearDown() override { ShutdownAllBackends(); for (auto& balancer : balancers_) balancer->Shutdown(); + // Make sure each test creates a new XdsClient instance rather than + // reusing the one from the previous test. This avoids spurious failures + // caused when a load reporting test runs after a non-load reporting test + // and the XdsClient is still talking to the old LRS server, which fails + // because it's not expecting the client to connect. It also + // ensures that each test can independently set the global channel + // args for the xDS channel. + grpc_core::internal::UnsetGlobalXdsClientForTest(); } void StartAllBackends() { @@ -1392,6 +1420,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } void ResetStub(int failover_timeout = 0) { + channel_ = CreateChannel(failover_timeout); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_); + stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_); + } + + std::shared_ptr CreateChannel( + int failover_timeout = 0, const char* server_name = kServerName) { ChannelArguments args; if (failover_timeout > 0) { args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout); @@ -1403,7 +1439,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { response_generator_.get()); } std::string uri = absl::StrCat( - GetParam().use_xds_resolver() ? "xds" : "fake", ":///", kServerName); + GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name); // TODO(dgq): templatize tests to run everything using both secure and // insecure channel credentials. grpc_channel_credentials* channel_creds = @@ -1415,10 +1451,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { channel_creds, call_creds, nullptr))); call_creds->Unref(); channel_creds->Unref(); - channel_ = ::grpc::CreateCustomChannel(uri, creds, args); - stub_ = grpc::testing::EchoTestService::NewStub(channel_); - stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_); - stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_); + return ::grpc::CreateCustomChannel(uri, creds, args); } enum RpcService { @@ -2226,6 +2259,28 @@ TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) { WaitForAllBackends(); } +TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) { + const char* kNewServerName = "new-server.example.com"; + Listener listener = balancers_[0]->ads_service()->default_listener(); + listener.set_name(kNewServerName); + balancers_[0]->ads_service()->SetLdsResource(listener); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + WaitForAllBackends(); + // Create second channel and tell it to connect to kNewServerName. + auto channel2 = CreateChannel(/*failover_timeout=*/0, kNewServerName); + channel2->GetState(/*try_to_connect=*/true); + ASSERT_TRUE( + channel2->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100))); + // Make sure there's only one client connected. + EXPECT_EQ(1UL, balancers_[0]->ads_service()->clients().size()); +} + class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest { public: XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index b29d607492e..dda80857d88 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1494,6 +1494,7 @@ src/core/lib/gprpp/arena.cc \ src/core/lib/gprpp/arena.h \ src/core/lib/gprpp/atomic.h \ src/core/lib/gprpp/debug_location.h \ +src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/fork.cc \ src/core/lib/gprpp/fork.h \ src/core/lib/gprpp/global_config.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 7812de9c5a0..76f4f14794b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1333,6 +1333,7 @@ src/core/lib/gprpp/arena.cc \ src/core/lib/gprpp/arena.h \ src/core/lib/gprpp/atomic.h \ src/core/lib/gprpp/debug_location.h \ +src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/fork.cc \ src/core/lib/gprpp/fork.h \ src/core/lib/gprpp/global_config.h \