Merge pull request #22330 from markdroth/xds_locality_map_update_fix

If an EDS update replaces all localities in a priority, go into CONNECTING
pull/22349/head
Mark D. Roth 5 years ago committed by GitHub
commit be7da6b149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 34
      test/cpp/end2end/xds_end2end_test.cc

@ -298,7 +298,7 @@ class XdsLb : public LoadBalancingPolicy {
~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); } ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
void UpdateLocked( void UpdateLocked(
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update, const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
bool update_locality_stats); bool update_locality_stats);
void ResetBackoffLocked(); void ResetBackoffLocked();
void UpdateXdsPickerLocked(); void UpdateXdsPickerLocked();
@ -1033,7 +1033,7 @@ XdsLb::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
} }
void XdsLb::LocalityMap::UpdateLocked( void XdsLb::LocalityMap::UpdateLocked(
const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update, const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
bool update_locality_stats) { bool update_locality_stats) {
if (xds_policy_->shutting_down_) return; if (xds_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@ -1043,11 +1043,11 @@ void XdsLb::LocalityMap::UpdateLocked(
// Maybe reactivate the locality map in case all the active locality maps have // Maybe reactivate the locality map in case all the active locality maps have
// failed. // failed.
MaybeReactivateLocked(); MaybeReactivateLocked();
// Remove (later) the localities not in locality_map_update. // Remove (later) the localities not in priority_update.
for (auto iter = localities_.begin(); iter != localities_.end();) { for (auto iter = localities_.begin(); iter != localities_.end();) {
const auto& name = iter->first; const auto& name = iter->first;
Locality* locality = iter->second.get(); Locality* locality = iter->second.get();
if (locality_map_update.Contains(name)) { if (priority_update.Contains(name)) {
++iter; ++iter;
continue; continue;
} }
@ -1058,8 +1058,8 @@ void XdsLb::LocalityMap::UpdateLocked(
++iter; ++iter;
} }
} }
// Add or update the localities in locality_map_update. // Add or update the localities in priority_update.
for (const auto& p : locality_map_update.localities) { for (const auto& p : priority_update.localities) {
const auto& name = p.first; const auto& name = p.first;
const auto& locality_update = p.second; const auto& locality_update = p.second;
OrphanablePtr<Locality>& locality = localities_[name]; OrphanablePtr<Locality>& locality = localities_[name];
@ -1079,6 +1079,32 @@ void XdsLb::LocalityMap::UpdateLocked(
locality->UpdateLocked(locality_update.lb_weight, locality->UpdateLocked(locality_update.lb_weight,
locality_update.serverlist, update_locality_stats); locality_update.serverlist, update_locality_stats);
} }
// If this is the current priority and we removed all of the READY
// localities, go into state CONNECTING.
// TODO(roth): Ideally, we should model this as a graceful policy
// switch: we should keep using the old localities for a short period
// of time, long enough to give the new localities a chance to get
// connected. As part of refactoring this policy, we should try to
// fix that.
if (priority_ == xds_policy()->current_priority_) {
bool found_ready = false;
for (auto& p : localities_) {
const auto& locality_name = p.first;
Locality* locality = p.second.get();
if (!locality_map_update()->Contains(locality_name)) continue;
if (locality->connectivity_state() == GRPC_CHANNEL_READY) {
found_ready = true;
break;
}
}
if (!found_ready) {
xds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING,
absl::make_unique<QueuePicker>(
xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")));
xds_policy_->current_priority_ = UINT32_MAX;
}
}
} }
void XdsLb::LocalityMap::ResetBackoffLocked() { void XdsLb::LocalityMap::ResetBackoffLocked() {

@ -1209,11 +1209,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return std::make_tuple(num_ok, num_failure, num_drops); return std::make_tuple(num_ok, num_failure, num_drops);
} }
void WaitForBackend(size_t backend_idx, bool reset_counters = true) { void WaitForBackend(size_t backend_idx, bool reset_counters = true,
bool require_success = false) {
gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========", gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
static_cast<unsigned long>(backend_idx)); static_cast<unsigned long>(backend_idx));
do { do {
(void)SendRpc(); Status status = SendRpc();
if (require_success) {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
} while (backends_[backend_idx]->backend_service()->request_count() == 0); } while (backends_[backend_idx]->backend_service()->request_count() == 0);
if (reset_counters) ResetBackendCounters(); if (reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========", gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
@ -2302,6 +2307,31 @@ TEST_P(LocalityMapTest, UpdateMap) {
} }
} }
// Tests that we don't fail RPCs when replacing all of the localities in
// a given priority.
TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
args = AdsServiceImpl::EdsResourceArgs({
{"locality1", GetBackendPorts(1, 2)},
});
std::thread delayed_resource_setter(std::bind(
&BasicTest::SetEdsResourceWithDelay, this, 0,
AdsServiceImpl::BuildEdsResource(args), 5000, kDefaultResourceName));
// Wait for the first backend to be ready.
WaitForBackend(0);
// Keep sending RPCs until we switch over to backend 1, which tells us
// that we received the update. No RPCs should fail during this
// transition.
WaitForBackend(1, /*reset_counters=*/true, /*require_success=*/true);
delayed_resource_setter.join();
}
class FailoverTest : public BasicTest { class FailoverTest : public BasicTest {
public: public:
void SetUp() override { void SetUp() override {

Loading…
Cancel
Save