mirror of https://github.com/grpc/grpc.git
commit
1af67d4da5
42 changed files with 2240 additions and 1642 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,374 @@ |
||||
//
|
||||
// Copyright 2019 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H |
||||
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H |
||||
|
||||
#include <atomic> |
||||
#include <map> |
||||
#include <memory> |
||||
#include <set> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "upb/reflection/def.hpp" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/dual_ref_counted.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/per_cpu.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/gprpp/work_serializer.h" |
||||
#include "src/core/lib/uri/uri_parser.h" |
||||
#include "src/core/xds/xds_client/xds_api.h" |
||||
#include "src/core/xds/xds_client/xds_backend_metric_propagation.h" |
||||
#include "src/core/xds/xds_client/xds_bootstrap.h" |
||||
#include "src/core/xds/xds_client/xds_locality.h" |
||||
#include "src/core/xds/xds_client/xds_metrics.h" |
||||
#include "src/core/xds/xds_client/xds_resource_type.h" |
||||
#include "src/core/xds/xds_client/xds_transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
bool XdsOrcaLrsPropagationChangesEnabled(); |
||||
|
||||
class LrsClient : public DualRefCounted<LrsClient> { |
||||
public: |
||||
// Drop stats for an xds cluster.
|
||||
class ClusterDropStats final : public RefCounted<ClusterDropStats> { |
||||
public: |
||||
// The total number of requests dropped for any reason is the sum of
|
||||
// uncategorized_drops, and dropped_requests map.
|
||||
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>; |
||||
struct Snapshot { |
||||
uint64_t uncategorized_drops = 0; |
||||
// The number of requests dropped for the specific drop categories
|
||||
// outlined in the drop_overloads field in the EDS response.
|
||||
CategorizedDropsMap categorized_drops; |
||||
|
||||
Snapshot& operator+=(const Snapshot& other) { |
||||
uncategorized_drops += other.uncategorized_drops; |
||||
for (const auto& p : other.categorized_drops) { |
||||
categorized_drops[p.first] += p.second; |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
if (uncategorized_drops != 0) return false; |
||||
for (const auto& p : categorized_drops) { |
||||
if (p.second != 0) return false; |
||||
} |
||||
return true; |
||||
} |
||||
}; |
||||
|
||||
ClusterDropStats(RefCountedPtr<LrsClient> lrs_client, |
||||
absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name); |
||||
~ClusterDropStats() override; |
||||
|
||||
// Returns a snapshot of this instance and resets all the counters.
|
||||
Snapshot GetSnapshotAndReset(); |
||||
|
||||
void AddUncategorizedDrops(); |
||||
void AddCallDropped(const std::string& category); |
||||
|
||||
private: |
||||
RefCountedPtr<LrsClient> lrs_client_; |
||||
absl::string_view lrs_server_; |
||||
absl::string_view cluster_name_; |
||||
absl::string_view eds_service_name_; |
||||
std::atomic<uint64_t> uncategorized_drops_{0}; |
||||
// Protects categorized_drops_. A mutex is necessary because the length of
|
||||
// dropped_requests can be accessed by both the picker (from data plane
|
||||
// mutex) and the load reporting thread (from the control plane combiner).
|
||||
Mutex mu_; |
||||
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// Locality stats for an xds cluster.
|
||||
class ClusterLocalityStats final : public RefCounted<ClusterLocalityStats> { |
||||
public: |
||||
struct BackendMetric { |
||||
uint64_t num_requests_finished_with_metric = 0; |
||||
double total_metric_value = 0; |
||||
|
||||
BackendMetric(BackendMetric&& other) |
||||
: num_requests_finished_with_metric( |
||||
std::exchange(other.num_requests_finished_with_metric, 0)), |
||||
total_metric_value(std::exchange(other.total_metric_value, 0)) {} |
||||
|
||||
BackendMetric& operator+=(const BackendMetric& other) { |
||||
num_requests_finished_with_metric += |
||||
other.num_requests_finished_with_metric; |
||||
total_metric_value += other.total_metric_value; |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
return num_requests_finished_with_metric == 0 && |
||||
total_metric_value == 0; |
||||
} |
||||
}; |
||||
|
||||
struct Snapshot { |
||||
uint64_t total_successful_requests = 0; |
||||
uint64_t total_requests_in_progress = 0; |
||||
uint64_t total_error_requests = 0; |
||||
uint64_t total_issued_requests = 0; |
||||
BackendMetric cpu_utilization; |
||||
BackendMetric mem_utilization; |
||||
BackendMetric application_utilization; |
||||
std::map<std::string, BackendMetric> backend_metrics; |
||||
|
||||
Snapshot& operator+=(const Snapshot& other) { |
||||
total_successful_requests += other.total_successful_requests; |
||||
total_requests_in_progress += other.total_requests_in_progress; |
||||
total_error_requests += other.total_error_requests; |
||||
total_issued_requests += other.total_issued_requests; |
||||
cpu_utilization += other.cpu_utilization; |
||||
mem_utilization += other.mem_utilization; |
||||
application_utilization += other.application_utilization; |
||||
for (const auto& p : other.backend_metrics) { |
||||
backend_metrics[p.first] += p.second; |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
if (total_successful_requests != 0 || total_requests_in_progress != 0 || |
||||
total_error_requests != 0 || total_issued_requests != 0 || |
||||
!cpu_utilization.IsZero() || !mem_utilization.IsZero() || |
||||
!application_utilization.IsZero()) { |
||||
return false; |
||||
} |
||||
for (const auto& p : backend_metrics) { |
||||
if (!p.second.IsZero()) return false; |
||||
} |
||||
return true; |
||||
} |
||||
}; |
||||
|
||||
ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client, |
||||
absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name, |
||||
RefCountedPtr<XdsLocalityName> name); |
||||
~ClusterLocalityStats() override; |
||||
|
||||
// Returns a snapshot of this instance and resets all the counters.
|
||||
Snapshot GetSnapshotAndReset(); |
||||
|
||||
void AddCallStarted(); |
||||
void AddCallFinished(const BackendMetricPropagation& propagation, |
||||
const BackendMetricData* backend_metrics, |
||||
bool fail = false); |
||||
|
||||
XdsLocalityName* locality_name() const { return name_.get(); } |
||||
|
||||
private: |
||||
struct Stats { |
||||
std::atomic<uint64_t> total_successful_requests{0}; |
||||
std::atomic<uint64_t> total_requests_in_progress{0}; |
||||
std::atomic<uint64_t> total_error_requests{0}; |
||||
std::atomic<uint64_t> total_issued_requests{0}; |
||||
|
||||
Mutex backend_metrics_mu; |
||||
BackendMetric cpu_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
BackendMetric mem_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
BackendMetric application_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
std::map<std::string, BackendMetric> backend_metrics |
||||
ABSL_GUARDED_BY(backend_metrics_mu); |
||||
}; |
||||
|
||||
RefCountedPtr<LrsClient> lrs_client_; |
||||
absl::string_view lrs_server_; |
||||
absl::string_view cluster_name_; |
||||
absl::string_view eds_service_name_; |
||||
RefCountedPtr<XdsLocalityName> name_; |
||||
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; |
||||
}; |
||||
|
||||
LrsClient( |
||||
std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name, |
||||
std::string user_agent_version, |
||||
RefCountedPtr<XdsTransportFactory> transport_factory, |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine); |
||||
~LrsClient() override; |
||||
|
||||
// Adds drop stats for cluster_name and eds_service_name.
|
||||
RefCountedPtr<ClusterDropStats> AddClusterDropStats( |
||||
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, |
||||
absl::string_view cluster_name, absl::string_view eds_service_name); |
||||
|
||||
// Adds locality stats for cluster_name and eds_service_name for the
|
||||
// specified locality.
|
||||
RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats( |
||||
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, |
||||
absl::string_view cluster_name, absl::string_view eds_service_name, |
||||
RefCountedPtr<XdsLocalityName> locality); |
||||
|
||||
// Resets connection backoff state.
|
||||
void ResetBackoff(); |
||||
|
||||
XdsTransportFactory* transport_factory() const { |
||||
return transport_factory_.get(); |
||||
} |
||||
|
||||
grpc_event_engine::experimental::EventEngine* engine() { |
||||
return engine_.get(); |
||||
} |
||||
|
||||
private: |
||||
// Contains a channel to the LRS server and all the data related to the
|
||||
// channel.
|
||||
class LrsChannel final : public DualRefCounted<LrsChannel> { |
||||
public: |
||||
template <typename T> |
||||
class RetryableCall; |
||||
|
||||
class LrsCall; |
||||
|
||||
LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client, |
||||
std::shared_ptr<const XdsBootstrap::XdsServer> server); |
||||
~LrsChannel() override; |
||||
|
||||
LrsClient* lrs_client() const { return lrs_client_.get(); } |
||||
|
||||
void ResetBackoff(); |
||||
|
||||
void MaybeStartLrsCall(); |
||||
|
||||
absl::string_view server_uri() const { return server_->server_uri(); } |
||||
|
||||
private: |
||||
void Orphaned() override; |
||||
|
||||
void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_); |
||||
|
||||
// The owning LrsClient.
|
||||
WeakRefCountedPtr<LrsClient> lrs_client_; |
||||
|
||||
std::shared_ptr<const XdsBootstrap::XdsServer> server_; |
||||
|
||||
RefCountedPtr<XdsTransportFactory::XdsTransport> transport_; |
||||
|
||||
// The retryable LRS call.
|
||||
OrphanablePtr<RetryableCall<LrsCall>> lrs_call_; |
||||
}; |
||||
|
||||
struct LoadReportState { |
||||
struct LocalityState { |
||||
ClusterLocalityStats* locality_stats = nullptr; |
||||
ClusterLocalityStats::Snapshot deleted_locality_stats; |
||||
}; |
||||
|
||||
ClusterDropStats* drop_stats = nullptr; |
||||
ClusterDropStats::Snapshot deleted_drop_stats; |
||||
std::map<RefCountedPtr<XdsLocalityName>, LocalityState, |
||||
XdsLocalityName::Less> |
||||
locality_stats; |
||||
Timestamp last_report_time = Timestamp::Now(); |
||||
}; |
||||
|
||||
// Load report data.
|
||||
using LoadReportMap = std::map< |
||||
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, |
||||
LoadReportState>; |
||||
|
||||
struct LoadReportServer { |
||||
RefCountedPtr<LrsChannel> lrs_channel; |
||||
LoadReportMap load_report_map; |
||||
}; |
||||
|
||||
struct ClusterLoadReport { |
||||
ClusterDropStats::Snapshot dropped_requests; |
||||
std::map<RefCountedPtr<XdsLocalityName>, ClusterLocalityStats::Snapshot, |
||||
XdsLocalityName::Less> |
||||
locality_stats; |
||||
Duration load_report_interval; |
||||
}; |
||||
using ClusterLoadReportMap = std::map< |
||||
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, |
||||
ClusterLoadReport>; |
||||
|
||||
void Orphaned() override; |
||||
|
||||
ClusterLoadReportMap BuildLoadReportSnapshotLocked( |
||||
const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters, |
||||
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
|
||||
RefCountedPtr<LrsChannel> GetOrCreateLrsChannelLocked( |
||||
std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
|
||||
static bool LoadReportCountersAreZero(const ClusterLoadReportMap& snapshot); |
||||
|
||||
void RemoveClusterDropStats(absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name, |
||||
ClusterDropStats* cluster_drop_stats); |
||||
|
||||
void RemoveClusterLocalityStats( |
||||
absl::string_view lrs_server, absl::string_view cluster_name, |
||||
absl::string_view eds_service_name, |
||||
const RefCountedPtr<XdsLocalityName>& locality, |
||||
ClusterLocalityStats* cluster_locality_stats); |
||||
|
||||
// Creates an initial LRS request.
|
||||
std::string CreateLrsInitialRequest() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||
|
||||
// Creates an LRS request sending a client-side load report.
|
||||
std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||
|
||||
// Parses the LRS response and populates send_all_clusters,
|
||||
// cluster_names, and load_reporting_interval.
|
||||
absl::Status ParseLrsResponse(absl::string_view encoded_response, |
||||
bool* send_all_clusters, |
||||
std::set<std::string>* cluster_names, |
||||
Duration* load_reporting_interval) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||
|
||||
std::shared_ptr<XdsBootstrap> bootstrap_; |
||||
const std::string user_agent_name_; |
||||
const std::string user_agent_version_; |
||||
RefCountedPtr<XdsTransportFactory> transport_factory_; |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; |
||||
|
||||
Mutex mu_; |
||||
upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_); |
||||
// Map of existing LRS channels.
|
||||
std::map<std::string /*XdsServer key*/, LrsChannel*> lrs_channel_map_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>> |
||||
load_report_map_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
|
@ -0,0 +1,41 @@ |
||||
//
|
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H |
||||
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H |
||||
|
||||
#include <string> |
||||
|
||||
#include "absl/container/flat_hash_set.h" |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
struct BackendMetricPropagation |
||||
: public RefCountedPtr<BackendMetricPropagation> { |
||||
static constexpr uint8_t kCpuUtilization = 1; |
||||
static constexpr uint8_t kMemUtilization = 2; |
||||
static constexpr uint8_t kApplicationUtilization = 4; |
||||
static constexpr uint8_t kNamedMetricsAll = 8; |
||||
|
||||
uint8_t propagation_bits = 0; |
||||
absl::flat_hash_set<std::string> named_metric_keys; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H
|
@ -1,206 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include "src/core/xds/xds_client/xds_client_stats.h" |
||||
|
||||
#include "absl/log/log.h" |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/xds/xds_client/xds_client.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) { |
||||
return from->exchange(0, std::memory_order_relaxed); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
//
|
||||
// XdsClusterDropStats
|
||||
//
|
||||
|
||||
XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, |
||||
absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name) |
||||
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) |
||||
? "XdsClusterDropStats" |
||||
: nullptr), |
||||
xds_client_(std::move(xds_client)), |
||||
lrs_server_(lrs_server), |
||||
cluster_name_(cluster_name), |
||||
eds_service_name_(eds_service_name) { |
||||
GRPC_TRACE_LOG(xds_client, INFO) |
||||
<< "[xds_client " << xds_client_.get() << "] created drop stats " << this |
||||
<< " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
||||
<< eds_service_name_ << "}"; |
||||
} |
||||
|
||||
XdsClusterDropStats::~XdsClusterDropStats() { |
||||
GRPC_TRACE_LOG(xds_client, INFO) |
||||
<< "[xds_client " << xds_client_.get() << "] destroying drop stats " |
||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
||||
<< eds_service_name_ << "}"; |
||||
xds_client_->RemoveClusterDropStats(lrs_server_, cluster_name_, |
||||
eds_service_name_, this); |
||||
xds_client_.reset(DEBUG_LOCATION, "DropStats"); |
||||
} |
||||
|
||||
XdsClusterDropStats::Snapshot XdsClusterDropStats::GetSnapshotAndReset() { |
||||
Snapshot snapshot; |
||||
snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_); |
||||
MutexLock lock(&mu_); |
||||
snapshot.categorized_drops = std::move(categorized_drops_); |
||||
return snapshot; |
||||
} |
||||
|
||||
void XdsClusterDropStats::AddUncategorizedDrops() { |
||||
uncategorized_drops_.fetch_add(1); |
||||
} |
||||
|
||||
void XdsClusterDropStats::AddCallDropped(const std::string& category) { |
||||
MutexLock lock(&mu_); |
||||
++categorized_drops_[category]; |
||||
} |
||||
|
||||
//
|
||||
// XdsClusterLocalityStats
|
||||
//
|
||||
|
||||
// TODO(roth): Remove this once the feature passes interop tests.
|
||||
bool XdsOrcaLrsPropagationChangesEnabled() { |
||||
auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION"); |
||||
if (!value.has_value()) return false; |
||||
bool parsed_value; |
||||
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value); |
||||
return parse_succeeded && parsed_value; |
||||
} |
||||
|
||||
XdsClusterLocalityStats::XdsClusterLocalityStats( |
||||
RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server, |
||||
absl::string_view cluster_name, absl::string_view eds_service_name, |
||||
RefCountedPtr<XdsLocalityName> name) |
||||
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) |
||||
? "XdsClusterLocalityStats" |
||||
: nullptr), |
||||
xds_client_(std::move(xds_client)), |
||||
lrs_server_(lrs_server), |
||||
cluster_name_(cluster_name), |
||||
eds_service_name_(eds_service_name), |
||||
name_(std::move(name)) { |
||||
GRPC_TRACE_LOG(xds_client, INFO) |
||||
<< "[xds_client " << xds_client_.get() << "] created locality stats " |
||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
||||
<< eds_service_name_ << ", " |
||||
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str()) |
||||
<< "}"; |
||||
} |
||||
|
||||
XdsClusterLocalityStats::~XdsClusterLocalityStats() { |
||||
GRPC_TRACE_LOG(xds_client, INFO) |
||||
<< "[xds_client " << xds_client_.get() << "] destroying locality stats " |
||||
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", " |
||||
<< eds_service_name_ << ", " |
||||
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str()) |
||||
<< "}"; |
||||
xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_, |
||||
eds_service_name_, name_, this); |
||||
xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); |
||||
} |
||||
|
||||
XdsClusterLocalityStats::Snapshot |
||||
XdsClusterLocalityStats::GetSnapshotAndReset() { |
||||
Snapshot snapshot; |
||||
for (auto& percpu_stats : stats_) { |
||||
Snapshot percpu_snapshot = { |
||||
GetAndResetCounter(&percpu_stats.total_successful_requests), |
||||
// Don't reset total_requests_in_progress because it's
|
||||
// not related to a single reporting interval.
|
||||
percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed), |
||||
GetAndResetCounter(&percpu_stats.total_error_requests), |
||||
GetAndResetCounter(&percpu_stats.total_issued_requests), |
||||
{}, {}, {}, {}}; |
||||
{ |
||||
MutexLock lock(&percpu_stats.backend_metrics_mu); |
||||
percpu_snapshot.cpu_utilization = std::move(percpu_stats.cpu_utilization); |
||||
permem_snapshot.mem_utilization = std::move(permem_stats.mem_utilization); |
||||
perapplication_snapshot.application_utilization = |
||||
std::move(perapplication_stats.application_utilization); |
||||
percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics); |
||||
} |
||||
snapshot += percpu_snapshot; |
||||
} |
||||
return snapshot; |
||||
} |
||||
|
||||
void XdsClusterLocalityStats::AddCallStarted() { |
||||
Stats& stats = stats_.this_cpu(); |
||||
stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed); |
||||
stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed); |
||||
} |
||||
|
||||
void XdsClusterLocalityStats::AddCallFinished( |
||||
const BackendMetricPropagation& propagation, |
||||
const BackendMetricData* backend_metrics, bool fail) { |
||||
Stats& stats = stats_.this_cpu(); |
||||
std::atomic<uint64_t>& to_increment = |
||||
fail ? stats.total_error_requests : stats.total_successful_requests; |
||||
to_increment.fetch_add(1, std::memory_order_relaxed); |
||||
stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel); |
||||
if (backend_metrics == nullptr) return; |
||||
MutexLock lock(&stats.backend_metrics_mu); |
||||
if (!XdsOrcaLrsPropagationChangesEnabled()) { |
||||
for (const auto& m : backend_metrics->named_metrics) { |
||||
stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second}; |
||||
} |
||||
return; |
||||
} |
||||
if (propagation.propagation_bits & |
||||
BackendMetricPropagation::kCpuUtilization) { |
||||
stats.cpu_utilization += BackendMetric{1, backend_metrics->cpu_utilization}; |
||||
} |
||||
if (propagation.propagation_bits & |
||||
BackendMetricPropagation::kMemUtilization) { |
||||
stats.mem_utilization += BackendMetric{1, backend_metrics->mem_utilization}; |
||||
} |
||||
if (propagation.propagation_bits & |
||||
BackendMetricPropagation::kApplicationUtilization) { |
||||
stats.application_utilization += |
||||
BackendMetric{1, backend_metrics->application_utilization}; |
||||
} |
||||
if (propagation.propagation_bits & |
||||
BackendMetricPropagation::kNamedMetricsAll || |
||||
!propagation.named_metric_keys.empty()) { |
||||
for (const auto& m : backend_metrics->named_metrics) { |
||||
if (propagation.propagation_bits &
|
||||
BackendMetricPropagation::kNamedMetricsAll || |
||||
propagation.named_metric_keys.contains(m.first)) { |
||||
stats.backend_metrics[absl::StrCat("named_metrics.", m.first)] += |
||||
BackendMetric{1, m.second}; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -1,285 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H |
||||
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H |
||||
|
||||
#include <atomic> |
||||
#include <cstdint> |
||||
#include <map> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/gprpp/per_cpu.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/resolver/endpoint_addresses.h" |
||||
#include "src/core/telemetry/call_tracer.h" |
||||
#include "src/core/util/useful.h" |
||||
#include "src/core/xds/xds_client/xds_bootstrap.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Forward declaration to avoid circular dependency.
|
||||
class XdsClient; |
||||
|
||||
// Locality name.
|
||||
class XdsLocalityName final : public RefCounted<XdsLocalityName> { |
||||
public: |
||||
struct Less { |
||||
bool operator()(const XdsLocalityName* lhs, |
||||
const XdsLocalityName* rhs) const { |
||||
if (lhs == nullptr || rhs == nullptr) return QsortCompare(lhs, rhs); |
||||
return lhs->Compare(*rhs) < 0; |
||||
} |
||||
|
||||
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs, |
||||
const RefCountedPtr<XdsLocalityName>& rhs) const { |
||||
return (*this)(lhs.get(), rhs.get()); |
||||
} |
||||
}; |
||||
|
||||
XdsLocalityName(std::string region, std::string zone, std::string sub_zone) |
||||
: region_(std::move(region)), |
||||
zone_(std::move(zone)), |
||||
sub_zone_(std::move(sub_zone)), |
||||
human_readable_string_( |
||||
absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", |
||||
region_, zone_, sub_zone_)) {} |
||||
|
||||
bool operator==(const XdsLocalityName& other) const { |
||||
return region_ == other.region_ && zone_ == other.zone_ && |
||||
sub_zone_ == other.sub_zone_; |
||||
} |
||||
|
||||
bool operator!=(const XdsLocalityName& other) const { |
||||
return !(*this == other); |
||||
} |
||||
|
||||
int Compare(const XdsLocalityName& other) const { |
||||
int cmp_result = region_.compare(other.region_); |
||||
if (cmp_result != 0) return cmp_result; |
||||
cmp_result = zone_.compare(other.zone_); |
||||
if (cmp_result != 0) return cmp_result; |
||||
return sub_zone_.compare(other.sub_zone_); |
||||
} |
||||
|
||||
const std::string& region() const { return region_; } |
||||
const std::string& zone() const { return zone_; } |
||||
const std::string& sub_zone() const { return sub_zone_; } |
||||
|
||||
const RefCountedStringValue& human_readable_string() const { |
||||
return human_readable_string_; |
||||
} |
||||
|
||||
// Channel args traits.
|
||||
static absl::string_view ChannelArgName() { |
||||
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_locality_name"; |
||||
} |
||||
static int ChannelArgsCompare(const XdsLocalityName* a, |
||||
const XdsLocalityName* b) { |
||||
return a->Compare(*b); |
||||
} |
||||
|
||||
private: |
||||
std::string region_; |
||||
std::string zone_; |
||||
std::string sub_zone_; |
||||
RefCountedStringValue human_readable_string_; |
||||
}; |
||||
|
||||
// Drop stats for an xds cluster.
|
||||
class XdsClusterDropStats final : public RefCounted<XdsClusterDropStats> { |
||||
public: |
||||
// The total number of requests dropped for any reason is the sum of
|
||||
// uncategorized_drops, and dropped_requests map.
|
||||
using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>; |
||||
struct Snapshot { |
||||
uint64_t uncategorized_drops = 0; |
||||
// The number of requests dropped for the specific drop categories
|
||||
// outlined in the drop_overloads field in the EDS response.
|
||||
CategorizedDropsMap categorized_drops; |
||||
|
||||
Snapshot& operator+=(const Snapshot& other) { |
||||
uncategorized_drops += other.uncategorized_drops; |
||||
for (const auto& p : other.categorized_drops) { |
||||
categorized_drops[p.first] += p.second; |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
if (uncategorized_drops != 0) return false; |
||||
for (const auto& p : categorized_drops) { |
||||
if (p.second != 0) return false; |
||||
} |
||||
return true; |
||||
} |
||||
}; |
||||
|
||||
XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client, |
||||
absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name); |
||||
~XdsClusterDropStats() override; |
||||
|
||||
// Returns a snapshot of this instance and resets all the counters.
|
||||
Snapshot GetSnapshotAndReset(); |
||||
|
||||
void AddUncategorizedDrops(); |
||||
void AddCallDropped(const std::string& category); |
||||
|
||||
private: |
||||
RefCountedPtr<XdsClient> xds_client_; |
||||
absl::string_view lrs_server_; |
||||
absl::string_view cluster_name_; |
||||
absl::string_view eds_service_name_; |
||||
std::atomic<uint64_t> uncategorized_drops_{0}; |
||||
// Protects categorized_drops_. A mutex is necessary because the length of
|
||||
// dropped_requests can be accessed by both the picker (from data plane
|
||||
// mutex) and the load reporting thread (from the control plane combiner).
|
||||
Mutex mu_; |
||||
CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
bool XdsOrcaLrsPropagationChangesEnabled(); |
||||
|
||||
// Locality stats for an xds cluster.
|
||||
class XdsClusterLocalityStats final |
||||
: public RefCounted<XdsClusterLocalityStats> { |
||||
public: |
||||
struct BackendMetric { |
||||
uint64_t num_requests_finished_with_metric = 0; |
||||
double total_metric_value = 0; |
||||
|
||||
BackendMetric(BackendMetric&& other) |
||||
: num_requests_finished_with_metric( |
||||
std::exchange(other.num_requests_finished_with_metric, 0)), |
||||
total_metric_value(std::exchange(other.total_metric_value, 0)) {} |
||||
|
||||
BackendMetric& operator+=(const BackendMetric& other) { |
||||
num_requests_finished_with_metric += |
||||
other.num_requests_finished_with_metric; |
||||
total_metric_value += other.total_metric_value; |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
return num_requests_finished_with_metric == 0 && total_metric_value == 0; |
||||
} |
||||
}; |
||||
|
||||
struct Snapshot { |
||||
uint64_t total_successful_requests = 0; |
||||
uint64_t total_requests_in_progress = 0; |
||||
uint64_t total_error_requests = 0; |
||||
uint64_t total_issued_requests = 0; |
||||
BackendMetric cpu_utilization; |
||||
BackendMetric mem_utilization; |
||||
BackendMetric application_utilization; |
||||
std::map<std::string, BackendMetric> backend_metrics; |
||||
|
||||
Snapshot& operator+=(const Snapshot& other) { |
||||
total_successful_requests += other.total_successful_requests; |
||||
total_requests_in_progress += other.total_requests_in_progress; |
||||
total_error_requests += other.total_error_requests; |
||||
total_issued_requests += other.total_issued_requests; |
||||
cpu_utilization += other.cpu_utilization; |
||||
mem_utilization += other.mem_utilization; |
||||
application_utilization += other.application_utilization; |
||||
for (const auto& p : other.backend_metrics) { |
||||
backend_metrics[p.first] += p.second; |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
bool IsZero() const { |
||||
if (total_successful_requests != 0 || total_requests_in_progress != 0 || |
||||
total_error_requests != 0 || total_issued_requests != 0 || |
||||
!cpu_utilization.IsZero() || !mem_utilization.IsZero() || |
||||
!application_utilization.IsZero()) { |
||||
return false; |
||||
} |
||||
for (const auto& p : backend_metrics) { |
||||
if (!p.second.IsZero()) return false; |
||||
} |
||||
return true; |
||||
} |
||||
}; |
||||
|
||||
struct BackendMetricPropagation |
||||
: public RefCountedPtr<BackendMetricPropagation> { |
||||
static constexpr uint8_t kCpuUtilization = 1; |
||||
static constexpr uint8_t kMemUtilization = 2; |
||||
static constexpr uint8_t kApplicationUtilization = 4; |
||||
static constexpr uint8_t kNamedMetricsAll = 8; |
||||
|
||||
uint8_t propagation_bits = 0; |
||||
absl::flat_hash_set<std::string> named_metric_keys; |
||||
}; |
||||
|
||||
XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client, |
||||
absl::string_view lrs_server, |
||||
absl::string_view cluster_name, |
||||
absl::string_view eds_service_name, |
||||
RefCountedPtr<XdsLocalityName> name); |
||||
~XdsClusterLocalityStats() override; |
||||
|
||||
// Returns a snapshot of this instance and resets all the counters.
|
||||
Snapshot GetSnapshotAndReset(); |
||||
|
||||
void AddCallStarted(); |
||||
void AddCallFinished(const BackendMetricPropagation& propagation, |
||||
const BackendMetricData* backend_metrics, |
||||
bool fail = false); |
||||
|
||||
XdsLocalityName* locality_name() const { return name_.get(); } |
||||
|
||||
private: |
||||
struct Stats { |
||||
std::atomic<uint64_t> total_successful_requests{0}; |
||||
std::atomic<uint64_t> total_requests_in_progress{0}; |
||||
std::atomic<uint64_t> total_error_requests{0}; |
||||
std::atomic<uint64_t> total_issued_requests{0}; |
||||
|
||||
Mutex backend_metrics_mu; |
||||
BackendMetric cpu_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
BackendMetric mem_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
BackendMetric application_utilization ABSL_GUARDED_BY(backend_metrics_mu); |
||||
std::map<std::string, BackendMetric> backend_metrics |
||||
ABSL_GUARDED_BY(backend_metrics_mu); |
||||
}; |
||||
|
||||
RefCountedPtr<XdsClient> xds_client_; |
||||
absl::string_view lrs_server_; |
||||
absl::string_view cluster_name_; |
||||
absl::string_view eds_service_name_; |
||||
RefCountedPtr<XdsLocalityName> name_; |
||||
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_STATS_H
|
@ -0,0 +1,103 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H |
||||
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H |
||||
|
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/ref_counted_string.h" |
||||
#include "src/core/resolver/endpoint_addresses.h" |
||||
#include "src/core/util/useful.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// An xDS locality name.
|
||||
class XdsLocalityName final : public RefCounted<XdsLocalityName> { |
||||
public: |
||||
struct Less { |
||||
bool operator()(const XdsLocalityName* lhs, |
||||
const XdsLocalityName* rhs) const { |
||||
if (lhs == nullptr || rhs == nullptr) return QsortCompare(lhs, rhs); |
||||
return lhs->Compare(*rhs) < 0; |
||||
} |
||||
|
||||
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs, |
||||
const RefCountedPtr<XdsLocalityName>& rhs) const { |
||||
return (*this)(lhs.get(), rhs.get()); |
||||
} |
||||
}; |
||||
|
||||
XdsLocalityName(std::string region, std::string zone, std::string sub_zone) |
||||
: region_(std::move(region)), |
||||
zone_(std::move(zone)), |
||||
sub_zone_(std::move(sub_zone)), |
||||
human_readable_string_( |
||||
absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", |
||||
region_, zone_, sub_zone_)) {} |
||||
|
||||
bool operator==(const XdsLocalityName& other) const { |
||||
return region_ == other.region_ && zone_ == other.zone_ && |
||||
sub_zone_ == other.sub_zone_; |
||||
} |
||||
|
||||
bool operator!=(const XdsLocalityName& other) const { |
||||
return !(*this == other); |
||||
} |
||||
|
||||
int Compare(const XdsLocalityName& other) const { |
||||
int cmp_result = region_.compare(other.region_); |
||||
if (cmp_result != 0) return cmp_result; |
||||
cmp_result = zone_.compare(other.zone_); |
||||
if (cmp_result != 0) return cmp_result; |
||||
return sub_zone_.compare(other.sub_zone_); |
||||
} |
||||
|
||||
const std::string& region() const { return region_; } |
||||
const std::string& zone() const { return zone_; } |
||||
const std::string& sub_zone() const { return sub_zone_; } |
||||
|
||||
const RefCountedStringValue& human_readable_string() const { |
||||
return human_readable_string_; |
||||
} |
||||
|
||||
// Channel args traits.
|
||||
static absl::string_view ChannelArgName() { |
||||
return GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_locality_name"; |
||||
} |
||||
static int ChannelArgsCompare(const XdsLocalityName* a, |
||||
const XdsLocalityName* b) { |
||||
return a->Compare(*b); |
||||
} |
||||
|
||||
private: |
||||
std::string region_; |
||||
std::string zone_; |
||||
std::string sub_zone_; |
||||
RefCountedStringValue human_readable_string_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_LOCALITY_H
|
Loading…
Reference in new issue