remove old LRS code

pull/37605/head
Mark D. Roth 3 months ago
parent e09ce612cd
commit 4c9850d530
  1. 2
      BUILD
  2. 220
      src/core/xds/xds_client/xds_api.cc
  3. 24
      src/core/xds/xds_client/xds_api.h
  4. 566
      src/core/xds/xds_client/xds_client.cc
  5. 76
      src/core/xds/xds_client/xds_client.h
  6. 164
      src/core/xds/xds_client/xds_client_stats.cc
  7. 162
      src/core/xds/xds_client/xds_client_stats.h

@ -4398,7 +4398,6 @@ grpc_cc_library(
"//src/core:xds/xds_client/xds_api.cc",
"//src/core:xds/xds_client/xds_bootstrap.cc",
"//src/core:xds/xds_client/xds_client.cc",
"//src/core:xds/xds_client/xds_client_stats.cc",
],
hdrs = [
"//src/core:xds/xds_client/lrs_client.h",
@ -4465,6 +4464,7 @@ grpc_cc_library(
"//src/core:json",
"//src/core:per_cpu",
"//src/core:ref_counted",
"//src/core:ref_counted_string",
"//src/core:time",
"//src/core:upb_utils",
"//src/core:useful",

@ -352,224 +352,4 @@ absl::Status XdsApi::ParseAdsResponse(absl::string_view encoded_response,
return absl::OkStatus();
}
namespace {
void MaybeLogLrsRequest(
const XdsApiContext& context,
const envoy_service_load_stats_v3_LoadStatsRequest* request) {
if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
const upb_MessageDef* msg_type =
envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef(
context.def_pool);
char buf[10240];
upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
nullptr, 0, buf, sizeof(buf));
VLOG(2) << "[xds_client " << context.client
<< "] constructed LRS request: " << buf;
}
}
std::string SerializeLrsRequest(
const XdsApiContext& context,
const envoy_service_load_stats_v3_LoadStatsRequest* request) {
size_t output_length;
char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize(
request, context.arena, &output_length);
return std::string(output, output_length);
}
} // namespace
std::string XdsApi::CreateLrsInitialRequest() {
upb::Arena arena;
const XdsApiContext context = {client_, tracer_, def_pool_->ptr(),
arena.ptr()};
// Create a request.
envoy_service_load_stats_v3_LoadStatsRequest* request =
envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
// Populate node.
envoy_config_core_v3_Node* node_msg =
envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
arena.ptr());
PopulateNode(node_msg, arena.ptr());
envoy_config_core_v3_Node_add_client_features(
node_msg,
upb_StringView_FromString("envoy.lrs.supports_send_all_clusters"),
arena.ptr());
MaybeLogLrsRequest(context, request);
return SerializeLrsRequest(context, request);
}
namespace {
void LocalityStatsPopulate(
const XdsApiContext& context,
envoy_config_endpoint_v3_UpstreamLocalityStats* output,
const XdsLocalityName& locality_name,
const XdsClusterLocalityStats::Snapshot& snapshot) {
// Set locality.
envoy_config_core_v3_Locality* locality =
envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_locality(
output, context.arena);
if (!locality_name.region().empty()) {
envoy_config_core_v3_Locality_set_region(
locality, StdStringToUpbString(locality_name.region()));
}
if (!locality_name.zone().empty()) {
envoy_config_core_v3_Locality_set_zone(
locality, StdStringToUpbString(locality_name.zone()));
}
if (!locality_name.sub_zone().empty()) {
envoy_config_core_v3_Locality_set_sub_zone(
locality, StdStringToUpbString(locality_name.sub_zone()));
}
// Set total counts.
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_successful_requests(
output, snapshot.total_successful_requests);
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_requests_in_progress(
output, snapshot.total_requests_in_progress);
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_error_requests(
output, snapshot.total_error_requests);
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add backend metrics.
for (const auto& p : snapshot.backend_metrics) {
const std::string& metric_name = p.first;
const XdsClusterLocalityStats::BackendMetric& metric_value = p.second;
envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
output, context.arena);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
load_metric, StdStringToUpbString(metric_name));
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, metric_value.num_requests_finished_with_metric);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
load_metric, metric_value.total_metric_value);
}
}
} // namespace
std::string XdsApi::CreateLrsRequest(
ClusterLoadReportMap cluster_load_report_map) {
upb::Arena arena;
const XdsApiContext context = {client_, tracer_, def_pool_->ptr(),
arena.ptr()};
// Create a request.
envoy_service_load_stats_v3_LoadStatsRequest* request =
envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
for (auto& p : cluster_load_report_map) {
const std::string& cluster_name = p.first.first;
const std::string& eds_service_name = p.first.second;
const ClusterLoadReport& load_report = p.second;
// Add cluster stats.
envoy_config_endpoint_v3_ClusterStats* cluster_stats =
envoy_service_load_stats_v3_LoadStatsRequest_add_cluster_stats(
request, arena.ptr());
// Set the cluster name.
envoy_config_endpoint_v3_ClusterStats_set_cluster_name(
cluster_stats, StdStringToUpbString(cluster_name));
// Set EDS service name, if non-empty.
if (!eds_service_name.empty()) {
envoy_config_endpoint_v3_ClusterStats_set_cluster_service_name(
cluster_stats, StdStringToUpbString(eds_service_name));
}
// Add locality stats.
for (const auto& p : load_report.locality_stats) {
const XdsLocalityName& locality_name = *p.first;
const auto& snapshot = p.second;
envoy_config_endpoint_v3_UpstreamLocalityStats* locality_stats =
envoy_config_endpoint_v3_ClusterStats_add_upstream_locality_stats(
cluster_stats, arena.ptr());
LocalityStatsPopulate(context, locality_stats, locality_name, snapshot);
}
// Add dropped requests.
uint64_t total_dropped_requests = 0;
for (const auto& p : load_report.dropped_requests.categorized_drops) {
const std::string& category = p.first;
const uint64_t count = p.second;
envoy_config_endpoint_v3_ClusterStats_DroppedRequests* dropped_requests =
envoy_config_endpoint_v3_ClusterStats_add_dropped_requests(
cluster_stats, arena.ptr());
envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_category(
dropped_requests, StdStringToUpbString(category));
envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_dropped_count(
dropped_requests, count);
total_dropped_requests += count;
}
total_dropped_requests += load_report.dropped_requests.uncategorized_drops;
// Set total dropped requests.
envoy_config_endpoint_v3_ClusterStats_set_total_dropped_requests(
cluster_stats, total_dropped_requests);
// Set real load report interval.
gpr_timespec timespec = load_report.load_report_interval.as_timespec();
google_protobuf_Duration* load_report_interval =
envoy_config_endpoint_v3_ClusterStats_mutable_load_report_interval(
cluster_stats, arena.ptr());
google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
}
MaybeLogLrsRequest(context, request);
return SerializeLrsRequest(context, request);
}
namespace {
void MaybeLogLrsResponse(
const XdsApiContext& context,
const envoy_service_load_stats_v3_LoadStatsResponse* response) {
if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
const upb_MessageDef* msg_type =
envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef(
context.def_pool);
char buf[10240];
upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
nullptr, 0, buf, sizeof(buf));
VLOG(2) << "[xds_client " << context.client
<< "] received LRS response: " << buf;
}
}
} // namespace
absl::Status XdsApi::ParseLrsResponse(absl::string_view encoded_response,
bool* send_all_clusters,
std::set<std::string>* cluster_names,
Duration* load_reporting_interval) {
upb::Arena arena;
// Decode the response.
const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response =
envoy_service_load_stats_v3_LoadStatsResponse_parse(
encoded_response.data(), encoded_response.size(), arena.ptr());
// Parse the response.
if (decoded_response == nullptr) {
return absl::UnavailableError("Can't decode response.");
}
const XdsApiContext context = {client_, tracer_, def_pool_->ptr(),
arena.ptr()};
MaybeLogLrsResponse(context, decoded_response);
// Check send_all_clusters.
if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters(
decoded_response)) {
*send_all_clusters = true;
} else {
// Store the cluster names.
size_t size;
const upb_StringView* clusters =
envoy_service_load_stats_v3_LoadStatsResponse_clusters(decoded_response,
&size);
for (size_t i = 0; i < size; ++i) {
cluster_names->emplace(UpbStringToStdString(clusters[i]));
}
}
// Get the load report interval.
const google_protobuf_Duration* load_reporting_interval_duration =
envoy_service_load_stats_v3_LoadStatsResponse_load_reporting_interval(
decoded_response);
*load_reporting_interval = Duration::FromSecondsAndNanoseconds(
google_protobuf_Duration_seconds(load_reporting_interval_duration),
google_protobuf_Duration_nanos(load_reporting_interval_duration));
return absl::OkStatus();
}
} // namespace grpc_core

@ -81,17 +81,6 @@ class XdsApi final {
absl::string_view message) = 0;
};
struct ClusterLoadReport {
XdsClusterDropStats::Snapshot dropped_requests;
std::map<RefCountedPtr<XdsLocalityName>, XdsClusterLocalityStats::Snapshot,
XdsLocalityName::Less>
locality_stats;
Duration load_report_interval;
};
using ClusterLoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
ClusterLoadReport>;
// The metadata of the xDS resource; used by the xDS config dump.
struct ResourceMetadata {
// Resource status from the view of a xDS client, which tells the
@ -160,19 +149,6 @@ class XdsApi final {
absl::Status ParseAdsResponse(absl::string_view encoded_response,
AdsResponseParserInterface* parser);
// Creates an initial LRS request.
std::string CreateLrsInitialRequest();
// Creates an LRS request sending a client-side load report.
std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map);
// Parses the LRS response and populates send_all_clusters,
// cluster_names, and load_reporting_interval.
absl::Status ParseLrsResponse(absl::string_view encoded_response,
bool* send_all_clusters,
std::set<std::string>* cluster_names,
Duration* load_reporting_interval);
void PopulateNode(envoy_config_core_v3_Node* node_msg, upb_Arena* arena);
private:

@ -350,100 +350,6 @@ class XdsClient::XdsChannel::AdsCall final
std::map<const XdsResourceType*, ResourceTypeState> state_map_;
};
// Contains an LRS call to the xds server.
class XdsClient::XdsChannel::LrsCall final
: public InternallyRefCounted<LrsCall> {
public:
// The ctor and dtor should not be used directly.
explicit LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call);
void Orphan() override;
RetryableCall<LrsCall>* retryable_call() { return retryable_call_.get(); }
XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
XdsClient* xds_client() const { return xds_channel()->xds_client(); }
bool seen_response() const { return seen_response_; }
private:
class StreamEventHandler final
: public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
public:
explicit StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)
: lrs_call_(std::move(lrs_call)) {}
void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
void OnRecvMessage(absl::string_view payload) override {
lrs_call_->OnRecvMessage(payload);
}
void OnStatusReceived(absl::Status status) override {
lrs_call_->OnStatusReceived(std::move(status));
}
private:
RefCountedPtr<LrsCall> lrs_call_;
};
// A repeating timer for a particular duration.
class Timer final : public InternallyRefCounted<Timer> {
public:
explicit Timer(RefCountedPtr<LrsCall> lrs_call)
: lrs_call_(std::move(lrs_call)) {}
~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
void ScheduleNextReportLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
private:
bool IsCurrentTimerOnCall() const {
return this == lrs_call_->timer_.get();
}
XdsClient* xds_client() const { return lrs_call_->xds_client(); }
void OnNextReportTimer();
// The owning LRS call.
RefCountedPtr<LrsCall> lrs_call_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&XdsClient::mu_);
};
void MaybeScheduleNextReportLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void SendMessageLocked(std::string payload)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void OnRequestSent();
void OnRecvMessage(absl::string_view payload);
void OnStatusReceived(absl::Status status);
bool IsCurrentCallOnChannel() const;
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<LrsCall>> retryable_call_;
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
streaming_call_;
bool seen_response_ = false;
bool send_message_pending_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
// Load reporting state.
bool send_all_clusters_ = false;
std::set<std::string> cluster_names_; // Asked for by the LRS server.
Duration load_reporting_interval_;
bool last_report_counters_were_zero_ = false;
OrphanablePtr<Timer> timer_;
};
//
// XdsClient::XdsChannel::ConnectivityFailureWatcher
//
@ -513,7 +419,6 @@ void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
// it is shutting down.
xds_client_->xds_channel_map_.erase(server_.Key());
ads_call_.reset();
lrs_call_.reset();
}
void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); }
@ -522,21 +427,6 @@ XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const {
return ads_call_->call();
}
XdsClient::XdsChannel::LrsCall* XdsClient::XdsChannel::lrs_call() const {
return lrs_call_->call();
}
void XdsClient::XdsChannel::MaybeStartLrsCall() {
if (lrs_call_ != nullptr) return;
lrs_call_.reset(
new RetryableCall<LrsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+lrs")));
}
void XdsClient::XdsChannel::StopLrsCallLocked() {
xds_client_->xds_load_report_server_map_.erase(server_.Key());
lrs_call_.reset();
}
void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name) {
if (ads_call_ == nullptr) {
@ -1298,241 +1188,6 @@ XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest(
return resource_names;
}
//
// XdsClient::XdsChannel::LrsCall::Timer
//
void XdsClient::XdsChannel::LrsCall::Timer::Orphan() {
if (timer_handle_.has_value()) {
xds_client()->engine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref(DEBUG_LOCATION, "Orphan");
}
void XdsClient::XdsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< lrs_call_->xds_channel()->server_.server_uri()
<< ": scheduling next load report in "
<< lrs_call_->load_reporting_interval_;
timer_handle_ = xds_client()->engine()->RunAfter(
lrs_call_->load_reporting_interval_,
[self = Ref(DEBUG_LOCATION, "timer")]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnNextReportTimer();
});
}
void XdsClient::XdsChannel::LrsCall::Timer::OnNextReportTimer() {
MutexLock lock(&xds_client()->mu_);
timer_handle_.reset();
if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
}
//
// XdsClient::XdsChannel::LrsCall
//
XdsClient::XdsChannel::LrsCall::LrsCall(
RefCountedPtr<RetryableCall<LrsCall>> retryable_call)
: InternallyRefCounted<LrsCall>(
GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsCall" : nullptr),
retryable_call_(std::move(retryable_call)) {
// Init the LRS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
CHECK_NE(xds_client(), nullptr);
const char* method =
"/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
method, std::make_unique<StreamEventHandler>(
// Passing the initial ref here. This ref will go away when
// the StreamEventHandler is destroyed.
RefCountedPtr<LrsCall>(this)));
CHECK(streaming_call_ != nullptr);
// Start the call.
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": starting LRS call (lrs_call=" << this
<< ", streaming_call=" << streaming_call_.get() << ")";
// Send the initial request.
std::string serialized_payload = xds_client()->api_.CreateLrsInitialRequest();
SendMessageLocked(std::move(serialized_payload));
// Read initial response.
streaming_call_->StartRecvMessage();
}
void XdsClient::XdsChannel::LrsCall::Orphan() {
timer_.reset();
// Note that the initial ref is held by the StreamEventHandler, which
// will be destroyed when streaming_call_ is destroyed, which may not happen
// here, since there may be other refs held to streaming_call_ by internal
// callbacks.
streaming_call_.reset();
}
void XdsClient::XdsChannel::LrsCall::MaybeScheduleNextReportLocked() {
// If there are no more registered stats to report, cancel the call.
auto it = xds_client()->xds_load_report_server_map_.find(
xds_channel()->server_.Key());
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.xds_channel->StopLrsCallLocked();
return;
}
// Don't start if the previous send_message op hasn't completed yet.
// If this happens, we'll be called again from OnRequestSent().
if (send_message_pending_) return;
// Don't start if no LRS response has arrived.
if (!seen_response()) return;
// If there is no timer, create one.
// This happens on the initial response and whenever the interval changes.
if (timer_ == nullptr) {
timer_ = MakeOrphanable<Timer>(Ref(DEBUG_LOCATION, "LRS timer"));
}
// Schedule the next load report.
timer_->ScheduleNextReportLocked();
}
namespace {
bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
for (const auto& p : snapshot) {
const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
if (!cluster_snapshot.dropped_requests.IsZero()) return false;
for (const auto& q : cluster_snapshot.locality_stats) {
const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
if (!locality_snapshot.IsZero()) return false;
}
}
return true;
}
} // namespace
void XdsClient::XdsChannel::LrsCall::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(
xds_channel()->server_, send_all_clusters_, cluster_names_);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
MaybeScheduleNextReportLocked();
return;
}
// Send a request that contains the snapshot.
std::string serialized_payload =
xds_client()->api_.CreateLrsRequest(std::move(snapshot));
SendMessageLocked(std::move(serialized_payload));
}
void XdsClient::XdsChannel::LrsCall::SendMessageLocked(std::string payload) {
send_message_pending_ = true;
streaming_call_->SendMessage(std::move(payload));
}
void XdsClient::XdsChannel::LrsCall::OnRequestSent() {
MutexLock lock(&xds_client()->mu_);
send_message_pending_ = false;
if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
}
void XdsClient::XdsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
MutexLock lock(&xds_client()->mu_);
// If we're no longer the current call, ignore the result.
if (!IsCurrentCallOnChannel()) return;
// Start recv after any code branch
auto cleanup = absl::MakeCleanup(
[call = streaming_call_.get()]() { call->StartRecvMessage(); });
// Parse the response.
bool send_all_clusters = false;
std::set<std::string> new_cluster_names;
Duration new_load_reporting_interval;
absl::Status status = xds_client()->api_.ParseLrsResponse(
payload, &send_all_clusters, &new_cluster_names,
&new_load_reporting_interval);
if (!status.ok()) {
LOG(ERROR) << "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": LRS response parsing failed: " << status;
return;
}
seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(xds_client)) {
LOG(INFO) << "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": LRS response received, " << new_cluster_names.size()
<< " cluster names, send_all_clusters=" << send_all_clusters
<< ", load_report_interval="
<< new_load_reporting_interval.millis() << "ms";
size_t i = 0;
for (const auto& name : new_cluster_names) {
LOG(INFO) << "[xds_client " << xds_client() << "] cluster_name " << i++
<< ": " << name;
}
}
if (new_load_reporting_interval <
Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
new_load_reporting_interval =
Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": increased load_report_interval to minimum value "
<< GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS << "ms";
}
// Ignore identical update.
if (send_all_clusters == send_all_clusters_ &&
cluster_names_ == new_cluster_names &&
load_reporting_interval_ == new_load_reporting_interval) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": incoming LRS response identical to current, ignoring.";
return;
}
// If the interval has changed, we'll need to restart the timer below.
const bool restart_timer =
load_reporting_interval_ != new_load_reporting_interval;
// Record the new config.
send_all_clusters_ = send_all_clusters;
cluster_names_ = std::move(new_cluster_names);
load_reporting_interval_ = new_load_reporting_interval;
// Restart timer if needed.
if (restart_timer) {
timer_.reset();
MaybeScheduleNextReportLocked();
}
}
void XdsClient::XdsChannel::LrsCall::OnStatusReceived(absl::Status status) {
MutexLock lock(&xds_client()->mu_);
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": LRS call status received (xds_channel=" << xds_channel()
<< ", lrs_call=" << this << ", streaming_call=" << streaming_call_.get()
<< "): " << status;
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
retryable_call_->OnCallFinishedLocked();
}
}
bool XdsClient::XdsChannel::LrsCall::IsCurrentCallOnChannel() const {
// If the retryable LRS call is null (which only happens when the xds
// channel is shutting down), all the LRS calls are stale.
if (xds_channel()->lrs_call_ == nullptr) return false;
return this == xds_channel()->lrs_call_->call();
}
//
// XdsClient
//
@ -1580,13 +1235,6 @@ void XdsClient::Orphaned() {
// Clear cache and any remaining watchers that may not have been cancelled.
authority_state_map_.clear();
invalid_watchers_.clear();
// We may still be sending lingering queued load report data, so don't
// just clear the load reporting map, but we do want to clear the refs
// we're holding to the XdsChannel objects, to make sure that
// everything shuts down properly.
for (auto& p : xds_load_report_server_map_) {
p.second.xds_channel.reset(DEBUG_LOCATION, "XdsClient::Orphan()");
}
}
RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
@ -1860,140 +1508,6 @@ std::string XdsClient::ConstructFullXdsResourceName(
return key.id;
}
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name) {
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
{
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the
// absl::string_views stored in the XdsClusterDropStats object point
// to the strings in the xds_load_report_server_map_ keys, so that
// they have the same lifetime.
auto server_it = xds_load_report_server_map_
.emplace(xds_server.Key(), LoadReportServer())
.first;
if (server_it->second.xds_channel == nullptr) {
server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
xds_server, "load report map (drop stats)");
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first;
LoadReportState& load_report_state = load_report_it->second;
if (load_report_state.drop_stats != nullptr) {
cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
}
if (cluster_drop_stats == nullptr) {
if (load_report_state.drop_stats != nullptr) {
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
}
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*xds_server*/,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get();
}
server_it->second.xds_channel->MaybeStartLrsCall();
}
work_serializer_.DrainQueue();
return cluster_drop_stats;
}
void XdsClient::RemoveClusterDropStats(
absl::string_view xds_server_key, absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server_key);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = load_report_it->second;
if (load_report_state.drop_stats == cluster_drop_stats) {
// Record final snapshot in deleted_drop_stats, which will be
// added to the next load report.
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
load_report_state.drop_stats = nullptr;
}
}
RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
{
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the
// absl::string_views stored in the XdsClusterDropStats object point
// to the strings in the xds_load_report_server_map_ keys, so that
// they have the same lifetime.
auto server_it = xds_load_report_server_map_
.emplace(xds_server.Key(), LoadReportServer())
.first;
if (server_it->second.xds_channel == nullptr) {
server_it->second.xds_channel = GetOrCreateXdsChannelLocked(
xds_server, "load report map (locality stats)");
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first;
LoadReportState& load_report_state = load_report_it->second;
LoadReportState::LocalityState& locality_state =
load_report_state.locality_stats[locality];
if (locality_state.locality_stats != nullptr) {
cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
}
if (cluster_locality_stats == nullptr) {
if (locality_state.locality_stats != nullptr) {
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
}
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*xds_server*/,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/,
std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get();
}
server_it->second.xds_channel->MaybeStartLrsCall();
}
work_serializer_.DrainQueue();
return cluster_locality_stats;
}
void XdsClient::RemoveClusterLocalityStats(
absl::string_view xds_server_key, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server_key);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = load_report_it->second;
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
LoadReportState::LocalityState& locality_state = locality_it->second;
if (locality_state.locality_stats == cluster_locality_stats) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
locality_state.locality_stats = nullptr;
}
}
void XdsClient::ResetBackoff() {
MutexLock lock(&mu_);
for (auto& p : xds_channel_map_) {
@ -2036,86 +1550,6 @@ void XdsClient::NotifyWatchersOnResourceDoesNotExist(
DEBUG_LOCATION);
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this << "] start building load report";
XdsApi::ClusterLoadReportMap snapshot_map;
auto server_it = xds_load_report_server_map_.find(xds_server.Key());
if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
auto& load_report_map = server_it->second.load_report_map;
for (auto load_report_it = load_report_map.begin();
load_report_it != load_report_map.end();) {
// Cluster key is cluster and EDS service name.
const auto& cluster_key = load_report_it->first;
LoadReportState& load_report = load_report_it->second;
// If the CDS response for a cluster indicates to use LRS but the
// LRS server does not say that it wants reports for this cluster,
// then we'll have stats objects here whose data we're not going to
// include in the load report. However, we still need to clear out
// the data from the stats objects, so that if the LRS server starts
// asking for the data in the future, we don't incorrectly include
// data from previous reporting intervals in that future report.
const bool record_stats =
send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
if (load_report.drop_stats != nullptr) {
snapshot.dropped_requests +=
load_report.drop_stats->GetSnapshotAndReset();
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this << "] cluster=" << cluster_key.first
<< " eds_service_name=" << cluster_key.second
<< " drop_stats=" << load_report.drop_stats;
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
it != load_report.locality_stats.end();) {
const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
locality_snapshot = std::move(locality_state.deleted_locality_stats);
if (locality_state.locality_stats != nullptr) {
locality_snapshot +=
locality_state.locality_stats->GetSnapshotAndReset();
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this
<< "] cluster=" << cluster_key.first.c_str()
<< " eds_service_name=" << cluster_key.second.c_str()
<< " locality=" << locality_name->human_readable_string().c_str()
<< " locality_stats=" << locality_state.locality_stats;
}
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats == nullptr) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
// Compute load report interval.
const Timestamp now = Timestamp::Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
if (record_stats) {
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() &&
load_report.drop_stats == nullptr) {
load_report_it = load_report_map.erase(load_report_it);
} else {
++load_report_it;
}
}
return snapshot_map;
}
namespace {
google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {

@ -92,14 +92,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
Duration resource_request_timeout = Duration::Seconds(15));
~XdsClient() override;
const XdsBootstrap& bootstrap() const {
return *bootstrap_; // ctor asserts that it is non-null
}
XdsTransportFactory* transport_factory() const {
return transport_factory_.get();
}
// Start and cancel watch for a resource.
//
// The XdsClient takes ownership of the watcher, but the caller may
@ -126,30 +118,17 @@ class XdsClient : public DualRefCounted<XdsClient> {
ResourceWatcherInterface* watcher,
bool delay_unsubscription = false);
// Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name);
void RemoveClusterDropStats(absl::string_view xds_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats);
// Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality.
RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality);
void RemoveClusterLocalityStats(
absl::string_view xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats);
// Resets connection backoff state.
virtual void ResetBackoff();
const XdsBootstrap& bootstrap() const {
return *bootstrap_; // ctor asserts that it is non-null
}
XdsTransportFactory* transport_factory() const {
return transport_factory_.get();
}
grpc_event_engine::experimental::EventEngine* engine() {
return engine_.get();
}
@ -212,7 +191,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
class RetryableCall;
class AdsCall;
class LrsCall;
XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
const XdsBootstrap::XdsServer& server);
@ -220,13 +198,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
XdsClient* xds_client() const { return xds_client_.get(); }
AdsCall* ads_call() const;
LrsCall* lrs_call() const;
void ResetBackoff();
void MaybeStartLrsCall();
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
// Returns non-OK if there has been an error since the last time the
// ADS stream saw a response.
const absl::Status& status() const { return status_; }
@ -272,7 +246,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
// The retryable ADS and LRS calls.
OrphanablePtr<RetryableCall<AdsCall>> ads_call_;
OrphanablePtr<RetryableCall<LrsCall>> lrs_call_;
// Stores the most recent accepted resource version for each resource type.
std::map<const XdsResourceType*, std::string /*version*/>
@ -296,30 +269,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
resource_map;
};
struct LoadReportState {
struct LocalityState {
XdsClusterLocalityStats* locality_stats = nullptr;
XdsClusterLocalityStats::Snapshot deleted_locality_stats;
};
XdsClusterDropStats* drop_stats = nullptr;
XdsClusterDropStats::Snapshot deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
Timestamp last_report_time = Timestamp::Now();
};
// Load report data.
using LoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>;
struct LoadReportServer {
RefCountedPtr<XdsChannel> xds_channel;
LoadReportMap load_report_map;
};
// Sends an error notification to a specific set of watchers.
void NotifyWatchersOnErrorLocked(
const std::map<ResourceWatcherInterface*,
@ -338,19 +287,17 @@ class XdsClient : public DualRefCounted<XdsClient> {
const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool HasUncachedResources(const AuthorityState& authority_state);
absl::StatusOr<XdsResourceName> ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type);
static std::string ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type,
const XdsResourceKey& key);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked(
const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool HasUncachedResources(const AuthorityState& authority_state);
std::shared_ptr<XdsBootstrap> bootstrap_;
RefCountedPtr<XdsTransportFactory> transport_factory_;
@ -375,9 +322,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::map<std::string /*authority*/, AuthorityState> authority_state_map_
ABSL_GUARDED_BY(mu_);
std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>>
xds_load_report_server_map_ ABSL_GUARDED_BY(mu_);
// Stores started watchers whose resource name was not parsed successfully,
// waiting to be cancelled or reset in Orphan().
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>

@ -1,164 +0,0 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/core/xds/xds_client/xds_client_stats.h"
#include "absl/log/log.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/xds/xds_client/xds_client.h"
namespace grpc_core {
namespace {
uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
return from->exchange(0, std::memory_order_relaxed);
}
} // namespace
//
// XdsClusterDropStats
//
XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
? "XdsClusterDropStats"
: nullptr),
xds_client_(std::move(xds_client)),
lrs_server_(lrs_server),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client_.get() << "] created drop stats " << this
<< " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << "}";
}
XdsClusterDropStats::~XdsClusterDropStats() {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client_.get() << "] destroying drop stats "
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << "}";
xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_,
eds_service_name_, this);
xds_client_.reset(DEBUG_LOCATION, "DropStats");
}
XdsClusterDropStats::Snapshot XdsClusterDropStats::GetSnapshotAndReset() {
Snapshot snapshot;
snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_);
MutexLock lock(&mu_);
snapshot.categorized_drops = std::move(categorized_drops_);
return snapshot;
}
void XdsClusterDropStats::AddUncategorizedDrops() {
uncategorized_drops_.fetch_add(1);
}
void XdsClusterDropStats::AddCallDropped(const std::string& category) {
MutexLock lock(&mu_);
++categorized_drops_[category];
}
//
// XdsClusterLocalityStats
//
XdsClusterLocalityStats::XdsClusterLocalityStats(
RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server,
absl::string_view cluster_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
? "XdsClusterLocalityStats"
: nullptr),
xds_client_(std::move(xds_client)),
lrs_server_(lrs_server),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name),
name_(std::move(name)) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client_.get() << "] created locality stats "
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << ", "
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
<< "}";
}
XdsClusterLocalityStats::~XdsClusterLocalityStats() {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client_.get() << "] destroying locality stats "
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << ", "
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
<< "}";
xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
eds_service_name_, name_, this);
xds_client_.reset(DEBUG_LOCATION, "LocalityStats");
}
XdsClusterLocalityStats::Snapshot
XdsClusterLocalityStats::GetSnapshotAndReset() {
Snapshot snapshot;
for (auto& percpu_stats : stats_) {
Snapshot percpu_snapshot = {
GetAndResetCounter(&percpu_stats.total_successful_requests),
// Don't reset total_requests_in_progress because it's
// not related to a single reporting interval.
percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
GetAndResetCounter(&percpu_stats.total_error_requests),
GetAndResetCounter(&percpu_stats.total_issued_requests),
{}};
{
MutexLock lock(&percpu_stats.backend_metrics_mu);
percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
}
snapshot += percpu_snapshot;
}
return snapshot;
}
void XdsClusterLocalityStats::AddCallStarted() {
Stats& stats = stats_.this_cpu();
stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed);
stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed);
}
void XdsClusterLocalityStats::AddCallFinished(
const std::map<absl::string_view, double>* named_metrics, bool fail) {
Stats& stats = stats_.this_cpu();
std::atomic<uint64_t>& to_increment =
fail ? stats.total_error_requests : stats.total_successful_requests;
to_increment.fetch_add(1, std::memory_order_relaxed);
stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
if (named_metrics == nullptr) return;
MutexLock lock(&stats.backend_metrics_mu);
for (const auto& m : *named_metrics) {
stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second};
}
}
} // namespace grpc_core

@ -1,3 +1,6 @@
// FIXME: rename to xds_locality.h
//
//
// Copyright 2018 gRPC authors.
@ -19,33 +22,21 @@
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H
#include <atomic>
#include <cstdint>
#include <map>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/util/useful.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
namespace grpc_core {
// Forward declaration to avoid circular dependency.
class XdsClient;
// Locality name.
// An xDS locality name.
class XdsLocalityName final : public RefCounted<XdsLocalityName> {
public:
struct Less {
@ -110,149 +101,6 @@ class XdsLocalityName final : public RefCounted<XdsLocalityName> {
RefCountedStringValue human_readable_string_;
};
// Drop stats for an xds cluster.
class XdsClusterDropStats final : public RefCounted<XdsClusterDropStats> {
public:
// The total number of requests dropped for any reason is the sum of
// uncategorized_drops, and dropped_requests map.
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>;
struct Snapshot {
uint64_t uncategorized_drops = 0;
// The number of requests dropped for the specific drop categories
// outlined in the drop_overloads field in the EDS response.
CategorizedDropsMap categorized_drops;
Snapshot& operator+=(const Snapshot& other) {
uncategorized_drops += other.uncategorized_drops;
for (const auto& p : other.categorized_drops) {
categorized_drops[p.first] += p.second;
}
return *this;
}
bool IsZero() const {
if (uncategorized_drops != 0) return false;
for (const auto& p : categorized_drops) {
if (p.second != 0) return false;
}
return true;
}
};
XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name);
~XdsClusterDropStats() override;
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddUncategorizedDrops();
void AddCallDropped(const std::string& category);
private:
RefCountedPtr<XdsClient> xds_client_;
absl::string_view lrs_server_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
std::atomic<uint64_t> uncategorized_drops_{0};
// Protects categorized_drops_. A mutex is necessary because the length of
// dropped_requests can be accessed by both the picker (from data plane
// mutex) and the load reporting thread (from the control plane combiner).
Mutex mu_;
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_);
};
// Locality stats for an xds cluster.
class XdsClusterLocalityStats final
: public RefCounted<XdsClusterLocalityStats> {
public:
struct BackendMetric {
uint64_t num_requests_finished_with_metric = 0;
double total_metric_value = 0;
BackendMetric& operator+=(const BackendMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
total_metric_value += other.total_metric_value;
return *this;
}
bool IsZero() const {
return num_requests_finished_with_metric == 0 && total_metric_value == 0;
}
};
struct Snapshot {
uint64_t total_successful_requests = 0;
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
std::map<std::string, BackendMetric> backend_metrics;
Snapshot& operator+=(const Snapshot& other) {
total_successful_requests += other.total_successful_requests;
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
for (const auto& p : other.backend_metrics) {
backend_metrics[p.first] += p.second;
}
return *this;
}
bool IsZero() const {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
return false;
}
for (const auto& p : backend_metrics) {
if (!p.second.IsZero()) return false;
}
return true;
}
};
XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name);
~XdsClusterLocalityStats() override;
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddCallStarted();
void AddCallFinished(const std::map<absl::string_view, double>* named_metrics,
bool fail = false);
XdsLocalityName* locality_name() const { return name_.get(); }
private:
struct Stats {
std::atomic<uint64_t> total_successful_requests{0};
std::atomic<uint64_t> total_requests_in_progress{0};
std::atomic<uint64_t> total_error_requests{0};
std::atomic<uint64_t> total_issued_requests{0};
// Protects backend_metrics. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata and the load reporting thread.
Mutex backend_metrics_mu;
std::map<std::string, BackendMetric> backend_metrics
ABSL_GUARDED_BY(backend_metrics_mu);
};
RefCountedPtr<XdsClient> xds_client_;
absl::string_view lrs_server_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)};
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H

Loading…
Cancel
Save