Use WorkSerializer in XdsClient to propagate updates in a synchronized manner (#27975)

* Use WorkSerializer in XdsClient to propagate updates

* Fix breakage

* Add missing Drain

* More fixes

* Get around msvc issue

* Fix asan leak

* Reviewer comments

* Get around TSAN annotations

* Remove notes

* Clang-format

* Reviewer comments
pull/28034/head
Yash Tibrewal 3 years ago committed by GitHub
parent 817eed0928
commit 6b34d961be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 107
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  2. 127
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  3. 157
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  4. 336
      src/core/ext/xds/xds_client.cc
  5. 81
      src/core/ext/xds/xds_client.h
  6. 2
      src/core/ext/xds/xds_server_config_fetcher.cc

@ -72,35 +72,36 @@ class CdsLb : public LoadBalancingPolicy {
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
new Notifier(parent_, name_, std::move(cluster_data));
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
// TODO(roth): When we move to C++14, capture cluster_data with
// std::move().
[this, cluster_data]() mutable {
parent_->OnClusterChanged(name_, std::move(cluster_data));
Unref();
},
DEBUG_LOCATION);
}
void OnError(grpc_error_handle error) override {
new Notifier(parent_, name_, error);
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
[this, error]() {
parent_->OnError(name_, error);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
[this]() {
parent_->OnResourceDoesNotExist(name_);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
private:
class Notifier {
public:
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
XdsApi::CdsUpdate update);
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
grpc_error_handle error);
explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
private:
enum Type { kUpdate, kError, kDoesNotExist };
static void RunInExecCtx(void* arg, grpc_error_handle error);
void RunInWorkSerializer(grpc_error_handle error);
RefCountedPtr<CdsLb> parent_;
std::string name_;
grpc_closure closure_;
XdsApi::CdsUpdate update_;
Type type_;
};
RefCountedPtr<CdsLb> parent_;
std::string name_;
};
@ -174,60 +175,6 @@ class CdsLb : public LoadBalancingPolicy {
bool shutting_down_ = false;
};
//
// CdsLb::ClusterWatcher::Notifier
//
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name,
XdsApi::CdsUpdate update)
: parent_(std::move(parent)),
name_(std::move(name)),
update_(std::move(update)),
type_(kUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name,
grpc_error_handle error)
: parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name)
: parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
(void)GRPC_ERROR_REF(error);
self->parent_->work_serializer()->Run(
[self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
}
void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(
grpc_error_handle error) {
switch (type_) {
case kUpdate:
parent_->OnClusterChanged(name_, std::move(update_));
break;
case kError:
parent_->OnError(name_, error);
break;
case kDoesNotExist:
parent_->OnResourceDoesNotExist(name_);
break;
};
delete this;
}
//
// CdsLb::Helper
//
@ -352,7 +299,7 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
}
watchers_.clear();
}
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), config_->cluster());
watchers_[config_->cluster()].watcher = watcher.get();
xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
}
@ -373,7 +320,7 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster(
auto& state = watchers_[name];
// Create a new watcher if needed.
if (state.watcher == nullptr) {
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
name.c_str());

@ -176,40 +176,57 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
}
void OnEndpointChanged(XdsApi::EdsUpdate update) override {
new Notifier(discovery_mechanism_, std::move(update));
Ref().release(); // ref held by callback
discovery_mechanism_->parent()->work_serializer()->Run(
// TODO(yashykt): When we move to C++14, capture update with
// std::move
[this, update]() mutable {
OnEndpointChangedHelper(std::move(update));
Unref();
},
DEBUG_LOCATION);
}
void OnError(grpc_error_handle error) override {
new Notifier(discovery_mechanism_, error);
Ref().release(); // ref held by callback
discovery_mechanism_->parent()->work_serializer()->Run(
[this, error]() {
OnErrorHelper(error);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
new Notifier(discovery_mechanism_);
Ref().release(); // ref held by callback
discovery_mechanism_->parent()->work_serializer()->Run(
[this]() {
OnResourceDoesNotExistHelper();
Unref();
},
DEBUG_LOCATION);
}
private:
class Notifier {
public:
Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
XdsApi::EdsUpdate update);
Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
grpc_error_handle error);
explicit Notifier(
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism);
~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); }
private:
enum Type { kUpdate, kError, kDoesNotExist };
static void RunInExecCtx(void* arg, grpc_error_handle error);
void RunInWorkSerializer(grpc_error_handle error);
// Code accessing protected methods of `DiscoveryMechanism` need to be
// in methods of this class rather than in lambdas to work around an MSVC
// bug.
void OnEndpointChangedHelper(XdsApi::EdsUpdate update) {
discovery_mechanism_->parent()->OnEndpointChanged(
discovery_mechanism_->index(), std::move(update));
}
void OnErrorHelper(grpc_error_handle error) {
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
error);
}
void OnResourceDoesNotExistHelper() {
discovery_mechanism_->parent()->OnResourceDoesNotExist(
discovery_mechanism_->index());
}
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
grpc_closure closure_;
XdsApi::EdsUpdate update_;
Type type_;
};
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
};
// This is necessary only because of a bug in msvc where nested class
// cannot access protected member in base class.
friend class EndpointWatcher;
absl::string_view GetEdsResourceName() const {
if (!parent()->is_xds_uri_) return parent()->server_name_;
@ -403,7 +420,7 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
":%p starting xds watch for %s",
parent(), index(), this, std::string(GetEdsResourceName()).c_str());
}
auto watcher = absl::make_unique<EndpointWatcher>(
auto watcher = MakeRefCounted<EndpointWatcher>(
Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
watcher_ = watcher.get();
parent()->xds_client_->WatchEndpointData(GetEdsResourceName(),
@ -422,66 +439,6 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
Unref();
}
//
// XdsClusterResolverLb::EndpointWatcher::Notifier
//
XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
discovery_mechanism,
XdsApi::EdsUpdate update)
: discovery_mechanism_(std::move(discovery_mechanism)),
update_(std::move(update)),
type_(kUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
discovery_mechanism,
grpc_error_handle error)
: discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
discovery_mechanism)
: discovery_mechanism_(std::move(discovery_mechanism)),
type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInExecCtx(void* arg, grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
(void)GRPC_ERROR_REF(error);
self->discovery_mechanism_->parent()->work_serializer()->Run(
[self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
}
void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInWorkSerializer(grpc_error_handle error) {
switch (type_) {
case kUpdate:
discovery_mechanism_->parent()->OnEndpointChanged(
discovery_mechanism_->index(), std::move(update_));
break;
case kError:
discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
error);
break;
case kDoesNotExist:
discovery_mechanism_->parent()->OnResourceDoesNotExist(
discovery_mechanism_->index());
break;
};
delete this;
}
//
// XdsClusterResolverLb::LogicalDNSDiscoveryMechanism
//

@ -78,36 +78,39 @@ class XdsResolver : public Resolver {
}
private:
class Notifier {
public:
Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error_handle error);
explicit Notifier(RefCountedPtr<XdsResolver> resolver);
private:
enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
static void RunInExecCtx(void* arg, grpc_error_handle error);
void RunInWorkSerializer(grpc_error_handle error);
RefCountedPtr<XdsResolver> resolver_;
grpc_closure closure_;
XdsApi::LdsUpdate update_;
Type type_;
};
class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
public:
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnListenerChanged(XdsApi::LdsUpdate listener) override {
new Notifier(resolver_, std::move(listener));
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
// TODO(yashykt): When we move to C++14, capture listener with
// std::move
[this, listener]() mutable {
resolver_->OnListenerUpdate(std::move(listener));
Unref();
},
DEBUG_LOCATION);
}
void OnError(grpc_error_handle error) override {
new Notifier(resolver_, error);
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
[this, error]() {
resolver_->OnError(error);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
[this]() {
resolver_->OnResourceDoesNotExist();
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override { new Notifier(resolver_); }
private:
RefCountedPtr<XdsResolver> resolver_;
@ -118,12 +121,34 @@ class XdsResolver : public Resolver {
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
new Notifier(resolver_, std::move(route_config));
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
// TODO(yashykt): When we move to C++14, capture route_config with
// std::move
[this, route_config]() mutable {
resolver_->OnRouteConfigUpdate(std::move(route_config));
Unref();
},
DEBUG_LOCATION);
}
void OnError(grpc_error_handle error) override {
new Notifier(resolver_, error);
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
[this, error]() {
resolver_->OnError(error);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // ref held by lambda
resolver_->work_serializer_->Run(
[this]() {
resolver_->OnResourceDoesNotExist();
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override { new Notifier(resolver_); }
private:
RefCountedPtr<XdsResolver> resolver_;
@ -275,71 +300,6 @@ class XdsResolver : public Resolver {
ClusterState::ClusterStateMap cluster_state_map_;
};
//
// XdsResolver::Notifier
//
XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
XdsApi::LdsUpdate update)
: resolver_(std::move(resolver)),
update_(std::move(update)),
type_(kLdsUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
XdsApi::RdsUpdate update)
: resolver_(std::move(resolver)), type_(kRdsUpdate) {
update_.http_connection_manager.rds_update = std::move(update);
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
grpc_error_handle error)
: resolver_(std::move(resolver)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)), type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
(void)GRPC_ERROR_REF(error);
self->resolver_->work_serializer_->Run(
[self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
}
void XdsResolver::Notifier::RunInWorkSerializer(grpc_error_handle error) {
if (resolver_->xds_client_ == nullptr) {
GRPC_ERROR_UNREF(error);
delete this;
return;
}
switch (type_) {
case kLdsUpdate:
resolver_->OnListenerUpdate(std::move(update_));
break;
case kRdsUpdate:
resolver_->OnRouteConfigUpdate(
std::move(*update_.http_connection_manager.rds_update));
break;
case kError:
resolver_->OnError(error);
break;
case kDoesNotExist:
resolver_->OnResourceDoesNotExist();
break;
};
delete this;
}
//
// XdsResolver::XdsConfigSelector::Route
//
@ -786,7 +746,7 @@ void XdsResolver::StartLocked() {
}
grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
interested_parties_);
auto watcher = absl::make_unique<ListenerWatcher>(Ref());
auto watcher = MakeRefCounted<ListenerWatcher>(Ref());
listener_watcher_ = watcher.get();
xds_client_->WatchListenerData(server_name_, std::move(watcher));
}
@ -814,6 +774,9 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
}
if (xds_client_ == nullptr) {
return;
}
if (listener.http_connection_manager.route_config_name !=
route_config_name_) {
if (route_config_watcher_ != nullptr) {
@ -827,7 +790,7 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
std::move(listener.http_connection_manager.route_config_name);
if (!route_config_name_.empty()) {
current_virtual_host_.routes.clear();
auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get();
xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
}
@ -849,6 +812,9 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
}
if (xds_client_ == nullptr) {
return;
}
// Find the relevant VirtualHost from the RouteConfiguration.
XdsApi::RdsUpdate::VirtualHost* vhost =
rds_update.FindVirtualHostForDomain(server_name_);
@ -867,6 +833,10 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
void XdsResolver::OnError(grpc_error_handle error) {
gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
this, grpc_error_std_string(error).c_str());
if (xds_client_ == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
Result result;
grpc_arg new_arg = xds_client_->MakeChannelArg();
result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
@ -879,6 +849,9 @@ void XdsResolver::OnResourceDoesNotExist() {
"[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
"update and returning empty service config",
this);
if (xds_client_ == nullptr) {
return;
}
current_virtual_host_.routes.clear();
Result result;
result.service_config =

@ -78,6 +78,43 @@ char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
} // namespace
class XdsClient::Notifier {
public:
// Helper template function to invoke `OnError()` on a list of watchers \a
// watchers_list within \a work_serializer. Works with all 4 resource types.
template <class T>
static void ScheduleNotifyWatchersOnErrorInWorkSerializer(
XdsClient* xds_client, const T& watchers_list, grpc_error_handle error,
const grpc_core::DebugLocation& location) {
xds_client->work_serializer_.Schedule(
[watchers_list, error]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
},
location);
}
// Helper template function to invoke `OnResourceDoesNotExist()` on a list of
// watchers \a watchers_list within \a work_serializer. Works with all 4
// resource types.
template <class T>
static void ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
XdsClient* xds_client, const T& watchers_list,
const grpc_core::DebugLocation& location) {
xds_client->work_serializer_.Schedule(
[watchers_list]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnResourceDoesNotExist();
}
},
location);
}
};
//
// Internal class declarations
//
@ -189,6 +226,7 @@ class XdsClient::ChannelState::AdsCallState
MutexLock lock(&self->ads_calld_->xds_client()->mu_);
self->OnTimerLocked(GRPC_ERROR_REF(error));
}
self->ads_calld_->xds_client()->work_serializer_.DrainQueue();
self->ads_calld_.reset();
self->Unref(DEBUG_LOCATION, "timer");
}
@ -214,28 +252,28 @@ class XdsClient::ChannelState::AdsCallState
if (type_url_ == XdsApi::kLdsTypeUrl) {
ListenerState& state = authority_state.listener_map[resource_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kRdsTypeUrl) {
RouteConfigState& state =
authority_state.route_config_map[resource_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kCdsTypeUrl) {
ClusterState& state = authority_state.cluster_map[resource_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else if (type_url_ == XdsApi::kEdsTypeUrl) {
EndpointState& state = authority_state.endpoint_map[resource_.id];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers,
GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION);
} else {
GPR_UNREACHABLE_CODE(return );
}
@ -464,6 +502,7 @@ class XdsClient::ChannelState::StateWatcher
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
{
MutexLock lock(&parent_->xds_client_->mu_);
if (!parent_->shutting_down_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -477,6 +516,8 @@ class XdsClient::ChannelState::StateWatcher
"xds channel in TRANSIENT_FAILURE"));
}
}
parent_->xds_client()->work_serializer_.DrainQueue();
}
WeakRefCountedPtr<ChannelState> parent_;
};
@ -1000,9 +1041,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
listener_state.meta = CreateResourceMetadataAcked(
std::move(p.second.serialized_proto), version, update_time);
// Notify watchers.
for (const auto& p : listener_state.watchers) {
p.first->OnListenerChanged(*listener_state.update);
auto& watchers_list = listener_state.watchers;
auto& value = listener_state.update.value();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnListenerChanged(value);
}
},
DEBUG_LOCATION);
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
@ -1041,9 +1089,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
// For that case, we rely on the request timeout instead.
if (!listener_state.update.has_value()) continue;
listener_state.update.reset();
for (const auto& p : listener_state.watchers) {
p.first->OnResourceDoesNotExist();
}
Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
xds_client(), listener_state.watchers, DEBUG_LOCATION);
}
}
}
@ -1063,9 +1110,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
->authority_state_map_[authority_name]
.route_config_map[listener_name];
route_config_state.update.reset();
for (const auto& p : route_config_state.watchers) {
p.first->OnResourceDoesNotExist();
}
Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
xds_client(), route_config_state.watchers, DEBUG_LOCATION);
}
}
}
@ -1110,9 +1156,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
route_config_state.meta = CreateResourceMetadataAcked(
std::move(p.second.serialized_proto), version, update_time);
// Notify all watchers.
for (const auto& p : route_config_state.watchers) {
p.first->OnRouteConfigChanged(*route_config_state.update);
auto& watchers_list = route_config_state.watchers;
auto& value = route_config_state.update.value();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnRouteConfigChanged(value);
}
},
DEBUG_LOCATION);
}
}
@ -1165,9 +1218,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
cluster_state.meta = CreateResourceMetadataAcked(
std::move(p.second.serialized_proto), version, update_time);
// Notify all watchers.
for (const auto& p : cluster_state.watchers) {
p.first->OnClusterChanged(cluster_state.update.value());
auto& watchers_list = cluster_state.watchers;
auto& value = cluster_state.update.value();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnClusterChanged(value);
}
},
DEBUG_LOCATION);
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
@ -1206,9 +1266,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
// For that case, we rely on the request timeout instead.
if (!cluster_state.update.has_value()) continue;
cluster_state.update.reset();
for (const auto& p : cluster_state.watchers) {
p.first->OnResourceDoesNotExist();
}
Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
xds_client(), cluster_state.watchers, DEBUG_LOCATION);
}
}
}
@ -1227,9 +1286,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
->authority_state_map_[authority]
.endpoint_map[eds_resource_name];
endpoint_state.update.reset();
for (const auto& p : endpoint_state.watchers) {
p.first->OnResourceDoesNotExist();
}
Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
xds_client(), endpoint_state.watchers, DEBUG_LOCATION);
}
}
}
@ -1277,9 +1335,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
endpoint_state.meta = CreateResourceMetadataAcked(
std::move(p.second.serialized_proto), version, update_time);
// Notify all watchers.
for (const auto& p : endpoint_state.watchers) {
p.first->OnEndpointChanged(endpoint_state.update.value());
auto& watchers_list = endpoint_state.watchers;
auto& value = endpoint_state.update.value();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnEndpointChanged(value);
}
},
DEBUG_LOCATION);
}
}
@ -1306,9 +1371,9 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateHelperLocked(
auto it = state_map->find(resource_name);
if (it == state_map->end()) return;
auto& state = it->second;
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(result.parse_error));
}
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
xds_client(), state.watchers, GRPC_ERROR_REF(result.parse_error),
DEBUG_LOCATION);
UpdateResourceMetadataNacked(result.version, error_details, update_time,
&state.meta);
}
@ -1388,6 +1453,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
MutexLock lock(&ads_calld->xds_client()->mu_);
done = ads_calld->OnResponseReceivedLocked();
}
ads_calld->xds_client()->work_serializer_.DrainQueue();
if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
}
@ -1494,6 +1560,7 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
MutexLock lock(&ads_calld->xds_client()->mu_);
ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
}
ads_calld->xds_client()->work_serializer_.DrainQueue();
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
}
@ -2081,31 +2148,50 @@ RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
void XdsClient::WatchListenerData(
absl::string_view listener_name,
std::unique_ptr<ListenerWatcherInterface> watcher) {
RefCountedPtr<ListenerWatcherInterface> watcher) {
std::string listener_name_str = std::string(listener_name);
MutexLock lock(&mu_);
ListenerWatcherInterface* w = watcher.get();
auto resource = XdsApi::ParseResourceName(listener_name, XdsApi::IsLds);
if (!resource.ok()) {
invalid_listener_watchers_[w] = std::move(watcher);
{
MutexLock lock(&mu_);
invalid_listener_watchers_[w] = watcher;
}
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
"Unable to parse resource name for listener %s", listener_name));
w->OnError(GRPC_ERROR_REF(error));
work_serializer_.Run(
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnError(error);
},
DEBUG_LOCATION);
return;
}
{
MutexLock lock(&mu_);
AuthorityState& authority_state = authority_state_map_[resource->authority];
ListenerState& listener_state = authority_state.listener_map[resource->id];
listener_state.watchers[w] = std::move(watcher);
listener_state.watchers[w] = watcher;
// If we've already received an LDS update, notify the new watcher
// immediately.
if (listener_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
this, listener_name_str.c_str());
}
w->OnListenerChanged(*listener_state.update);
}
// If the authority doesn't yet have a channel, set it, creating it if needed.
gpr_log(GPR_INFO,
"[xds_client %p] returning cached listener data for %s", this,
listener_name_str.c_str());
}
auto& value = listener_state.update.value();
work_serializer_.Schedule(
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
watcher->OnListenerChanged(value);
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.channel_state == nullptr) {
authority_state.channel_state =
GetOrCreateChannelStateLocked(bootstrap_->server());
@ -2113,6 +2199,8 @@ void XdsClient::WatchListenerData(
authority_state.channel_state->SubscribeLocked(XdsApi::kLdsTypeUrl,
*resource);
}
work_serializer_.DrainQueue();
}
void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
ListenerWatcherInterface* watcher,
@ -2140,34 +2228,50 @@ void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
void XdsClient::WatchRouteConfigData(
absl::string_view route_config_name,
std::unique_ptr<RouteConfigWatcherInterface> watcher) {
RefCountedPtr<RouteConfigWatcherInterface> watcher) {
std::string route_config_name_str = std::string(route_config_name);
MutexLock lock(&mu_);
RouteConfigWatcherInterface* w = watcher.get();
auto resource = XdsApi::ParseResourceName(route_config_name, XdsApi::IsRds);
if (!resource.ok()) {
invalid_route_config_watchers_[w] = std::move(watcher);
{
MutexLock lock(&mu_);
invalid_route_config_watchers_[w] = watcher;
}
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrFormat("Unable to parse resource name for route config %s",
route_config_name));
w->OnError(GRPC_ERROR_REF(error));
work_serializer_.Run(
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnError(error);
},
DEBUG_LOCATION);
return;
}
{
MutexLock lock(&mu_);
auto& authority_state = authority_state_map_[resource->authority];
RouteConfigState& route_config_state =
authority_state.route_config_map[resource->id];
route_config_state.watchers[w] = std::move(watcher);
route_config_state.watchers[w] = watcher;
// If we've already received an RDS update, notify the new watcher
// immediately.
if (route_config_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] returning cached route config data for %s", this,
route_config_name_str.c_str());
}
w->OnRouteConfigChanged(*route_config_state.update);
}
// If the authority doesn't yet have a channel, set it, creating it if needed.
"[xds_client %p] returning cached route config data for %s",
this, route_config_name_str.c_str());
}
auto& value = route_config_state.update.value();
work_serializer_.Schedule(
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
watcher->OnRouteConfigChanged(value);
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.channel_state == nullptr) {
authority_state.channel_state =
GetOrCreateChannelStateLocked(bootstrap_->server());
@ -2175,6 +2279,8 @@ void XdsClient::WatchRouteConfigData(
authority_state.channel_state->SubscribeLocked(XdsApi::kRdsTypeUrl,
*resource);
}
work_serializer_.DrainQueue();
}
void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
RouteConfigWatcherInterface* watcher,
@ -2203,31 +2309,46 @@ void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
void XdsClient::WatchClusterData(
absl::string_view cluster_name,
std::unique_ptr<ClusterWatcherInterface> watcher) {
RefCountedPtr<ClusterWatcherInterface> watcher) {
std::string cluster_name_str = std::string(cluster_name);
MutexLock lock(&mu_);
ClusterWatcherInterface* w = watcher.get();
auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds);
if (!resource.ok()) {
invalid_cluster_watchers_[w] = std::move(watcher);
{
MutexLock lock(&mu_);
invalid_cluster_watchers_[w] = watcher;
}
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
"Unable to parse resource name for cluster %s", cluster_name));
w->OnError(GRPC_ERROR_REF(error));
work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
work_serializer_) { watcher->OnError(error); },
DEBUG_LOCATION);
return;
}
{
MutexLock lock(&mu_);
auto& authority_state = authority_state_map_[resource->authority];
ClusterState& cluster_state = authority_state.cluster_map[resource->id];
cluster_state.watchers[w] = std::move(watcher);
cluster_state.watchers[w] = watcher;
// If we've already received a CDS update, notify the new watcher
// immediately.
if (cluster_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
this, cluster_name_str.c_str());
}
w->OnClusterChanged(cluster_state.update.value());
}
// If the authority doesn't yet have a channel, set it, creating it if needed.
gpr_log(GPR_INFO,
"[xds_client %p] returning cached cluster data for %s", this,
cluster_name_str.c_str());
}
auto& value = cluster_state.update.value();
work_serializer_.Schedule(
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
watcher->OnClusterChanged(value);
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.channel_state == nullptr) {
authority_state.channel_state =
GetOrCreateChannelStateLocked(bootstrap_->server());
@ -2235,6 +2356,8 @@ void XdsClient::WatchClusterData(
authority_state.channel_state->SubscribeLocked(XdsApi::kCdsTypeUrl,
*resource);
}
work_serializer_.DrainQueue();
}
void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcherInterface* watcher,
@ -2262,32 +2385,45 @@ void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
void XdsClient::WatchEndpointData(
absl::string_view eds_service_name,
std::unique_ptr<EndpointWatcherInterface> watcher) {
RefCountedPtr<EndpointWatcherInterface> watcher) {
std::string eds_service_name_str = std::string(eds_service_name);
MutexLock lock(&mu_);
EndpointWatcherInterface* w = watcher.get();
auto resource = XdsApi::ParseResourceName(eds_service_name, XdsApi::IsEds);
if (!resource.ok()) {
invalid_endpoint_watchers_[w] = std::move(watcher);
{
MutexLock lock(&mu_);
invalid_endpoint_watchers_[w] = watcher;
}
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrFormat("Unable to parse resource name for endpoint service %s",
eds_service_name));
w->OnError(GRPC_ERROR_REF(error));
work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
work_serializer_) { watcher->OnError(error); },
DEBUG_LOCATION);
return;
}
{
MutexLock lock(&mu_);
auto& authority_state = authority_state_map_[resource->authority];
EndpointState& endpoint_state = authority_state.endpoint_map[resource->id];
endpoint_state.watchers[w] = std::move(watcher);
endpoint_state.watchers[w] = watcher;
// If we've already received an EDS update, notify the new watcher
// immediately.
if (endpoint_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
this, eds_service_name_str.c_str());
}
w->OnEndpointChanged(endpoint_state.update.value());
}
// If the authority doesn't yet have a channel, set it, creating it if needed.
gpr_log(GPR_INFO,
"[xds_client %p] returning cached endpoint data for %s", this,
eds_service_name_str.c_str());
}
auto& value = endpoint_state.update.value();
work_serializer_.Schedule(
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
watcher->OnEndpointChanged(value);
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.channel_state == nullptr) {
authority_state.channel_state =
GetOrCreateChannelStateLocked(bootstrap_->server());
@ -2295,6 +2431,8 @@ void XdsClient::WatchEndpointData(
authority_state.channel_state->SubscribeLocked(XdsApi::kEdsTypeUrl,
*resource);
}
work_serializer_.DrainQueue();
}
void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
EndpointWatcherInterface* watcher,
@ -2453,33 +2591,57 @@ void XdsClient::ResetBackoff() {
}
void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
std::set<RefCountedPtr<ListenerWatcherInterface>> listener_watchers;
std::set<RefCountedPtr<RouteConfigWatcherInterface>> route_config_watchers;
std::set<RefCountedPtr<ClusterWatcherInterface>> cluster_watchers;
std::set<RefCountedPtr<EndpointWatcherInterface>> endpoint_watchers;
for (const auto& a : authority_state_map_) {
for (const auto& p : a.second.listener_map) {
const ListenerState& listener_state = p.second;
for (const auto& p : listener_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
for (const auto& q : listener_state.watchers) {
listener_watchers.insert(q.second);
}
}
for (const auto& p : a.second.route_config_map) {
const RouteConfigState& route_config_state = p.second;
for (const auto& p : route_config_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
for (const auto& q : route_config_state.watchers) {
route_config_watchers.insert(q.second);
}
}
for (const auto& p : a.second.cluster_map) {
const ClusterState& cluster_state = p.second;
for (const auto& p : cluster_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
for (const auto& q : cluster_state.watchers) {
cluster_watchers.insert(q.second);
}
}
for (const auto& p : a.second.endpoint_map) {
const EndpointState& endpoint_state = p.second;
for (const auto& p : endpoint_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
for (const auto& q : endpoint_state.watchers) {
endpoint_watchers.insert(q.second);
}
}
}
work_serializer_.Schedule(
// TODO(yashykt): When we move to C++14, capture *_watchers using
// std::move()
[listener_watchers, route_config_watchers, cluster_watchers,
endpoint_watchers, error]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
for (const auto& watcher : listener_watchers) {
watcher->OnError(GRPC_ERROR_REF(error));
}
for (const auto& watcher : route_config_watchers) {
watcher->OnError(GRPC_ERROR_REF(error));
}
for (const auto& watcher : cluster_watchers) {
watcher->OnError(GRPC_ERROR_REF(error));
}
for (const auto& watcher : endpoint_watchers) {
watcher->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
},
DEBUG_LOCATION);
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(

@ -44,39 +44,48 @@ extern TraceFlag grpc_xds_client_refcount_trace;
class XdsClient : public DualRefCounted<XdsClient> {
public:
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface {
class ListenerWatcherInterface : public RefCounted<ListenerWatcherInterface> {
public:
virtual ~ListenerWatcherInterface() = default;
virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
virtual void OnError(grpc_error_handle error) = 0;
virtual void OnResourceDoesNotExist() = 0;
virtual void OnListenerChanged(XdsApi::LdsUpdate listener)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// RouteConfiguration data watcher interface. Implemented by callers.
class RouteConfigWatcherInterface {
class RouteConfigWatcherInterface
: public RefCounted<RouteConfigWatcherInterface> {
public:
virtual ~RouteConfigWatcherInterface() = default;
virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
virtual void OnError(grpc_error_handle error) = 0;
virtual void OnResourceDoesNotExist() = 0;
virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// Cluster data watcher interface. Implemented by callers.
class ClusterWatcherInterface {
class ClusterWatcherInterface : public RefCounted<ClusterWatcherInterface> {
public:
virtual ~ClusterWatcherInterface() = default;
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
virtual void OnError(grpc_error_handle error) = 0;
virtual void OnResourceDoesNotExist() = 0;
virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// Endpoint data watcher interface. Implemented by callers.
class EndpointWatcherInterface {
class EndpointWatcherInterface : public RefCounted<EndpointWatcherInterface> {
public:
virtual ~EndpointWatcherInterface() = default;
virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
virtual void OnError(grpc_error_handle error) = 0;
virtual void OnResourceDoesNotExist() = 0;
virtual void OnEndpointChanged(XdsApi::EdsUpdate update)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// Factory function to get or create the global XdsClient instance.
@ -112,7 +121,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchListenerData(absl::string_view listener_name,
std::unique_ptr<ListenerWatcherInterface> watcher);
RefCountedPtr<ListenerWatcherInterface> watcher);
void CancelListenerDataWatch(absl::string_view listener_name,
ListenerWatcherInterface* watcher,
bool delay_unsubscription = false);
@ -124,9 +133,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchRouteConfigData(
absl::string_view route_config_name,
std::unique_ptr<RouteConfigWatcherInterface> watcher);
void WatchRouteConfigData(absl::string_view route_config_name,
RefCountedPtr<RouteConfigWatcherInterface> watcher);
void CancelRouteConfigDataWatch(absl::string_view route_config_name,
RouteConfigWatcherInterface* watcher,
bool delay_unsubscription = false);
@ -139,7 +147,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchClusterData(absl::string_view cluster_name,
std::unique_ptr<ClusterWatcherInterface> watcher);
RefCountedPtr<ClusterWatcherInterface> watcher);
void CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcherInterface* watcher,
bool delay_unsubscription = false);
@ -152,7 +160,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchEndpointData(absl::string_view eds_service_name,
std::unique_ptr<EndpointWatcherInterface> watcher);
RefCountedPtr<EndpointWatcherInterface> watcher);
void CancelEndpointDataWatch(absl::string_view eds_service_name,
EndpointWatcherInterface* watcher,
bool delay_unsubscription = false);
@ -257,8 +265,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
};
struct ListenerState {
std::map<ListenerWatcherInterface*,
std::unique_ptr<ListenerWatcherInterface>>
std::map<ListenerWatcherInterface*, RefCountedPtr<ListenerWatcherInterface>>
watchers;
// The latest data seen from LDS.
absl::optional<XdsApi::LdsUpdate> update;
@ -267,7 +274,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
struct RouteConfigState {
std::map<RouteConfigWatcherInterface*,
std::unique_ptr<RouteConfigWatcherInterface>>
RefCountedPtr<RouteConfigWatcherInterface>>
watchers;
// The latest data seen from RDS.
absl::optional<XdsApi::RdsUpdate> update;
@ -275,7 +282,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
};
struct ClusterState {
std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
std::map<ClusterWatcherInterface*, RefCountedPtr<ClusterWatcherInterface>>
watchers;
// The latest data seen from CDS.
absl::optional<XdsApi::CdsUpdate> update;
@ -283,8 +290,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
};
struct EndpointState {
std::map<EndpointWatcherInterface*,
std::unique_ptr<EndpointWatcherInterface>>
std::map<EndpointWatcherInterface*, RefCountedPtr<EndpointWatcherInterface>>
watchers;
// The latest data seen from EDS.
absl::optional<XdsApi::EdsUpdate> update;
@ -319,6 +325,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
grpc_millis last_report_time = ExecCtx::Get()->Now();
};
class Notifier;
// Sends an error notification to all watchers.
void NotifyOnErrorLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
@ -336,6 +344,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
grpc_pollset_set* interested_parties_;
OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
XdsApi api_;
WorkSerializer work_serializer_;
Mutex mu_;
@ -354,14 +363,14 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Stores started watchers whose resource name was not parsed successfully,
// waiting to be cancelled or reset in Orphan().
std::map<ListenerWatcherInterface*, std::unique_ptr<ListenerWatcherInterface>>
std::map<ListenerWatcherInterface*, RefCountedPtr<ListenerWatcherInterface>>
invalid_listener_watchers_ ABSL_GUARDED_BY(mu_);
std::map<RouteConfigWatcherInterface*,
std::unique_ptr<RouteConfigWatcherInterface>>
RefCountedPtr<RouteConfigWatcherInterface>>
invalid_route_config_watchers_ ABSL_GUARDED_BY(mu_);
std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
std::map<ClusterWatcherInterface*, RefCountedPtr<ClusterWatcherInterface>>
invalid_cluster_watchers_ ABSL_GUARDED_BY(mu_);
std::map<EndpointWatcherInterface*, std::unique_ptr<EndpointWatcherInterface>>
std::map<EndpointWatcherInterface*, RefCountedPtr<EndpointWatcherInterface>>
invalid_endpoint_watchers_ ABSL_GUARDED_BY(mu_);
bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;

@ -357,7 +357,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
watcher) override {
grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
auto listener_watcher = absl::make_unique<ListenerWatcher>(
auto listener_watcher = MakeRefCounted<ListenerWatcher>(
std::move(watcher), xds_client_, serving_status_notifier_,
listening_address);
auto* listener_watcher_ptr = listener_watcher.get();

Loading…
Cancel
Save