xDS watchers: use C++14 and skip events on stale RDS watchers (#31675)

* xDS watchers: use C++14 and skip events on stale RDS watchers

* Automated change: Fix sanity tests

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/31688/head
Mark D. Roth 2 years ago committed by GitHub
parent 3d68bb735a
commit cf666c4c20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 23
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  3. 56
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc

@ -122,31 +122,28 @@ class CdsLb : public LoadBalancingPolicy {
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnResourceChanged(XdsClusterResource cluster_data) override {
Ref().release(); // Ref held by lambda
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
// TODO(roth): When we move to C++14, capture cluster_data with
// std::move().
[this, cluster_data]() mutable {
parent_->OnClusterChanged(name_, std::move(cluster_data));
Unref();
[self = std::move(self),
cluster_data = std::move(cluster_data)]() mutable {
self->parent_->OnClusterChanged(self->name_,
std::move(cluster_data));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
Ref().release(); // Ref held by lambda
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
[this, status]() {
parent_->OnError(name_, status);
Unref();
[self = std::move(self), status = std::move(status)]() mutable {
self->parent_->OnError(self->name_, std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // Ref held by lambda
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
[this]() {
parent_->OnResourceDoesNotExist(name_);
Unref();
[self = std::move(self)]() {
self->parent_->OnResourceDoesNotExist(self->name_);
},
DEBUG_LOCATION);
}

@ -209,31 +209,26 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
}
void OnResourceChanged(XdsEndpointResource update) override {
Ref().release(); // ref held by callback
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
// TODO(yashykt): When we move to C++14, capture update with
// std::move
[this, update]() mutable {
OnResourceChangedHelper(std::move(update));
Unref();
[self = std::move(self), update = std::move(update)]() mutable {
self->OnResourceChangedHelper(std::move(update));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
Ref().release(); // ref held by callback
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
[this, status]() {
OnErrorHelper(status);
Unref();
[self = std::move(self), status = std::move(status)]() mutable {
self->OnErrorHelper(std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // ref held by callback
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
[this]() {
OnResourceDoesNotExistHelper();
Unref();
[self = std::move(self)]() {
self->OnResourceDoesNotExistHelper();
},
DEBUG_LOCATION);
}

@ -156,33 +156,29 @@ class XdsResolver : public Resolver {
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsListenerResource listener) override {
Ref().release(); // ref held by lambda
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
// TODO(yashykt): When we move to C++14, capture listener with
// std::move
[this, listener]() mutable {
resolver_->OnListenerUpdate(std::move(listener));
Unref();
[self = std::move(self), listener = std::move(listener)]() mutable {
self->resolver_->OnListenerUpdate(std::move(listener));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
Ref().release(); // ref held by lambda
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[this, status]() {
resolver_->OnError(resolver_->lds_resource_name_, status);
Unref();
[self = std::move(self), status = std::move(status)]() mutable {
self->resolver_->OnError(self->resolver_->lds_resource_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // ref held by lambda
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[this]() {
resolver_->OnResourceDoesNotExist(
absl::StrCat(resolver_->lds_resource_name_,
[self = std::move(self)]() {
self->resolver_->OnResourceDoesNotExist(
absl::StrCat(self->resolver_->lds_resource_name_,
": xDS listener resource does not exist"));
Unref();
},
DEBUG_LOCATION);
}
@ -197,33 +193,33 @@ class XdsResolver : public Resolver {
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsRouteConfigResource route_config) override {
Ref().release(); // ref held by lambda
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
// TODO(yashykt): When we move to C++14, capture route_config with
// std::move
[this, route_config]() mutable {
resolver_->OnRouteConfigUpdate(std::move(route_config));
Unref();
[self = std::move(self),
route_config = std::move(route_config)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnRouteConfigUpdate(std::move(route_config));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
Ref().release(); // ref held by lambda
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[this, status]() {
resolver_->OnError(resolver_->route_config_name_, status);
Unref();
[self = std::move(self), status = std::move(status)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnError(self->resolver_->route_config_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // ref held by lambda
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[this]() {
resolver_->OnResourceDoesNotExist(absl::StrCat(
resolver_->route_config_name_,
[self = std::move(self)]() {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnResourceDoesNotExist(absl::StrCat(
self->resolver_->route_config_name_,
": xDS route configuration resource does not exist"));
Unref();
},
DEBUG_LOCATION);
}

Loading…
Cancel
Save