weighted_target and xds_cluster_manager: don't update picker while update is in flight (#29313)

* weighted_target and xds_cluster_manager: don't update picker while update is in flight

* include deactivation and new child creation

* add test
pull/29342/head
Mark D. Roth 3 years ago committed by GitHub
parent 817420318a
commit 96c19e8c98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  2. 21
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  3. 28
      test/cpp/end2end/xds/xds_end2end_test.cc

@ -210,6 +210,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;
// Children.
std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
@ -281,6 +282,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
}
update_in_progress_ = true;
// Update config.
config_ = std::move(args.config);
// Deactivate the targets not in the new config.
@ -291,38 +293,37 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
child->DeactivateLocked();
}
}
// Create any children that don't already exist.
// Note that we add all children before updating any of them, because
// an update may trigger a child to immediately update its
// connectivity state (e.g., reporting TRANSIENT_FAILURE immediately when
// receiving an empty address list), and we don't want to return an
// overall state with incomplete data.
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
auto it = targets_.find(name);
if (it == targets_.end()) {
targets_.emplace(name, MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name));
}
}
// Update all children.
absl::StatusOr<HierarchicalAddressMap> address_map =
MakeHierarchicalAddressMap(args.addresses);
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second;
auto& target = targets_[name];
// Create child if it does not already exist.
if (target == nullptr) {
target = MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
absl::StatusOr<ServerAddressList> addresses;
if (address_map.ok()) {
addresses = std::move((*address_map)[name]);
} else {
addresses = address_map.status();
}
targets_[name]->UpdateLocked(config, std::move(addresses), args.args);
target->UpdateLocked(config, std::move(addresses), args.args);
}
update_in_progress_ = false;
UpdateStateLocked();
}
void WeightedTargetLb::UpdateStateLocked() {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead return a new picker once the update has been seen by
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] scanning children to determine "
@ -356,6 +357,7 @@ void WeightedTargetLb::UpdateStateLocked() {
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
GPR_ASSERT(child->weight() > 0);
end += child->weight();
picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
break;

@ -196,6 +196,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;
// Children.
std::map<std::string, OrphanablePtr<ClusterChild>> children_;
@ -254,6 +255,7 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
}
update_in_progress_ = true;
// Update config.
config_ = std::move(args.config);
// Deactivate the children not in the new config.
@ -268,19 +270,24 @@ void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
for (const auto& p : config_->cluster_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = children_.find(name);
if (it == children_.end()) {
it = children_
.emplace(name, MakeOrphanable<ClusterChild>(
Ref(DEBUG_LOCATION, "ClusterChild"), name))
.first;
auto& child = children_[name];
if (child == nullptr) {
child = MakeOrphanable<ClusterChild>(Ref(DEBUG_LOCATION, "ClusterChild"),
name);
}
it->second->UpdateLocked(config, args.addresses, args.args);
child->UpdateLocked(config, args.addresses, args.args);
}
update_in_progress_ = false;
UpdateStateLocked();
}
void XdsClusterManagerLb::UpdateStateLocked() {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead return a new picker once the update has been seen by
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
// Also count the number of children in each state, to determine the
// overall state.
size_t num_ready = 0;

@ -11860,6 +11860,34 @@ TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
delayed_resource_setter.join();
}
TEST_P(LocalityMapTest, ConsistentWeightedTargetUpdates) {
CreateAndStartBackends(4);
// Initial update has two localities.
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(1, 2)},
{"locality1", CreateEndpointsForBackends(2, 3)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(1, 3);
// Next update removes locality1.
// Also add backend 0 to locality0, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(0);
// Next update re-adds locality1.
// Also add backend 3 to locality1, so that we can tell when the
// update has been seen.
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForBackend(3);
}
class FailoverTest : public XdsEnd2endTest {
public:
void SetUp() override {

Loading…
Cancel
Save