@ -49,6 +49,7 @@
# include "src/core/ext/xds/xds_transport.h"
# include "src/core/ext/xds/xds_transport_grpc.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/channel/metrics.h"
# include "src/core/lib/debug/trace.h"
# include "src/core/lib/event_engine/default_event_engine.h"
# include "src/core/lib/gprpp/debug_location.h"
@ -64,8 +65,6 @@
# include "src/core/lib/slice/slice_internal.h"
# include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
// If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
// will be appended to the user agent name reported to the xDS server.
# ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
@ -84,10 +83,90 @@ namespace grpc_core {
# define GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING ""
# endif
namespace grpc_core {
namespace {
// Metric labels.
constexpr absl : : string_view kMetricLabelXdsServer = " grpc.xds.server " ;
constexpr absl : : string_view kMetricLabelXdsAuthority = " grpc.xds.authority " ;
constexpr absl : : string_view kMetricLabelXdsResourceType =
" grpc.xds.resource_type " ;
constexpr absl : : string_view kMetricLabelXdsCacheState = " grpc.xds.cache_state " ;
const auto kMetricResourceUpdatesValid =
GlobalInstrumentsRegistry : : RegisterUInt64Counter (
" grpc.xds_client.resource_updates_valid " ,
" EXPERIMENTAL. A counter of resources received that were considered "
" valid. The counter will be incremented even for resources that "
" have not changed. " ,
" {resource} " ,
{ kMetricLabelTarget , kMetricLabelXdsServer ,
kMetricLabelXdsResourceType } ,
{ } , false ) ;
const auto kMetricResourceUpdatesInvalid =
GlobalInstrumentsRegistry : : RegisterUInt64Counter (
" grpc.xds_client.resource_updates_invalid " ,
" EXPERIMENTAL. A counter of resources received that were considered "
" invalid. " ,
" {resource} " ,
{ kMetricLabelTarget , kMetricLabelXdsServer ,
kMetricLabelXdsResourceType } ,
{ } , false ) ;
const auto kMetricConnected =
GlobalInstrumentsRegistry : : RegisterCallbackInt64Gauge (
" grpc.xds_client.connected " ,
" EXPERIMENTAL. Whether or not the xDS client currently has a "
" working ADS stream to the xDS server. For a given server, this "
" will be set to 0 when we have a connectivity failure or when the "
" ADS stream fails without seeing a response message, as per gRFC "
" A57. It will be set to 1 when we receive the first response on "
" an ADS stream. " ,
" {bool} " , { kMetricLabelTarget , kMetricLabelXdsServer } , { } , false ) ;
const auto kMetricResources =
GlobalInstrumentsRegistry : : RegisterCallbackInt64Gauge (
" grpc.xds_client.resources " , " EXPERIMENTAL. Number of xDS resources. " ,
" {resource} " ,
{ kMetricLabelTarget , kMetricLabelXdsAuthority ,
kMetricLabelXdsResourceType , kMetricLabelXdsCacheState } ,
{ } , false ) ;
} // namespace
//
// GrpcXdsClient::MetricsReporter
//
class GrpcXdsClient : : MetricsReporter : public XdsMetricsReporter {
public :
explicit MetricsReporter ( GrpcXdsClient & xds_client )
: xds_client_ ( xds_client ) { }
void ReportResourceUpdates ( absl : : string_view xds_server ,
absl : : string_view resource_type ,
uint64_t num_valid_resources ,
uint64_t num_invalid_resources ) override {
xds_client_ . stats_plugin_group_ . AddCounter (
kMetricResourceUpdatesValid , num_valid_resources ,
{ xds_client_ . key_ , xds_server , resource_type } , { } ) ;
xds_client_ . stats_plugin_group_ . AddCounter (
kMetricResourceUpdatesInvalid , num_invalid_resources ,
{ xds_client_ . key_ , xds_server , resource_type } , { } ) ;
}
private :
GrpcXdsClient & xds_client_ ;
} ;
//
// GrpcXdsClient
//
constexpr absl : : string_view GrpcXdsClient : : kServerKey ;
namespace {
Mutex * g_mu = new Mutex ;
@ -97,10 +176,6 @@ NoDestruct<std::map<absl::string_view, GrpcXdsClient*>> g_xds_client_map
ABSL_GUARDED_BY ( * g_mu ) ;
char * g_fallback_bootstrap_config ABSL_GUARDED_BY ( * g_mu ) = nullptr ;
} // namespace
namespace {
absl : : StatusOr < std : : string > GetBootstrapContents ( const char * fallback_config ) {
// First, try GRPC_XDS_BOOTSTRAP env var.
auto path = GetEnv ( " GRPC_XDS_BOOTSTRAP " ) ;
@ -138,19 +213,6 @@ absl::StatusOr<std::string> GetBootstrapContents(const char* fallback_config) {
" not defined " ) ;
}
std : : vector < RefCountedPtr < GrpcXdsClient > > GetAllXdsClients ( ) {
MutexLock lock ( g_mu ) ;
std : : vector < RefCountedPtr < GrpcXdsClient > > xds_clients ;
for ( const auto & key_client : * g_xds_client_map ) {
auto xds_client =
key_client . second - > RefIfNonZero ( DEBUG_LOCATION , " DumpAllClientConfigs " ) ;
if ( xds_client ! = nullptr ) {
xds_clients . emplace_back ( xds_client . TakeAsSubclass < GrpcXdsClient > ( ) ) ;
}
}
return xds_clients ;
}
} // namespace
absl : : StatusOr < RefCountedPtr < GrpcXdsClient > > GrpcXdsClient : : GetOrCreate (
@ -201,36 +263,20 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
return xds_client ;
}
// ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
// individual XdsClients and compiler struggles with checking the validity
grpc_slice GrpcXdsClient : : DumpAllClientConfigs ( )
ABSL_NO_THREAD_SAFETY_ANALYSIS {
auto xds_clients = GetAllXdsClients ( ) ;
upb : : Arena arena ;
// Contains strings that should survive till serialization
std : : set < std : : string > string_pool ;
auto response = envoy_service_status_v3_ClientStatusResponse_new ( arena . ptr ( ) ) ;
// We lock each XdsClient mutex till we are done with the serialization to
// ensure that all data referenced from the UPB proto message stays alive.
for ( const auto & xds_client : xds_clients ) {
auto client_config =
envoy_service_status_v3_ClientStatusResponse_add_config ( response ,
arena . ptr ( ) ) ;
xds_client - > mu ( ) - > Lock ( ) ;
xds_client - > DumpClientConfig ( & string_pool , arena . ptr ( ) , client_config ) ;
envoy_service_status_v3_ClientConfig_set_client_scope (
client_config , StdStringToUpbString ( xds_client - > key ( ) ) ) ;
}
// Serialize the upb message to bytes
size_t output_length ;
char * output = envoy_service_status_v3_ClientStatusResponse_serialize (
response , arena . ptr ( ) , & output_length ) ;
for ( const auto & xds_client : xds_clients ) {
xds_client - > mu ( ) - > Unlock ( ) ;
namespace {
GlobalStatsPluginRegistry : : StatsPluginGroup GetStatsPluginGroupForKey (
absl : : string_view key ) {
if ( key = = GrpcXdsClient : : kServerKey ) {
return GlobalStatsPluginRegistry : : GetAllStatsPlugins ( ) ;
}
return grpc_slice_from_cpp_string ( std : : string ( output , output_length ) ) ;
// TODO(roth): How do we set the authority here?
StatsPlugin : : ChannelScope scope ( key , " " ) ;
return GlobalStatsPluginRegistry : : GetStatsPluginsForChannel ( scope ) ;
}
} // namespace
GrpcXdsClient : : GrpcXdsClient (
absl : : string_view key , std : : unique_ptr < GrpcXdsBootstrap > bootstrap ,
const ChannelArgs & args ,
@ -238,6 +284,7 @@ GrpcXdsClient::GrpcXdsClient(
: XdsClient (
std : : move ( bootstrap ) , std : : move ( transport_factory ) ,
grpc_event_engine : : experimental : : GetDefaultEventEngine ( ) ,
std : : make_unique < MetricsReporter > ( * this ) ,
absl : : StrCat ( " gRPC C-core " , GPR_PLATFORM_STRING ,
GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING ) ,
absl : : StrCat ( " C-core " , grpc_version_string ( ) ,
@ -250,15 +297,22 @@ GrpcXdsClient::GrpcXdsClient(
key_ ( key ) ,
certificate_provider_store_ ( MakeOrphanable < CertificateProviderStore > (
static_cast < const GrpcXdsBootstrap & > ( this - > bootstrap ( ) )
. certificate_providers ( ) ) ) { }
. certificate_providers ( ) ) ) ,
stats_plugin_group_ ( GetStatsPluginGroupForKey ( key_ ) ) ,
registered_metric_callback_ ( stats_plugin_group_ . RegisterCallback (
[ this ] ( CallbackMetricReporter & reporter ) {
ReportCallbackMetrics ( reporter ) ;
} ,
{ kMetricConnected , kMetricResources } ) ) { }
void GrpcXdsClient : : Orphan ( ) {
registered_metric_callback_ . reset ( ) ;
XdsClient : : Orphan ( ) ;
MutexLock lock ( g_mu ) ;
auto it = g_xds_client_map - > find ( key_ ) ;
if ( it ! = g_xds_client_map - > end ( ) & & it - > second = = this ) {
g_xds_client_map - > erase ( it ) ;
}
XdsClient : : Orphan ( ) ;
}
grpc_pollset_set * GrpcXdsClient : : interested_parties ( ) const {
@ -266,6 +320,66 @@ grpc_pollset_set* GrpcXdsClient::interested_parties() const {
- > interested_parties ( ) ;
}
namespace {
std : : vector < RefCountedPtr < GrpcXdsClient > > GetAllXdsClients ( ) {
MutexLock lock ( g_mu ) ;
std : : vector < RefCountedPtr < GrpcXdsClient > > xds_clients ;
for ( const auto & key_client : * g_xds_client_map ) {
auto xds_client =
key_client . second - > RefIfNonZero ( DEBUG_LOCATION , " DumpAllClientConfigs " ) ;
if ( xds_client ! = nullptr ) {
xds_clients . emplace_back ( xds_client . TakeAsSubclass < GrpcXdsClient > ( ) ) ;
}
}
return xds_clients ;
}
} // namespace
// ABSL_NO_THREAD_SAFETY_ANALYSIS because we have to manually manage locks for
// individual XdsClients and compiler struggles with checking the validity
grpc_slice GrpcXdsClient : : DumpAllClientConfigs ( )
ABSL_NO_THREAD_SAFETY_ANALYSIS {
auto xds_clients = GetAllXdsClients ( ) ;
upb : : Arena arena ;
// Contains strings that should survive till serialization
std : : set < std : : string > string_pool ;
auto response = envoy_service_status_v3_ClientStatusResponse_new ( arena . ptr ( ) ) ;
// We lock each XdsClient mutex till we are done with the serialization to
// ensure that all data referenced from the UPB proto message stays alive.
for ( const auto & xds_client : xds_clients ) {
auto client_config =
envoy_service_status_v3_ClientStatusResponse_add_config ( response ,
arena . ptr ( ) ) ;
xds_client - > mu ( ) - > Lock ( ) ;
xds_client - > DumpClientConfig ( & string_pool , arena . ptr ( ) , client_config ) ;
envoy_service_status_v3_ClientConfig_set_client_scope (
client_config , StdStringToUpbString ( xds_client - > key ( ) ) ) ;
}
// Serialize the upb message to bytes
size_t output_length ;
char * output = envoy_service_status_v3_ClientStatusResponse_serialize (
response , arena . ptr ( ) , & output_length ) ;
for ( const auto & xds_client : xds_clients ) {
xds_client - > mu ( ) - > Unlock ( ) ;
}
return grpc_slice_from_cpp_string ( std : : string ( output , output_length ) ) ;
}
void GrpcXdsClient : : ReportCallbackMetrics ( CallbackMetricReporter & reporter ) {
MutexLock lock ( mu ( ) ) ;
ReportResourceCounts ( [ & ] ( const ResourceCountLabels & labels , uint64_t count ) {
reporter . Report (
kMetricResources , count ,
{ key_ , labels . xds_authority , labels . resource_type , labels . cache_state } ,
{ } ) ;
} ) ;
ReportServerConnections ( [ & ] ( absl : : string_view xds_server , bool connected ) {
reporter . Report ( kMetricConnected , connected , { key_ , xds_server } , { } ) ;
} ) ;
}
namespace internal {
void SetXdsChannelArgsForTest ( grpc_channel_args * args ) {