XdsClient: refcount tracing improvements (#30277)

* XdsClient: improve ref-count trace logging

* better message for GetOrCreate()

* add more tracing

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30325/head
Mark D. Roth 3 years ago committed by GitHub
parent 908e46d137
commit 4341a810f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 3
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  4. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  5. 29
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  6. 14
      src/core/ext/xds/xds_client.cc
  7. 3
      src/core/ext/xds/xds_client.h
  8. 87
      src/core/ext/xds/xds_client_grpc.cc
  9. 8
      src/core/ext/xds/xds_client_grpc.h
  10. 70
      src/core/ext/xds/xds_server_config_fetcher.cc
  11. 19
      src/core/lib/channel/channel_args.h

@ -3166,6 +3166,7 @@ grpc_cc_library(
deps = [ deps = [
"avl", "avl",
"channel_stack_type", "channel_stack_type",
"debug_location",
"dual_ref_counted", "dual_ref_counted",
"gpr_base", "gpr_base",
"grpc_codegen", "grpc_codegen",
@ -4027,6 +4028,7 @@ grpc_cc_library(
"channel_args_preconditioning", "channel_args_preconditioning",
"channel_fwd", "channel_fwd",
"config", "config",
"debug_location",
"exec_ctx", "exec_ctx",
"gpr_base", "gpr_base",
"grpc_base", "grpc_base",

@ -714,7 +714,8 @@ class CdsLbFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(); auto xds_client =
args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION, "CdsLb");
if (xds_client == nullptr) { if (xds_client == nullptr) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate " "XdsClient not present in channel args -- cannot instantiate "

@ -466,7 +466,7 @@ void XdsClusterImplLb::ShutdownLocked() {
// the child. // the child.
picker_.reset(); picker_.reset();
drop_stats_.reset(); drop_stats_.reset();
xds_client_.reset(); xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
} }
void XdsClusterImplLb::ExitIdleLocked() { void XdsClusterImplLb::ExitIdleLocked() {
@ -691,7 +691,8 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(); auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
"XdsClusterImplLb");
if (xds_client == nullptr) { if (xds_client == nullptr) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate " "XdsClient not present in channel args -- cannot instantiate "

@ -1059,7 +1059,8 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
public: public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override { LoadBalancingPolicy::Args args) const override {
auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(); auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(
DEBUG_LOCATION, "XdsClusterResolverLbFactory");
if (xds_client == nullptr) { if (xds_client == nullptr) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate " "XdsClient not present in channel args -- cannot instantiate "
@ -1281,6 +1282,10 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
&grpc_lb_xds_cluster_resolver_trace), &grpc_lb_xds_cluster_resolver_trace),
xds_client_(std::move(xds_client)) {} xds_client_(std::move(xds_client)) {}
~XdsClusterResolverChildHandler() override {
xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverChildHandler");
}
bool ConfigChangeRequiresNewPolicyInstance( bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config, LoadBalancingPolicy::Config* old_config,
LoadBalancingPolicy::Config* new_config) const override { LoadBalancingPolicy::Config* new_config) const override {
@ -1296,7 +1301,9 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* /*name*/, LoadBalancingPolicy::Args args) const override { const char* /*name*/, LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args)); return MakeOrphanable<XdsClusterResolverLb>(
xds_client_->Ref(DEBUG_LOCATION, "XdsClusterResolverLb"),
std::move(args));
} }
private: private:

@ -787,24 +787,22 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
void XdsResolver::StartLocked() { void XdsResolver::StartLocked() {
grpc_error_handle error = GRPC_ERROR_NONE; grpc_error_handle error = GRPC_ERROR_NONE;
xds_client_ = GrpcXdsClient::GetOrCreate(args_, &error); auto xds_client = GrpcXdsClient::GetOrCreate(args_, "xds resolver");
if (!GRPC_ERROR_IS_NONE(error)) { if (!xds_client.ok()) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in " "Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s", "TRANSIENT_FAILURE: %s",
grpc_error_std_string(error).c_str()); xds_client.status().ToString().c_str());
std::string error_message; absl::Status status = absl::UnavailableError(absl::StrCat(
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &error_message); "Failed to create XdsClient: ", xds_client.status().message()));
absl::Status status = absl::UnavailableError(
absl::StrCat("Failed to create XdsClient: ", error_message));
Result result; Result result;
result.addresses = status; result.addresses = status;
result.service_config = std::move(status); result.service_config = std::move(status);
result.args = args_; result.args = args_;
result_handler_->ReportResult(std::move(result)); result_handler_->ReportResult(std::move(result));
GRPC_ERROR_UNREF(error);
return; return;
} }
xds_client_ = std::move(*xds_client);
std::string resource_name_fragment(absl::StripPrefix(uri_.path(), "/")); std::string resource_name_fragment(absl::StripPrefix(uri_.path(), "/"));
if (!uri_.authority().empty()) { if (!uri_.authority().empty()) {
// target_uri.authority is set case // target_uri.authority is set case
@ -876,7 +874,7 @@ void XdsResolver::ShutdownLocked() {
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
static_cast<GrpcXdsClient*>(xds_client_.get())->interested_parties(), static_cast<GrpcXdsClient*>(xds_client_.get())->interested_parties(),
interested_parties_); interested_parties_);
xds_client_.reset(); xds_client_.reset(DEBUG_LOCATION, "xds resolver");
} }
} }
@ -974,7 +972,11 @@ void XdsResolver::OnError(absl::string_view context, absl::Status status) {
Result result; Result result;
result.addresses = status; result.addresses = status;
result.service_config = std::move(status); result.service_config = std::move(status);
result.args = args_.SetObject(xds_client_); // Need to explicitly convert to the right RefCountedPtr<> type for
// use with ChannelArgs::SetObject().
RefCountedPtr<GrpcXdsClient> xds_client =
xds_client_->Ref(DEBUG_LOCATION, "xds resolver result");
result.args = args_.SetObject(std::move(xds_client));
result_handler_->ReportResult(std::move(result)); result_handler_->ReportResult(std::move(result));
} }
@ -1066,7 +1068,12 @@ void XdsResolver::GenerateResult() {
? std::string((*result.service_config)->json_string()).c_str() ? std::string((*result.service_config)->json_string()).c_str()
: result.service_config.status().ToString().c_str()); : result.service_config.status().ToString().c_str());
} }
result.args = args_.SetObject(xds_client_).SetObject(config_selector); // Need to explicitly convert to the right RefCountedPtr<> type for
// use with ChannelArgs::SetObject().
RefCountedPtr<GrpcXdsClient> xds_client =
xds_client_->Ref(DEBUG_LOCATION, "xds resolver result");
result.args =
args_.SetObject(std::move(xds_client)).SetObject(config_selector);
result_handler_->ReportResult(std::move(result)); result_handler_->ReportResult(std::move(result));
} }

@ -1426,10 +1426,10 @@ void XdsClient::Orphan() {
} }
RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked( RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server) { const XdsBootstrap::XdsServer& server, const char* reason) {
auto it = xds_server_channel_map_.find(server); auto it = xds_server_channel_map_.find(server);
if (it != xds_server_channel_map_.end()) { if (it != xds_server_channel_map_.end()) {
return it->second->Ref(DEBUG_LOCATION, "Authority"); return it->second->Ref(DEBUG_LOCATION, reason);
} }
// Channel not found, so create a new one. // Channel not found, so create a new one.
auto channel_state = MakeRefCounted<ChannelState>( auto channel_state = MakeRefCounted<ChannelState>(
@ -1507,7 +1507,7 @@ void XdsClient::WatchResource(const XdsResourceType* type,
// needed. // needed.
if (authority_state.channel_state == nullptr) { if (authority_state.channel_state == nullptr) {
authority_state.channel_state = authority_state.channel_state =
GetOrCreateChannelStateLocked(*xds_server); GetOrCreateChannelStateLocked(*xds_server, "start watch");
} }
authority_state.channel_state->SubscribeLocked(type, *resource_name); authority_state.channel_state->SubscribeLocked(type, *resource_name);
} }
@ -1641,8 +1641,8 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()) xds_load_report_server_map_.emplace(xds_server, LoadReportServer())
.first; .first;
if (server_it->second.channel_state == nullptr) { if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = server_it->second.channel_state = GetOrCreateChannelStateLocked(
GetOrCreateChannelStateLocked(xds_server); xds_server, "load report map (drop stats)");
} }
auto load_report_it = server_it->second.load_report_map auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState()) .emplace(std::move(key), LoadReportState())
@ -1707,8 +1707,8 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()) xds_load_report_server_map_.emplace(xds_server, LoadReportServer())
.first; .first;
if (server_it->second.channel_state == nullptr) { if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = server_it->second.channel_state = GetOrCreateChannelStateLocked(
GetOrCreateChannelStateLocked(xds_server); xds_server, "load report map (locality stats)");
} }
auto load_report_it = server_it->second.load_report_map auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState()) .emplace(std::move(key), LoadReportState())

@ -297,7 +297,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked( RefCountedPtr<ChannelState> GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::unique_ptr<XdsBootstrap> bootstrap_; std::unique_ptr<XdsBootstrap> bootstrap_;
OrphanablePtr<XdsTransportFactory> transport_factory_; OrphanablePtr<XdsTransportFactory> transport_factory_;

@ -24,6 +24,7 @@
#include <utility> #include <utility>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
@ -42,15 +43,18 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_refcount.h" #include "src/core/lib/slice/slice_refcount.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core { namespace grpc_core {
@ -58,7 +62,7 @@ namespace {
Mutex* g_mu = nullptr; Mutex* g_mu = nullptr;
const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; GrpcXdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
} // namespace } // namespace
@ -82,8 +86,7 @@ void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
namespace { namespace {
std::string GetBootstrapContents(const char* fallback_config, absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
grpc_error_handle* error) {
// First, try GRPC_XDS_BOOTSTRAP env var. // First, try GRPC_XDS_BOOTSTRAP env var.
UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP")); UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
if (path != nullptr) { if (path != nullptr) {
@ -94,9 +97,9 @@ std::string GetBootstrapContents(const char* fallback_config,
path.get()); path.get());
} }
grpc_slice contents; grpc_slice contents;
*error = grpc_error_handle error =
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents); grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
if (!GRPC_ERROR_IS_NONE(*error)) return ""; if (!GRPC_ERROR_IS_NONE(error)) return grpc_error_to_absl_status(error);
std::string contents_str(StringViewFromSlice(contents)); std::string contents_str(StringViewFromSlice(contents));
grpc_slice_unref_internal(contents); grpc_slice_unref_internal(contents);
return contents_str; return contents_str;
@ -119,56 +122,51 @@ std::string GetBootstrapContents(const char* fallback_config,
return fallback_config; return fallback_config;
} }
// No bootstrap config found. // No bootstrap config found.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( return absl::FailedPreconditionError(
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG " "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
"not defined"); "not defined");
return "";
} }
} // namespace } // namespace
RefCountedPtr<XdsClient> GrpcXdsClient::GetOrCreate(const ChannelArgs& args, absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
grpc_error_handle* error) { const ChannelArgs& args, const char* reason) {
RefCountedPtr<XdsClient> xds_client;
// If getting bootstrap from channel args, create a local XdsClient // If getting bootstrap from channel args, create a local XdsClient
// instance for the channel or server instead of using the global instance. // instance for the channel or server instead of using the global instance.
absl::optional<absl::string_view> bootstrap_config = args.GetString( absl::optional<absl::string_view> bootstrap_config = args.GetString(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG); GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
if (bootstrap_config.has_value()) { if (bootstrap_config.has_value()) {
grpc_error_handle error = GRPC_ERROR_NONE;
std::unique_ptr<XdsBootstrap> bootstrap = std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(*bootstrap_config, error); XdsBootstrap::Create(*bootstrap_config, &error);
if (GRPC_ERROR_IS_NONE(*error)) { if (!GRPC_ERROR_IS_NONE(error)) return grpc_error_to_absl_status(error);
grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>( grpc_channel_args* xds_channel_args = args.GetPointer<grpc_channel_args>(
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
return MakeRefCounted<GrpcXdsClient>( return MakeRefCounted<GrpcXdsClient>(std::move(bootstrap),
std::move(bootstrap), ChannelArgs::FromC(xds_channel_args)); ChannelArgs::FromC(xds_channel_args));
}
return nullptr;
} }
// Otherwise, use the global instance. // Otherwise, use the global instance.
{ MutexLock lock(g_mu);
MutexLock lock(g_mu); if (g_xds_client != nullptr) {
if (g_xds_client != nullptr) { auto xds_client = g_xds_client->RefIfNonZero(DEBUG_LOCATION, reason);
auto xds_client = g_xds_client->RefIfNonZero(); if (xds_client != nullptr) return xds_client;
if (xds_client != nullptr) return xds_client;
}
// Find bootstrap contents.
std::string bootstrap_contents =
GetBootstrapContents(g_fallback_bootstrap_config, error);
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
bootstrap_contents.c_str());
}
// Parse bootstrap.
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(bootstrap_contents, error);
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr;
// Instantiate XdsClient.
xds_client = MakeRefCounted<GrpcXdsClient>(
std::move(bootstrap), ChannelArgs::FromC(g_channel_args));
g_xds_client = xds_client.get();
} }
// Find bootstrap contents.
auto bootstrap_contents = GetBootstrapContents(g_fallback_bootstrap_config);
if (!bootstrap_contents.ok()) return bootstrap_contents.status();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
bootstrap_contents->c_str());
}
// Parse bootstrap.
grpc_error_handle error = GRPC_ERROR_NONE;
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(*bootstrap_contents, &error);
if (!GRPC_ERROR_IS_NONE(error)) return grpc_error_to_absl_status(error);
// Instantiate XdsClient.
auto xds_client = MakeRefCounted<GrpcXdsClient>(
std::move(bootstrap), ChannelArgs::FromC(g_channel_args));
g_xds_client = xds_client.get();
return xds_client; return xds_client;
} }
@ -218,12 +216,11 @@ grpc_slice grpc_dump_xds_configs(void) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_error_handle error = GRPC_ERROR_NONE; grpc_error_handle error = GRPC_ERROR_NONE;
auto xds_client = auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(
grpc_core::GrpcXdsClient::GetOrCreate(grpc_core::ChannelArgs(), &error); grpc_core::ChannelArgs(), "grpc_dump_xds_configs()");
if (!GRPC_ERROR_IS_NONE(error)) { if (!xds_client.ok()) {
// If we aren't using xDS, just return an empty string. // If we aren't using xDS, just return an empty string.
GRPC_ERROR_UNREF(error);
return grpc_empty_slice(); return grpc_empty_slice();
} }
return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary()); return grpc_slice_from_cpp_string((*xds_client)->DumpClientConfigBinary());
} }

@ -21,6 +21,7 @@
#include <memory> #include <memory>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
@ -30,7 +31,6 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
namespace grpc_core { namespace grpc_core {
@ -38,10 +38,8 @@ namespace grpc_core {
class GrpcXdsClient : public XdsClient { class GrpcXdsClient : public XdsClient {
public: public:
// Factory function to get or create the global XdsClient instance. // Factory function to get or create the global XdsClient instance.
// If *error is not GRPC_ERROR_NONE upon return, then there was static absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GetOrCreate(
// an error initializing the client. const ChannelArgs& args, const char* reason);
static RefCountedPtr<XdsClient> GetOrCreate(const ChannelArgs& args,
grpc_error_handle* error);
// Do not instantiate directly -- use GetOrCreate() instead. // Do not instantiate directly -- use GetOrCreate() instead.
GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap, GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap,

@ -54,7 +54,6 @@
#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_channel_stack_modifier.h" #include "src/core/ext/xds/xds_channel_stack_modifier.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_http_filters.h"
@ -69,6 +68,7 @@
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
@ -102,9 +102,13 @@ TraceFlag grpc_xds_server_config_fetcher_trace(false,
// listeners from the xDS control plane. // listeners from the xDS control plane.
class XdsServerConfigFetcher : public grpc_server_config_fetcher { class XdsServerConfigFetcher : public grpc_server_config_fetcher {
public: public:
XdsServerConfigFetcher(RefCountedPtr<XdsClient> xds_client, XdsServerConfigFetcher(RefCountedPtr<GrpcXdsClient> xds_client,
grpc_server_xds_status_notifier notifier); grpc_server_xds_status_notifier notifier);
~XdsServerConfigFetcher() override {
xds_client_.reset(DEBUG_LOCATION, "XdsServerConfigFetcher");
}
void StartWatch(std::string listening_address, void StartWatch(std::string listening_address,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
watcher) override; watcher) override;
@ -114,13 +118,13 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
// Return the interested parties from the xds client so that it can be polled. // Return the interested parties from the xds client so that it can be polled.
grpc_pollset_set* interested_parties() override { grpc_pollset_set* interested_parties() override {
return static_cast<GrpcXdsClient*>(xds_client_.get())->interested_parties(); return xds_client_->interested_parties();
} }
private: private:
class ListenerWatcher; class ListenerWatcher;
const RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<GrpcXdsClient> xds_client_;
const grpc_server_xds_status_notifier serving_status_notifier_; const grpc_server_xds_status_notifier serving_status_notifier_;
Mutex mu_; Mutex mu_;
std::map<grpc_server_config_fetcher::WatcherInterface*, ListenerWatcher*> std::map<grpc_server_config_fetcher::WatcherInterface*, ListenerWatcher*>
@ -140,12 +144,16 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
class XdsServerConfigFetcher::ListenerWatcher class XdsServerConfigFetcher::ListenerWatcher
: public XdsListenerResourceType::WatcherInterface { : public XdsListenerResourceType::WatcherInterface {
public: public:
ListenerWatcher(RefCountedPtr<XdsClient> xds_client, ListenerWatcher(RefCountedPtr<GrpcXdsClient> xds_client,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher, server_config_watcher,
grpc_server_xds_status_notifier serving_status_notifier, grpc_server_xds_status_notifier serving_status_notifier,
std::string listening_address); std::string listening_address);
~ListenerWatcher() override {
xds_client_.reset(DEBUG_LOCATION, "ListenerWatcher");
}
void OnResourceChanged(XdsListenerResource listener) override; void OnResourceChanged(XdsListenerResource listener) override;
void OnError(absl::Status status) override; void OnError(absl::Status status) override;
@ -172,7 +180,7 @@ class XdsServerConfigFetcher::ListenerWatcher
FilterChainMatchManager* filter_chain_match_manager) FilterChainMatchManager* filter_chain_match_manager)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
const RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<GrpcXdsClient> xds_client_;
const std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> const std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher_; server_config_watcher_;
const grpc_server_xds_status_notifier serving_status_notifier_; const grpc_server_xds_status_notifier serving_status_notifier_;
@ -192,11 +200,15 @@ class XdsServerConfigFetcher::ListenerWatcher
class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
: public grpc_server_config_fetcher::ConnectionManager { : public grpc_server_config_fetcher::ConnectionManager {
public: public:
FilterChainMatchManager(RefCountedPtr<XdsClient> xds_client, FilterChainMatchManager(RefCountedPtr<GrpcXdsClient> xds_client,
XdsListenerResource::FilterChainMap filter_chain_map, XdsListenerResource::FilterChainMap filter_chain_map,
absl::optional<XdsListenerResource::FilterChainData> absl::optional<XdsListenerResource::FilterChainData>
default_filter_chain); default_filter_chain);
~FilterChainMatchManager() override {
xds_client_.reset(DEBUG_LOCATION, "FilterChainMatchManager");
}
absl::StatusOr<ChannelArgs> UpdateChannelArgsForConnection( absl::StatusOr<ChannelArgs> UpdateChannelArgsForConnection(
const ChannelArgs& args, grpc_endpoint* tcp) override; const ChannelArgs& args, grpc_endpoint* tcp) override;
@ -246,7 +258,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
void OnError(const std::string& resource_name, absl::Status status); void OnError(const std::string& resource_name, absl::Status status);
void OnResourceDoesNotExist(const std::string& resource_name); void OnResourceDoesNotExist(const std::string& resource_name);
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<GrpcXdsClient> xds_client_;
// This ref is only kept around till the FilterChainMatchManager becomes // This ref is only kept around till the FilterChainMatchManager becomes
// ready. // ready.
RefCountedPtr<ListenerWatcher> listener_watcher_; RefCountedPtr<ListenerWatcher> listener_watcher_;
@ -402,11 +414,15 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
: public ServerConfigSelectorProvider { : public ServerConfigSelectorProvider {
public: public:
DynamicXdsServerConfigSelectorProvider( DynamicXdsServerConfigSelectorProvider(
RefCountedPtr<XdsClient> xds_client, std::string resource_name, RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
absl::StatusOr<XdsRouteConfigResource> initial_resource, absl::StatusOr<XdsRouteConfigResource> initial_resource,
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters); http_filters);
~DynamicXdsServerConfigSelectorProvider() override {
xds_client_.reset(DEBUG_LOCATION, "DynamicXdsServerConfigSelectorProvider");
}
void Orphan() override; void Orphan() override;
absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch( absl::StatusOr<RefCountedPtr<ServerConfigSelector>> Watch(
@ -421,7 +437,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
void OnError(absl::Status status); void OnError(absl::Status status);
void OnResourceDoesNotExist(); void OnResourceDoesNotExist();
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<GrpcXdsClient> xds_client_;
std::string resource_name_; std::string resource_name_;
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters_; http_filters_;
@ -459,7 +475,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
// //
XdsServerConfigFetcher::XdsServerConfigFetcher( XdsServerConfigFetcher::XdsServerConfigFetcher(
RefCountedPtr<XdsClient> xds_client, RefCountedPtr<GrpcXdsClient> xds_client,
grpc_server_xds_status_notifier notifier) grpc_server_xds_status_notifier notifier)
: xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) { : xds_client_(std::move(xds_client)), serving_status_notifier_(notifier) {
GPR_ASSERT(xds_client_ != nullptr); GPR_ASSERT(xds_client_ != nullptr);
@ -481,8 +497,8 @@ void XdsServerConfigFetcher::StartWatch(
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher) { std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> watcher) {
grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
auto listener_watcher = MakeRefCounted<ListenerWatcher>( auto listener_watcher = MakeRefCounted<ListenerWatcher>(
xds_client_, std::move(watcher), serving_status_notifier_, xds_client_->Ref(DEBUG_LOCATION, "ListenerWatcher"), std::move(watcher),
listening_address); serving_status_notifier_, listening_address);
auto* listener_watcher_ptr = listener_watcher.get(); auto* listener_watcher_ptr = listener_watcher.get();
XdsListenerResourceType::StartWatch( XdsListenerResourceType::StartWatch(
xds_client_.get(), xds_client_.get(),
@ -515,7 +531,7 @@ void XdsServerConfigFetcher::CancelWatch(
// //
XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher( XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher(
RefCountedPtr<XdsClient> xds_client, RefCountedPtr<GrpcXdsClient> xds_client,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher, server_config_watcher,
grpc_server_xds_status_notifier serving_status_notifier, grpc_server_xds_status_notifier serving_status_notifier,
@ -539,7 +555,8 @@ void XdsServerConfigFetcher::ListenerWatcher::OnResourceChanged(
return; return;
} }
auto new_filter_chain_match_manager = MakeRefCounted<FilterChainMatchManager>( auto new_filter_chain_match_manager = MakeRefCounted<FilterChainMatchManager>(
xds_client_, std::move(listener.filter_chain_map), xds_client_->Ref(DEBUG_LOCATION, "FilterChainMatchManager"),
std::move(listener.filter_chain_map),
std::move(listener.default_filter_chain)); std::move(listener.default_filter_chain));
MutexLock lock(&mu_); MutexLock lock(&mu_);
if (filter_chain_match_manager_ == nullptr || if (filter_chain_match_manager_ == nullptr ||
@ -642,7 +659,7 @@ void XdsServerConfigFetcher::ListenerWatcher::
XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
FilterChainMatchManager( FilterChainMatchManager(
RefCountedPtr<XdsClient> xds_client, RefCountedPtr<GrpcXdsClient> xds_client,
XdsListenerResource::FilterChainMap filter_chain_map, XdsListenerResource::FilterChainMap filter_chain_map,
absl::optional<XdsListenerResource::FilterChainData> absl::optional<XdsListenerResource::FilterChainData>
default_filter_chain) default_filter_chain)
@ -1083,7 +1100,8 @@ absl::StatusOr<ChannelArgs> XdsServerConfigFetcher::ListenerWatcher::
} }
server_config_selector_provider = server_config_selector_provider =
MakeRefCounted<DynamicXdsServerConfigSelectorProvider>( MakeRefCounted<DynamicXdsServerConfigSelectorProvider>(
xds_client_, xds_client_->Ref(DEBUG_LOCATION,
"DynamicXdsServerConfigSelectorProvider"),
filter_chain->http_connection_manager.route_config_name, filter_chain->http_connection_manager.route_config_name,
std::move(initial_resource), std::move(initial_resource),
filter_chain->http_connection_manager.http_filters); filter_chain->http_connection_manager.http_filters);
@ -1226,7 +1244,7 @@ ServerConfigSelector::CallConfig XdsServerConfigFetcher::ListenerWatcher::
XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager:: XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider:: DynamicXdsServerConfigSelectorProvider::
DynamicXdsServerConfigSelectorProvider( DynamicXdsServerConfigSelectorProvider(
RefCountedPtr<XdsClient> xds_client, std::string resource_name, RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
absl::StatusOr<XdsRouteConfigResource> initial_resource, absl::StatusOr<XdsRouteConfigResource> initial_resource,
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter> std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters) http_filters)
@ -1330,16 +1348,15 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
"grpc_server_config_fetcher_xds_create(notifier={on_serving_status_" "grpc_server_config_fetcher_xds_create(notifier={on_serving_status_"
"update=%p, user_data=%p}, args=%p)", "update=%p, user_data=%p}, args=%p)",
3, (notifier.on_serving_status_update, notifier.user_data, args)); 3, (notifier.on_serving_status_update, notifier.user_data, args));
grpc_error_handle error = GRPC_ERROR_NONE; auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(
grpc_core::RefCountedPtr<grpc_core::XdsClient> xds_client = channel_args, "XdsServerConfigFetcher");
grpc_core::GrpcXdsClient::GetOrCreate(channel_args, &error); if (!xds_client.ok()) {
if (!GRPC_ERROR_IS_NONE(error)) {
gpr_log(GPR_ERROR, "Failed to create xds client: %s", gpr_log(GPR_ERROR, "Failed to create xds client: %s",
grpc_error_std_string(error).c_str()); xds_client.status().ToString().c_str());
GRPC_ERROR_UNREF(error);
return nullptr; return nullptr;
} }
if (xds_client->bootstrap() if ((*xds_client)
->bootstrap()
.server_listener_resource_name_template() .server_listener_resource_name_template()
.empty()) { .empty()) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
@ -1347,5 +1364,6 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
"file."); "file.");
return nullptr; return nullptr;
} }
return new grpc_core::XdsServerConfigFetcher(std::move(xds_client), notifier); return new grpc_core::XdsServerConfigFetcher(std::move(*xds_client),
notifier);
} }

@ -39,6 +39,7 @@
#include "src/core/lib/avl/avl.h" #include "src/core/lib/avl/avl.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -86,11 +87,16 @@ struct ChannelArgTypeTraits<
static const grpc_arg_pointer_vtable tbl = { static const grpc_arg_pointer_vtable tbl = {
// copy // copy
[](void* p) -> void* { [](void* p) -> void* {
return p == nullptr ? nullptr : static_cast<T*>(p)->Ref().release(); return p == nullptr ? nullptr
: static_cast<T*>(p)
->Ref(DEBUG_LOCATION, "ChannelArgs copy")
.release();
}, },
// destroy // destroy
[](void* p) { [](void* p) {
if (p != nullptr) static_cast<T*>(p)->Unref(); if (p != nullptr) {
static_cast<T*>(p)->Unref(DEBUG_LOCATION, "ChannelArgs destroy");
}
}, },
// compare // compare
[](void* p1, void* p2) { [](void* p1, void* p2) {
@ -257,7 +263,14 @@ class ChannelArgs {
RefCountedPtr<T> GetObjectRef() const { RefCountedPtr<T> GetObjectRef() const {
auto* p = GetObject<T>(); auto* p = GetObject<T>();
if (p == nullptr) return nullptr; if (p == nullptr) return nullptr;
return p->Ref(); return p->Ref(DEBUG_LOCATION, "ChannelArgs GetObjectRef()");
}
template <typename T>
RefCountedPtr<T> GetObjectRef(const DebugLocation& location,
const char* reason) const {
auto* p = GetObject<T>();
if (p == nullptr) return nullptr;
return p->Ref(location, reason);
} }
bool operator!=(const ChannelArgs& other) const; bool operator!=(const ChannelArgs& other) const;

Loading…
Cancel
Save