implement LrsClient

pull/37605/head
Mark D. Roth 3 months ago
parent d3f22d31b4
commit 1beea74ebb
  1. 2
      BUILD
  2. 30
      src/core/xds/grpc/xds_client_grpc.cc
  3. 6
      src/core/xds/grpc/xds_client_grpc.h
  4. 1206
      src/core/xds/xds_client/lrs_client.cc
  5. 360
      src/core/xds/xds_client/lrs_client.h
  6. 42
      src/core/xds/xds_client/xds_api.cc
  7. 5
      src/core/xds/xds_client/xds_api.h
  8. 2
      src/core/xds/xds_client/xds_client.cc
  9. 4
      src/core/xds/xds_client/xds_client.h

@ -4394,12 +4394,14 @@ grpc_cc_library(
grpc_cc_library(
name = "xds_client",
srcs = [
"//src/core:xds/xds_client/lrs_client.cc",
"//src/core:xds/xds_client/xds_api.cc",
"//src/core:xds/xds_client/xds_bootstrap.cc",
"//src/core:xds/xds_client/xds_client.cc",
"//src/core:xds/xds_client/xds_client_stats.cc",
],
hdrs = [
"//src/core:xds/xds_client/lrs_client.h",
"//src/core:xds/xds_client/xds_api.h",
"//src/core:xds/xds_client/xds_bootstrap.h",
"//src/core:xds/xds_client/xds_channel_args.h",

@ -286,21 +286,28 @@ GlobalStatsPluginRegistry::StatsPluginGroup GetStatsPluginGroupForKey(
return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
}
std::string UserAgentName() {
return absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING);
}
std::string UserAgentVersion() {
return absl::StrCat("C-core ", grpc_version_string(),
GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING);
}
} // namespace
GrpcXdsClient::GrpcXdsClient(
absl::string_view key, std::unique_ptr<GrpcXdsBootstrap> bootstrap,
absl::string_view key, std::shared_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args,
RefCountedPtr<XdsTransportFactory> transport_factory)
: XdsClient(
std::move(bootstrap), std::move(transport_factory),
bootstrap, transport_factory,
grpc_event_engine::experimental::GetDefaultEventEngine(),
std::make_unique<MetricsReporter>(*this),
absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING),
absl::StrCat("C-core ", grpc_version_string(),
GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING),
std::make_unique<MetricsReporter>(*this), UserAgentName(),
UserAgentVersion(),
std::max(Duration::Zero(),
args.GetDurationFromIntMillis(
GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
@ -314,11 +321,16 @@ GrpcXdsClient::GrpcXdsClient(
[this](CallbackMetricReporter& reporter) {
ReportCallbackMetrics(reporter);
},
Duration::Seconds(5), kMetricConnected, kMetricResources)) {}
Duration::Seconds(5), kMetricConnected, kMetricResources)),
lrs_client_(MakeRefCounted<LrsClient>(
std::move(bootstrap), UserAgentName(), UserAgentVersion(),
std::move(transport_factory),
grpc_event_engine::experimental::GetDefaultEventEngine())) {}
void GrpcXdsClient::Orphaned() {
registered_metric_callback_.reset();
XdsClient::Orphaned();
lrs_client_.reset();
MutexLock lock(g_mu);
auto it = g_xds_client_map->find(key_);
if (it != g_xds_client_map->end() && it->second == this) {

@ -34,6 +34,7 @@
#include "src/core/util/useful.h"
#include "src/core/xds/grpc/certificate_provider_store.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
#include "src/core/xds/xds_client/lrs_client.h"
#include "src/core/xds/xds_client/xds_client.h"
#include "src/core/xds/xds_client/xds_transport.h"
@ -61,7 +62,7 @@ class GrpcXdsClient final : public XdsClient {
// that also use certificate_provider_store(), but we should consider
// alternatives for that case as well.
GrpcXdsClient(absl::string_view key,
std::unique_ptr<GrpcXdsBootstrap> bootstrap,
std::shared_ptr<GrpcXdsBootstrap> bootstrap,
const ChannelArgs& args,
RefCountedPtr<XdsTransportFactory> transport_factory);
@ -81,6 +82,8 @@ class GrpcXdsClient final : public XdsClient {
absl::string_view key() const { return key_; }
RefCountedPtr<LrsClient> lrs_client() const { return lrs_client_; }
// Builds ClientStatusResponse containing all resources from all XdsClients
static grpc_slice DumpAllClientConfigs();
@ -94,6 +97,7 @@ class GrpcXdsClient final : public XdsClient {
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_;
std::unique_ptr<RegisteredMetricCallback> registered_metric_callback_;
RefCountedPtr<LrsClient> lrs_client_;
};
namespace internal {

File diff suppressed because it is too large Load Diff

@ -0,0 +1,360 @@
//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "upb/reflection/def.hpp"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/xds/xds_client/xds_api.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_client_stats.h"
#include "src/core/xds/xds_client/xds_metrics.h"
#include "src/core/xds/xds_client/xds_resource_type.h"
#include "src/core/xds/xds_client/xds_transport.h"
#include "src/core/lib/gprpp/per_cpu.h"
namespace grpc_core {
class LrsClient : public DualRefCounted<LrsClient> {
public:
// Drop stats for an xds cluster.
class ClusterDropStats final : public RefCounted<ClusterDropStats> {
public:
// The total number of requests dropped for any reason is the sum of
// uncategorized_drops, and dropped_requests map.
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>;
struct Snapshot {
uint64_t uncategorized_drops = 0;
// The number of requests dropped for the specific drop categories
// outlined in the drop_overloads field in the EDS response.
CategorizedDropsMap categorized_drops;
Snapshot& operator+=(const Snapshot& other) {
uncategorized_drops += other.uncategorized_drops;
for (const auto& p : other.categorized_drops) {
categorized_drops[p.first] += p.second;
}
return *this;
}
bool IsZero() const {
if (uncategorized_drops != 0) return false;
for (const auto& p : categorized_drops) {
if (p.second != 0) return false;
}
return true;
}
};
ClusterDropStats(RefCountedPtr<LrsClient> lrs_client,
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name);
~ClusterDropStats() override;
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddUncategorizedDrops();
void AddCallDropped(const std::string& category);
private:
RefCountedPtr<LrsClient> lrs_client_;
absl::string_view lrs_server_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
std::atomic<uint64_t> uncategorized_drops_{0};
// Protects categorized_drops_. A mutex is necessary because the length of
// dropped_requests can be accessed by both the picker (from data plane
// mutex) and the load reporting thread (from the control plane combiner).
Mutex mu_;
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_);
};
// Locality stats for an xds cluster.
class ClusterLocalityStats final
: public RefCounted<ClusterLocalityStats> {
public:
struct BackendMetric {
uint64_t num_requests_finished_with_metric = 0;
double total_metric_value = 0;
BackendMetric& operator+=(const BackendMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
total_metric_value += other.total_metric_value;
return *this;
}
bool IsZero() const {
return num_requests_finished_with_metric == 0 && total_metric_value == 0;
}
};
struct Snapshot {
uint64_t total_successful_requests = 0;
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
std::map<std::string, BackendMetric> backend_metrics;
Snapshot& operator+=(const Snapshot& other) {
total_successful_requests += other.total_successful_requests;
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
for (const auto& p : other.backend_metrics) {
backend_metrics[p.first] += p.second;
}
return *this;
}
bool IsZero() const {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
return false;
}
for (const auto& p : backend_metrics) {
if (!p.second.IsZero()) return false;
}
return true;
}
};
ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client,
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name);
~ClusterLocalityStats() override;
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddCallStarted();
void AddCallFinished(
const std::map<absl::string_view, double>* named_metrics,
bool fail = false);
XdsLocalityName* locality_name() const { return name_.get(); }
private:
struct Stats {
std::atomic<uint64_t> total_successful_requests{0};
std::atomic<uint64_t> total_requests_in_progress{0};
std::atomic<uint64_t> total_error_requests{0};
std::atomic<uint64_t> total_issued_requests{0};
// Protects backend_metrics. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata and the load reporting thread.
Mutex backend_metrics_mu;
std::map<std::string, BackendMetric> backend_metrics
ABSL_GUARDED_BY(backend_metrics_mu);
};
RefCountedPtr<LrsClient> lrs_client_;
absl::string_view lrs_server_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)};
};
LrsClient(
std::shared_ptr<XdsBootstrap> bootstrap,
std::string user_agent_name, std::string user_agent_version,
RefCountedPtr<XdsTransportFactory> transport_factory,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
~LrsClient() override;
// Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<ClusterDropStats> AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name);
// Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality.
RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality);
// Resets connection backoff state.
void ResetBackoff();
XdsTransportFactory* transport_factory() const {
return transport_factory_.get();
}
grpc_event_engine::experimental::EventEngine* engine() {
return engine_.get();
}
private:
// Contains a channel to the LRS server and all the data related to the
// channel.
class LrsChannel final : public DualRefCounted<LrsChannel> {
public:
template <typename T>
class RetryableCall;
class LrsCall;
LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client,
const XdsBootstrap::XdsServer& server);
~LrsChannel() override;
LrsClient* lrs_client() const { return lrs_client_.get(); }
void ResetBackoff();
// FIXME: maybe call this automatically from ctor?
void MaybeStartLrsCall();
absl::string_view server_uri() const { return server_.server_uri(); }
private:
void Orphaned() override;
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
// The owning LrsClient.
WeakRefCountedPtr<LrsClient> lrs_client_;
const XdsBootstrap::XdsServer& server_; // Owned by bootstrap.
RefCountedPtr<XdsTransportFactory::XdsTransport> transport_;
// The retryable LRS call.
OrphanablePtr<RetryableCall<LrsCall>> lrs_call_;
};
struct LoadReportState {
struct LocalityState {
ClusterLocalityStats* locality_stats = nullptr;
ClusterLocalityStats::Snapshot deleted_locality_stats;
};
ClusterDropStats* drop_stats = nullptr;
ClusterDropStats::Snapshot deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
Timestamp last_report_time = Timestamp::Now();
};
// Load report data.
using LoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>;
struct LoadReportServer {
RefCountedPtr<LrsChannel> lrs_channel;
LoadReportMap load_report_map;
};
struct ClusterLoadReport {
ClusterDropStats::Snapshot dropped_requests;
std::map<RefCountedPtr<XdsLocalityName>, ClusterLocalityStats::Snapshot,
XdsLocalityName::Less>
locality_stats;
Duration load_report_interval;
};
using ClusterLoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
ClusterLoadReport>;
void Orphaned() override;
ClusterLoadReportMap BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<LrsChannel> GetOrCreateLrsChannelLocked(
const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static bool LoadReportCountersAreZero(const ClusterLoadReportMap& snapshot);
void RemoveClusterDropStats(absl::string_view xds_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
ClusterDropStats* cluster_drop_stats);
void RemoveClusterLocalityStats(
absl::string_view xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
ClusterLocalityStats* cluster_locality_stats);
// Creates an initial LRS request.
std::string CreateLrsInitialRequest()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
// Creates an LRS request sending a client-side load report.
std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
// Parses the LRS response and populates send_all_clusters,
// cluster_names, and load_reporting_interval.
absl::Status ParseLrsResponse(absl::string_view encoded_response,
bool* send_all_clusters,
std::set<std::string>* cluster_names,
Duration* load_reporting_interval)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
std::shared_ptr<XdsBootstrap> bootstrap_;
const std::string user_agent_name_;
const std::string user_agent_version_;
RefCountedPtr<XdsTransportFactory> transport_factory_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
Mutex mu_;
upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_);
// Map of existing LRS channels.
std::map<std::string /*XdsServer key*/, LrsChannel*> lrs_channel_map_
ABSL_GUARDED_BY(mu_);
std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>>
load_report_map_ ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H

@ -154,42 +154,50 @@ std::string SerializeDiscoveryRequest(
void XdsApi::PopulateNode(envoy_config_core_v3_Node* node_msg,
upb_Arena* arena) {
if (node_ != nullptr) {
if (!node_->id().empty()) {
PopulateXdsNode(node_, user_agent_name_, user_agent_version_, node_msg,
arena);
}
void PopulateXdsNode(const XdsBootstrap::Node* node,
absl::string_view user_agent_name,
absl::string_view user_agent_version,
envoy_config_core_v3_Node* node_msg, upb_Arena* arena) {
if (node != nullptr) {
if (!node->id().empty()) {
envoy_config_core_v3_Node_set_id(node_msg,
StdStringToUpbString(node_->id()));
StdStringToUpbString(node->id()));
}
if (!node_->cluster().empty()) {
if (!node->cluster().empty()) {
envoy_config_core_v3_Node_set_cluster(
node_msg, StdStringToUpbString(node_->cluster()));
node_msg, StdStringToUpbString(node->cluster()));
}
if (!node_->metadata().empty()) {
if (!node->metadata().empty()) {
google_protobuf_Struct* metadata =
envoy_config_core_v3_Node_mutable_metadata(node_msg, arena);
PopulateMetadata(metadata, node_->metadata(), arena);
PopulateMetadata(metadata, node->metadata(), arena);
}
if (!node_->locality_region().empty() || !node_->locality_zone().empty() ||
!node_->locality_sub_zone().empty()) {
if (!node->locality_region().empty() || !node->locality_zone().empty() ||
!node->locality_sub_zone().empty()) {
envoy_config_core_v3_Locality* locality =
envoy_config_core_v3_Node_mutable_locality(node_msg, arena);
if (!node_->locality_region().empty()) {
if (!node->locality_region().empty()) {
envoy_config_core_v3_Locality_set_region(
locality, StdStringToUpbString(node_->locality_region()));
locality, StdStringToUpbString(node->locality_region()));
}
if (!node_->locality_zone().empty()) {
if (!node->locality_zone().empty()) {
envoy_config_core_v3_Locality_set_zone(
locality, StdStringToUpbString(node_->locality_zone()));
locality, StdStringToUpbString(node->locality_zone()));
}
if (!node_->locality_sub_zone().empty()) {
if (!node->locality_sub_zone().empty()) {
envoy_config_core_v3_Locality_set_sub_zone(
locality, StdStringToUpbString(node_->locality_sub_zone()));
locality, StdStringToUpbString(node->locality_sub_zone()));
}
}
}
envoy_config_core_v3_Node_set_user_agent_name(
node_msg, StdStringToUpbString(user_agent_name_));
node_msg, StdStringToUpbString(user_agent_name));
envoy_config_core_v3_Node_set_user_agent_version(
node_msg, StdStringToUpbString(user_agent_version_));
node_msg, StdStringToUpbString(user_agent_version));
envoy_config_core_v3_Node_add_client_features(
node_msg,
upb_StringView_FromString("envoy.lb.does_not_support_overprovisioning"),

@ -184,6 +184,11 @@ class XdsApi final {
const std::string user_agent_version_;
};
void PopulateXdsNode(const XdsBootstrap::Node* node,
absl::string_view user_agent_name,
absl::string_view user_agent_version,
envoy_config_core_v3_Node* node_msg, upb_Arena* arena);
} // namespace grpc_core
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_API_H

@ -1540,7 +1540,7 @@ bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const {
constexpr absl::string_view XdsClient::kOldStyleAuthority;
XdsClient::XdsClient(
std::unique_ptr<XdsBootstrap> bootstrap,
std::shared_ptr<XdsBootstrap> bootstrap,
RefCountedPtr<XdsTransportFactory> transport_factory,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
std::unique_ptr<XdsMetricsReporter> metrics_reporter,

@ -84,7 +84,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
};
XdsClient(
std::unique_ptr<XdsBootstrap> bootstrap,
std::shared_ptr<XdsBootstrap> bootstrap,
RefCountedPtr<XdsTransportFactory> transport_factory,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
std::unique_ptr<XdsMetricsReporter> metrics_reporter,
@ -352,7 +352,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool HasUncachedResources(const AuthorityState& authority_state);
std::unique_ptr<XdsBootstrap> bootstrap_;
std::shared_ptr<XdsBootstrap> bootstrap_;
RefCountedPtr<XdsTransportFactory> transport_factory_;
const Duration request_timeout_;
const bool xds_federation_enabled_;

Loading…
Cancel
Save