From a6954fe39eb7fa51820410f5b0ce3d9c5ca7139d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 12 Oct 2020 14:14:52 -0700 Subject: [PATCH] Use a single instance of the drop and locality stats objects. --- .../client_channel/lb_policy/xds/eds_drop.cc | 2 +- src/core/ext/xds/xds_client.cc | 146 +++++++++++------- src/core/ext/xds/xds_client.h | 12 +- src/core/ext/xds/xds_client_stats.cc | 41 ++++- test/cpp/end2end/xds_end2end_test.cc | 8 - 5 files changed, 135 insertions(+), 74 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc index e5c8f4dfaad..41af02a4499 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc @@ -207,7 +207,7 @@ EdsDropLb::EdsDropLb(RefCountedPtr xds_client, Args args) EdsDropLb::~EdsDropLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) { - gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying xds LB policy", this); + gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying eds_drop LB policy", this); } } diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 6d448641b03..ad49714a938 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -336,7 +336,7 @@ class XdsClient::ChannelState::LrsCallState void ScheduleNextReportLocked(); static void OnNextReportTimer(void* arg, grpc_error* error); bool OnNextReportTimerLocked(grpc_error* error); - void SendReportLocked(); + bool SendReportLocked(); static void OnReportDone(void* arg, grpc_error* error); bool OnReportDoneLocked(grpc_error* error); @@ -1287,8 +1287,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( GRPC_ERROR_UNREF(error); return true; } - SendReportLocked(); - return false; + return SendReportLocked(); } namespace { @@ -1307,7 +1306,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { } // namespace -void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { +bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. XdsApi::ClusterLoadReportMap snapshot = xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_, @@ -1317,8 +1316,12 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { const bool old_val = last_report_counters_were_zero_; last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot); if (old_val && last_report_counters_were_zero_) { + if (xds_client()->load_report_map_.empty()) { + parent_->chand()->StopLrsCall(); + return true; + } ScheduleNextReportLocked(); - return; + return false; } // Create a request that contains the snapshot. grpc_slice request_payload_slice = @@ -1339,6 +1342,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { xds_client(), this, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } + return false; } void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( @@ -1982,10 +1986,22 @@ RefCountedPtr XdsClient::AddClusterDropStats( auto it = load_report_map_ .emplace(std::make_pair(std::move(key), LoadReportState())) .first; - auto cluster_drop_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "DropStats"), lrs_server, - it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/); - it->second.drop_stats.insert(cluster_drop_stats.get()); + LoadReportState& load_report_state = it->second; + RefCountedPtr cluster_drop_stats; + if (load_report_state.drop_stats != nullptr) { + cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero(); + } + if (cluster_drop_stats == nullptr) { + if (load_report_state.drop_stats != nullptr) { + load_report_state.deleted_drop_stats += + load_report_state.drop_stats->GetSnapshotAndReset(); + } + cluster_drop_stats = MakeRefCounted( + Ref(DEBUG_LOCATION, "DropStats"), lrs_server, + it->first.first /*cluster_name*/, + it->first.second /*eds_service_name*/); + load_report_state.drop_stats = cluster_drop_stats.get(); + } chand_->MaybeStartLrsCall(); return cluster_drop_stats; } @@ -1995,19 +2011,18 @@ void XdsClient::RemoveClusterDropStats( absl::string_view eds_service_name, XdsClusterDropStats* cluster_drop_stats) { MutexLock lock(&mu_); - auto load_report_it = load_report_map_.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == load_report_map_.end()) return; - LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. - auto it = load_report_state.drop_stats.find(cluster_drop_stats); - if (it != load_report_state.drop_stats.end()) { - // Record final drop stats in deleted_drop_stats, which will be + auto it = load_report_map_.find( + std::make_pair(std::string(cluster_name), std::string(eds_service_name))); + if (it == load_report_map_.end()) return; + LoadReportState& load_report_state = it->second; + if (load_report_state.drop_stats == cluster_drop_stats) { + // Record final snapshot in deleted_drop_stats, which will be // added to the next load report. - auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset(); - load_report_state.deleted_drop_stats += dropped_requests; - load_report_state.drop_stats.erase(it); + load_report_state.deleted_drop_stats += + load_report_state.drop_stats->GetSnapshotAndReset(); + load_report_state.drop_stats = nullptr; } } @@ -2026,12 +2041,24 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( auto it = load_report_map_ .emplace(std::make_pair(std::move(key), LoadReportState())) .first; - auto cluster_locality_stats = MakeRefCounted( - Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, - it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, - locality); - it->second.locality_stats[std::move(locality)].locality_stats.insert( - cluster_locality_stats.get()); + LoadReportState& load_report_state = it->second; + LoadReportState::LocalityState& locality_state = + load_report_state.locality_stats[locality]; + RefCountedPtr cluster_locality_stats; + if (locality_state.locality_stats != nullptr) { + cluster_locality_stats = locality_state.locality_stats->RefIfNonZero(); + } + if (cluster_locality_stats == nullptr) { + if (locality_state.locality_stats != nullptr) { + locality_state.deleted_locality_stats += + locality_state.locality_stats->GetSnapshotAndReset(); + } + cluster_locality_stats = MakeRefCounted( + Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, + it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, + std::move(locality)); + locality_state.locality_stats = cluster_locality_stats.get(); + } chand_->MaybeStartLrsCall(); return cluster_locality_stats; } @@ -2042,22 +2069,21 @@ void XdsClient::RemoveClusterLocalityStats( const RefCountedPtr& locality, XdsClusterLocalityStats* cluster_locality_stats) { MutexLock lock(&mu_); - auto load_report_it = load_report_map_.find( - std::make_pair(std::string(cluster_name), std::string(eds_service_name))); - if (load_report_it == load_report_map_.end()) return; - LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. + auto it = load_report_map_.find( + std::make_pair(std::string(cluster_name), std::string(eds_service_name))); + if (it == load_report_map_.end()) return; + LoadReportState& load_report_state = it->second; auto locality_it = load_report_state.locality_stats.find(locality); if (locality_it == load_report_state.locality_stats.end()) return; - auto& locality_set = locality_it->second.locality_stats; - auto it = locality_set.find(cluster_locality_stats); - if (it != locality_set.end()) { + LoadReportState::LocalityState& locality_state = locality_it->second; + if (locality_state.locality_stats == cluster_locality_stats) { // Record final snapshot in deleted_locality_stats, which will be // added to the next load report. - locality_it->second.deleted_locality_stats.emplace_back( - cluster_locality_stats->GetSnapshotAndReset()); - locality_set.erase(it); + locality_state.deleted_locality_stats += + locality_state.locality_stats->GetSnapshotAndReset(); + locality_state.locality_stats = nullptr; } } @@ -2098,6 +2124,9 @@ void XdsClient::NotifyOnErrorLocked(grpc_error* error) { XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set& clusters) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] start building load report", this); + } XdsApi::ClusterLoadReportMap snapshot_map; for (auto load_report_it = load_report_map_.begin(); load_report_it != load_report_map_.end();) { @@ -2116,9 +2145,15 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( XdsApi::ClusterLoadReport snapshot; // Aggregate drop stats. snapshot.dropped_requests = std::move(load_report.deleted_drop_stats); - for (auto& drop_stats : load_report.drop_stats) { - auto dropped_requests = drop_stats->GetSnapshotAndReset(); - snapshot.dropped_requests += dropped_requests; + if (load_report.drop_stats != nullptr) { + snapshot.dropped_requests += + load_report.drop_stats->GetSnapshotAndReset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p", + this, cluster_key.first.c_str(), cluster_key.second.c_str(), + load_report.drop_stats); + } } // Aggregate locality stats. for (auto it = load_report.locality_stats.begin(); @@ -2127,34 +2162,39 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( auto& locality_state = it->second; XdsClusterLocalityStats::Snapshot& locality_snapshot = snapshot.locality_stats[locality_name]; - for (auto& locality_stats : locality_state.locality_stats) { - locality_snapshot += locality_stats->GetSnapshotAndReset(); - } - // Add final snapshots from recently deleted locality stats objects. - for (auto& deleted_locality_stats : - locality_state.deleted_locality_stats) { - locality_snapshot += deleted_locality_stats; + locality_snapshot = std::move(locality_state.deleted_locality_stats); + if (locality_state.locality_stats != nullptr) { + locality_snapshot += + locality_state.locality_stats->GetSnapshotAndReset(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] cluster=%s eds_service_name=%s " + "locality=%s locality_stats=%p", + this, cluster_key.first.c_str(), cluster_key.second.c_str(), + locality_name->AsHumanReadableString().c_str(), + locality_state.locality_stats); + } } - locality_state.deleted_locality_stats.clear(); // If the only thing left in this entry was final snapshots from // deleted locality stats objects, remove the entry. - if (locality_state.locality_stats.empty()) { + if (locality_state.locality_stats == nullptr) { it = load_report.locality_stats.erase(it); } else { ++it; } } + // Compute load report interval. + const grpc_millis now = ExecCtx::Get()->Now(); + snapshot.load_report_interval = now - load_report.last_report_time; + load_report.last_report_time = now; + // Record snapshot. if (record_stats) { - // Compute load report interval. - const grpc_millis now = ExecCtx::Get()->Now(); - snapshot.load_report_interval = now - load_report.last_report_time; - load_report.last_report_time = now; - // Record snapshot. snapshot_map[cluster_key] = std::move(snapshot); } // If the only thing left in this entry was final snapshots from // deleted stats objects, remove the entry. - if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) { + if (load_report.locality_stats.empty() && + load_report.drop_stats == nullptr) { load_report_it = load_report_map_.erase(load_report_it); } else { ++load_report_it; diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 3e74c2c38dc..9e222fbb536 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -39,7 +39,7 @@ namespace grpc_core { -extern TraceFlag xds_client_trace; +extern TraceFlag grpc_xds_client_trace; class XdsClient : public DualRefCounted { public: @@ -267,17 +267,13 @@ class XdsClient : public DualRefCounted { absl::optional update; }; - // TODO(roth): Change this to store exactly one instance of - // XdsClusterDropStats and exactly one instance of - // XdsClusterLocalityStats per locality. We can return multiple refs - // to the same object instead of registering multiple objects. struct LoadReportState { struct LocalityState { - std::set locality_stats; - std::vector deleted_locality_stats; + XdsClusterLocalityStats* locality_stats = nullptr; + XdsClusterLocalityStats::Snapshot deleted_locality_stats; }; - std::set drop_stats; + XdsClusterDropStats* drop_stats = nullptr; XdsClusterDropStats::Snapshot deleted_drop_stats; std::map, LocalityState, XdsLocalityName::Less> diff --git a/src/core/ext/xds/xds_client_stats.cc b/src/core/ext/xds/xds_client_stats.cc index ba29ec085f6..06cd95c4fc0 100644 --- a/src/core/ext/xds/xds_client_stats.cc +++ b/src/core/ext/xds/xds_client_stats.cc @@ -45,12 +45,27 @@ XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr xds_client, absl::string_view lrs_server_name, absl::string_view cluster_name, absl::string_view eds_service_name) - : xds_client_(std::move(xds_client)), + : RefCounted(&grpc_xds_client_trace), + xds_client_(std::move(xds_client)), lrs_server_name_(lrs_server_name), cluster_name_(cluster_name), - eds_service_name_(eds_service_name) {} + eds_service_name_(eds_service_name) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str()); + } +} XdsClusterDropStats::~XdsClusterDropStats() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] destroying drop stats %p for {%s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str()); + } xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_, eds_service_name_, this); xds_client_.reset(DEBUG_LOCATION, "DropStats"); @@ -81,13 +96,31 @@ XdsClusterLocalityStats::XdsClusterLocalityStats( RefCountedPtr xds_client, absl::string_view lrs_server_name, absl::string_view cluster_name, absl::string_view eds_service_name, RefCountedPtr name) - : xds_client_(std::move(xds_client)), + : RefCounted(&grpc_xds_client_trace), + xds_client_(std::move(xds_client)), lrs_server_name_(lrs_server_name), cluster_name_(cluster_name), eds_service_name_(eds_service_name), - name_(std::move(name)) {} + name_(std::move(name)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] created locality stats %p for {%s, %s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str(), + name_->AsHumanReadableString().c_str()); + } +} XdsClusterLocalityStats::~XdsClusterLocalityStats() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}", + xds_client_.get(), this, std::string(lrs_server_name_).c_str(), + std::string(cluster_name_).c_str(), + std::string(eds_service_name_).c_str(), + name_->AsHumanReadableString().c_str()); + } xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_, eds_service_name_, name_, this); xds_client_.reset(DEBUG_LOCATION, "LocalityStats"); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 744d7aac3fd..2108e3eaaa3 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -5347,14 +5347,6 @@ TEST_P(BalancerUpdateTest, DeadUpdate) { << balancers_[2]->ads_service()->eds_response_state().error_message; } -// The re-resolution tests are deferred because they rely on the fallback mode, -// which hasn't been supported. - -// TODO(juanlishen): Add TEST_P(BalancerUpdateTest, ReresolveDeadBackend). - -// TODO(juanlishen): Add TEST_P(UpdatesWithClientLoadReportingTest, -// ReresolveDeadBalancer) - class ClientLoadReportingTest : public XdsEnd2endTest { public: ClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {}