Gracefully switch xds policy instances when cluster name changes, and other LRS-related fixes.

pull/22371/head
Mark D. Roth 5 years ago
parent dd165a3d79
commit 5927f9b625
  1. 51
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  2. 17
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
  3. 96
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  4. 2
      src/core/ext/filters/client_channel/xds/xds_api.h
  5. 116
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 11
      src/core/ext/filters/client_channel/xds/xds_client.h
  7. 350
      test/cpp/end2end/xds_end2end_test.cc

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include <cstring>
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "absl/strings/str_cat.h"
@ -138,8 +140,6 @@ void ChildPolicyHandler::ShutdownLocked() {
}
void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// The name of the policy that this update wants us to use.
const char* child_policy_name = args.config->name();
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -166,10 +166,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// a. If going from the current config to the new config does not
// require a new policy, then we update the existing child policy.
// b. If going from the current config to the new config does require a
// new policy, we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
@ -180,10 +180,11 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// a. If going from the current config to the new config does not
// require a new policy, then we update the existing pending
// child policy.
// b. If going from the current config to the new config does require a
// new child policy, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
@ -191,12 +192,10 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
// cases 2b and 3b
ConfigChangeRequiresNewPolicyInstance(current_config_.get(),
args.config.get());
current_config_ = args.config;
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
@ -205,11 +204,11 @@ void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
child_policy_ == nullptr ? "" : "pending ", args.config->name());
}
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicy(child_policy_name, *args.args);
lb_policy = CreateChildPolicy(args.config->name(), *args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
@ -257,8 +256,7 @@ OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
std::unique_ptr<ChannelControlHelper>(helper);
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
child_policy_name, std::move(lb_policy_args));
CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", child_policy_name);
return nullptr;
@ -277,4 +275,17 @@ OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
return lb_policy;
}
bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
LoadBalancingPolicy::Config* new_config) const {
return strcmp(old_config->name(), new_config->name()) != 0;
}
OrphanablePtr<LoadBalancingPolicy>
ChildPolicyHandler::CreateLoadBalancingPolicy(
const char* name, LoadBalancingPolicy::Args args) const {
return LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(args));
}
} // namespace grpc_core

@ -42,6 +42,18 @@ class ChildPolicyHandler : public LoadBalancingPolicy {
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
// Returns true if transitioning from the old config to the new config
// requires instantiating a new policy object.
virtual bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
LoadBalancingPolicy::Config* new_config) const;
// Instantiates a new policy of the specified name.
// May be overridden by subclasses to avoid recursion when an LB
// policy factory returns a ChildPolicyHandler.
virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, LoadBalancingPolicy::Args args) const;
private:
class Helper;
@ -55,6 +67,11 @@ class ChildPolicyHandler : public LoadBalancingPolicy {
bool shutting_down_ = false;
// The most recent config passed to UpdateLocked().
// If pending_child_policy_ is non-null, this is the config passed to
// pending_child_policy_; otherwise, it's the config passed to child_policy_.
RefCountedPtr<LoadBalancingPolicy::Config> current_config_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;

@ -725,7 +725,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
}
const bool is_initial_update = args_ == nullptr;
// Update config.
const char* old_eds_service_name = eds_service_name();
auto old_config = std::move(config_);
config_ = std::move(args.config);
// Update fallback address list.
@ -773,30 +772,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
eds_service_name(), eds_service_name());
}
}
// Update priority list.
// Note that this comes after updating drop_stats_, since we want that
// to be used by any new picker we create here.
// No need to do this on the initial update, since there won't be any
// priorities to update yet.
if (!is_initial_update) {
const bool update_locality_stats =
config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name() ||
strcmp(old_eds_service_name, eds_service_name()) != 0;
UpdatePrioritiesLocked(update_locality_stats);
}
// Update endpoint watcher if needed.
if (is_initial_update ||
strcmp(old_eds_service_name, eds_service_name()) != 0) {
if (!is_initial_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] cancelling watch for %s", this,
old_eds_service_name);
}
xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name),
endpoint_watcher_,
/*delay_unsubscription=*/true);
}
// On the initial update, create the endpoint watcher.
if (is_initial_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this,
eds_service_name());
@ -806,6 +783,16 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
endpoint_watcher_ = watcher.get();
xds_client()->WatchEndpointData(StringView(eds_service_name()),
std::move(watcher));
} else {
// Update priority list.
// Note that this comes after updating drop_stats_, since we want that
// to be used by any new picker we create here.
// No need to do this on the initial update, since there won't be any
// priorities to update yet.
const bool update_locality_stats =
config_->lrs_load_reporting_server_name() !=
old_config->lrs_load_reporting_server_name();
UpdatePrioritiesLocked(update_locality_stats);
}
}
@ -1000,7 +987,16 @@ OrphanablePtr<XdsLb::LocalityMap::Locality> XdsLb::ExtractLocalityLocked(
if (priority == exclude_priority) continue;
LocalityMap* locality_map = priorities_[priority].get();
auto locality = locality_map->ExtractLocalityLocked(name);
if (locality != nullptr) return locality;
if (locality != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] moving locality %p %s to new priority (%" PRIu32
" -> %" PRIu32 ")",
this, locality.get(), name->AsHumanReadableString(),
exclude_priority, priority);
}
return locality;
}
}
return nullptr;
}
@ -1160,6 +1156,10 @@ XdsLb::LocalityMap::ExtractLocalityLocked(
}
void XdsLb::LocalityMap::DeactivateLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] deactivating priority %" PRIu32, xds_policy(),
priority_);
}
// If already deactivated, don't do it again.
if (delayed_removal_timer_callback_pending_) return;
MaybeCancelFailoverTimerLocked();
@ -1184,6 +1184,10 @@ bool XdsLb::LocalityMap::MaybeReactivateLocked() {
// Don't reactivate a priority that is not higher than the current one.
if (priority_ >= xds_policy_->current_priority_) return false;
// Reactivate this priority by cancelling deletion timer.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] reactivating priority %" PRIu32, xds_policy(),
priority_);
}
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
}
@ -1440,6 +1444,10 @@ void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight,
// Update locality weight.
weight_ = locality_weight;
if (delayed_removal_timer_callback_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: reactivating", xds_policy(),
this, name_->AsHumanReadableString());
}
grpc_timer_cancel(&delayed_removal_timer_);
}
// Update locality stats.
@ -1497,6 +1505,10 @@ void XdsLb::LocalityMap::Locality::Orphan() {
void XdsLb::LocalityMap::Locality::DeactivateLocked() {
// If already deactivated, don't do that again.
if (weight_ == 0) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: deactivating", xds_policy(),
this, name_->AsHumanReadableString());
}
// Set the locality weight to 0 so that future xds picker won't contain this
// locality.
weight_ = 0;
@ -1574,7 +1586,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsLb>(std::move(args));
return MakeOrphanable<XdsChildHandler>(std::move(args), &grpc_lb_xds_trace);
}
const char* name() const override { return kXds; }
@ -1672,6 +1684,36 @@ class XdsFactory : public LoadBalancingPolicyFactory {
return nullptr;
}
}
private:
class XdsChildHandler : public ChildPolicyHandler {
public:
XdsChildHandler(Args args, TraceFlag* tracer)
: ChildPolicyHandler(std::move(args), tracer) {}
bool ConfigChangeRequiresNewPolicyInstance(
LoadBalancingPolicy::Config* old_config,
LoadBalancingPolicy::Config* new_config) const override {
GPR_ASSERT(old_config->name() == kXds);
GPR_ASSERT(new_config->name() == kXds);
XdsConfig* old_xds_config = static_cast<XdsConfig*>(old_config);
XdsConfig* new_xds_config = static_cast<XdsConfig*>(new_config);
const char* old_eds_service_name =
old_xds_config->eds_service_name() == nullptr
? ""
: old_xds_config->eds_service_name();
const char* new_eds_service_name =
new_xds_config->eds_service_name() == nullptr
? ""
: new_xds_config->eds_service_name();
return strcmp(old_eds_service_name, new_eds_service_name) != 0;
}
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsLb>(std::move(args));
}
};
};
} // namespace

@ -192,7 +192,7 @@ class XdsApi {
struct ClusterLoadReport {
XdsClusterDropStats::DroppedRequestsMap dropped_requests;
std::map<XdsLocalityName*, XdsClusterLocalityStats::Snapshot,
std::map<RefCountedPtr<XdsLocalityName>, XdsClusterLocalityStats::Snapshot,
XdsLocalityName::Less>
locality_stats;
grpc_millis load_report_interval;

@ -302,7 +302,6 @@ class XdsClient::ChannelState::LrsCallState
void Orphan() override;
void MaybeStartReportingLocked();
bool ShouldSendLoadReports(const StringView& cluster_name) const;
RetryableCall<LrsCallState>* parent() { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
@ -1414,7 +1413,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshot();
xds_client()->BuildLoadReportSnapshot(parent_->cluster_names_);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
@ -1460,6 +1459,12 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
Reporter* self = static_cast<Reporter*>(arg);
grpc_byte_buffer_destroy(self->parent_->send_message_payload_);
self->parent_->send_message_payload_ = nullptr;
// If there are no more registered stats to report, cancel the call.
if (self->xds_client()->load_report_map_.empty()) {
self->parent_->chand()->StopLrsCall();
self->Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters");
return;
}
if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
// If this reporter is no longer the current one on the call, the reason
// might be that it was orphaned for a new one due to config update.
@ -1613,13 +1618,6 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
bool XdsClient::ChannelState::LrsCallState::ShouldSendLoadReports(
const StringView& cluster_name) const {
// Only send load reports for the clusters that are asked for by the LRS
// server.
return cluster_names_.find(std::string(cluster_name)) != cluster_names_.end();
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
@ -1966,19 +1964,14 @@ void XdsClient::RemoveClusterDropStats(
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
// containing whatever final stats have been accumulated since the
// last load report.
auto it = load_report_state.drop_stats.find(cluster_drop_stats);
if (it != load_report_state.drop_stats.end()) {
load_report_state.drop_stats.erase(it);
if (load_report_state.drop_stats.empty() &&
load_report_state.locality_stats.empty()) {
load_report_map_.erase(load_report_it);
if (chand_ != nullptr && load_report_map_.empty()) {
chand_->StopLrsCall();
}
// Record final drop stats in deleted_drop_stats, which will be
// added to the next load report.
for (const auto& p : cluster_drop_stats->GetSnapshotAndReset()) {
load_report_state.deleted_drop_stats[p.first] += p.second;
}
load_report_state.drop_stats.erase(it);
}
}
@ -1999,7 +1992,7 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
locality);
it->second.locality_stats[std::move(locality)].insert(
it->second.locality_stats[std::move(locality)].locality_stats.insert(
cluster_locality_stats.get());
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
@ -2015,25 +2008,16 @@ void XdsClient::RemoveClusterLocalityStats(
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
// containing whatever final stats have been accumulated since the
// last load report.
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
auto& locality_set = locality_it->second;
auto& locality_set = locality_it->second.locality_stats;
auto it = locality_set.find(cluster_locality_stats);
if (it != locality_set.end()) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_it->second.deleted_locality_stats.emplace_back(
cluster_locality_stats->GetSnapshotAndReset());
locality_set.erase(it);
if (locality_set.empty()) {
load_report_state.locality_stats.erase(locality_it);
if (load_report_state.locality_stats.empty() &&
load_report_state.drop_stats.empty()) {
load_report_map_.erase(load_report_it);
if (chand_ != nullptr && load_report_map_.empty()) {
chand_->StopLrsCall();
}
}
}
}
}
@ -2062,32 +2046,70 @@ grpc_error* XdsClient::CreateServiceConfig(
return error;
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot() {
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
const std::set<std::string>& clusters) {
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto& p : load_report_map_) {
const auto& cluster_key = p.first; // cluster and EDS service name
LoadReportState& load_report = p.second;
XdsApi::ClusterLoadReport& snapshot = snapshot_map[cluster_key];
for (auto load_report_it = load_report_map_.begin();
load_report_it != load_report_map_.end();) {
// Cluster key is cluster and EDS service name.
const auto& cluster_key = load_report_it->first;
LoadReportState& load_report = load_report_it->second;
// If the CDS response for a cluster indicates to use LRS but the
// LRS server does not say that it wants reports for this cluster,
// then we'll have stats objects here whose data we're not going to
// include in the load report. However, we still need to clear out
// the data from the stats objects, so that if the LRS server starts
// asking for the data in the future, we don't incorrectly include
// data from previous reporting intervals in that future report.
const bool record_stats =
clusters.find(cluster_key.first) != clusters.end();
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
for (auto& drop_stats : load_report.drop_stats) {
for (const auto& p : drop_stats->GetSnapshotAndReset()) {
snapshot.dropped_requests[p.first] += p.second;
}
}
// Aggregate locality stats.
for (auto& p : load_report.locality_stats) {
XdsLocalityName* locality_name = p.first.get();
auto& locality_stats_set = p.second;
for (auto it = load_report.locality_stats.begin();
it != load_report.locality_stats.end();) {
const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
for (auto& locality_stats : locality_stats_set) {
for (auto& locality_stats : locality_state.locality_stats) {
locality_snapshot += locality_stats->GetSnapshotAndReset();
}
// Add final snapshots from recently deleted locality stats objects.
for (auto& deleted_locality_stats :
locality_state.deleted_locality_stats) {
locality_snapshot += deleted_locality_stats;
}
locality_state.deleted_locality_stats.clear();
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats.empty()) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
if (record_stats) {
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
load_report_it = load_report_map_.erase(load_report_it);
} else {
++load_report_it;
}
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
}
return snapshot_map;
}

@ -209,8 +209,14 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
};
struct LoadReportState {
struct LocalityState {
std::set<XdsClusterLocalityStats*> locality_stats;
std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
};
std::set<XdsClusterDropStats*> drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, std::set<XdsClusterLocalityStats*>,
XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>
locality_stats;
grpc_millis last_report_time = ExecCtx::Get()->Now();
@ -223,7 +229,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const std::string& cluster_name,
RefCountedPtr<ServiceConfig>* service_config) const;
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot();
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
const std::set<std::string>& clusters);
// Channel arg vtable functions.
static void* ChannelArgCopy(void* p);

@ -22,7 +22,9 @@
#include <numeric>
#include <set>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -296,8 +298,9 @@ class ClientStats {
};
// Converts from proto message class.
ClientStats(const ClusterStats& cluster_stats)
: total_dropped_requests_(cluster_stats.total_dropped_requests()) {
explicit ClientStats(const ClusterStats& cluster_stats)
: cluster_name_(cluster_stats.cluster_name()),
total_dropped_requests_(cluster_stats.total_dropped_requests()) {
for (const auto& input_locality_stats :
cluster_stats.upstream_locality_stats()) {
locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
@ -310,6 +313,11 @@ class ClientStats {
}
}
const std::string& cluster_name() const { return cluster_name_; }
const std::map<grpc::string, LocalityStats>& locality_stats() const {
return locality_stats_;
}
uint64_t total_successful_requests() const {
uint64_t sum = 0;
for (auto& p : locality_stats_) {
@ -338,7 +346,9 @@ class ClientStats {
}
return sum;
}
uint64_t total_dropped_requests() const { return total_dropped_requests_; }
uint64_t dropped_requests(const grpc::string& category) const {
auto iter = dropped_requests_.find(category);
GPR_ASSERT(iter != dropped_requests_.end());
@ -346,6 +356,7 @@ class ClientStats {
}
private:
std::string cluster_name_;
std::map<grpc::string, LocalityStats> locality_stats_;
uint64_t total_dropped_requests_;
std::map<grpc::string, uint64_t> dropped_requests_;
@ -391,7 +402,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
};
using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
using ResponseDelayPair = std::pair<DiscoveryResponse, int>;
// A queue of resource type/name pairs that have changed since the client
// subscribed to them.
@ -933,60 +943,62 @@ class LrsServiceImpl : public LrsService,
explicit LrsServiceImpl(int client_load_reporting_interval_seconds)
: client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds) {}
client_load_reporting_interval_seconds),
cluster_names_({kDefaultResourceName}) {}
Status StreamLoadStats(ServerContext* /*context*/, Stream* stream) override {
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
GPR_ASSERT(client_load_reporting_interval_seconds_ > 0);
// Take a reference of the LrsServiceImpl object, reference will go
// out of scope after this method exits.
std::shared_ptr<LrsServiceImpl> lrs_service_impl = shared_from_this();
// Read request.
// Read initial request.
LoadStatsRequest request;
if (stream->Read(&request)) {
if (client_load_reporting_interval_seconds_ > 0) {
IncreaseRequestCount();
// Send response.
LoadStatsResponse response;
std::string server_name;
auto it = request.node().metadata().fields().find(
"PROXYLESS_CLIENT_HOSTNAME");
if (it != request.node().metadata().fields().end()) {
server_name = it->second.string_value();
}
GPR_ASSERT(server_name != "");
response.add_clusters(server_name);
response.mutable_load_reporting_interval()->set_seconds(
client_load_reporting_interval_seconds_);
stream->Write(response);
IncreaseResponseCount();
// Wait for report.
request.Clear();
if (stream->Read(&request)) {
gpr_log(GPR_INFO, "LRS[%p]: received client load report message '%s'",
this, request.DebugString().c_str());
GPR_ASSERT(request.cluster_stats().size() == 1);
const ClusterStats& cluster_stats = request.cluster_stats()[0];
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
grpc_core::MutexLock lock(&load_report_mu_);
GPR_ASSERT(client_stats_ == nullptr);
client_stats_.reset(new ClientStats(cluster_stats));
load_report_ready_ = true;
load_report_cond_.Signal();
IncreaseRequestCount(); // Only for initial request.
// Verify server name set in metadata.
auto it =
request.node().metadata().fields().find("PROXYLESS_CLIENT_HOSTNAME");
GPR_ASSERT(it != request.node().metadata().fields().end());
EXPECT_EQ(it->second.string_value(), kDefaultResourceName);
// Send initial response.
LoadStatsResponse response;
for (const std::string& cluster_name : cluster_names_) {
response.add_clusters(cluster_name);
}
response.mutable_load_reporting_interval()->set_seconds(
client_load_reporting_interval_seconds_);
stream->Write(response);
IncreaseResponseCount();
// Wait for report.
request.Clear();
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "LRS[%p]: received client load report message: %s",
this, request.DebugString().c_str());
std::vector<ClientStats> stats;
for (const auto& cluster_stats : request.cluster_stats()) {
stats.emplace_back(cluster_stats);
}
grpc_core::MutexLock lock(&load_report_mu_);
result_queue_.emplace_back(std::move(stats));
if (load_report_cond_ != nullptr) load_report_cond_->Signal();
}
// Wait until notified done.
grpc_core::MutexLock lock(&lrs_mu_);
lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done; });
lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done_; });
}
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
return Status::OK;
}
// Must be called before the LRS call is started.
void set_cluster_names(const std::set<std::string>& cluster_names) {
cluster_names_ = cluster_names;
}
void Start() {
lrs_done = false;
load_report_ready_ = false;
client_stats_.reset();
lrs_done_ = false;
result_queue_.clear();
}
void Shutdown() {
@ -997,12 +1009,18 @@ class LrsServiceImpl : public LrsService,
gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
}
ClientStats* WaitForLoadReport() {
std::vector<ClientStats> WaitForLoadReport() {
grpc_core::MutexLock lock(&load_report_mu_);
load_report_cond_.WaitUntil(&load_report_mu_,
[this] { return load_report_ready_; });
load_report_ready_ = false;
return client_stats_.get();
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
load_report_cond_->WaitUntil(&load_report_mu_,
[this] { return !result_queue_.empty(); });
load_report_cond_ = nullptr;
}
std::vector<ClientStats> result = std::move(result_queue_.front());
result_queue_.pop_front();
return result;
}
void NotifyDoneWithLrsCall() {
@ -1010,26 +1028,24 @@ class LrsServiceImpl : public LrsService,
NotifyDoneWithLrsCallLocked();
}
private:
void NotifyDoneWithLrsCallLocked() {
if (!lrs_done) {
lrs_done = true;
if (!lrs_done_) {
lrs_done_ = true;
lrs_cv_.Broadcast();
}
}
private:
const int client_load_reporting_interval_seconds_;
std::set<std::string> cluster_names_;
grpc_core::CondVar lrs_cv_;
// Protect lrs_done.
grpc_core::Mutex lrs_mu_;
bool lrs_done = false;
grpc_core::Mutex lrs_mu_; // Protects lrs_done_.
bool lrs_done_ = false;
grpc_core::CondVar load_report_cond_;
// Protect the members below.
grpc_core::Mutex load_report_mu_;
std::unique_ptr<ClientStats> client_stats_;
bool load_report_ready_ = false;
grpc_core::Mutex load_report_mu_; // Protects the members below.
grpc_core::CondVar* load_report_cond_ = nullptr;
std::deque<std::vector<ClientStats>> result_queue_;
};
class TestType {
@ -1720,6 +1736,141 @@ TEST_P(XdsResolverOnlyTest, ClusterRemoved) {
AdsServiceImpl::ACKED);
}
class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest {
public:
XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {}
};
// Tests load reporting when switching over from one cluster to another.
TEST_P(XdsResolverLoadReportingOnlyTest, ChangeClusters) {
const char* kNewClusterName = "new_cluster_name";
balancers_[0]->lrs_service()->set_cluster_names(
{kDefaultResourceName, kNewClusterName});
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// cluster kDefaultResourceName -> locality0 -> backends 0 and 1
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 2)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
// cluster kNewClusterName -> locality1 -> backends 2 and 3
AdsServiceImpl::EdsResourceArgs args2({
{"locality1", GetBackendPorts(2, 4)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewClusterName),
kNewClusterName);
// CDS resource for kNewClusterName.
Cluster new_cluster = balancers_[0]->ads_service()->default_cluster();
new_cluster.set_name(kNewClusterName);
balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName);
// Wait for all backends to come online.
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(0, 2);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_THAT(
load_report,
::testing::ElementsAre(::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kDefaultResourceName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
num_ok),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
num_failure),
::testing::Field(
&ClientStats::LocalityStats::total_issued_requests,
num_failure + num_ok))))),
::testing::Property(&ClientStats::total_dropped_requests,
num_drops))));
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
new_route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->set_cluster(kNewClusterName);
Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
// Wait for all new backends to be used.
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(2, 4);
// The load report received at the balancer should be correct.
load_report = balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_THAT(
load_report,
::testing::ElementsAre(
::testing::AllOf(
::testing::Property(&ClientStats::cluster_name,
kDefaultResourceName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
::testing::Lt(num_ok)),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
::testing::Le(num_failure)),
::testing::Field(
&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_failure + num_ok)))))),
::testing::Property(&ClientStats::total_dropped_requests,
num_drops)),
::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kNewClusterName),
::testing::Property(
&ClientStats::locality_stats,
::testing::ElementsAre(::testing::Pair(
"locality1",
::testing::AllOf(
::testing::Field(&ClientStats::LocalityStats::
total_successful_requests,
::testing::Le(num_ok)),
::testing::Field(&ClientStats::LocalityStats::
total_requests_in_progress,
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_error_requests,
::testing::Le(num_failure)),
::testing::Field(
&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_failure + num_ok)))))),
::testing::Property(&ClientStats::total_dropped_requests,
num_drops))));
int total_ok = 0;
int total_failure = 0;
for (const ClientStats& client_stats : load_report) {
total_ok += client_stats.total_successful_requests();
total_failure += client_stats.total_error_requests();
}
EXPECT_EQ(total_ok, num_ok);
EXPECT_EQ(total_failure, num_failure);
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count());
}
using SecureNamingTest = BasicTest;
// Tests that secure naming check passes if target name is expected.
@ -3227,14 +3378,50 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count());
// The load report received at the balancer should be correct.
ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
std::vector<ClientStats> load_report =
balancers_[0]->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
client_stats->total_successful_requests());
EXPECT_EQ(0U, client_stats->total_requests_in_progress());
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok,
client_stats->total_issued_requests());
EXPECT_EQ(0U, client_stats->total_error_requests());
EXPECT_EQ(0U, client_stats->total_dropped_requests());
client_stats.total_issued_requests());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
}
// Tests that we don't include stats for clusters that are not requested
// by the LRS server.
TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
balancers_[0]->lrs_service()->set_cluster_names({"bogus"});
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
// Wait until all backends are ready.
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
backends_[i]->backend_service()->request_count());
}
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->lrs_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->lrs_service()->response_count());
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancers_[0]->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 0UL);
}
// Tests that if the balancer restarts, the client load report contains the
@ -3257,12 +3444,15 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
std::tie(num_ok, num_failure, num_drops) =
WaitForAllBackends(/* start_index */ 0,
/* stop_index */ kNumBackendsFirstPass);
ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
std::vector<ClientStats> load_report =
balancers_[0]->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats client_stats = std::move(load_report.front());
EXPECT_EQ(static_cast<size_t>(num_ok),
client_stats->total_successful_requests());
EXPECT_EQ(0U, client_stats->total_requests_in_progress());
EXPECT_EQ(0U, client_stats->total_error_requests());
EXPECT_EQ(0U, client_stats->total_dropped_requests());
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
// Shut down the balancer.
balancers_[0]->Shutdown();
// We should continue using the last EDS response we received from the
@ -3294,11 +3484,13 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
CheckRpcSendOk(kNumBackendsSecondPass);
num_started += kNumBackendsSecondPass;
// Check client stats.
client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_EQ(num_started, client_stats->total_successful_requests());
EXPECT_EQ(0U, client_stats->total_requests_in_progress());
EXPECT_EQ(0U, client_stats->total_error_requests());
EXPECT_EQ(0U, client_stats->total_dropped_requests());
load_report = balancers_[0]->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
client_stats = std::move(load_report.front());
EXPECT_EQ(num_started, client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
}
class ClientLoadReportingWithDropTest : public XdsEnd2endTest {
@ -3352,15 +3544,18 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
// Check client stats.
ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_EQ(num_drops, client_stats->total_dropped_requests());
std::vector<ClientStats> load_report =
balancers_[0]->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
const size_t total_rpc = num_warmup + kNumRpcs;
EXPECT_THAT(
client_stats->dropped_requests(kLbDropType),
client_stats.dropped_requests(kLbDropType),
::testing::AllOf(
::testing::Ge(total_rpc * kDropRateForLb * (1 - kErrorTolerance)),
::testing::Le(total_rpc * kDropRateForLb * (1 + kErrorTolerance))));
EXPECT_THAT(client_stats->dropped_requests(kThrottleDropType),
EXPECT_THAT(client_stats.dropped_requests(kThrottleDropType),
::testing::AllOf(
::testing::Ge(total_rpc * (1 - kDropRateForLb) *
kDropRateForThrottle * (1 - kErrorTolerance)),
@ -3417,6 +3612,11 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest,
TestType(true, true)),
&TestTypeName);
// XdsResolverLoadReprtingOnlyTest depends on XdsResolver and load reporting.
INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverLoadReportingOnlyTest,
::testing::Values(TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
::testing::Values(TestType(false, true),
TestType(false, false),

Loading…
Cancel
Save