diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index 14b26404b44..1ed5a6993c6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -16,6 +16,8 @@ #include <grpc/support/port_platform.h> +#include <cstring> + #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "absl/strings/str_cat.h" @@ -138,8 +140,6 @@ void ChildPolicyHandler::ShutdownLocked() { } void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { - // The name of the policy that this update wants us to use. - const char* child_policy_name = args.config->name(); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store // the new child policy in pending_child_policy_. Once the new child @@ -166,10 +166,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { // previous update that changed the policy name, or we have already // finished swapping in the new policy; in this case, child_policy_ // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in + // a. If going from the current config to the new config does not + // require a new policy, then we update the existing child policy. + // b. If going from the current config to the new config does require a + // new policy, we create a new policy. The policy will be stored in // pending_child_policy_ and will later be swapped into // child_policy_ by the helper when the new child transitions // into state READY. @@ -180,10 +180,11 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { // not yet transitioned into state READY and been swapped into // child_policy_; in this case, both child_policy_ and // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new + // a. If going from the current config to the new config does not + // require a new policy, then we update the existing pending + // child policy. + // b. If going from the current config to the new config does require a + // new child policy, then we create a new policy. The new // policy is stored in pending_child_policy_ (replacing the one // that was there before, which will be immediately shut down) // and will later be swapped into child_policy_ by the helper @@ -191,12 +192,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { const bool create_policy = // case 1 child_policy_ == nullptr || - // case 2b - (pending_child_policy_ == nullptr && - strcmp(child_policy_->name(), child_policy_name) != 0) || - // case 3b - (pending_child_policy_ != nullptr && - strcmp(pending_child_policy_->name(), child_policy_name) != 0); + // cases 2b and 3b + ConfigChangeRequiresNewPolicyInstance(current_config_.get(), + args.config.get()); + current_config_ = args.config; LoadBalancingPolicy* policy_to_update = nullptr; if (create_policy) { // Cases 1, 2b, and 3b: create a new child policy. @@ -205,11 +204,11 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] creating new %schild policy %s", this, - child_policy_ == nullptr ? "" : "pending ", child_policy_name); + child_policy_ == nullptr ? "" : "pending ", args.config->name()); } auto& lb_policy = child_policy_ == nullptr ? child_policy_ : pending_child_policy_; - lb_policy = CreateChildPolicy(child_policy_name, *args.args); + lb_policy = CreateChildPolicy(args.config->name(), *args.args); policy_to_update = lb_policy.get(); } else { // Cases 2a and 3a: update an existing policy. @@ -257,8 +256,7 @@ OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy( std::unique_ptr<ChannelControlHelper>(helper); lb_policy_args.args = &args; OrphanablePtr<LoadBalancingPolicy> lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - child_policy_name, std::move(lb_policy_args)); + CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args)); if (GPR_UNLIKELY(lb_policy == nullptr)) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", child_policy_name); return nullptr; @@ -277,4 +275,17 @@ OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy( return lb_policy; } +bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance( + LoadBalancingPolicy::Config* old_config, + LoadBalancingPolicy::Config* new_config) const { + return strcmp(old_config->name(), new_config->name()) != 0; +} + +OrphanablePtr<LoadBalancingPolicy> +ChildPolicyHandler::CreateLoadBalancingPolicy( + const char* name, LoadBalancingPolicy::Args args) const { + return LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + name, std::move(args)); +} + } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h index 7a32722d367..d67f3264a87 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h @@ -42,6 +42,18 @@ class ChildPolicyHandler : public LoadBalancingPolicy { void ExitIdleLocked() override; void ResetBackoffLocked() override; + // Returns true if transitioning from the old config to the new config + // requires instantiating a new policy object. + virtual bool ConfigChangeRequiresNewPolicyInstance( + LoadBalancingPolicy::Config* old_config, + LoadBalancingPolicy::Config* new_config) const; + + // Instantiates a new policy of the specified name. + // May be overridden by subclasses to avoid recursion when an LB + // policy factory returns a ChildPolicyHandler. + virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const char* name, LoadBalancingPolicy::Args args) const; + private: class Helper; @@ -55,6 +67,11 @@ class ChildPolicyHandler : public LoadBalancingPolicy { bool shutting_down_ = false; + // The most recent config passed to UpdateLocked(). + // If pending_child_policy_ is non-null, this is the config passed to + // pending_child_policy_; otherwise, it's the config passed to child_policy_. + RefCountedPtr<LoadBalancingPolicy::Config> current_config_; + // Child LB policy. OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 0c390c2a1fe..d5109c150d7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -725,7 +725,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) { } const bool is_initial_update = args_ == nullptr; // Update config. - const char* old_eds_service_name = eds_service_name(); auto old_config = std::move(config_); config_ = std::move(args.config); // Update fallback address list. @@ -773,30 +772,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) { eds_service_name(), eds_service_name()); } } - // Update priority list. - // Note that this comes after updating drop_stats_, since we want that - // to be used by any new picker we create here. - // No need to do this on the initial update, since there won't be any - // priorities to update yet. - if (!is_initial_update) { - const bool update_locality_stats = - config_->lrs_load_reporting_server_name() != - old_config->lrs_load_reporting_server_name() || - strcmp(old_eds_service_name, eds_service_name()) != 0; - UpdatePrioritiesLocked(update_locality_stats); - } - // Update endpoint watcher if needed. - if (is_initial_update || - strcmp(old_eds_service_name, eds_service_name()) != 0) { - if (!is_initial_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] cancelling watch for %s", this, - old_eds_service_name); - } - xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), - endpoint_watcher_, - /*delay_unsubscription=*/true); - } + // On the initial update, create the endpoint watcher. + if (is_initial_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this, eds_service_name()); @@ -806,6 +783,16 @@ void XdsLb::UpdateLocked(UpdateArgs args) { endpoint_watcher_ = watcher.get(); xds_client()->WatchEndpointData(StringView(eds_service_name()), std::move(watcher)); + } else { + // Update priority list. + // Note that this comes after updating drop_stats_, since we want that + // to be used by any new picker we create here. + // No need to do this on the initial update, since there won't be any + // priorities to update yet. + const bool update_locality_stats = + config_->lrs_load_reporting_server_name() != + old_config->lrs_load_reporting_server_name(); + UpdatePrioritiesLocked(update_locality_stats); } } @@ -1000,7 +987,16 @@ OrphanablePtr<XdsLb::LocalityMap::Locality> XdsLb::ExtractLocalityLocked( if (priority == exclude_priority) continue; LocalityMap* locality_map = priorities_[priority].get(); auto locality = locality_map->ExtractLocalityLocked(name); - if (locality != nullptr) return locality; + if (locality != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, + "[xdslb %p] moving locality %p %s to new priority (%" PRIu32 + " -> %" PRIu32 ")", + this, locality.get(), name->AsHumanReadableString(), + exclude_priority, priority); + } + return locality; + } } return nullptr; } @@ -1160,6 +1156,10 @@ XdsLb::LocalityMap::ExtractLocalityLocked( } void XdsLb::LocalityMap::DeactivateLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] deactivating priority %" PRIu32, xds_policy(), + priority_); + } // If already deactivated, don't do it again. if (delayed_removal_timer_callback_pending_) return; MaybeCancelFailoverTimerLocked(); @@ -1184,6 +1184,10 @@ bool XdsLb::LocalityMap::MaybeReactivateLocked() { // Don't reactivate a priority that is not higher than the current one. if (priority_ >= xds_policy_->current_priority_) return false; // Reactivate this priority by cancelling deletion timer. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] reactivating priority %" PRIu32, xds_policy(), + priority_); + } if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); } @@ -1440,6 +1444,10 @@ void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight, // Update locality weight. weight_ = locality_weight; if (delayed_removal_timer_callback_pending_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: reactivating", xds_policy(), + this, name_->AsHumanReadableString()); + } grpc_timer_cancel(&delayed_removal_timer_); } // Update locality stats. @@ -1497,6 +1505,10 @@ void XdsLb::LocalityMap::Locality::Orphan() { void XdsLb::LocalityMap::Locality::DeactivateLocked() { // If already deactivated, don't do that again. if (weight_ == 0) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: deactivating", xds_policy(), + this, name_->AsHumanReadableString()); + } // Set the locality weight to 0 so that future xds picker won't contain this // locality. weight_ = 0; @@ -1574,7 +1586,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - return MakeOrphanable<XdsLb>(std::move(args)); + return MakeOrphanable<XdsChildHandler>(std::move(args), &grpc_lb_xds_trace); } const char* name() const override { return kXds; } @@ -1672,6 +1684,36 @@ class XdsFactory : public LoadBalancingPolicyFactory { return nullptr; } } + + private: + class XdsChildHandler : public ChildPolicyHandler { + public: + XdsChildHandler(Args args, TraceFlag* tracer) + : ChildPolicyHandler(std::move(args), tracer) {} + + bool ConfigChangeRequiresNewPolicyInstance( + LoadBalancingPolicy::Config* old_config, + LoadBalancingPolicy::Config* new_config) const override { + GPR_ASSERT(old_config->name() == kXds); + GPR_ASSERT(new_config->name() == kXds); + XdsConfig* old_xds_config = static_cast<XdsConfig*>(old_config); + XdsConfig* new_xds_config = static_cast<XdsConfig*>(new_config); + const char* old_eds_service_name = + old_xds_config->eds_service_name() == nullptr + ? "" + : old_xds_config->eds_service_name(); + const char* new_eds_service_name = + new_xds_config->eds_service_name() == nullptr + ? "" + : new_xds_config->eds_service_name(); + return strcmp(old_eds_service_name, new_eds_service_name) != 0; + } + + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const char* name, LoadBalancingPolicy::Args args) const override { + return MakeOrphanable<XdsLb>(std::move(args)); + } + }; }; } // namespace diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 28cae1ae351..ee45f85dbaf 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -192,7 +192,7 @@ class XdsApi { struct ClusterLoadReport { XdsClusterDropStats::DroppedRequestsMap dropped_requests; - std::map<XdsLocalityName*, XdsClusterLocalityStats::Snapshot, + std::map<RefCountedPtr<XdsLocalityName>, XdsClusterLocalityStats::Snapshot, XdsLocalityName::Less> locality_stats; grpc_millis load_report_interval; diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 3357e308ad5..631c7f9a504 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -302,7 +302,6 @@ class XdsClient::ChannelState::LrsCallState void Orphan() override; void MaybeStartReportingLocked(); - bool ShouldSendLoadReports(const StringView& cluster_name) const; RetryableCall<LrsCallState>* parent() { return parent_.get(); } ChannelState* chand() const { return parent_->chand(); } @@ -1414,7 +1413,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) { void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { // Construct snapshot from all reported stats. XdsApi::ClusterLoadReportMap snapshot = - xds_client()->BuildLoadReportSnapshot(); + xds_client()->BuildLoadReportSnapshot(parent_->cluster_names_); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. const bool old_val = last_report_counters_were_zero_; @@ -1460,6 +1459,12 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( Reporter* self = static_cast<Reporter*>(arg); grpc_byte_buffer_destroy(self->parent_->send_message_payload_); self->parent_->send_message_payload_ = nullptr; + // If there are no more registered stats to report, cancel the call. + if (self->xds_client()->load_report_map_.empty()) { + self->parent_->chand()->StopLrsCall(); + self->Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters"); + return; + } if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { // If this reporter is no longer the current one on the call, the reason // might be that it was orphaned for a new one due to config update. @@ -1613,13 +1618,6 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); } -bool XdsClient::ChannelState::LrsCallState::ShouldSendLoadReports( - const StringView& cluster_name) const { - // Only send load reports for the clusters that are asked for by the LRS - // server. - return cluster_names_.find(std::string(cluster_name)) != cluster_names_.end(); -} - void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); @@ -1966,19 +1964,14 @@ void XdsClient::RemoveClusterDropStats( LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. - // TODO(roth): In principle, we should try to send a final load report - // containing whatever final stats have been accumulated since the - // last load report. auto it = load_report_state.drop_stats.find(cluster_drop_stats); if (it != load_report_state.drop_stats.end()) { - load_report_state.drop_stats.erase(it); - if (load_report_state.drop_stats.empty() && - load_report_state.locality_stats.empty()) { - load_report_map_.erase(load_report_it); - if (chand_ != nullptr && load_report_map_.empty()) { - chand_->StopLrsCall(); - } + // Record final drop stats in deleted_drop_stats, which will be + // added to the next load report. + for (const auto& p : cluster_drop_stats->GetSnapshotAndReset()) { + load_report_state.deleted_drop_stats[p.first] += p.second; } + load_report_state.drop_stats.erase(it); } } @@ -1999,7 +1992,7 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats( Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server, it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/, locality); - it->second.locality_stats[std::move(locality)].insert( + it->second.locality_stats[std::move(locality)].locality_stats.insert( cluster_locality_stats.get()); chand_->MaybeStartLrsCall(); return cluster_locality_stats; @@ -2015,25 +2008,16 @@ void XdsClient::RemoveClusterLocalityStats( LoadReportState& load_report_state = load_report_it->second; // TODO(roth): When we add support for direct federation, use the // server name specified in lrs_server. - // TODO(roth): In principle, we should try to send a final load report - // containing whatever final stats have been accumulated since the - // last load report. auto locality_it = load_report_state.locality_stats.find(locality); if (locality_it == load_report_state.locality_stats.end()) return; - auto& locality_set = locality_it->second; + auto& locality_set = locality_it->second.locality_stats; auto it = locality_set.find(cluster_locality_stats); if (it != locality_set.end()) { + // Record final snapshot in deleted_locality_stats, which will be + // added to the next load report. + locality_it->second.deleted_locality_stats.emplace_back( + cluster_locality_stats->GetSnapshotAndReset()); locality_set.erase(it); - if (locality_set.empty()) { - load_report_state.locality_stats.erase(locality_it); - if (load_report_state.locality_stats.empty() && - load_report_state.drop_stats.empty()) { - load_report_map_.erase(load_report_it); - if (chand_ != nullptr && load_report_map_.empty()) { - chand_->StopLrsCall(); - } - } - } } } @@ -2062,32 +2046,70 @@ grpc_error* XdsClient::CreateServiceConfig( return error; } -XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot() { +XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot( + const std::set<std::string>& clusters) { XdsApi::ClusterLoadReportMap snapshot_map; - for (auto& p : load_report_map_) { - const auto& cluster_key = p.first; // cluster and EDS service name - LoadReportState& load_report = p.second; - XdsApi::ClusterLoadReport& snapshot = snapshot_map[cluster_key]; + for (auto load_report_it = load_report_map_.begin(); + load_report_it != load_report_map_.end();) { + // Cluster key is cluster and EDS service name. + const auto& cluster_key = load_report_it->first; + LoadReportState& load_report = load_report_it->second; + // If the CDS response for a cluster indicates to use LRS but the + // LRS server does not say that it wants reports for this cluster, + // then we'll have stats objects here whose data we're not going to + // include in the load report. However, we still need to clear out + // the data from the stats objects, so that if the LRS server starts + // asking for the data in the future, we don't incorrectly include + // data from previous reporting intervals in that future report. + const bool record_stats = + clusters.find(cluster_key.first) != clusters.end(); + XdsApi::ClusterLoadReport snapshot; // Aggregate drop stats. + snapshot.dropped_requests = std::move(load_report.deleted_drop_stats); for (auto& drop_stats : load_report.drop_stats) { for (const auto& p : drop_stats->GetSnapshotAndReset()) { snapshot.dropped_requests[p.first] += p.second; } } // Aggregate locality stats. - for (auto& p : load_report.locality_stats) { - XdsLocalityName* locality_name = p.first.get(); - auto& locality_stats_set = p.second; + for (auto it = load_report.locality_stats.begin(); + it != load_report.locality_stats.end();) { + const RefCountedPtr<XdsLocalityName>& locality_name = it->first; + auto& locality_state = it->second; XdsClusterLocalityStats::Snapshot& locality_snapshot = snapshot.locality_stats[locality_name]; - for (auto& locality_stats : locality_stats_set) { + for (auto& locality_stats : locality_state.locality_stats) { locality_snapshot += locality_stats->GetSnapshotAndReset(); } + // Add final snapshots from recently deleted locality stats objects. + for (auto& deleted_locality_stats : + locality_state.deleted_locality_stats) { + locality_snapshot += deleted_locality_stats; + } + locality_state.deleted_locality_stats.clear(); + // If the only thing left in this entry was final snapshots from + // deleted locality stats objects, remove the entry. + if (locality_state.locality_stats.empty()) { + it = load_report.locality_stats.erase(it); + } else { + ++it; + } + } + if (record_stats) { + // Compute load report interval. + const grpc_millis now = ExecCtx::Get()->Now(); + snapshot.load_report_interval = now - load_report.last_report_time; + load_report.last_report_time = now; + // Record snapshot. + snapshot_map[cluster_key] = std::move(snapshot); + } + // If the only thing left in this entry was final snapshots from + // deleted stats objects, remove the entry. + if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) { + load_report_it = load_report_map_.erase(load_report_it); + } else { + ++load_report_it; } - // Compute load report interval. - const grpc_millis now = ExecCtx::Get()->Now(); - snapshot.load_report_interval = now - load_report.last_report_time; - load_report.last_report_time = now; } return snapshot_map; } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 609890dc02e..c6ef4f87b99 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -209,8 +209,14 @@ class XdsClient : public InternallyRefCounted<XdsClient> { }; struct LoadReportState { + struct LocalityState { + std::set<XdsClusterLocalityStats*> locality_stats; + std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats; + }; + std::set<XdsClusterDropStats*> drop_stats; - std::map<RefCountedPtr<XdsLocalityName>, std::set<XdsClusterLocalityStats*>, + XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats; + std::map<RefCountedPtr<XdsLocalityName>, LocalityState, XdsLocalityName::Less> locality_stats; grpc_millis last_report_time = ExecCtx::Get()->Now(); @@ -223,7 +229,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> { const std::string& cluster_name, RefCountedPtr<ServiceConfig>* service_config) const; - XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(); + XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot( + const std::set<std::string>& clusters); // Channel arg vtable functions. static void* ChannelArgCopy(void* p); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index e747e8ceb42..a2c0ee6f382 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -22,7 +22,9 @@ #include <numeric> #include <set> #include <sstream> +#include <string> #include <thread> +#include <vector> #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -296,8 +298,9 @@ class ClientStats { }; // Converts from proto message class. - ClientStats(const ClusterStats& cluster_stats) - : total_dropped_requests_(cluster_stats.total_dropped_requests()) { + explicit ClientStats(const ClusterStats& cluster_stats) + : cluster_name_(cluster_stats.cluster_name()), + total_dropped_requests_(cluster_stats.total_dropped_requests()) { for (const auto& input_locality_stats : cluster_stats.upstream_locality_stats()) { locality_stats_.emplace(input_locality_stats.locality().sub_zone(), @@ -310,6 +313,11 @@ class ClientStats { } } + const std::string& cluster_name() const { return cluster_name_; } + + const std::map<grpc::string, LocalityStats>& locality_stats() const { + return locality_stats_; + } uint64_t total_successful_requests() const { uint64_t sum = 0; for (auto& p : locality_stats_) { @@ -338,7 +346,9 @@ class ClientStats { } return sum; } + uint64_t total_dropped_requests() const { return total_dropped_requests_; } + uint64_t dropped_requests(const grpc::string& category) const { auto iter = dropped_requests_.find(category); GPR_ASSERT(iter != dropped_requests_.end()); @@ -346,6 +356,7 @@ class ClientStats { } private: + std::string cluster_name_; std::map<grpc::string, LocalityStats> locality_stats_; uint64_t total_dropped_requests_; std::map<grpc::string, uint64_t> dropped_requests_; @@ -391,7 +402,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, }; using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>; - using ResponseDelayPair = std::pair<DiscoveryResponse, int>; // A queue of resource type/name pairs that have changed since the client // subscribed to them. @@ -933,60 +943,62 @@ class LrsServiceImpl : public LrsService, explicit LrsServiceImpl(int client_load_reporting_interval_seconds) : client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + client_load_reporting_interval_seconds), + cluster_names_({kDefaultResourceName}) {} Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override { gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this); + GPR_ASSERT(client_load_reporting_interval_seconds_ > 0); // Take a reference of the LrsServiceImpl object, reference will go // out of scope after this method exits. std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this(); - // Read request. + // Read initial request. LoadStatsRequest request; if (stream->Read(&request)) { - if (client_load_reporting_interval_seconds_ > 0) { - IncreaseRequestCount(); - // Send response. - LoadStatsResponse response; - std::string server_name; - auto it = request.node().metadata().fields().find( - "PROXYLESS_CLIENT_HOSTNAME"); - if (it != request.node().metadata().fields().end()) { - server_name = it->second.string_value(); - } - GPR_ASSERT(server_name != ""); - response.add_clusters(server_name); - response.mutable_load_reporting_interval()->set_seconds( - client_load_reporting_interval_seconds_); - stream->Write(response); - IncreaseResponseCount(); - // Wait for report. - request.Clear(); - if (stream->Read(&request)) { - gpr_log(GPR_INFO, "LRS[%p]: received client load report message '%s'", - this, request.DebugString().c_str()); - GPR_ASSERT(request.cluster_stats().size() == 1); - const ClusterStats& cluster_stats = request.cluster_stats()[0]; - // We need to acquire the lock here in order to prevent the notify_one - // below from firing before its corresponding wait is executed. - grpc_core::MutexLock lock(&load_report_mu_); - GPR_ASSERT(client_stats_ == nullptr); - client_stats_.reset(new ClientStats(cluster_stats)); - load_report_ready_ = true; - load_report_cond_.Signal(); + IncreaseRequestCount(); // Only for initial request. + // Verify server name set in metadata. + auto it = + request.node().metadata().fields().find("PROXYLESS_CLIENT_HOSTNAME"); + GPR_ASSERT(it != request.node().metadata().fields().end()); + EXPECT_EQ(it->second.string_value(), kDefaultResourceName); + // Send initial response. + LoadStatsResponse response; + for (const std::string& cluster_name : cluster_names_) { + response.add_clusters(cluster_name); + } + response.mutable_load_reporting_interval()->set_seconds( + client_load_reporting_interval_seconds_); + stream->Write(response); + IncreaseResponseCount(); + // Wait for report. + request.Clear(); + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "LRS[%p]: received client load report message: %s", + this, request.DebugString().c_str()); + std::vector<ClientStats> stats; + for (const auto& cluster_stats : request.cluster_stats()) { + stats.emplace_back(cluster_stats); } + grpc_core::MutexLock lock(&load_report_mu_); + result_queue_.emplace_back(std::move(stats)); + if (load_report_cond_ != nullptr) load_report_cond_->Signal(); } // Wait until notified done. grpc_core::MutexLock lock(&lrs_mu_); - lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done; }); + lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done_; }); } gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this); return Status::OK; } + // Must be called before the LRS call is started. + void set_cluster_names(const std::set<std::string>& cluster_names) { + cluster_names_ = cluster_names; + } + void Start() { - lrs_done = false; - load_report_ready_ = false; - client_stats_.reset(); + lrs_done_ = false; + result_queue_.clear(); } void Shutdown() { @@ -997,12 +1009,18 @@ class LrsServiceImpl : public LrsService, gpr_log(GPR_INFO, "LRS[%p]: shut down", this); } - ClientStats* WaitForLoadReport() { + std::vector<ClientStats> WaitForLoadReport() { grpc_core::MutexLock lock(&load_report_mu_); - load_report_cond_.WaitUntil(&load_report_mu_, - [this] { return load_report_ready_; }); - load_report_ready_ = false; - return client_stats_.get(); + grpc_core::CondVar cv; + if (result_queue_.empty()) { + load_report_cond_ = &cv; + load_report_cond_->WaitUntil(&load_report_mu_, + [this] { return !result_queue_.empty(); }); + load_report_cond_ = nullptr; + } + std::vector<ClientStats> result = std::move(result_queue_.front()); + result_queue_.pop_front(); + return result; } void NotifyDoneWithLrsCall() { @@ -1010,26 +1028,24 @@ class LrsServiceImpl : public LrsService, NotifyDoneWithLrsCallLocked(); } + private: void NotifyDoneWithLrsCallLocked() { - if (!lrs_done) { - lrs_done = true; + if (!lrs_done_) { + lrs_done_ = true; lrs_cv_.Broadcast(); } } - private: const int client_load_reporting_interval_seconds_; + std::set<std::string> cluster_names_; grpc_core::CondVar lrs_cv_; - // Protect lrs_done. - grpc_core::Mutex lrs_mu_; - bool lrs_done = false; + grpc_core::Mutex lrs_mu_; // Protects lrs_done_. + bool lrs_done_ = false; - grpc_core::CondVar load_report_cond_; - // Protect the members below. - grpc_core::Mutex load_report_mu_; - std::unique_ptr<ClientStats> client_stats_; - bool load_report_ready_ = false; + grpc_core::Mutex load_report_mu_; // Protects the members below. + grpc_core::CondVar* load_report_cond_ = nullptr; + std::deque<std::vector<ClientStats>> result_queue_; }; class TestType { @@ -1720,6 +1736,141 @@ TEST_P(XdsResolverOnlyTest, ClusterRemoved) { AdsServiceImpl::ACKED); } +class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest { + public: + XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {} +}; + +// Tests load reporting when switching over from one cluster to another. +TEST_P(XdsResolverLoadReportingOnlyTest, ChangeClusters) { + const char* kNewClusterName = "new_cluster_name"; + balancers_[0]->lrs_service()->set_cluster_names( + {kDefaultResourceName, kNewClusterName}); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // cluster kDefaultResourceName -> locality0 -> backends 0 and 1 + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // cluster kNewClusterName -> locality1 -> backends 2 and 3 + AdsServiceImpl::EdsResourceArgs args2({ + {"locality1", GetBackendPorts(2, 4)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args2, kNewClusterName), + kNewClusterName); + // CDS resource for kNewClusterName. + Cluster new_cluster = balancers_[0]->ads_service()->default_cluster(); + new_cluster.set_name(kNewClusterName); + balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName); + // Wait for all backends to come online. + int num_ok = 0; + int num_failure = 0; + int num_drops = 0; + std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(0, 2); + // The load report received at the balancer should be correct. + std::vector<ClientStats> load_report = + balancers_[0]->lrs_service()->WaitForLoadReport(); + EXPECT_THAT( + load_report, + ::testing::ElementsAre(::testing::AllOf( + ::testing::Property(&ClientStats::cluster_name, kDefaultResourceName), + ::testing::Property( + &ClientStats::locality_stats, + ::testing::ElementsAre(::testing::Pair( + "locality0", + ::testing::AllOf( + ::testing::Field(&ClientStats::LocalityStats:: + total_successful_requests, + num_ok), + ::testing::Field(&ClientStats::LocalityStats:: + total_requests_in_progress, + 0UL), + ::testing::Field( + &ClientStats::LocalityStats::total_error_requests, + num_failure), + ::testing::Field( + &ClientStats::LocalityStats::total_issued_requests, + num_failure + num_ok))))), + ::testing::Property(&ClientStats::total_dropped_requests, + num_drops)))); + // Change RDS resource to point to new cluster. + RouteConfiguration new_route_config = + balancers_[0]->ads_service()->default_route_config(); + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + Listener listener = + balancers_[0]->ads_service()->BuildListener(new_route_config); + balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); + // Wait for all new backends to be used. + std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(2, 4); + // The load report received at the balancer should be correct. + load_report = balancers_[0]->lrs_service()->WaitForLoadReport(); + EXPECT_THAT( + load_report, + ::testing::ElementsAre( + ::testing::AllOf( + ::testing::Property(&ClientStats::cluster_name, + kDefaultResourceName), + ::testing::Property( + &ClientStats::locality_stats, + ::testing::ElementsAre(::testing::Pair( + "locality0", + ::testing::AllOf( + ::testing::Field(&ClientStats::LocalityStats:: + total_successful_requests, + ::testing::Lt(num_ok)), + ::testing::Field(&ClientStats::LocalityStats:: + total_requests_in_progress, + 0UL), + ::testing::Field( + &ClientStats::LocalityStats::total_error_requests, + ::testing::Le(num_failure)), + ::testing::Field( + &ClientStats::LocalityStats:: + total_issued_requests, + ::testing::Le(num_failure + num_ok)))))), + ::testing::Property(&ClientStats::total_dropped_requests, + num_drops)), + ::testing::AllOf( + ::testing::Property(&ClientStats::cluster_name, kNewClusterName), + ::testing::Property( + &ClientStats::locality_stats, + ::testing::ElementsAre(::testing::Pair( + "locality1", + ::testing::AllOf( + ::testing::Field(&ClientStats::LocalityStats:: + total_successful_requests, + ::testing::Le(num_ok)), + ::testing::Field(&ClientStats::LocalityStats:: + total_requests_in_progress, + 0UL), + ::testing::Field( + &ClientStats::LocalityStats::total_error_requests, + ::testing::Le(num_failure)), + ::testing::Field( + &ClientStats::LocalityStats:: + total_issued_requests, + ::testing::Le(num_failure + num_ok)))))), + ::testing::Property(&ClientStats::total_dropped_requests, + num_drops)))); + int total_ok = 0; + int total_failure = 0; + for (const ClientStats& client_stats : load_report) { + total_ok += client_stats.total_successful_requests(); + total_failure += client_stats.total_error_requests(); + } + EXPECT_EQ(total_ok, num_ok); + EXPECT_EQ(total_failure, num_failure); + // The LRS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count()); +} + using SecureNamingTest = BasicTest; // Tests that secure naming check passes if target name is expected. @@ -3227,14 +3378,50 @@ TEST_P(ClientLoadReportingTest, Vanilla) { EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count()); EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count()); // The load report received at the balancer should be correct. - ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); + std::vector<ClientStats> load_report = + balancers_[0]->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats& client_stats = load_report.front(); EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, - client_stats->total_successful_requests()); - EXPECT_EQ(0U, client_stats->total_requests_in_progress()); + client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, - client_stats->total_issued_requests()); - EXPECT_EQ(0U, client_stats->total_error_requests()); - EXPECT_EQ(0U, client_stats->total_dropped_requests()); + client_stats.total_issued_requests()); + EXPECT_EQ(0U, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); +} + +// Tests that we don't include stats for clusters that are not requested +// by the LRS server. +TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) { + balancers_[0]->lrs_service()->set_cluster_names({"bogus"}); + SetNextResolution({}); + SetNextResolutionForLbChannel({balancers_[0]->port()}); + const size_t kNumRpcsPerAddress = 100; + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // Wait until all backends are ready. + int num_ok = 0; + int num_failure = 0; + int num_drops = 0; + std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backends_[i]->backend_service()->request_count()); + } + // The LRS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count()); + // The load report received at the balancer should be correct. + std::vector<ClientStats> load_report = + balancers_[0]->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 0UL); } // Tests that if the balancer restarts, the client load report contains the @@ -3257,12 +3444,15 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(/* start_index */ 0, /* stop_index */ kNumBackendsFirstPass); - ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); + std::vector<ClientStats> load_report = + balancers_[0]->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats client_stats = std::move(load_report.front()); EXPECT_EQ(static_cast<size_t>(num_ok), - client_stats->total_successful_requests()); - EXPECT_EQ(0U, client_stats->total_requests_in_progress()); - EXPECT_EQ(0U, client_stats->total_error_requests()); - EXPECT_EQ(0U, client_stats->total_dropped_requests()); + client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); + EXPECT_EQ(0U, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); // Shut down the balancer. balancers_[0]->Shutdown(); // We should continue using the last EDS response we received from the @@ -3294,11 +3484,13 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { CheckRpcSendOk(kNumBackendsSecondPass); num_started += kNumBackendsSecondPass; // Check client stats. - client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); - EXPECT_EQ(num_started, client_stats->total_successful_requests()); - EXPECT_EQ(0U, client_stats->total_requests_in_progress()); - EXPECT_EQ(0U, client_stats->total_error_requests()); - EXPECT_EQ(0U, client_stats->total_dropped_requests()); + load_report = balancers_[0]->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + client_stats = std::move(load_report.front()); + EXPECT_EQ(num_started, client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); + EXPECT_EQ(0U, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); } class ClientLoadReportingWithDropTest : public XdsEnd2endTest { @@ -3352,15 +3544,18 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) { ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)), ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance)))); // Check client stats. - ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); - EXPECT_EQ(num_drops, client_stats->total_dropped_requests()); + std::vector<ClientStats> load_report = + balancers_[0]->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats& client_stats = load_report.front(); + EXPECT_EQ(num_drops, client_stats.total_dropped_requests()); const size_t total_rpc = num_warmup + kNumRpcs; EXPECT_THAT( - client_stats->dropped_requests(kLbDropType), + client_stats.dropped_requests(kLbDropType), ::testing::AllOf( ::testing::Ge(total_rpc * kDropRateForLb * (1 - kErrorTolerance)), ::testing::Le(total_rpc * kDropRateForLb * (1 + kErrorTolerance)))); - EXPECT_THAT(client_stats->dropped_requests(kThrottleDropType), + EXPECT_THAT(client_stats.dropped_requests(kThrottleDropType), ::testing::AllOf( ::testing::Ge(total_rpc * (1 - kDropRateForLb) * kDropRateForThrottle * (1 - kErrorTolerance)), @@ -3417,6 +3612,11 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest, TestType(true, true)), &TestTypeName); +// XdsResolverLoadReprtingOnlyTest depends on XdsResolver and load reporting. +INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverLoadReportingOnlyTest, + ::testing::Values(TestType(true, true)), + &TestTypeName); + INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, ::testing::Values(TestType(false, true), TestType(false, false),