Restructure XdsClient load reporting APIs

pull/22011/head
Mark D. Roth 5 years ago
parent 17036c1452
commit 78c648c98a
  1. 232
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 93
      src/core/ext/filters/client_channel/xds/xds_api.cc
  3. 18
      src/core/ext/filters/client_channel/xds/xds_api.h
  4. 169
      src/core/ext/filters/client_channel/xds/xds_client.cc
  5. 40
      src/core/ext/filters/client_channel/xds/xds_client.h
  6. 181
      src/core/ext/filters/client_channel/xds/xds_client_stats.cc
  7. 209
      src/core/ext/filters/client_channel/xds/xds_client_stats.h

@ -123,31 +123,42 @@ class XdsLb : public LoadBalancingPolicy {
private:
class EndpointWatcher;
// We need this wrapper for the following reasons:
// 1. To process per-locality load reporting.
// 2. Since pickers are std::unique_ptrs we use this RefCounted wrapper to
// control
// references to it by the xds picker and the locality.
class EndpointPickerWrapper : public RefCounted<EndpointPickerWrapper> {
// A simple wrapper to convert the picker returned from a locality's child
// policy as a unique_ptr<> to a RefCountedPtr<>. This allows it to be
// referenced by both the picker and the locality.
class RefCountedEndpointPicker : public RefCounted<RefCountedEndpointPicker> {
public:
EndpointPickerWrapper(
std::unique_ptr<SubchannelPicker> picker,
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
explicit RefCountedEndpointPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(std::move(args)); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that wraps the RefCountedEndpointPicker and performs load
// reporting for the locality.
class LoadReportingPicker : public RefCounted<LoadReportingPicker> {
public:
LoadReportingPicker(RefCountedPtr<RefCountedEndpointPicker> picker,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: picker_(std::move(picker)),
locality_stats_(std::move(locality_stats)) {
locality_stats_->RefByPicker();
}
~EndpointPickerWrapper() { locality_stats_->UnrefByPicker(); }
locality_stats_(std::move(locality_stats)) {}
PickResult Pick(PickArgs args);
RefCountedEndpointPicker* picker() const { return picker_.get(); }
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
}
private:
std::unique_ptr<SubchannelPicker> picker_;
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_;
RefCountedPtr<RefCountedEndpointPicker> picker_;
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
// The picker will use a stateless weighting algorithm to pick the locality to
// use for each request.
// A picker that uses a stateless weighting algorithm to pick the locality
// to use for each request.
class LocalityPicker : public SubchannelPicker {
public:
// Maintains a weighted list of pickers from each locality that is in ready
@ -155,14 +166,12 @@ class XdsLb : public LoadBalancingPolicy {
// proportional to the locality's weight. The start of the range is the
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector<std::pair<uint32_t, RefCountedPtr<EndpointPickerWrapper>>,
InlinedVector<std::pair<uint32_t, RefCountedPtr<LoadReportingPicker>>,
1>;
LocalityPicker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
: xds_policy_(std::move(xds_policy)),
pickers_(std::move(pickers)),
drop_config_(xds_policy_->drop_config_) {}
~LocalityPicker() { xds_policy_.reset(DEBUG_LOCATION, "LocalityPicker"); }
LocalityPicker(XdsLb* xds_policy, PickerList pickers)
: drop_stats_(xds_policy->drop_stats_),
drop_config_(xds_policy->drop_config_),
pickers_(std::move(pickers)) {}
PickResult Pick(PickArgs args) override;
@ -170,9 +179,9 @@ class XdsLb : public LoadBalancingPolicy {
// Calls the picker of the locality that the key falls within.
PickResult PickFromLocality(const uint32_t key, PickArgs args);
RefCountedPtr<XdsLb> xds_policy_;
PickerList pickers_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<XdsApi::DropConfig> drop_config_;
PickerList pickers_;
};
class FallbackHelper : public ChannelControlHelper {
@ -209,18 +218,28 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLocalityName> name);
~Locality();
void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist);
void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist,
bool update_locality_stats);
void ShutdownLocked();
void ResetBackoffLocked();
void DeactivateLocked();
void Orphan() override;
uint32_t weight() const { return weight_; }
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
uint32_t weight() const { return weight_; }
RefCountedPtr<EndpointPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
RefCountedPtr<LoadReportingPicker> GetLoadReportingPicker() {
// Recreate load reporting picker if stats object has changed.
if (load_reporting_picker_ == nullptr ||
load_reporting_picker_->picker() != picker_wrapper_.get() ||
load_reporting_picker_->locality_stats() != stats_.get()) {
load_reporting_picker_ =
MakeRefCounted<LoadReportingPicker>(picker_wrapper_, stats_);
}
return load_reporting_picker_;
}
void set_locality_map(RefCountedPtr<LocalityMap> locality_map) {
@ -259,6 +278,8 @@ class XdsLb : public LoadBalancingPolicy {
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args);
void UpdateLocalityStats();
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
@ -268,9 +289,11 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<LocalityMap> locality_map_;
RefCountedPtr<XdsLocalityName> name_;
RefCountedPtr<XdsClusterLocalityStats> stats_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<EndpointPickerWrapper> picker_wrapper_;
RefCountedPtr<RefCountedEndpointPicker> picker_wrapper_;
RefCountedPtr<LoadReportingPicker> load_reporting_picker_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
uint32_t weight_;
@ -286,7 +309,8 @@ class XdsLb : public LoadBalancingPolicy {
~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
void UpdateLocked(
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update);
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
bool update_locality_stats);
void ResetBackoffLocked();
void UpdateXdsPickerLocked();
OrphanablePtr<Locality> ExtractLocalityLocked(
@ -358,7 +382,7 @@ class XdsLb : public LoadBalancingPolicy {
: xds_client_.get();
}
void UpdatePrioritiesLocked();
void UpdatePrioritiesLocked(bool update_locality_stats);
void UpdateXdsPickerLocked();
void MaybeCreateLocalityMapLocked(uint32_t priority);
void FailoverOnConnectionFailureLocked();
@ -436,15 +460,15 @@ class XdsLb : public LoadBalancingPolicy {
// The config for dropping calls.
RefCountedPtr<XdsApi::DropConfig> drop_config_;
// The stats for client-side load reporting.
XdsClientStats client_stats_;
// Drop stats for client-side load reporting.
RefCountedPtr<XdsClusterDropStats> drop_stats_;
};
//
// XdsLb::EndpointPickerWrapper
// XdsLb::LoadReportingPicker
//
LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick(
LoadBalancingPolicy::PickResult XdsLb::LoadReportingPicker::Pick(
LoadBalancingPolicy::PickArgs args) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_->Pick(args);
@ -455,7 +479,7 @@ LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick(
// Record a call started.
locality_stats_->AddCallStarted();
// Intercept the recv_trailing_metadata op to record call completion.
XdsClientStats::LocalityStats* locality_stats =
XdsClusterLocalityStats* locality_stats =
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
@ -477,7 +501,7 @@ XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) {
// Handle drop.
const std::string* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
xds_policy_->client_stats_.AddCallDropped(*drop_category);
if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
@ -622,7 +646,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
}
// Update the priority list.
xds_policy_->priority_list_update_ = std::move(update.priority_list_update);
xds_policy_->UpdatePrioritiesLocked();
xds_policy_->UpdatePrioritiesLocked(false /*update_locality_stats*/);
}
void OnError(grpc_error* error) override {
@ -709,6 +733,7 @@ void XdsLb::ShutdownLocked() {
shutting_down_ = true;
MaybeCancelFallbackAtStartupChecks();
priorities_.clear();
drop_stats_.reset();
if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
interested_parties());
@ -726,14 +751,6 @@ void XdsLb::ShutdownLocked() {
if (xds_client_from_channel_ != nullptr) {
xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()),
endpoint_watcher_);
if (config_->lrs_load_reporting_server_name().has_value()) {
// TODO(roth): We should pass the cluster name (in addition to the
// eds_service_name) when adding the client stats. To do so, we need to
// first find a way to plumb the cluster name down into this LB policy.
xds_client()->RemoveClientStats(
StringView(config_->lrs_load_reporting_server_name().value().c_str()),
StringView(eds_service_name()), &client_stats_);
}
xds_client_from_channel_.reset();
}
xds_client_.reset();
@ -774,8 +791,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
grpc_channel_args_destroy(args_);
args_ = args.args;
args.args = nullptr;
// Update priority list.
UpdatePrioritiesLocked();
// Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new.
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
@ -802,6 +817,31 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
fallback_at_startup_checks_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
// Update drop stats for load reporting if needed.
if (is_initial_update || config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name()) {
drop_stats_.reset();
if (config_->lrs_load_reporting_server_name().has_value()) {
drop_stats_ = xds_client()->AddClusterDropStats(
config_->lrs_load_reporting_server_name().value(),
// TODO(roth): We currently hard-code the assumption that
// cluster name and EDS service name are the same. Fix this
// as part of refectoring this LB policy.
eds_service_name(), eds_service_name());
}
}
// Update priority list.
// Note that this comes after updating drop_stats_, since we want that
// to be used by any new picker we create here.
// No need to do this on the initial update, since there won't be any
// priorities to update yet.
if (!is_initial_update) {
const bool update_locality_stats =
config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name() ||
strcmp(old_eds_service_name, eds_service_name()) != 0;
UpdatePrioritiesLocked(update_locality_stats);
}
// Update endpoint watcher if needed.
if (is_initial_update ||
strcmp(old_eds_service_name, eds_service_name()) != 0) {
@ -815,34 +855,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
xds_client()->WatchEndpointData(StringView(eds_service_name()),
std::move(watcher));
}
// Update load reporting if needed.
// TODO(roth): Ideally, we should not collect any stats if load reporting
// is disabled, which would require changing this code to recreate
// all of the pickers whenever load reporting is enabled or disabled
// here.
if (is_initial_update ||
(config_->lrs_load_reporting_server_name().has_value()) !=
(old_config->lrs_load_reporting_server_name().has_value()) ||
(config_->lrs_load_reporting_server_name().has_value() &&
old_config->lrs_load_reporting_server_name().has_value() &&
config_->lrs_load_reporting_server_name().value() !=
old_config->lrs_load_reporting_server_name().value())) {
if (old_config != nullptr &&
old_config->lrs_load_reporting_server_name().has_value()) {
xds_client()->RemoveClientStats(
StringView(
old_config->lrs_load_reporting_server_name().value().c_str()),
StringView(old_eds_service_name), &client_stats_);
}
if (config_->lrs_load_reporting_server_name().has_value()) {
// TODO(roth): We should pass the cluster name (in addition to the
// eds_service_name) when adding the client stats. To do so, we need to
// first find a way to plumb the cluster name down into this LB policy.
xds_client()->AddClientStats(
StringView(config_->lrs_load_reporting_server_name().value().c_str()),
StringView(eds_service_name()), &client_stats_);
}
}
}
//
@ -1025,7 +1037,7 @@ void XdsLb::MaybeExitFallbackMode() {
// priority list-related methods
//
void XdsLb::UpdatePrioritiesLocked() {
void XdsLb::UpdatePrioritiesLocked(bool update_locality_stats) {
// 1. Remove from the priority list the priorities that are not in the update.
DeactivatePrioritiesLowerThan(priority_list_update_.LowestPriority());
// 2. Update all the existing priorities.
@ -1037,7 +1049,7 @@ void XdsLb::UpdatePrioritiesLocked() {
// Propagate locality_map_update.
// TODO(juanlishen): Find a clean way to skip duplicate update for a
// priority.
locality_map->UpdateLocked(*locality_map_update);
locality_map->UpdateLocked(*locality_map_update, update_locality_stats);
}
// 3. Only create a new locality map if all the existing ones have failed.
if (priorities_.empty() ||
@ -1049,6 +1061,11 @@ void XdsLb::UpdatePrioritiesLocked() {
// to be created.
MaybeCreateLocalityMapLocked(new_priority);
}
// 4. If we updated locality stats and we already have at least one
// priority, update the picker to start using the new stats object(s).
if (update_locality_stats && !priorities_.empty()) {
UpdateXdsPickerLocked();
}
}
void XdsLb::UpdateXdsPickerLocked() {
@ -1072,7 +1089,8 @@ void XdsLb::MaybeCreateLocalityMapLocked(uint32_t priority) {
auto new_locality_map =
new LocalityMap(Ref(DEBUG_LOCATION, "LocalityMap"), priority);
priorities_.emplace_back(OrphanablePtr<LocalityMap>(new_locality_map));
new_locality_map->UpdateLocked(*priority_list_update_.Find(priority));
new_locality_map->UpdateLocked(*priority_list_update_.Find(priority),
false /*update_locality_stats*/);
}
void XdsLb::FailoverOnConnectionFailureLocked() {
@ -1156,7 +1174,8 @@ XdsLb::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
}
void XdsLb::LocalityMap::UpdateLocked(
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update) {
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
bool update_locality_stats) {
if (xds_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Start Updating priority %" PRIu32,
@ -1199,7 +1218,7 @@ void XdsLb::LocalityMap::UpdateLocked(
// Keep a copy of serverlist in the update so that we can compare it
// with the future ones.
locality->UpdateLocked(locality_update.lb_weight,
locality_update.serverlist);
locality_update.serverlist, update_locality_stats);
}
}
@ -1214,20 +1233,19 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
// weights of all localities.
LocalityPicker::PickerList picker_list;
uint32_t end = 0;
for (const auto& p : localities_) {
for (auto& p : localities_) {
const auto& locality_name = p.first;
const Locality* locality = p.second.get();
Locality* locality = p.second.get();
// Skip the localities that are not in the latest locality map update.
if (!locality_map_update()->Contains(locality_name)) continue;
if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue;
end += locality->weight();
picker_list.push_back(std::make_pair(end, locality->picker_wrapper()));
picker_list.push_back(
std::make_pair(end, locality->GetLoadReportingPicker()));
}
xds_policy()->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
absl::make_unique<LocalityPicker>(
xds_policy_->Ref(DEBUG_LOCATION, "LocalityPicker"),
std::move(picker_list)));
absl::make_unique<LocalityPicker>(xds_policy(), std::move(picker_list)));
}
OrphanablePtr<XdsLb::LocalityMap::Locality>
@ -1451,6 +1469,8 @@ XdsLb::LocalityMap::Locality::Locality(RefCountedPtr<LocalityMap> locality_map,
gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(),
this, name_->AsHumanReadableString());
}
// Initialize locality stats if load reporting is enabled.
UpdateLocalityStats();
}
XdsLb::LocalityMap::Locality::~Locality() {
@ -1461,6 +1481,19 @@ XdsLb::LocalityMap::Locality::~Locality() {
locality_map_.reset(DEBUG_LOCATION, "Locality");
}
void XdsLb::LocalityMap::Locality::UpdateLocalityStats() {
stats_.reset();
if (xds_policy()->config_->lrs_load_reporting_server_name().has_value()) {
stats_ = xds_policy()->xds_client()->AddClusterLocalityStats(
xds_policy()->config_->lrs_load_reporting_server_name().value(),
// TODO(roth): We currently hard-code the assumption that
// cluster name and EDS service name are the same. Fix this
// as part of refectoring this LB policy.
xds_policy()->eds_service_name(), xds_policy()->eds_service_name(),
name_);
}
}
grpc_channel_args* XdsLb::LocalityMap::Locality::CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in) {
const grpc_arg args_to_add[] = {
@ -1512,13 +1545,16 @@ XdsLb::LocalityMap::Locality::CreateChildPolicyLocked(
}
void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight,
ServerAddressList serverlist) {
ServerAddressList serverlist,
bool update_locality_stats) {
if (xds_policy()->shutting_down_) return;
// Update locality weight.
weight_ = locality_weight;
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
}
// Update locality stats.
if (update_locality_stats) UpdateLocalityStats();
// Construct update args.
UpdateArgs update_args;
update_args.addresses = std::move(serverlist);
@ -1626,6 +1662,7 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality",
xds_policy(), this, name_->AsHumanReadableString());
}
stats_.reset();
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
@ -1639,6 +1676,7 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() {
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
load_reporting_picker_.reset();
picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
@ -1695,7 +1733,7 @@ void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked(
}
//
// XdsLb::Locality::Helper
// XdsLb::LocalityMap::Locality::Helper
//
bool XdsLb::LocalityMap::Locality::Helper::CalledByPendingChild() const {
@ -1741,16 +1779,10 @@ void XdsLb::LocalityMap::Locality::Helper::UpdateState(
// This request is from an outdated child, so ignore it.
return;
}
// Cache the picker and its state in the locality.
// TODO(roth): If load reporting is not configured, we should ideally
// pass a null LocalityStats ref to the EndpointPickerWrapper and have it
// not collect any stats, since they're not going to be used. This would
// require recreating all of the pickers whenever we get a config update.
locality_->picker_wrapper_ = MakeRefCounted<EndpointPickerWrapper>(
std::move(picker),
locality_->xds_policy()->client_stats_.FindLocalityStats(
locality_->name_));
// Cache the state and picker in the locality.
locality_->connectivity_state_ = state;
locality_->picker_wrapper_ =
MakeRefCounted<RefCountedEndpointPicker>(std::move(picker));
// Notify the locality map.
locality_->locality_map_->OnLocalityStateUpdateLocked();
}

@ -979,19 +979,27 @@ grpc_slice XdsApi::CreateLrsInitialRequest(const std::string& server_name) {
namespace {
void LocalityStatsPopulate(
envoy_api_v2_endpoint_UpstreamLocalityStats* output,
const std::pair<const RefCountedPtr<XdsLocalityName>,
XdsClientStats::LocalityStats::Snapshot>& input,
void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output,
const XdsLocalityName& locality_name,
const XdsClusterLocalityStats::Snapshot& snapshot,
upb_arena* arena) {
// Set sub_zone.
// Set locality.
envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
arena);
if (!locality_name.region().empty()) {
envoy_api_v2_core_Locality_set_region(
locality, upb_strview_makez(locality_name.region().c_str()));
}
if (!locality_name.zone().empty()) {
envoy_api_v2_core_Locality_set_zone(
locality, upb_strview_makez(locality_name.zone().c_str()));
}
if (!locality_name.sub_zone().empty()) {
envoy_api_v2_core_Locality_set_sub_zone(
locality, upb_strview_makez(input.first->sub_zone().c_str()));
locality, upb_strview_makez(locality_name.sub_zone().c_str()));
}
// Set total counts.
const XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
output, snapshot.total_successful_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
@ -1000,16 +1008,15 @@ void LocalityStatsPopulate(
output, snapshot.total_error_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add load metric stats.
for (auto& p : snapshot.load_metric_stats) {
const char* metric_name = p.first.c_str();
const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
p.second;
// Add backend metrics.
for (const auto& p : snapshot.backend_metrics) {
const std::string& metric_name = p.first;
const XdsClusterLocalityStats::BackendMetric& metric_value = p.second;
envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
output, arena);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
load_metric, upb_strview_makez(metric_name));
load_metric, upb_strview_make(metric_name.data(), metric_name.size()));
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, metric_value.num_requests_finished_with_metric);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
@ -1020,33 +1027,15 @@ void LocalityStatsPopulate(
} // namespace
grpc_slice XdsApi::CreateLrsRequest(
std::map<StringView, std::set<XdsClientStats*>, StringLess>
client_stats_map) {
ClusterLoadReportMap cluster_load_report_map) {
upb::Arena arena;
// Get the snapshots.
std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>,
StringLess>
snapshot_map;
for (auto& p : client_stats_map) {
const StringView& cluster_name = p.first;
for (auto* client_stats : p.second) {
XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
// Prune unused locality stats.
client_stats->PruneLocalityStats();
if (snapshot.IsAllZero()) continue;
snapshot_map[cluster_name].emplace_back(std::move(snapshot));
}
}
// When all the counts are zero, return empty slice.
if (snapshot_map.empty()) return grpc_empty_slice();
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
for (auto& p : snapshot_map) {
const StringView& cluster_name = p.first;
const auto& snapshot_list = p.second;
for (size_t i = 0; i < snapshot_list.size(); ++i) {
const auto& snapshot = snapshot_list[i];
for (auto& p : cluster_load_report_map) {
const std::string& cluster_name = p.first.first;
const std::string& eds_service_name = p.first.second;
const ClusterLoadReport& load_report = p.second;
// Add cluster stats.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
@ -1055,39 +1044,47 @@ grpc_slice XdsApi::CreateLrsRequest(
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats,
upb_strview_make(cluster_name.data(), cluster_name.size()));
// Set EDS service name, if non-empty.
if (!eds_service_name.empty()) {
envoy_api_v2_endpoint_ClusterStats_set_cluster_service_name(
cluster_stats,
upb_strview_make(eds_service_name.data(), eds_service_name.size()));
}
// Add locality stats.
for (auto& p : snapshot.upstream_locality_stats) {
for (const auto& p : load_report.locality_stats) {
const XdsLocalityName& locality_name = *p.first;
const auto& snapshot = p.second;
envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
cluster_stats, arena.ptr());
LocalityStatsPopulate(locality_stats, p, arena.ptr());
LocalityStatsPopulate(locality_stats, locality_name, snapshot,
arena.ptr());
}
// Add dropped requests.
for (auto& p : snapshot.dropped_requests) {
uint64_t total_dropped_requests = 0;
for (const auto& p : load_report.dropped_requests) {
const char* category = p.first.c_str();
const uint64_t count = p.second;
envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(
cluster_stats, arena.ptr());
envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
arena.ptr());
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
dropped_requests, upb_strview_makez(category));
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
dropped_requests, count);
total_dropped_requests += count;
}
// Set total dropped requests.
envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
cluster_stats, snapshot.total_dropped_requests);
cluster_stats, total_dropped_requests);
// Set real load report interval.
gpr_timespec timespec =
grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
grpc_millis_to_timespec(load_report.load_report_interval, GPR_TIMESPAN);
google_protobuf_Duration* load_report_interval =
envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
cluster_stats, arena.ptr());
google_protobuf_Duration_set_seconds(load_report_interval,
timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval,
timespec.tv_nsec);
}
google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
}
return SerializeLrsRequest(request, arena.ptr());
}

@ -176,6 +176,17 @@ class XdsApi {
using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>;
struct ClusterLoadReport {
XdsClusterDropStats::DroppedRequestsMap dropped_requests;
std::map<XdsLocalityName*, XdsClusterLocalityStats::Snapshot,
XdsLocalityName::Less>
locality_stats;
grpc_millis load_report_interval;
};
using ClusterLoadReportMap = std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
ClusterLoadReport>;
XdsApi(const XdsBootstrap::Node* node, const char* build_version)
: node_(node), build_version_(build_version) {}
@ -228,11 +239,8 @@ class XdsApi {
// Creates an LRS request querying \a server_name.
grpc_slice CreateLrsInitialRequest(const std::string& server_name);
// Creates an LRS request sending client-side load reports. If all the
// counters are zero, returns empty slice.
grpc_slice CreateLrsRequest(std::map<StringView /*cluster_name*/,
std::set<XdsClientStats*>, StringLess>
client_stats_map);
// Creates an LRS request sending a client-side load report.
grpc_slice CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map);
// Parses the LRS response and returns \a
// load_reporting_interval for client-side load reporting. If there is any

@ -1314,19 +1314,39 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
self->SendReportLocked();
}
namespace {
bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
for (const auto& p : snapshot) {
const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
for (const auto& q : cluster_snapshot.dropped_requests) {
if (q.second > 0) return false;
}
for (const auto& q : cluster_snapshot.locality_stats) {
const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
if (!locality_snapshot.IsZero()) return false;
}
}
return true;
}
} // namespace
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Create a request that contains the load report.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsRequest(xds_client()->ClientStatsMap());
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshot();
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = static_cast<bool>(
grpc_slice_eq(request_payload_slice, grpc_empty_slice()));
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
ScheduleNextReportLocked();
return;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsRequest(std::move(snapshot));
parent_->send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
@ -1507,11 +1527,6 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
AdsCallState* ads_calld = chand()->ads_calld_->calld();
if (ads_calld == nullptr || !ads_calld->seen_response()) return;
// Start reporting.
for (auto& p : chand()->xds_client_->endpoint_map_) {
for (auto* client_stats : p.second.client_stats) {
client_stats->MaybeInitLastReportTime();
}
}
reporter_ = MakeOrphanable<Reporter>(
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
@ -1823,32 +1838,105 @@ void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
}
}
void XdsClient::AddClientStats(StringView /*lrs_server*/,
StringView cluster_name,
XdsClientStats* client_stats) {
EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
StringView lrs_server, StringView cluster_name,
StringView eds_service_name) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
endpoint_state.client_stats.insert(client_stats);
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
// We jump through some hoops here to make sure that the StringViews
// stored in the XdsClusterDropStats object point to the strings
// in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
it->second.drop_stats.insert(cluster_drop_stats.get());
chand_->MaybeStartLrsCall();
return cluster_drop_stats;
}
void XdsClient::RemoveClientStats(StringView /*lrs_server*/,
StringView cluster_name,
XdsClientStats* client_stats) {
EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
void XdsClient::RemoveClusterDropStats(
StringView /*lrs_server*/, StringView cluster_name,
StringView eds_service_name, XdsClusterDropStats* cluster_drop_stats) {
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
// containing whatever final stats have been accumulated since the
// last load report.
auto it = endpoint_state.client_stats.find(client_stats);
if (it != endpoint_state.client_stats.end()) {
endpoint_state.client_stats.erase(it);
auto it = load_report_state.drop_stats.find(cluster_drop_stats);
if (it != load_report_state.drop_stats.end()) {
load_report_state.drop_stats.erase(it);
if (load_report_state.drop_stats.empty() &&
load_report_state.locality_stats.empty()) {
load_report_map_.erase(load_report_it);
if (chand_ != nullptr && load_report_map_.empty()) {
chand_->StopLrsCall();
}
if (chand_ != nullptr && endpoint_state.client_stats.empty()) {
}
}
}
RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
StringView lrs_server, StringView cluster_name, StringView eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
// We jump through some hoops here to make sure that the StringViews
// stored in the XdsClusterLocalityStats object point to the strings
// in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
locality);
it->second.locality_stats[std::move(locality)].insert(
cluster_locality_stats.get());
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
}
void XdsClient::RemoveClusterLocalityStats(
StringView /*lrs_server*/, StringView cluster_name,
StringView eds_service_name, const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
// containing whatever final stats have been accumulated since the
// last load report.
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
auto& locality_set = locality_it->second;
auto it = locality_set.find(cluster_locality_stats);
if (it != locality_set.end()) {
locality_set.erase(it);
if (locality_set.empty()) {
load_report_state.locality_stats.erase(locality_it);
if (load_report_state.locality_stats.empty() &&
load_report_state.drop_stats.empty()) {
load_report_map_.erase(load_report_it);
if (chand_ != nullptr && load_report_map_.empty()) {
chand_->StopLrsCall();
}
}
}
}
}
void XdsClient::ResetBackoff() {
@ -1876,17 +1964,34 @@ grpc_error* XdsClient::CreateServiceConfig(
return error;
}
std::map<StringView, std::set<XdsClientStats*>, StringLess>
XdsClient::ClientStatsMap() const {
std::map<StringView, std::set<XdsClientStats*>, StringLess> client_stats_map;
for (const auto& p : endpoint_map_) {
const StringView cluster_name = p.first;
const auto& client_stats = p.second.client_stats;
if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) {
client_stats_map.emplace(cluster_name, client_stats);
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot() {
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto& p : load_report_map_) {
const auto& cluster_key = p.first; // cluster and EDS service name
LoadReportState& load_report = p.second;
XdsApi::ClusterLoadReport& snapshot = snapshot_map[cluster_key];
// Aggregate drop stats.
for (auto& drop_stats : load_report.drop_stats) {
for (const auto& p : drop_stats->GetSnapshotAndReset()) {
snapshot.dropped_requests[p.first] += p.second;
}
}
// Aggregate locality stats.
for (auto& p : load_report.locality_stats) {
XdsLocalityName* locality_name = p.first.get();
auto& locality_stats_set = p.second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
for (auto& locality_stats : locality_stats_set) {
locality_snapshot += locality_stats->GetSnapshotAndReset();
}
}
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
}
return client_stats_map;
return snapshot_map;
}
void XdsClient::NotifyOnError(grpc_error* error) {

@ -101,11 +101,25 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
void CancelEndpointDataWatch(StringView eds_service_name,
EndpointWatcherInterface* watcher);
// Adds and removes client stats for \a cluster_name.
void AddClientStats(StringView /*lrs_server*/, StringView cluster_name,
XdsClientStats* client_stats);
void RemoveClientStats(StringView /*lrs_server*/, StringView cluster_name,
XdsClientStats* client_stats);
// Adds and removes drop stats for cluster_name and eds_service_name.
RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
StringView lrs_server, StringView cluster_name,
StringView eds_service_name);
void RemoveClusterDropStats(StringView /*lrs_server*/,
StringView cluster_name,
StringView eds_service_name,
XdsClusterDropStats* cluster_drop_stats);
// Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality.
RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
StringView lrs_server, StringView cluster_name,
StringView eds_service_name, RefCountedPtr<XdsLocalityName> locality);
void RemoveClusterLocalityStats(
StringView /*lrs_server*/, StringView cluster_name,
StringView eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats);
// Resets connection backoff state.
void ResetBackoff();
@ -182,11 +196,18 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
std::map<EndpointWatcherInterface*,
std::unique_ptr<EndpointWatcherInterface>>
watchers;
std::set<XdsClientStats*> client_stats;
// The latest data seen from EDS.
XdsApi::EdsUpdate update;
};
struct LoadReportState {
std::set<XdsClusterDropStats*> drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, std::set<XdsClusterLocalityStats*>,
XdsLocalityName::Less>
locality_stats;
grpc_millis last_report_time = ExecCtx::Get()->Now();
};
// Sends an error notification to all watchers.
void NotifyOnError(grpc_error* error);
@ -194,8 +215,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const std::string& cluster_name,
RefCountedPtr<ServiceConfig>* service_config) const;
std::map<StringView, std::set<XdsClientStats*>, StringLess> ClientStatsMap()
const;
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot();
// Channel arg vtable functions.
static void* ChannelArgCopy(void* p);
@ -227,6 +247,10 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
// Only the watched EDS service names are stored.
std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
std::map<
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
LoadReportState>
load_report_map_;
bool shutting_down_ = false;
};

@ -20,63 +20,75 @@
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
#include <string.h>
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include <string.h>
#include "src/core/ext/filters/client_channel/xds/xds_client.h"
namespace grpc_core {
namespace {
//
// XdsClusterDropStats
//
template <typename T>
T GetAndResetCounter(Atomic<T>* from) {
return from->Exchange(0, MemoryOrder::RELAXED);
XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
StringView lrs_server_name,
StringView cluster_name,
StringView eds_service_name)
: xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name) {}
XdsClusterDropStats::~XdsClusterDropStats() {
xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_,
eds_service_name_, this);
xds_client_.reset(DEBUG_LOCATION, "DropStats");
}
} // namespace
//
// XdsClientStats::LocalityStats::LoadMetric::Snapshot
//
XdsClusterDropStats::DroppedRequestsMap
XdsClusterDropStats::GetSnapshotAndReset() {
MutexLock lock(&mu_);
return std::move(dropped_requests_);
}
bool XdsClientStats::LocalityStats::LoadMetric::Snapshot::IsAllZero() const {
return total_metric_value == 0 && num_requests_finished_with_metric == 0;
void XdsClusterDropStats::AddCallDropped(const std::string& category) {
MutexLock lock(&mu_);
++dropped_requests_[category];
}
//
// XdsClientStats::LocalityStats::LoadMetric
// XdsClusterLocalityStats
//
XdsClientStats::LocalityStats::LoadMetric::Snapshot
XdsClientStats::LocalityStats::LoadMetric::GetSnapshotAndReset() {
Snapshot metric = {num_requests_finished_with_metric_, total_metric_value_};
num_requests_finished_with_metric_ = 0;
total_metric_value_ = 0;
return metric;
XdsClusterLocalityStats::XdsClusterLocalityStats(
RefCountedPtr<XdsClient> xds_client, StringView lrs_server_name,
StringView cluster_name, StringView eds_service_name,
RefCountedPtr<XdsLocalityName> name)
: xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name),
name_(std::move(name)) {}
XdsClusterLocalityStats::~XdsClusterLocalityStats() {
xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_,
eds_service_name_, name_, this);
xds_client_.reset(DEBUG_LOCATION, "LocalityStats");
}
//
// XdsClientStats::LocalityStats::Snapshot
//
namespace {
bool XdsClientStats::LocalityStats::Snapshot::IsAllZero() {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
return false;
}
for (auto& p : load_metric_stats) {
const LoadMetric::Snapshot& metric_value = p.second;
if (!metric_value.IsAllZero()) return false;
}
return true;
uint64_t GetAndResetCounter(Atomic<uint64_t>* from) {
return from->Exchange(0, MemoryOrder::RELAXED);
}
//
// XdsClientStats::LocalityStats
//
} // namespace
XdsClientStats::LocalityStats::Snapshot
XdsClientStats::LocalityStats::GetSnapshotAndReset() {
XdsClusterLocalityStats::Snapshot
XdsClusterLocalityStats::GetSnapshotAndReset() {
Snapshot snapshot = {
GetAndResetCounter(&total_successful_requests_),
// Don't reset total_requests_in_progress because it's not
@ -84,108 +96,21 @@ XdsClientStats::LocalityStats::GetSnapshotAndReset() {
total_requests_in_progress_.Load(MemoryOrder::RELAXED),
GetAndResetCounter(&total_error_requests_),
GetAndResetCounter(&total_issued_requests_)};
{
MutexLock lock(&load_metric_stats_mu_);
for (auto& p : load_metric_stats_) {
const std::string& metric_name = p.first;
LoadMetric& metric_value = p.second;
snapshot.load_metric_stats.emplace(metric_name,
metric_value.GetSnapshotAndReset());
}
}
MutexLock lock(&backend_metrics_mu_);
snapshot.backend_metrics = std::move(backend_metrics_);
return snapshot;
}
void XdsClientStats::LocalityStats::AddCallStarted() {
void XdsClusterLocalityStats::AddCallStarted() {
total_issued_requests_.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(1, MemoryOrder::RELAXED);
}
void XdsClientStats::LocalityStats::AddCallFinished(bool fail) {
void XdsClusterLocalityStats::AddCallFinished(bool fail) {
Atomic<uint64_t>& to_increment =
fail ? total_error_requests_ : total_successful_requests_;
to_increment.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(-1, MemoryOrder::ACQ_REL);
}
//
// XdsClientStats::Snapshot
//
bool XdsClientStats::Snapshot::IsAllZero() {
for (auto& p : upstream_locality_stats) {
if (!p.second.IsAllZero()) return false;
}
for (auto& p : dropped_requests) {
if (p.second != 0) return false;
}
return total_dropped_requests == 0;
}
//
// XdsClientStats
//
XdsClientStats::Snapshot XdsClientStats::GetSnapshotAndReset() {
grpc_millis now = ExecCtx::Get()->Now();
// Record total_dropped_requests and reporting interval in the snapshot.
Snapshot snapshot;
snapshot.total_dropped_requests =
GetAndResetCounter(&total_dropped_requests_);
snapshot.load_report_interval = now - last_report_time_;
// Update last report time.
last_report_time_ = now;
// Snapshot all the other stats.
for (auto& p : upstream_locality_stats_) {
snapshot.upstream_locality_stats.emplace(p.first,
p.second->GetSnapshotAndReset());
}
{
MutexLock lock(&dropped_requests_mu_);
// This is a workaround for the case where some compilers cannot build
// move-assignment of map with non-copyable but movable key.
// https://stackoverflow.com/questions/36475497
std::swap(snapshot.dropped_requests, dropped_requests_);
dropped_requests_.clear();
}
return snapshot;
}
void XdsClientStats::MaybeInitLastReportTime() {
if (last_report_time_ == -1) last_report_time_ = ExecCtx::Get()->Now();
}
RefCountedPtr<XdsClientStats::LocalityStats> XdsClientStats::FindLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality_name) {
auto iter = upstream_locality_stats_.find(locality_name);
if (iter == upstream_locality_stats_.end()) {
iter = upstream_locality_stats_
.emplace(locality_name, MakeRefCounted<LocalityStats>())
.first;
}
return iter->second;
}
void XdsClientStats::PruneLocalityStats() {
auto iter = upstream_locality_stats_.begin();
while (iter != upstream_locality_stats_.end()) {
if (iter->second->IsSafeToDelete()) {
iter = upstream_locality_stats_.erase(iter);
} else {
++iter;
}
}
}
void XdsClientStats::AddCallDropped(const std::string& category) {
total_dropped_requests_.FetchAdd(1, MemoryOrder::RELAXED);
MutexLock lock(&dropped_requests_mu_);
auto iter = dropped_requests_.find(category);
if (iter == dropped_requests_.end()) {
dropped_requests_.emplace(category, 1);
} else {
++iter->second;
}
}
} // namespace grpc_core

@ -33,17 +33,26 @@
namespace grpc_core {
// Forward declaration to avoid circular dependency.
class XdsClient;
// Locality name.
class XdsLocalityName : public RefCounted<XdsLocalityName> {
public:
struct Less {
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs,
const RefCountedPtr<XdsLocalityName>& rhs) const {
bool operator()(const XdsLocalityName* lhs,
const XdsLocalityName* rhs) const {
int cmp_result = lhs->region_.compare(rhs->region_);
if (cmp_result != 0) return cmp_result < 0;
cmp_result = lhs->zone_.compare(rhs->zone_);
if (cmp_result != 0) return cmp_result < 0;
return lhs->sub_zone_.compare(rhs->sub_zone_) < 0;
}
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs,
const RefCountedPtr<XdsLocalityName>& rhs) const {
return (*this)(lhs.get(), rhs.get());
}
};
XdsLocalityName(std::string region, std::string zone, std::string subzone)
@ -77,148 +86,112 @@ class XdsLocalityName : public RefCounted<XdsLocalityName> {
UniquePtr<char> human_readable_string_;
};
// The stats classes (i.e., XdsClientStats, LocalityStats, and LoadMetric) can
// be taken a snapshot (and reset) to populate the load report. The snapshots
// are contained in the respective Snapshot structs. The Snapshot structs have
// no synchronization. The stats classes use several different synchronization
// methods. 1. Most of the counters are Atomic<>s for performance. 2. Some of
// the Map<>s are protected by Mutex if we are not guaranteed that the accesses
// to them are synchronized by the callers. 3. The Map<>s to which the accesses
// are already synchronized by the callers do not have additional
// synchronization here. Note that the Map<>s we mentioned in 2 and 3 refer to
// the map's tree structure rather than the content in each tree node.
class XdsClientStats {
// Drop stats for an xds cluster.
class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> {
public:
class LocalityStats : public RefCounted<LocalityStats> {
public:
class LoadMetric {
public:
struct Snapshot {
bool IsAllZero() const;
using DroppedRequestsMap = std::map<std::string /* category */, uint64_t>;
XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
StringView lrs_server_name, StringView cluster_name,
StringView eds_service_name);
~XdsClusterDropStats();
// Returns a snapshot of this instance and resets all the counters.
DroppedRequestsMap GetSnapshotAndReset();
void AddCallDropped(const std::string& category);
private:
RefCountedPtr<XdsClient> xds_client_;
StringView lrs_server_name_;
StringView cluster_name_;
StringView eds_service_name_;
// Protects dropped_requests_. A mutex is necessary because the length of
// dropped_requests_ can be accessed by both the picker (from data plane
// mutex) and the load reporting thread (from the control plane combiner).
Mutex mu_;
DroppedRequestsMap dropped_requests_;
};
// Locality stats for an xds cluster.
class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
public:
struct BackendMetric {
uint64_t num_requests_finished_with_metric;
double total_metric_value;
};
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
BackendMetric& operator+=(const BackendMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
total_metric_value += other.total_metric_value;
return *this;
}
private:
uint64_t num_requests_finished_with_metric_{0};
double total_metric_value_{0};
bool IsZero() const {
return num_requests_finished_with_metric == 0 && total_metric_value == 0;
}
};
using LoadMetricMap = std::map<std::string, LoadMetric>;
using LoadMetricSnapshotMap = std::map<std::string, LoadMetric::Snapshot>;
struct Snapshot {
// TODO(juanlishen): Change this to const method when const_iterator is
// added to Map<>.
bool IsAllZero();
uint64_t total_successful_requests;
uint64_t total_requests_in_progress;
uint64_t total_error_requests;
uint64_t total_issued_requests;
LoadMetricSnapshotMap load_metric_stats;
std::map<std::string, BackendMetric> backend_metrics;
Snapshot& operator+=(const Snapshot& other) {
total_successful_requests += other.total_successful_requests;
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
for (const auto& p : other.backend_metrics) {
backend_metrics[p.first] += p.second;
}
return *this;
}
bool IsZero() const {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
return false;
}
for (const auto& p : backend_metrics) {
if (!p.second.IsZero()) return false;
}
return true;
}
};
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
XdsClusterLocalityStats(RefCountedPtr<XdsClient> xds_client,
StringView lrs_server_name, StringView cluster_name,
StringView eds_service_name,
RefCountedPtr<XdsLocalityName> name);
~XdsClusterLocalityStats();
// Each XdsLb::PickerWrapper holds a ref to the perspective LocalityStats.
// If the refcount is 0, there won't be new calls recorded to the
// LocalityStats, so the LocalityStats can be safely deleted when all the
// in-progress calls have finished.
// Only be called from the control plane combiner.
void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); }
// Might be called from the control plane combiner or the data plane
// combiner.
// TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged,
// this method will also only be invoked in the control plane combiner.
// We may then be able to simplify the LocalityStats' lifetime by making it
// RefCounted<> and populating the protobuf in its dtor.
void UnrefByPicker() { picker_refcount_.FetchSub(1, MemoryOrder::ACQ_REL); }
// Only be called from the control plane combiner.
// The only place where the picker_refcount_ can be increased is
// RefByPicker(), which also can only be called from the control plane
// combiner. Also, if the picker_refcount_ is 0, total_requests_in_progress_
// can't be increased from 0. So it's safe to delete the LocalityStats right
// after this method returns true.
bool IsSafeToDelete() {
return picker_refcount_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0 &&
total_requests_in_progress_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0;
}
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddCallStarted();
void AddCallFinished(bool fail = false);
private:
RefCountedPtr<XdsClient> xds_client_;
StringView lrs_server_name_;
StringView cluster_name_;
StringView eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
Atomic<uint64_t> total_successful_requests_{0};
Atomic<uint64_t> total_requests_in_progress_{0};
// Requests that were issued (not dropped) but failed.
Atomic<uint64_t> total_error_requests_{0};
Atomic<uint64_t> total_issued_requests_{0};
// Protects load_metric_stats_. A mutex is necessary because the length of
// load_metric_stats_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata (not from any combiner) and the load
// reporting thread (from the control plane combiner).
Mutex load_metric_stats_mu_;
LoadMetricMap load_metric_stats_;
// Can be accessed from either the control plane combiner or the data plane
// combiner.
Atomic<uint8_t> picker_refcount_{0};
};
// TODO(juanlishen): The value type of Map<> must be movable in current
// implementation. To avoid making LocalityStats movable, we wrap it by
// std::unique_ptr<>. We should remove this wrapper if the value type of Map<>
// doesn't have to be movable.
using LocalityStatsMap =
std::map<RefCountedPtr<XdsLocalityName>, RefCountedPtr<LocalityStats>,
XdsLocalityName::Less>;
using LocalityStatsSnapshotMap =
std::map<RefCountedPtr<XdsLocalityName>, LocalityStats::Snapshot,
XdsLocalityName::Less>;
using DroppedRequestsMap = std::map<std::string, uint64_t>;
using DroppedRequestsSnapshotMap = DroppedRequestsMap;
struct Snapshot {
// TODO(juanlishen): Change this to const method when const_iterator is
// added to Map<>.
bool IsAllZero();
LocalityStatsSnapshotMap upstream_locality_stats;
uint64_t total_dropped_requests;
DroppedRequestsSnapshotMap dropped_requests;
// The actual load report interval.
grpc_millis load_report_interval;
};
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
void MaybeInitLastReportTime();
RefCountedPtr<LocalityStats> FindLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality_name);
void PruneLocalityStats();
void AddCallDropped(const std::string& category);
private:
// The stats for each locality.
LocalityStatsMap upstream_locality_stats_;
Atomic<uint64_t> total_dropped_requests_{0};
// Protects dropped_requests_. A mutex is necessary because the length of
// dropped_requests_ can be accessed by both the picker (from data plane
// combiner) and the load reporting thread (from the control plane combiner).
Mutex dropped_requests_mu_;
DroppedRequestsMap dropped_requests_;
// The timestamp of last reporting. For the LB-policy-wide first report, the
// last_report_time is the time we scheduled the first reporting timer.
grpc_millis last_report_time_ = -1;
// Protects backend_metrics_. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata (not from the control plane combiner) and
// the load reporting thread (from the control plane combiner).
Mutex backend_metrics_mu_;
std::map<std::string, BackendMetric> backend_metrics_;
};
} // namespace grpc_core

Loading…
Cancel
Save