Use a single instance of the drop and locality stats objects.

pull/24395/head
Mark D. Roth 4 years ago
parent cdd9e3ce59
commit a6954fe39e
  1. 2
      src/core/ext/filters/client_channel/lb_policy/xds/eds_drop.cc
  2. 146
      src/core/ext/xds/xds_client.cc
  3. 12
      src/core/ext/xds/xds_client.h
  4. 41
      src/core/ext/xds/xds_client_stats.cc
  5. 8
      test/cpp/end2end/xds_end2end_test.cc

@ -207,7 +207,7 @@ EdsDropLb::EdsDropLb(RefCountedPtr<XdsClient> xds_client, Args args)
EdsDropLb::~EdsDropLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_eds_drop_lb_trace)) {
gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying xds LB policy", this);
gpr_log(GPR_INFO, "[eds_drop_lb %p] destroying eds_drop LB policy", this);
}
}

@ -336,7 +336,7 @@ class XdsClient::ChannelState::LrsCallState
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
bool OnNextReportTimerLocked(grpc_error* error);
void SendReportLocked();
bool SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
bool OnReportDoneLocked(grpc_error* error);
@ -1287,8 +1287,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
GRPC_ERROR_UNREF(error);
return true;
}
SendReportLocked();
return false;
return SendReportLocked();
}
namespace {
@ -1307,7 +1306,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
} // namespace
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
@ -1317,8 +1316,12 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
if (xds_client()->load_report_map_.empty()) {
parent_->chand()->StopLrsCall();
return true;
}
ScheduleNextReportLocked();
return;
return false;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
@ -1339,6 +1342,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
xds_client(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
return false;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
@ -1982,10 +1986,22 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
it->second.drop_stats.insert(cluster_drop_stats.get());
LoadReportState& load_report_state = it->second;
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
if (load_report_state.drop_stats != nullptr) {
cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
}
if (cluster_drop_stats == nullptr) {
if (load_report_state.drop_stats != nullptr) {
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
}
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
it->first.first /*cluster_name*/,
it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get();
}
chand_->MaybeStartLrsCall();
return cluster_drop_stats;
}
@ -1995,19 +2011,18 @@ void XdsClient::RemoveClusterDropStats(
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto it = load_report_state.drop_stats.find(cluster_drop_stats);
if (it != load_report_state.drop_stats.end()) {
// Record final drop stats in deleted_drop_stats, which will be
auto it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (it == load_report_map_.end()) return;
LoadReportState& load_report_state = it->second;
if (load_report_state.drop_stats == cluster_drop_stats) {
// Record final snapshot in deleted_drop_stats, which will be
// added to the next load report.
auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
load_report_state.deleted_drop_stats += dropped_requests;
load_report_state.drop_stats.erase(it);
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
load_report_state.drop_stats = nullptr;
}
}
@ -2026,12 +2041,24 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
locality);
it->second.locality_stats[std::move(locality)].locality_stats.insert(
cluster_locality_stats.get());
LoadReportState& load_report_state = it->second;
LoadReportState::LocalityState& locality_state =
load_report_state.locality_stats[locality];
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
if (locality_state.locality_stats != nullptr) {
cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
}
if (cluster_locality_stats == nullptr) {
if (locality_state.locality_stats != nullptr) {
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
}
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get();
}
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
}
@ -2042,22 +2069,21 @@ void XdsClient::RemoveClusterLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (it == load_report_map_.end()) return;
LoadReportState& load_report_state = it->second;
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
auto& locality_set = locality_it->second.locality_stats;
auto it = locality_set.find(cluster_locality_stats);
if (it != locality_set.end()) {
LoadReportState::LocalityState& locality_state = locality_it->second;
if (locality_state.locality_stats == cluster_locality_stats) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_it->second.deleted_locality_stats.emplace_back(
cluster_locality_stats->GetSnapshotAndReset());
locality_set.erase(it);
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
locality_state.locality_stats = nullptr;
}
}
@ -2098,6 +2124,9 @@ void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
}
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto load_report_it = load_report_map_.begin();
load_report_it != load_report_map_.end();) {
@ -2116,9 +2145,15 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
for (auto& drop_stats : load_report.drop_stats) {
auto dropped_requests = drop_stats->GetSnapshotAndReset();
snapshot.dropped_requests += dropped_requests;
if (load_report.drop_stats != nullptr) {
snapshot.dropped_requests +=
load_report.drop_stats->GetSnapshotAndReset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
this, cluster_key.first.c_str(), cluster_key.second.c_str(),
load_report.drop_stats);
}
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
@ -2127,34 +2162,39 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
for (auto& locality_stats : locality_state.locality_stats) {
locality_snapshot += locality_stats->GetSnapshotAndReset();
}
// Add final snapshots from recently deleted locality stats objects.
for (auto& deleted_locality_stats :
locality_state.deleted_locality_stats) {
locality_snapshot += deleted_locality_stats;
locality_snapshot = std::move(locality_state.deleted_locality_stats);
if (locality_state.locality_stats != nullptr) {
locality_snapshot +=
locality_state.locality_stats->GetSnapshotAndReset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] cluster=%s eds_service_name=%s "
"locality=%s locality_stats=%p",
this, cluster_key.first.c_str(), cluster_key.second.c_str(),
locality_name->AsHumanReadableString().c_str(),
locality_state.locality_stats);
}
}
locality_state.deleted_locality_stats.clear();
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats.empty()) {
if (locality_state.locality_stats == nullptr) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
if (record_stats) {
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
if (load_report.locality_stats.empty() &&
load_report.drop_stats == nullptr) {
load_report_it = load_report_map_.erase(load_report_it);
} else {
++load_report_it;

@ -39,7 +39,7 @@
namespace grpc_core {
extern TraceFlag xds_client_trace;
extern TraceFlag grpc_xds_client_trace;
class XdsClient : public DualRefCounted<XdsClient> {
public:
@ -267,17 +267,13 @@ class XdsClient : public DualRefCounted<XdsClient> {
absl::optional<XdsApi::EdsUpdate> update;
};
// TODO(roth): Change this to store exactly one instance of
// XdsClusterDropStats and exactly one instance of
// XdsClusterLocalityStats per locality. We can return multiple refs
// to the same object instead of registering multiple objects.
struct LoadReportState {
struct LocalityState {
std::set<XdsClusterLocalityStats*> locality_stats;
std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
XdsClusterLocalityStats* locality_stats = nullptr;
XdsClusterLocalityStats::Snapshot deleted_locality_stats;
};
std::set<XdsClusterDropStats*> drop_stats;
XdsClusterDropStats* drop_stats = nullptr;
XdsClusterDropStats::Snapshot deleted_drop_stats;
std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
XdsLocalityName::Less>

@ -45,12 +45,27 @@ XdsClusterDropStats::XdsClusterDropStats(RefCountedPtr<XdsClient> xds_client,
absl::string_view lrs_server_name,
absl::string_view cluster_name,
absl::string_view eds_service_name)
: xds_client_(std::move(xds_client)),
: RefCounted(&grpc_xds_client_trace),
xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name) {}
eds_service_name_(eds_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] created drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str());
}
}
XdsClusterDropStats::~XdsClusterDropStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] destroying drop stats %p for {%s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str());
}
xds_client_->RemoveClusterDropStats(lrs_server_name_, cluster_name_,
eds_service_name_, this);
xds_client_.reset(DEBUG_LOCATION, "DropStats");
@ -81,13 +96,31 @@ XdsClusterLocalityStats::XdsClusterLocalityStats(
RefCountedPtr<XdsClient> xds_client, absl::string_view lrs_server_name,
absl::string_view cluster_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name)
: xds_client_(std::move(xds_client)),
: RefCounted(&grpc_xds_client_trace),
xds_client_(std::move(xds_client)),
lrs_server_name_(lrs_server_name),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name),
name_(std::move(name)) {}
name_(std::move(name)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] created locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());
}
}
XdsClusterLocalityStats::~XdsClusterLocalityStats() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] destroying locality stats %p for {%s, %s, %s, %s}",
xds_client_.get(), this, std::string(lrs_server_name_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());
}
xds_client_->RemoveClusterLocalityStats(lrs_server_name_, cluster_name_,
eds_service_name_, name_, this);
xds_client_.reset(DEBUG_LOCATION, "LocalityStats");

@ -5347,14 +5347,6 @@ TEST_P(BalancerUpdateTest, DeadUpdate) {
<< balancers_[2]->ads_service()->eds_response_state().error_message;
}
// The re-resolution tests are deferred because they rely on the fallback mode,
// which hasn't been supported.
// TODO(juanlishen): Add TEST_P(BalancerUpdateTest, ReresolveDeadBackend).
// TODO(juanlishen): Add TEST_P(UpdatesWithClientLoadReportingTest,
// ReresolveDeadBalancer)
class ClientLoadReportingTest : public XdsEnd2endTest {
public:
ClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {}

Loading…
Cancel
Save