[xDS] split LRS client into its own API (#37605)
Closes #37605
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37605 from markdroth:xds_client_lrs_refactor f9b86b8f06
PiperOrigin-RevId: 678281548
pull/37668/head^2
parent
94b7927def
commit
06d3e40d81
41 changed files with 2114 additions and 1573 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,358 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2019 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H |
||||||
|
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include <map> |
||||||
|
#include <memory> |
||||||
|
#include <set> |
||||||
|
#include <string> |
||||||
|
#include <utility> |
||||||
|
|
||||||
|
#include "absl/base/thread_annotations.h" |
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/status/statusor.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
#include "upb/reflection/def.hpp" |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
|
||||||
|
#include "src/core/lib/debug/trace.h" |
||||||
|
#include "src/core/util/dual_ref_counted.h" |
||||||
|
#include "src/core/util/orphanable.h" |
||||||
|
#include "src/core/util/per_cpu.h" |
||||||
|
#include "src/core/util/ref_counted.h" |
||||||
|
#include "src/core/util/ref_counted_ptr.h" |
||||||
|
#include "src/core/util/sync.h" |
||||||
|
#include "src/core/util/time.h" |
||||||
|
#include "src/core/util/uri.h" |
||||||
|
#include "src/core/util/work_serializer.h" |
||||||
|
#include "src/core/xds/xds_client/xds_api.h" |
||||||
|
#include "src/core/xds/xds_client/xds_bootstrap.h" |
||||||
|
#include "src/core/xds/xds_client/xds_locality.h" |
||||||
|
#include "src/core/xds/xds_client/xds_metrics.h" |
||||||
|
#include "src/core/xds/xds_client/xds_resource_type.h" |
||||||
|
#include "src/core/xds/xds_client/xds_transport.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
class LrsClient : public DualRefCounted<LrsClient> { |
||||||
|
public: |
||||||
|
// Drop stats for an xds cluster.
|
||||||
|
class ClusterDropStats final : public RefCounted<ClusterDropStats> { |
||||||
|
public: |
||||||
|
// The total number of requests dropped for any reason is the sum of
|
||||||
|
// uncategorized_drops, and dropped_requests map.
|
||||||
|
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>; |
||||||
|
struct Snapshot { |
||||||
|
uint64_t uncategorized_drops = 0; |
||||||
|
// The number of requests dropped for the specific drop categories
|
||||||
|
// outlined in the drop_overloads field in the EDS response.
|
||||||
|
CategorizedDropsMap categorized_drops; |
||||||
|
|
||||||
|
Snapshot& operator+=(const Snapshot& other) { |
||||||
|
uncategorized_drops += other.uncategorized_drops; |
||||||
|
for (const auto& p : other.categorized_drops) { |
||||||
|
categorized_drops[p.first] += p.second; |
||||||
|
} |
||||||
|
return *this; |
||||||
|
} |
||||||
|
|
||||||
|
bool IsZero() const { |
||||||
|
if (uncategorized_drops != 0) return false; |
||||||
|
for (const auto& p : categorized_drops) { |
||||||
|
if (p.second != 0) return false; |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
ClusterDropStats(RefCountedPtr<LrsClient> lrs_client, |
||||||
|
absl::string_view lrs_server, |
||||||
|
absl::string_view cluster_name, |
||||||
|
absl::string_view eds_service_name); |
||||||
|
~ClusterDropStats() override; |
||||||
|
|
||||||
|
// Returns a snapshot of this instance and resets all the counters.
|
||||||
|
Snapshot GetSnapshotAndReset(); |
||||||
|
|
||||||
|
void AddUncategorizedDrops(); |
||||||
|
void AddCallDropped(const std::string& category); |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<LrsClient> lrs_client_; |
||||||
|
absl::string_view lrs_server_; |
||||||
|
absl::string_view cluster_name_; |
||||||
|
absl::string_view eds_service_name_; |
||||||
|
std::atomic<uint64_t> uncategorized_drops_{0}; |
||||||
|
// Protects categorized_drops_. A mutex is necessary because the length of
|
||||||
|
// dropped_requests can be accessed by both the picker (from data plane
|
||||||
|
// mutex) and the load reporting thread (from the control plane combiner).
|
||||||
|
Mutex mu_; |
||||||
|
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); |
||||||
|
}; |
||||||
|
|
||||||
|
// Locality stats for an xds cluster.
|
||||||
|
class ClusterLocalityStats final : public RefCounted<ClusterLocalityStats> { |
||||||
|
public: |
||||||
|
struct BackendMetric { |
||||||
|
uint64_t num_requests_finished_with_metric = 0; |
||||||
|
double total_metric_value = 0; |
||||||
|
|
||||||
|
BackendMetric& operator+=(const BackendMetric& other) { |
||||||
|
num_requests_finished_with_metric += |
||||||
|
other.num_requests_finished_with_metric; |
||||||
|
total_metric_value += other.total_metric_value; |
||||||
|
return *this; |
||||||
|
} |
||||||
|
|
||||||
|
bool IsZero() const { |
||||||
|
return num_requests_finished_with_metric == 0 && |
||||||
|
total_metric_value == 0; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
struct Snapshot { |
||||||
|
uint64_t total_successful_requests = 0; |
||||||
|
uint64_t total_requests_in_progress = 0; |
||||||
|
uint64_t total_error_requests = 0; |
||||||
|
uint64_t total_issued_requests = 0; |
||||||
|
std::map<std::string, BackendMetric> backend_metrics; |
||||||
|
|
||||||
|
Snapshot& operator+=(const Snapshot& other) { |
||||||
|
total_successful_requests += other.total_successful_requests; |
||||||
|
total_requests_in_progress += other.total_requests_in_progress; |
||||||
|
total_error_requests += other.total_error_requests; |
||||||
|
total_issued_requests += other.total_issued_requests; |
||||||
|
for (const auto& p : other.backend_metrics) { |
||||||
|
backend_metrics[p.first] += p.second; |
||||||
|
} |
||||||
|
return *this; |
||||||
|
} |
||||||
|
|
||||||
|
bool IsZero() const { |
||||||
|
if (total_successful_requests != 0 || total_requests_in_progress != 0 || |
||||||
|
total_error_requests != 0 || total_issued_requests != 0) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
for (const auto& p : backend_metrics) { |
||||||
|
if (!p.second.IsZero()) return false; |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client, |
||||||
|
absl::string_view lrs_server, |
||||||
|
absl::string_view cluster_name, |
||||||
|
absl::string_view eds_service_name, |
||||||
|
RefCountedPtr<XdsLocalityName> name); |
||||||
|
~ClusterLocalityStats() override; |
||||||
|
|
||||||
|
// Returns a snapshot of this instance and resets all the counters.
|
||||||
|
Snapshot GetSnapshotAndReset(); |
||||||
|
|
||||||
|
void AddCallStarted(); |
||||||
|
void AddCallFinished( |
||||||
|
const std::map<absl::string_view, double>* named_metrics, |
||||||
|
bool fail = false); |
||||||
|
|
||||||
|
XdsLocalityName* locality_name() const { return name_.get(); } |
||||||
|
|
||||||
|
private: |
||||||
|
struct Stats { |
||||||
|
std::atomic<uint64_t> total_successful_requests{0}; |
||||||
|
std::atomic<uint64_t> total_requests_in_progress{0}; |
||||||
|
std::atomic<uint64_t> total_error_requests{0}; |
||||||
|
std::atomic<uint64_t> total_issued_requests{0}; |
||||||
|
|
||||||
|
// Protects backend_metrics. A mutex is necessary because the length of
|
||||||
|
// backend_metrics_ can be accessed by both the callback intercepting the
|
||||||
|
// call's recv_trailing_metadata and the load reporting thread.
|
||||||
|
Mutex backend_metrics_mu; |
||||||
|
std::map<std::string, BackendMetric> backend_metrics |
||||||
|
ABSL_GUARDED_BY(backend_metrics_mu); |
||||||
|
}; |
||||||
|
|
||||||
|
RefCountedPtr<LrsClient> lrs_client_; |
||||||
|
absl::string_view lrs_server_; |
||||||
|
absl::string_view cluster_name_; |
||||||
|
absl::string_view eds_service_name_; |
||||||
|
RefCountedPtr<XdsLocalityName> name_; |
||||||
|
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; |
||||||
|
}; |
||||||
|
|
||||||
|
LrsClient( |
||||||
|
std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name, |
||||||
|
std::string user_agent_version, |
||||||
|
RefCountedPtr<XdsTransportFactory> transport_factory, |
||||||
|
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine); |
||||||
|
~LrsClient() override; |
||||||
|
|
||||||
|
// Adds and removes drop stats for cluster_name and eds_service_name.
|
||||||
|
RefCountedPtr<ClusterDropStats> AddClusterDropStats( |
||||||
|
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, |
||||||
|
absl::string_view cluster_name, absl::string_view eds_service_name); |
||||||
|
|
||||||
|
// Adds and removes locality stats for cluster_name and eds_service_name
|
||||||
|
// for the specified locality.
|
||||||
|
RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats( |
||||||
|
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, |
||||||
|
absl::string_view cluster_name, absl::string_view eds_service_name, |
||||||
|
RefCountedPtr<XdsLocalityName> locality); |
||||||
|
|
||||||
|
// Resets connection backoff state.
|
||||||
|
void ResetBackoff(); |
||||||
|
|
||||||
|
XdsTransportFactory* transport_factory() const { |
||||||
|
return transport_factory_.get(); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_event_engine::experimental::EventEngine* engine() { |
||||||
|
return engine_.get(); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
// Contains a channel to the LRS server and all the data related to the
|
||||||
|
// channel.
|
||||||
|
class LrsChannel final : public DualRefCounted<LrsChannel> { |
||||||
|
public: |
||||||
|
template <typename T> |
||||||
|
class RetryableCall; |
||||||
|
|
||||||
|
class LrsCall; |
||||||
|
|
||||||
|
LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client, |
||||||
|
std::shared_ptr<const XdsBootstrap::XdsServer> server); |
||||||
|
~LrsChannel() override; |
||||||
|
|
||||||
|
LrsClient* lrs_client() const { return lrs_client_.get(); } |
||||||
|
|
||||||
|
void ResetBackoff(); |
||||||
|
|
||||||
|
void MaybeStartLrsCall(); |
||||||
|
|
||||||
|
absl::string_view server_uri() const { return server_->server_uri(); } |
||||||
|
|
||||||
|
private: |
||||||
|
void Orphaned() override; |
||||||
|
|
||||||
|
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_); |
||||||
|
|
||||||
|
// The owning LrsClient.
|
||||||
|
WeakRefCountedPtr<LrsClient> lrs_client_; |
||||||
|
|
||||||
|
std::shared_ptr<const XdsBootstrap::XdsServer> server_; |
||||||
|
|
||||||
|
RefCountedPtr<XdsTransportFactory::XdsTransport> transport_; |
||||||
|
|
||||||
|
// The retryable LRS call.
|
||||||
|
OrphanablePtr<RetryableCall<LrsCall>> lrs_call_; |
||||||
|
}; |
||||||
|
|
||||||
|
struct LoadReportState { |
||||||
|
struct LocalityState { |
||||||
|
ClusterLocalityStats* locality_stats = nullptr; |
||||||
|
ClusterLocalityStats::Snapshot deleted_locality_stats; |
||||||
|
}; |
||||||
|
|
||||||
|
ClusterDropStats* drop_stats = nullptr; |
||||||
|
ClusterDropStats::Snapshot deleted_drop_stats; |
||||||
|
std::map<RefCountedPtr<XdsLocalityName>, LocalityState, |
||||||
|
XdsLocalityName::Less> |
||||||
|
locality_stats; |
||||||
|
Timestamp last_report_time = Timestamp::Now(); |
||||||
|
}; |
||||||
|
|
||||||
|
// Load report data.
|
||||||
|
using LoadReportMap = std::map< |
||||||
|
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, |
||||||
|
LoadReportState>; |
||||||
|
|
||||||
|
struct LoadReportServer { |
||||||
|
RefCountedPtr<LrsChannel> lrs_channel; |
||||||
|
LoadReportMap load_report_map; |
||||||
|
}; |
||||||
|
|
||||||
|
struct ClusterLoadReport { |
||||||
|
ClusterDropStats::Snapshot dropped_requests; |
||||||
|
std::map<RefCountedPtr<XdsLocalityName>, ClusterLocalityStats::Snapshot, |
||||||
|
XdsLocalityName::Less> |
||||||
|
locality_stats; |
||||||
|
Duration load_report_interval; |
||||||
|
}; |
||||||
|
using ClusterLoadReportMap = std::map< |
||||||
|
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, |
||||||
|
ClusterLoadReport>; |
||||||
|
|
||||||
|
void Orphaned() override; |
||||||
|
|
||||||
|
ClusterLoadReportMap BuildLoadReportSnapshotLocked( |
||||||
|
const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters, |
||||||
|
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||||
|
|
||||||
|
RefCountedPtr<LrsChannel> GetOrCreateLrsChannelLocked( |
||||||
|
std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||||
|
|
||||||
|
static bool LoadReportCountersAreZero(const ClusterLoadReportMap& snapshot); |
||||||
|
|
||||||
|
void RemoveClusterDropStats(absl::string_view lrs_server, |
||||||
|
absl::string_view cluster_name, |
||||||
|
absl::string_view eds_service_name, |
||||||
|
ClusterDropStats* cluster_drop_stats); |
||||||
|
|
||||||
|
void RemoveClusterLocalityStats( |
||||||
|
absl::string_view lrs_server, absl::string_view cluster_name, |
||||||
|
absl::string_view eds_service_name, |
||||||
|
const RefCountedPtr<XdsLocalityName>& locality, |
||||||
|
ClusterLocalityStats* cluster_locality_stats); |
||||||
|
|
||||||
|
// Creates an initial LRS request.
|
||||||
|
std::string CreateLrsInitialRequest() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||||
|
|
||||||
|
// Creates an LRS request sending a client-side load report.
|
||||||
|
std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||||
|
|
||||||
|
// Parses the LRS response and populates send_all_clusters,
|
||||||
|
// cluster_names, and load_reporting_interval.
|
||||||
|
absl::Status ParseLrsResponse(absl::string_view encoded_response, |
||||||
|
bool* send_all_clusters, |
||||||
|
std::set<std::string>* cluster_names, |
||||||
|
Duration* load_reporting_interval) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||||
|
|
||||||
|
std::shared_ptr<XdsBootstrap> bootstrap_; |
||||||
|
const std::string user_agent_name_; |
||||||
|
const std::string user_agent_version_; |
||||||
|
RefCountedPtr<XdsTransportFactory> transport_factory_; |
||||||
|
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; |
||||||
|
|
||||||
|
Mutex mu_; |
||||||
|
upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_); |
||||||
|
// Map of existing LRS channels.
|
||||||
|
std::map<std::string /*XdsServer key*/, LrsChannel*> lrs_channel_map_ |
||||||
|
ABSL_GUARDED_BY(mu_); |
||||||
|
std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>> |
||||||
|
load_report_map_ ABSL_GUARDED_BY(mu_); |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
|
@ -1,164 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2018 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include "src/core/xds/xds_client/xds_client_stats.h" |
|
||||||
|
|
||||||
#include "absl/log/log.h" |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/debug/trace.h" |
|
||||||
#include "src/core/util/debug_location.h" |
|
||||||
#include "src/core/xds/xds_client/xds_client.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) { |
|
||||||
return from->exchange(0, std::memory_order_relaxed); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
//
|
|
||||||
// XdsClusterDropStats
|
|
||||||
//
|
|
||||||
|
|
||||||
XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, |
|
||||||
absl::string_view lrs_server, |
|
||||||
absl::string_view cluster_name, |
|
||||||
absl::string_view eds_service_name) |
|
||||||
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) |
|
||||||
? "XdsClusterDropStats" |
|
||||||
: nullptr), |
|
||||||
xds_client_(std::move(xds_client)), |
|
||||||
lrs_server_(lrs_server), |
|
||||||
cluster_name_(cluster_name), |
|
||||||
eds_service_name_(eds_service_name) { |
|
||||||
GRPC_TRACE_LOG(xds_client, INFO) |
|
||||||
<< "[xds_client " << xds_client_.get() << "] created drop stats " << this |
|
||||||
<< " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
|
||||||
<< eds_service_name_ << "}"; |
|
||||||
} |
|
||||||
|
|
||||||
XdsClusterDropStats::~XdsClusterDropStats() { |
|
||||||
GRPC_TRACE_LOG(xds_client, INFO) |
|
||||||
<< "[xds_client " << xds_client_.get() << "] destroying drop stats " |
|
||||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
|
||||||
<< eds_service_name_ << "}"; |
|
||||||
xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_, |
|
||||||
eds_service_name_, this); |
|
||||||
xds_client_.reset(DEBUG_LOCATION, "DropStats"); |
|
||||||
} |
|
||||||
|
|
||||||
XdsClusterDropStats::Snapshot XdsClusterDropStats::GetSnapshotAndReset() { |
|
||||||
Snapshot snapshot; |
|
||||||
snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_); |
|
||||||
MutexLock lock(&mu_); |
|
||||||
snapshot.categorized_drops = std::move(categorized_drops_); |
|
||||||
return snapshot; |
|
||||||
} |
|
||||||
|
|
||||||
void XdsClusterDropStats::AddUncategorizedDrops() { |
|
||||||
uncategorized_drops_.fetch_add(1); |
|
||||||
} |
|
||||||
|
|
||||||
void XdsClusterDropStats::AddCallDropped(const std::string& category) { |
|
||||||
MutexLock lock(&mu_); |
|
||||||
++categorized_drops_[category]; |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// XdsClusterLocalityStats
|
|
||||||
//
|
|
||||||
|
|
||||||
XdsClusterLocalityStats::XdsClusterLocalityStats( |
|
||||||
RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server, |
|
||||||
absl::string_view cluster_name, absl::string_view eds_service_name, |
|
||||||
RefCountedPtr<XdsLocalityName> name) |
|
||||||
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) |
|
||||||
? "XdsClusterLocalityStats" |
|
||||||
: nullptr), |
|
||||||
xds_client_(std::move(xds_client)), |
|
||||||
lrs_server_(lrs_server), |
|
||||||
cluster_name_(cluster_name), |
|
||||||
eds_service_name_(eds_service_name), |
|
||||||
name_(std::move(name)) { |
|
||||||
GRPC_TRACE_LOG(xds_client, INFO) |
|
||||||
<< "[xds_client " << xds_client_.get() << "] created locality stats " |
|
||||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
|
||||||
<< eds_service_name_ << ", " |
|
||||||
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str()) |
|
||||||
<< "}"; |
|
||||||
} |
|
||||||
|
|
||||||
XdsClusterLocalityStats::~XdsClusterLocalityStats() { |
|
||||||
GRPC_TRACE_LOG(xds_client, INFO) |
|
||||||
<< "[xds_client " << xds_client_.get() << "] destroying locality stats " |
|
||||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
|
||||||
<< eds_service_name_ << ", " |
|
||||||
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str()) |
|
||||||
<< "}"; |
|
||||||
xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_, |
|
||||||
eds_service_name_, name_, this); |
|
||||||
xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); |
|
||||||
} |
|
||||||
|
|
||||||
XdsClusterLocalityStats::Snapshot |
|
||||||
XdsClusterLocalityStats::GetSnapshotAndReset() { |
|
||||||
Snapshot snapshot; |
|
||||||
for (auto& percpu_stats : stats_) { |
|
||||||
Snapshot percpu_snapshot = { |
|
||||||
GetAndResetCounter(&percpu_stats.total_successful_requests), |
|
||||||
// Don't reset total_requests_in_progress because it's
|
|
||||||
// not related to a single reporting interval.
|
|
||||||
percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed), |
|
||||||
GetAndResetCounter(&percpu_stats.total_error_requests), |
|
||||||
GetAndResetCounter(&percpu_stats.total_issued_requests), |
|
||||||
{}}; |
|
||||||
{ |
|
||||||
MutexLock lock(&percpu_stats.backend_metrics_mu); |
|
||||||
percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics); |
|
||||||
} |
|
||||||
snapshot += percpu_snapshot; |
|
||||||
} |
|
||||||
return snapshot; |
|
||||||
} |
|
||||||
|
|
||||||
void XdsClusterLocalityStats::AddCallStarted() { |
|
||||||
Stats& stats = stats_.this_cpu(); |
|
||||||
stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed); |
|
||||||
stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed); |
|
||||||
} |
|
||||||
|
|
||||||
void XdsClusterLocalityStats::AddCallFinished( |
|
||||||
const std::map<absl::string_view, double>* named_metrics, bool fail) { |
|
||||||
Stats& stats = stats_.this_cpu(); |
|
||||||
std::atomic<uint64_t>& to_increment = |
|
||||||
fail ? stats.total_error_requests : stats.total_successful_requests; |
|
||||||
to_increment.fetch_add(1, std::memory_order_relaxed); |
|
||||||
stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel); |
|
||||||
if (named_metrics == nullptr) return; |
|
||||||
MutexLock lock(&stats.backend_metrics_mu); |
|
||||||
for (const auto& m : *named_metrics) { |
|
||||||
stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second}; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
@ -1,258 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2018 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H |
|
||||||
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H |
|
||||||
|
|
||||||
#include <atomic> |
|
||||||
#include <cstdint> |
|
||||||
#include <map> |
|
||||||
#include <string> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/base/thread_annotations.h" |
|
||||||
#include "absl/strings/str_format.h" |
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/resolver/endpoint_addresses.h" |
|
||||||
#include "src/core/telemetry/call_tracer.h" |
|
||||||
#include "src/core/util/per_cpu.h" |
|
||||||
#include "src/core/util/ref_counted.h" |
|
||||||
#include "src/core/util/ref_counted_ptr.h" |
|
||||||
#include "src/core/util/sync.h" |
|
||||||
#include "src/core/util/useful.h" |
|
||||||
#include "src/core/xds/xds_client/xds_bootstrap.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
// Forward declaration to avoid circular dependency.
|
|
||||||
class XdsClient; |
|
||||||
|
|
||||||
// Locality name.
|
|
||||||
class XdsLocalityName final : public RefCounted<XdsLocalityName> { |
|
||||||
public: |
|
||||||
struct Less { |
|
||||||
bool operator()(const XdsLocalityName* lhs, |
|
||||||
const XdsLocalityName* rhs) const { |
|
||||||
if (lhs == nullptr || rhs == nullptr) return QsortCompare(lhs, rhs); |
|
||||||
return lhs->Compare(*rhs) < 0; |
|
||||||
} |
|
||||||
|
|
||||||
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs, |
|
||||||
const RefCountedPtr<XdsLocalityName>& rhs) const { |
|
||||||
return (*this)(lhs.get(), rhs.get()); |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
XdsLocalityName(std::string region, std::string zone, std::string sub_zone) |
|
||||||
: region_(std::move(region)), |
|
||||||
zone_(std::move(zone)), |
|
||||||
sub_zone_(std::move(sub_zone)), |
|
||||||
human_readable_string_( |
|
||||||
absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", |
|
||||||
region_, zone_, sub_zone_)) {} |
|
||||||
|
|
||||||
bool operator==(const XdsLocalityName& other) const { |
|
||||||
return region_ == other.region_ && zone_ == other.zone_ && |
|
||||||
sub_zone_ == other.sub_zone_; |
|
||||||
} |
|
||||||
|
|
||||||
bool operator!=(const XdsLocalityName& other) const { |
|
||||||
return !(*this == other); |
|
||||||
} |
|
||||||
|
|
||||||
int Compare(const XdsLocalityName& other) const { |
|
||||||
int cmp_result = region_.compare(other.region_); |
|
||||||
if (cmp_result != 0) return cmp_result; |
|
||||||
cmp_result = zone_.compare(other.zone_); |
|
||||||
if (cmp_result != 0) return cmp_result; |
|
||||||
return sub_zone_.compare(other.sub_zone_); |
|
||||||
} |
|
||||||
|
|
||||||
const std::string& region() const { return region_; } |
|
||||||
const std::string& zone() const { return zone_; } |
|
||||||
const std::string& sub_zone() const { return sub_zone_; } |
|
||||||
|
|
||||||
const RefCountedStringValue& human_readable_string() const { |
|
||||||
return human_readable_string_; |
|
||||||
} |
|
||||||
|
|
||||||
// Channel args traits.
|
|
||||||
static absl::string_view ChannelArgName() { |
|
||||||
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_locality_name"; |
|
||||||
} |
|
||||||
static int ChannelArgsCompare(const XdsLocalityName* a, |
|
||||||
const XdsLocalityName* b) { |
|
||||||
return a->Compare(*b); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
std::string region_; |
|
||||||
std::string zone_; |
|
||||||
std::string sub_zone_; |
|
||||||
RefCountedStringValue human_readable_string_; |
|
||||||
}; |
|
||||||
|
|
||||||
// Drop stats for an xds cluster.
|
|
||||||
class XdsClusterDropStats final : public RefCounted<XdsClusterDropStats> { |
|
||||||
public: |
|
||||||
// The total number of requests dropped for any reason is the sum of
|
|
||||||
// uncategorized_drops, and dropped_requests map.
|
|
||||||
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>; |
|
||||||
struct Snapshot { |
|
||||||
uint64_t uncategorized_drops = 0; |
|
||||||
// The number of requests dropped for the specific drop categories
|
|
||||||
// outlined in the drop_overloads field in the EDS response.
|
|
||||||
CategorizedDropsMap categorized_drops; |
|
||||||
|
|
||||||
Snapshot& operator+=(const Snapshot& other) { |
|
||||||
uncategorized_drops += other.uncategorized_drops; |
|
||||||
for (const auto& p : other.categorized_drops) { |
|
||||||
categorized_drops[p.first] += p.second; |
|
||||||
} |
|
||||||
return *this; |
|
||||||
} |
|
||||||
|
|
||||||
bool IsZero() const { |
|
||||||
if (uncategorized_drops != 0) return false; |
|
||||||
for (const auto& p : categorized_drops) { |
|
||||||
if (p.second != 0) return false; |
|
||||||
} |
|
||||||
return true; |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, |
|
||||||
absl::string_view lrs_server, |
|
||||||
absl::string_view cluster_name, |
|
||||||
absl::string_view eds_service_name); |
|
||||||
~XdsClusterDropStats() override; |
|
||||||
|
|
||||||
// Returns a snapshot of this instance and resets all the counters.
|
|
||||||
Snapshot GetSnapshotAndReset(); |
|
||||||
|
|
||||||
void AddUncategorizedDrops(); |
|
||||||
void AddCallDropped(const std::string& category); |
|
||||||
|
|
||||||
private: |
|
||||||
RefCountedPtr<XdsClient> xds_client_; |
|
||||||
absl::string_view lrs_server_; |
|
||||||
absl::string_view cluster_name_; |
|
||||||
absl::string_view eds_service_name_; |
|
||||||
std::atomic<uint64_t> uncategorized_drops_{0}; |
|
||||||
// Protects categorized_drops_. A mutex is necessary because the length of
|
|
||||||
// dropped_requests can be accessed by both the picker (from data plane
|
|
||||||
// mutex) and the load reporting thread (from the control plane combiner).
|
|
||||||
Mutex mu_; |
|
||||||
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); |
|
||||||
}; |
|
||||||
|
|
||||||
// Locality stats for an xds cluster.
|
|
||||||
class XdsClusterLocalityStats final |
|
||||||
: public RefCounted<XdsClusterLocalityStats> { |
|
||||||
public: |
|
||||||
struct BackendMetric { |
|
||||||
uint64_t num_requests_finished_with_metric = 0; |
|
||||||
double total_metric_value = 0; |
|
||||||
|
|
||||||
BackendMetric& operator+=(const BackendMetric& other) { |
|
||||||
num_requests_finished_with_metric += |
|
||||||
other.num_requests_finished_with_metric; |
|
||||||
total_metric_value += other.total_metric_value; |
|
||||||
return *this; |
|
||||||
} |
|
||||||
|
|
||||||
bool IsZero() const { |
|
||||||
return num_requests_finished_with_metric == 0 && total_metric_value == 0; |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
struct Snapshot { |
|
||||||
uint64_t total_successful_requests = 0; |
|
||||||
uint64_t total_requests_in_progress = 0; |
|
||||||
uint64_t total_error_requests = 0; |
|
||||||
uint64_t total_issued_requests = 0; |
|
||||||
std::map<std::string, BackendMetric> backend_metrics; |
|
||||||
|
|
||||||
Snapshot& operator+=(const Snapshot& other) { |
|
||||||
total_successful_requests += other.total_successful_requests; |
|
||||||
total_requests_in_progress += other.total_requests_in_progress; |
|
||||||
total_error_requests += other.total_error_requests; |
|
||||||
total_issued_requests += other.total_issued_requests; |
|
||||||
for (const auto& p : other.backend_metrics) { |
|
||||||
backend_metrics[p.first] += p.second; |
|
||||||
} |
|
||||||
return *this; |
|
||||||
} |
|
||||||
|
|
||||||
bool IsZero() const { |
|
||||||
if (total_successful_requests != 0 || total_requests_in_progress != 0 || |
|
||||||
total_error_requests != 0 || total_issued_requests != 0) { |
|
||||||
return false; |
|
||||||
} |
|
||||||
for (const auto& p : backend_metrics) { |
|
||||||
if (!p.second.IsZero()) return false; |
|
||||||
} |
|
||||||
return true; |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client, |
|
||||||
absl::string_view lrs_server, |
|
||||||
absl::string_view cluster_name, |
|
||||||
absl::string_view eds_service_name, |
|
||||||
RefCountedPtr<XdsLocalityName> name); |
|
||||||
~XdsClusterLocalityStats() override; |
|
||||||
|
|
||||||
// Returns a snapshot of this instance and resets all the counters.
|
|
||||||
Snapshot GetSnapshotAndReset(); |
|
||||||
|
|
||||||
void AddCallStarted(); |
|
||||||
void AddCallFinished(const std::map<absl::string_view, double>* named_metrics, |
|
||||||
bool fail = false); |
|
||||||
|
|
||||||
XdsLocalityName* locality_name() const { return name_.get(); } |
|
||||||
|
|
||||||
private: |
|
||||||
struct Stats { |
|
||||||
std::atomic<uint64_t> total_successful_requests{0}; |
|
||||||
std::atomic<uint64_t> total_requests_in_progress{0}; |
|
||||||
std::atomic<uint64_t> total_error_requests{0}; |
|
||||||
std::atomic<uint64_t> total_issued_requests{0}; |
|
||||||
|
|
||||||
// Protects backend_metrics. A mutex is necessary because the length of
|
|
||||||
// backend_metrics_ can be accessed by both the callback intercepting the
|
|
||||||
// call's recv_trailing_metadata and the load reporting thread.
|
|
||||||
Mutex backend_metrics_mu; |
|
||||||
std::map<std::string, BackendMetric> backend_metrics |
|
||||||
ABSL_GUARDED_BY(backend_metrics_mu); |
|
||||||
}; |
|
||||||
|
|
||||||
RefCountedPtr<XdsClient> xds_client_; |
|
||||||
absl::string_view lrs_server_; |
|
||||||
absl::string_view cluster_name_; |
|
||||||
absl::string_view eds_service_name_; |
|
||||||
RefCountedPtr<XdsLocalityName> name_; |
|
||||||
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H
|
|
@ -0,0 +1,103 @@ |
|||||||
|
//
|
||||||
|
//
|
||||||
|
// Copyright 2018 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H |
||||||
|
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H |
||||||
|
|
||||||
|
#include <string> |
||||||
|
#include <utility> |
||||||
|
|
||||||
|
#include "absl/strings/str_format.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
|
||||||
|
#include "src/core/resolver/endpoint_addresses.h" |
||||||
|
#include "src/core/util/ref_counted.h" |
||||||
|
#include "src/core/util/ref_counted_ptr.h" |
||||||
|
#include "src/core/util/ref_counted_string.h" |
||||||
|
#include "src/core/util/useful.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// An xDS locality name.
|
||||||
|
class XdsLocalityName final : public RefCounted<XdsLocalityName> { |
||||||
|
public: |
||||||
|
struct Less { |
||||||
|
bool operator()(const XdsLocalityName* lhs, |
||||||
|
const XdsLocalityName* rhs) const { |
||||||
|
if (lhs == nullptr || rhs == nullptr) return QsortCompare(lhs, rhs); |
||||||
|
return lhs->Compare(*rhs) < 0; |
||||||
|
} |
||||||
|
|
||||||
|
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs, |
||||||
|
const RefCountedPtr<XdsLocalityName>& rhs) const { |
||||||
|
return (*this)(lhs.get(), rhs.get()); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
XdsLocalityName(std::string region, std::string zone, std::string sub_zone) |
||||||
|
: region_(std::move(region)), |
||||||
|
zone_(std::move(zone)), |
||||||
|
sub_zone_(std::move(sub_zone)), |
||||||
|
human_readable_string_( |
||||||
|
absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", |
||||||
|
region_, zone_, sub_zone_)) {} |
||||||
|
|
||||||
|
bool operator==(const XdsLocalityName& other) const { |
||||||
|
return region_ == other.region_ && zone_ == other.zone_ && |
||||||
|
sub_zone_ == other.sub_zone_; |
||||||
|
} |
||||||
|
|
||||||
|
bool operator!=(const XdsLocalityName& other) const { |
||||||
|
return !(*this == other); |
||||||
|
} |
||||||
|
|
||||||
|
int Compare(const XdsLocalityName& other) const { |
||||||
|
int cmp_result = region_.compare(other.region_); |
||||||
|
if (cmp_result != 0) return cmp_result; |
||||||
|
cmp_result = zone_.compare(other.zone_); |
||||||
|
if (cmp_result != 0) return cmp_result; |
||||||
|
return sub_zone_.compare(other.sub_zone_); |
||||||
|
} |
||||||
|
|
||||||
|
const std::string& region() const { return region_; } |
||||||
|
const std::string& zone() const { return zone_; } |
||||||
|
const std::string& sub_zone() const { return sub_zone_; } |
||||||
|
|
||||||
|
const RefCountedStringValue& human_readable_string() const { |
||||||
|
return human_readable_string_; |
||||||
|
} |
||||||
|
|
||||||
|
// Channel args traits.
|
||||||
|
static absl::string_view ChannelArgName() { |
||||||
|
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_locality_name"; |
||||||
|
} |
||||||
|
static int ChannelArgsCompare(const XdsLocalityName* a, |
||||||
|
const XdsLocalityName* b) { |
||||||
|
return a->Compare(*b); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
std::string region_; |
||||||
|
std::string zone_; |
||||||
|
std::string sub_zone_; |
||||||
|
RefCountedStringValue human_readable_string_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H
|
Loading…
Reference in new issue