diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 49a4496e97d..7b97d9be07b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -72,35 +72,36 @@ class CdsLb : public LoadBalancingPolicy { : parent_(std::move(parent)), name_(std::move(name)) {} void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override { - new Notifier(parent_, name_, std::move(cluster_data)); + Ref().release(); // Ref held by lambda + parent_->work_serializer()->Run( + // TODO(roth): When we move to C++14, capture cluster_data with + // std::move(). + [this, cluster_data]() mutable { + parent_->OnClusterChanged(name_, std::move(cluster_data)); + Unref(); + }, + DEBUG_LOCATION); } void OnError(grpc_error_handle error) override { - new Notifier(parent_, name_, error); + Ref().release(); // Ref held by lambda + parent_->work_serializer()->Run( + [this, error]() { + parent_->OnError(name_, error); + Unref(); + }, + DEBUG_LOCATION); + } + void OnResourceDoesNotExist() override { + Ref().release(); // Ref held by lambda + parent_->work_serializer()->Run( + [this]() { + parent_->OnResourceDoesNotExist(name_); + Unref(); + }, + DEBUG_LOCATION); } - void OnResourceDoesNotExist() override { new Notifier(parent_, name_); } private: - class Notifier { - public: - Notifier(RefCountedPtr parent, std::string name, - XdsApi::CdsUpdate update); - Notifier(RefCountedPtr parent, std::string name, - grpc_error_handle error); - explicit Notifier(RefCountedPtr parent, std::string name); - - private: - enum Type { kUpdate, kError, kDoesNotExist }; - - static void RunInExecCtx(void* arg, grpc_error_handle error); - void RunInWorkSerializer(grpc_error_handle error); - - RefCountedPtr parent_; - std::string name_; - grpc_closure closure_; - XdsApi::CdsUpdate update_; - Type type_; - }; - RefCountedPtr parent_; std::string name_; }; @@ -174,60 +175,6 @@ class CdsLb : public LoadBalancingPolicy { bool shutting_down_ = false; }; -// -// CdsLb::ClusterWatcher::Notifier -// - -CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, - std::string name, - XdsApi::CdsUpdate update) - : parent_(std::move(parent)), - name_(std::move(name)), - update_(std::move(update)), - type_(kUpdate) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, - std::string name, - grpc_error_handle error) - : parent_(std::move(parent)), name_(std::move(name)), type_(kError) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, error); -} - -CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr parent, - std::string name) - : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg, - grpc_error_handle error) { - Notifier* self = static_cast(arg); - (void)GRPC_ERROR_REF(error); - self->parent_->work_serializer()->Run( - [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); -} - -void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer( - grpc_error_handle error) { - switch (type_) { - case kUpdate: - parent_->OnClusterChanged(name_, std::move(update_)); - break; - case kError: - parent_->OnError(name_, error); - break; - case kDoesNotExist: - parent_->OnResourceDoesNotExist(name_); - break; - }; - delete this; -} - // // CdsLb::Helper // @@ -352,7 +299,7 @@ void CdsLb::UpdateLocked(UpdateArgs args) { } watchers_.clear(); } - auto watcher = absl::make_unique(Ref(), config_->cluster()); + auto watcher = MakeRefCounted(Ref(), config_->cluster()); watchers_[config_->cluster()].watcher = watcher.get(); xds_client_->WatchClusterData(config_->cluster(), std::move(watcher)); } @@ -373,7 +320,7 @@ bool CdsLb::GenerateDiscoveryMechanismForCluster( auto& state = watchers_[name]; // Create a new watcher if needed. if (state.watcher == nullptr) { - auto watcher = absl::make_unique(Ref(), name); + auto watcher = MakeRefCounted(Ref(), name); if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, name.c_str()); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 8ca5f065ba4..5998c36038c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -176,41 +176,58 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher"); } void OnEndpointChanged(XdsApi::EdsUpdate update) override { - new Notifier(discovery_mechanism_, std::move(update)); + Ref().release(); // ref held by callback + discovery_mechanism_->parent()->work_serializer()->Run( + // TODO(yashykt): When we move to C++14, capture update with + // std::move + [this, update]() mutable { + OnEndpointChangedHelper(std::move(update)); + Unref(); + }, + DEBUG_LOCATION); } void OnError(grpc_error_handle error) override { - new Notifier(discovery_mechanism_, error); + Ref().release(); // ref held by callback + discovery_mechanism_->parent()->work_serializer()->Run( + [this, error]() { + OnErrorHelper(error); + Unref(); + }, + DEBUG_LOCATION); } void OnResourceDoesNotExist() override { - new Notifier(discovery_mechanism_); + Ref().release(); // ref held by callback + discovery_mechanism_->parent()->work_serializer()->Run( + [this]() { + OnResourceDoesNotExistHelper(); + Unref(); + }, + DEBUG_LOCATION); } private: - class Notifier { - public: - Notifier(RefCountedPtr discovery_mechanism, - XdsApi::EdsUpdate update); - Notifier(RefCountedPtr discovery_mechanism, - grpc_error_handle error); - explicit Notifier( - RefCountedPtr discovery_mechanism); - ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); } - - private: - enum Type { kUpdate, kError, kDoesNotExist }; - - static void RunInExecCtx(void* arg, grpc_error_handle error); - void RunInWorkSerializer(grpc_error_handle error); - - RefCountedPtr discovery_mechanism_; - grpc_closure closure_; - XdsApi::EdsUpdate update_; - Type type_; - }; - + // Code accessing protected methods of `DiscoveryMechanism` need to be + // in methods of this class rather than in lambdas to work around an MSVC + // bug. + void OnEndpointChangedHelper(XdsApi::EdsUpdate update) { + discovery_mechanism_->parent()->OnEndpointChanged( + discovery_mechanism_->index(), std::move(update)); + } + void OnErrorHelper(grpc_error_handle error) { + discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), + error); + } + void OnResourceDoesNotExistHelper() { + discovery_mechanism_->parent()->OnResourceDoesNotExist( + discovery_mechanism_->index()); + } RefCountedPtr discovery_mechanism_; }; + // This is necessary only because of a bug in msvc where nested class + // cannot access protected member in base class. + friend class EndpointWatcher; + absl::string_view GetEdsResourceName() const { if (!parent()->is_xds_uri_) return parent()->server_name_; if (!parent() @@ -403,7 +420,7 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() { ":%p starting xds watch for %s", parent(), index(), this, std::string(GetEdsResourceName()).c_str()); } - auto watcher = absl::make_unique( + auto watcher = MakeRefCounted( Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism")); watcher_ = watcher.get(); parent()->xds_client_->WatchEndpointData(GetEdsResourceName(), @@ -422,66 +439,6 @@ void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() { Unref(); } -// -// XdsClusterResolverLb::EndpointWatcher::Notifier -// - -XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - Notifier(RefCountedPtr - discovery_mechanism, - XdsApi::EdsUpdate update) - : discovery_mechanism_(std::move(discovery_mechanism)), - update_(std::move(update)), - type_(kUpdate) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - Notifier(RefCountedPtr - discovery_mechanism, - grpc_error_handle error) - : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, error); -} - -XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - Notifier(RefCountedPtr - discovery_mechanism) - : discovery_mechanism_(std::move(discovery_mechanism)), - type_(kDoesNotExist) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - RunInExecCtx(void* arg, grpc_error_handle error) { - Notifier* self = static_cast(arg); - (void)GRPC_ERROR_REF(error); - self->discovery_mechanism_->parent()->work_serializer()->Run( - [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); -} - -void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier:: - RunInWorkSerializer(grpc_error_handle error) { - switch (type_) { - case kUpdate: - discovery_mechanism_->parent()->OnEndpointChanged( - discovery_mechanism_->index(), std::move(update_)); - break; - case kError: - discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), - error); - break; - case kDoesNotExist: - discovery_mechanism_->parent()->OnResourceDoesNotExist( - discovery_mechanism_->index()); - break; - }; - delete this; -} - // // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism // diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index fc2dac9586f..dc8a569b891 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -78,36 +78,39 @@ class XdsResolver : public Resolver { } private: - class Notifier { - public: - Notifier(RefCountedPtr resolver, XdsApi::LdsUpdate update); - Notifier(RefCountedPtr resolver, XdsApi::RdsUpdate update); - Notifier(RefCountedPtr resolver, grpc_error_handle error); - explicit Notifier(RefCountedPtr resolver); - - private: - enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist }; - - static void RunInExecCtx(void* arg, grpc_error_handle error); - void RunInWorkSerializer(grpc_error_handle error); - - RefCountedPtr resolver_; - grpc_closure closure_; - XdsApi::LdsUpdate update_; - Type type_; - }; - class ListenerWatcher : public XdsClient::ListenerWatcherInterface { public: explicit ListenerWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} void OnListenerChanged(XdsApi::LdsUpdate listener) override { - new Notifier(resolver_, std::move(listener)); + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + // TODO(yashykt): When we move to C++14, capture listener with + // std::move + [this, listener]() mutable { + resolver_->OnListenerUpdate(std::move(listener)); + Unref(); + }, + DEBUG_LOCATION); } void OnError(grpc_error_handle error) override { - new Notifier(resolver_, error); + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + [this, error]() { + resolver_->OnError(error); + Unref(); + }, + DEBUG_LOCATION); + } + void OnResourceDoesNotExist() override { + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + [this]() { + resolver_->OnResourceDoesNotExist(); + Unref(); + }, + DEBUG_LOCATION); } - void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; @@ -118,12 +121,34 @@ class XdsResolver : public Resolver { explicit RouteConfigWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override { - new Notifier(resolver_, std::move(route_config)); + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + // TODO(yashykt): When we move to C++14, capture route_config with + // std::move + [this, route_config]() mutable { + resolver_->OnRouteConfigUpdate(std::move(route_config)); + Unref(); + }, + DEBUG_LOCATION); } void OnError(grpc_error_handle error) override { - new Notifier(resolver_, error); + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + [this, error]() { + resolver_->OnError(error); + Unref(); + }, + DEBUG_LOCATION); + } + void OnResourceDoesNotExist() override { + Ref().release(); // ref held by lambda + resolver_->work_serializer_->Run( + [this]() { + resolver_->OnResourceDoesNotExist(); + Unref(); + }, + DEBUG_LOCATION); } - void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; @@ -275,71 +300,6 @@ class XdsResolver : public Resolver { ClusterState::ClusterStateMap cluster_state_map_; }; -// -// XdsResolver::Notifier -// - -XdsResolver::Notifier::Notifier(RefCountedPtr resolver, - XdsApi::LdsUpdate update) - : resolver_(std::move(resolver)), - update_(std::move(update)), - type_(kLdsUpdate) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -XdsResolver::Notifier::Notifier(RefCountedPtr resolver, - XdsApi::RdsUpdate update) - : resolver_(std::move(resolver)), type_(kRdsUpdate) { - update_.http_connection_manager.rds_update = std::move(update); - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -XdsResolver::Notifier::Notifier(RefCountedPtr resolver, - grpc_error_handle error) - : resolver_(std::move(resolver)), type_(kError) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, error); -} - -XdsResolver::Notifier::Notifier(RefCountedPtr resolver) - : resolver_(std::move(resolver)), type_(kDoesNotExist) { - GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); -} - -void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error_handle error) { - Notifier* self = static_cast(arg); - (void)GRPC_ERROR_REF(error); - self->resolver_->work_serializer_->Run( - [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); -} - -void XdsResolver::Notifier::RunInWorkSerializer(grpc_error_handle error) { - if (resolver_->xds_client_ == nullptr) { - GRPC_ERROR_UNREF(error); - delete this; - return; - } - switch (type_) { - case kLdsUpdate: - resolver_->OnListenerUpdate(std::move(update_)); - break; - case kRdsUpdate: - resolver_->OnRouteConfigUpdate( - std::move(*update_.http_connection_manager.rds_update)); - break; - case kError: - resolver_->OnError(error); - break; - case kDoesNotExist: - resolver_->OnResourceDoesNotExist(); - break; - }; - delete this; -} - // // XdsResolver::XdsConfigSelector::Route // @@ -786,7 +746,7 @@ void XdsResolver::StartLocked() { } grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), interested_parties_); - auto watcher = absl::make_unique(Ref()); + auto watcher = MakeRefCounted(Ref()); listener_watcher_ = watcher.get(); xds_client_->WatchListenerData(server_name_, std::move(watcher)); } @@ -814,6 +774,9 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this); } + if (xds_client_ == nullptr) { + return; + } if (listener.http_connection_manager.route_config_name != route_config_name_) { if (route_config_watcher_ != nullptr) { @@ -827,7 +790,7 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { std::move(listener.http_connection_manager.route_config_name); if (!route_config_name_.empty()) { current_virtual_host_.routes.clear(); - auto watcher = absl::make_unique(Ref()); + auto watcher = MakeRefCounted(Ref()); route_config_watcher_ = watcher.get(); xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher)); } @@ -849,6 +812,9 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this); } + if (xds_client_ == nullptr) { + return; + } // Find the relevant VirtualHost from the RouteConfiguration. XdsApi::RdsUpdate::VirtualHost* vhost = rds_update.FindVirtualHostForDomain(server_name_); @@ -867,6 +833,10 @@ void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { void XdsResolver::OnError(grpc_error_handle error) { gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s", this, grpc_error_std_string(error).c_str()); + if (xds_client_ == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } Result result; grpc_arg new_arg = xds_client_->MakeChannelArg(); result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); @@ -879,6 +849,9 @@ void XdsResolver::OnResourceDoesNotExist() { "[xds_resolver %p] LDS/RDS resource does not exist -- clearing " "update and returning empty service config", this); + if (xds_client_ == nullptr) { + return; + } current_virtual_host_.routes.clear(); Result result; result.service_config = diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 4e94198dbaf..034556bd347 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -78,6 +78,43 @@ char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; } // namespace +class XdsClient::Notifier { + public: + // Helper template function to invoke `OnError()` on a list of watchers \a + // watchers_list within \a work_serializer. Works with all 4 resource types. + template + static void ScheduleNotifyWatchersOnErrorInWorkSerializer( + XdsClient* xds_client, const T& watchers_list, grpc_error_handle error, + const grpc_core::DebugLocation& location) { + xds_client->work_serializer_.Schedule( + [watchers_list, error]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + }, + location); + } + + // Helper template function to invoke `OnResourceDoesNotExist()` on a list of + // watchers \a watchers_list within \a work_serializer. Works with all 4 + // resource types. + template + static void ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + XdsClient* xds_client, const T& watchers_list, + const grpc_core::DebugLocation& location) { + xds_client->work_serializer_.Schedule( + [watchers_list]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnResourceDoesNotExist(); + } + }, + location); + } +}; + // // Internal class declarations // @@ -189,6 +226,7 @@ class XdsClient::ChannelState::AdsCallState MutexLock lock(&self->ads_calld_->xds_client()->mu_); self->OnTimerLocked(GRPC_ERROR_REF(error)); } + self->ads_calld_->xds_client()->work_serializer_.DrainQueue(); self->ads_calld_.reset(); self->Unref(DEBUG_LOCATION, "timer"); } @@ -214,28 +252,28 @@ class XdsClient::ChannelState::AdsCallState if (type_url_ == XdsApi::kLdsTypeUrl) { ListenerState& state = authority_state.listener_map[resource_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - for (const auto& p : state.watchers) { - p.first->OnError(GRPC_ERROR_REF(watcher_error)); - } + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + ads_calld_->xds_client(), state.watchers, + GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kRdsTypeUrl) { RouteConfigState& state = authority_state.route_config_map[resource_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - for (const auto& p : state.watchers) { - p.first->OnError(GRPC_ERROR_REF(watcher_error)); - } + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + ads_calld_->xds_client(), state.watchers, + GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kCdsTypeUrl) { ClusterState& state = authority_state.cluster_map[resource_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - for (const auto& p : state.watchers) { - p.first->OnError(GRPC_ERROR_REF(watcher_error)); - } + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + ads_calld_->xds_client(), state.watchers, + GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else if (type_url_ == XdsApi::kEdsTypeUrl) { EndpointState& state = authority_state.endpoint_map[resource_.id]; state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - for (const auto& p : state.watchers) { - p.first->OnError(GRPC_ERROR_REF(watcher_error)); - } + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + ads_calld_->xds_client(), state.watchers, + GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); } else { GPR_UNREACHABLE_CODE(return ); } @@ -464,18 +502,21 @@ class XdsClient::ChannelState::StateWatcher private: void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { - MutexLock lock(&parent_->xds_client_->mu_); - if (!parent_->shutting_down_ && - new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // In TRANSIENT_FAILURE. Notify all watchers of error. - gpr_log(GPR_INFO, - "[xds_client %p] xds channel in state:TRANSIENT_FAILURE " - "status_message:(%s)", - parent_->xds_client(), status.ToString().c_str()); - parent_->xds_client_->NotifyOnErrorLocked( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "xds channel in TRANSIENT_FAILURE")); + { + MutexLock lock(&parent_->xds_client_->mu_); + if (!parent_->shutting_down_ && + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // In TRANSIENT_FAILURE. Notify all watchers of error. + gpr_log(GPR_INFO, + "[xds_client %p] xds channel in state:TRANSIENT_FAILURE " + "status_message:(%s)", + parent_->xds_client(), status.ToString().c_str()); + parent_->xds_client_->NotifyOnErrorLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "xds channel in TRANSIENT_FAILURE")); + } } + parent_->xds_client()->work_serializer_.DrainQueue(); } WeakRefCountedPtr parent_; @@ -1000,9 +1041,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( listener_state.meta = CreateResourceMetadataAcked( std::move(p.second.serialized_proto), version, update_time); // Notify watchers. - for (const auto& p : listener_state.watchers) { - p.first->OnListenerChanged(*listener_state.update); - } + auto& watchers_list = listener_state.watchers; + auto& value = listener_state.update.value(); + xds_client()->work_serializer_.Schedule( + [watchers_list, value]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnListenerChanged(value); + } + }, + DEBUG_LOCATION); } // For invalid resources in the update, if they are already in the // cache, pretend that they are present in the update, so that we @@ -1041,9 +1089,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( // For that case, we rely on the request timeout instead. if (!listener_state.update.has_value()) continue; listener_state.update.reset(); - for (const auto& p : listener_state.watchers) { - p.first->OnResourceDoesNotExist(); - } + Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + xds_client(), listener_state.watchers, DEBUG_LOCATION); } } } @@ -1063,9 +1110,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( ->authority_state_map_[authority_name] .route_config_map[listener_name]; route_config_state.update.reset(); - for (const auto& p : route_config_state.watchers) { - p.first->OnResourceDoesNotExist(); - } + Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + xds_client(), route_config_state.watchers, DEBUG_LOCATION); } } } @@ -1110,9 +1156,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( route_config_state.meta = CreateResourceMetadataAcked( std::move(p.second.serialized_proto), version, update_time); // Notify all watchers. - for (const auto& p : route_config_state.watchers) { - p.first->OnRouteConfigChanged(*route_config_state.update); - } + auto& watchers_list = route_config_state.watchers; + auto& value = route_config_state.update.value(); + xds_client()->work_serializer_.Schedule( + [watchers_list, value]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnRouteConfigChanged(value); + } + }, + DEBUG_LOCATION); } } @@ -1165,9 +1218,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( cluster_state.meta = CreateResourceMetadataAcked( std::move(p.second.serialized_proto), version, update_time); // Notify all watchers. - for (const auto& p : cluster_state.watchers) { - p.first->OnClusterChanged(cluster_state.update.value()); - } + auto& watchers_list = cluster_state.watchers; + auto& value = cluster_state.update.value(); + xds_client()->work_serializer_.Schedule( + [watchers_list, value]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnClusterChanged(value); + } + }, + DEBUG_LOCATION); } // For invalid resources in the update, if they are already in the // cache, pretend that they are present in the update, so that we @@ -1206,9 +1266,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( // For that case, we rely on the request timeout instead. if (!cluster_state.update.has_value()) continue; cluster_state.update.reset(); - for (const auto& p : cluster_state.watchers) { - p.first->OnResourceDoesNotExist(); - } + Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + xds_client(), cluster_state.watchers, DEBUG_LOCATION); } } } @@ -1227,9 +1286,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( ->authority_state_map_[authority] .endpoint_map[eds_resource_name]; endpoint_state.update.reset(); - for (const auto& p : endpoint_state.watchers) { - p.first->OnResourceDoesNotExist(); - } + Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + xds_client(), endpoint_state.watchers, DEBUG_LOCATION); } } } @@ -1277,9 +1335,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked( endpoint_state.meta = CreateResourceMetadataAcked( std::move(p.second.serialized_proto), version, update_time); // Notify all watchers. - for (const auto& p : endpoint_state.watchers) { - p.first->OnEndpointChanged(endpoint_state.update.value()); - } + auto& watchers_list = endpoint_state.watchers; + auto& value = endpoint_state.update.value(); + xds_client()->work_serializer_.Schedule( + [watchers_list, value]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnEndpointChanged(value); + } + }, + DEBUG_LOCATION); } } @@ -1306,9 +1371,9 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateHelperLocked( auto it = state_map->find(resource_name); if (it == state_map->end()) return; auto& state = it->second; - for (const auto& p : state.watchers) { - p.first->OnError(GRPC_ERROR_REF(result.parse_error)); - } + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + xds_client(), state.watchers, GRPC_ERROR_REF(result.parse_error), + DEBUG_LOCATION); UpdateResourceMetadataNacked(result.version, error_details, update_time, &state.meta); } @@ -1388,6 +1453,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceived( MutexLock lock(&ads_calld->xds_client()->mu_); done = ads_calld->OnResponseReceivedLocked(); } + ads_calld->xds_client()->work_serializer_.DrainQueue(); if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); } @@ -1494,6 +1560,7 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( MutexLock lock(&ads_calld->xds_client()->mu_); ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error)); } + ads_calld->xds_client()->work_serializer_.DrainQueue(); ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); } @@ -2081,37 +2148,58 @@ RefCountedPtr XdsClient::GetOrCreateChannelStateLocked( void XdsClient::WatchListenerData( absl::string_view listener_name, - std::unique_ptr watcher) { + RefCountedPtr watcher) { std::string listener_name_str = std::string(listener_name); - MutexLock lock(&mu_); ListenerWatcherInterface* w = watcher.get(); auto resource = XdsApi::ParseResourceName(listener_name, XdsApi::IsLds); if (!resource.ok()) { - invalid_listener_watchers_[w] = std::move(watcher); + { + MutexLock lock(&mu_); + invalid_listener_watchers_[w] = watcher; + } grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( "Unable to parse resource name for listener %s", listener_name)); - w->OnError(GRPC_ERROR_REF(error)); + work_serializer_.Run( + // TODO(yashykt): When we move to C++14, capture watcher using + // std::move() + [watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnError(error); + }, + DEBUG_LOCATION); return; } - AuthorityState& authority_state = authority_state_map_[resource->authority]; - ListenerState& listener_state = authority_state.listener_map[resource->id]; - listener_state.watchers[w] = std::move(watcher); - // If we've already received an LDS update, notify the new watcher - // immediately. - if (listener_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s", - this, listener_name_str.c_str()); + { + MutexLock lock(&mu_); + AuthorityState& authority_state = authority_state_map_[resource->authority]; + ListenerState& listener_state = authority_state.listener_map[resource->id]; + listener_state.watchers[w] = watcher; + // If we've already received an LDS update, notify the new watcher + // immediately. + if (listener_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached listener data for %s", this, + listener_name_str.c_str()); + } + auto& value = listener_state.update.value(); + work_serializer_.Schedule( + // TODO(yashykt): When we move to C++14, capture watcher using + // std::move() + [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + watcher->OnListenerChanged(value); + }, + DEBUG_LOCATION); } - w->OnListenerChanged(*listener_state.update); - } - // If the authority doesn't yet have a channel, set it, creating it if needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); + // If the authority doesn't yet have a channel, set it, creating it if + // needed. + if (authority_state.channel_state == nullptr) { + authority_state.channel_state = + GetOrCreateChannelStateLocked(bootstrap_->server()); + } + authority_state.channel_state->SubscribeLocked(XdsApi::kLdsTypeUrl, + *resource); } - authority_state.channel_state->SubscribeLocked(XdsApi::kLdsTypeUrl, - *resource); + work_serializer_.DrainQueue(); } void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, @@ -2140,40 +2228,58 @@ void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, void XdsClient::WatchRouteConfigData( absl::string_view route_config_name, - std::unique_ptr watcher) { + RefCountedPtr watcher) { std::string route_config_name_str = std::string(route_config_name); - MutexLock lock(&mu_); RouteConfigWatcherInterface* w = watcher.get(); auto resource = XdsApi::ParseResourceName(route_config_name, XdsApi::IsRds); if (!resource.ok()) { - invalid_route_config_watchers_[w] = std::move(watcher); + { + MutexLock lock(&mu_); + invalid_route_config_watchers_[w] = watcher; + } grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( absl::StrFormat("Unable to parse resource name for route config %s", route_config_name)); - w->OnError(GRPC_ERROR_REF(error)); + work_serializer_.Run( + // TODO(yashykt): When we move to C++14, capture watcher using + // std::move() + [watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnError(error); + }, + DEBUG_LOCATION); return; } - auto& authority_state = authority_state_map_[resource->authority]; - RouteConfigState& route_config_state = - authority_state.route_config_map[resource->id]; - route_config_state.watchers[w] = std::move(watcher); - // If we've already received an RDS update, notify the new watcher - // immediately. - if (route_config_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] returning cached route config data for %s", this, - route_config_name_str.c_str()); + { + MutexLock lock(&mu_); + auto& authority_state = authority_state_map_[resource->authority]; + RouteConfigState& route_config_state = + authority_state.route_config_map[resource->id]; + route_config_state.watchers[w] = watcher; + // If we've already received an RDS update, notify the new watcher + // immediately. + if (route_config_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached route config data for %s", + this, route_config_name_str.c_str()); + } + auto& value = route_config_state.update.value(); + work_serializer_.Schedule( + [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + watcher->OnRouteConfigChanged(value); + }, + DEBUG_LOCATION); } - w->OnRouteConfigChanged(*route_config_state.update); - } - // If the authority doesn't yet have a channel, set it, creating it if needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); + // If the authority doesn't yet have a channel, set it, creating it if + // needed. + if (authority_state.channel_state == nullptr) { + authority_state.channel_state = + GetOrCreateChannelStateLocked(bootstrap_->server()); + } + authority_state.channel_state->SubscribeLocked(XdsApi::kRdsTypeUrl, + *resource); } - authority_state.channel_state->SubscribeLocked(XdsApi::kRdsTypeUrl, - *resource); + work_serializer_.DrainQueue(); } void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, @@ -2203,37 +2309,54 @@ void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, void XdsClient::WatchClusterData( absl::string_view cluster_name, - std::unique_ptr watcher) { + RefCountedPtr watcher) { std::string cluster_name_str = std::string(cluster_name); - MutexLock lock(&mu_); ClusterWatcherInterface* w = watcher.get(); auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds); if (!resource.ok()) { - invalid_cluster_watchers_[w] = std::move(watcher); + { + MutexLock lock(&mu_); + invalid_cluster_watchers_[w] = watcher; + } grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( "Unable to parse resource name for cluster %s", cluster_name)); - w->OnError(GRPC_ERROR_REF(error)); + work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( + work_serializer_) { watcher->OnError(error); }, + DEBUG_LOCATION); return; } - auto& authority_state = authority_state_map_[resource->authority]; - ClusterState& cluster_state = authority_state.cluster_map[resource->id]; - cluster_state.watchers[w] = std::move(watcher); - // If we've already received a CDS update, notify the new watcher - // immediately. - if (cluster_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s", - this, cluster_name_str.c_str()); + { + MutexLock lock(&mu_); + auto& authority_state = authority_state_map_[resource->authority]; + ClusterState& cluster_state = authority_state.cluster_map[resource->id]; + cluster_state.watchers[w] = watcher; + // If we've already received a CDS update, notify the new watcher + // immediately. + if (cluster_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached cluster data for %s", this, + cluster_name_str.c_str()); + } + auto& value = cluster_state.update.value(); + work_serializer_.Schedule( + // TODO(yashykt): When we move to C++14, capture watcher using + // std::move() + [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + watcher->OnClusterChanged(value); + }, + DEBUG_LOCATION); } - w->OnClusterChanged(cluster_state.update.value()); - } - // If the authority doesn't yet have a channel, set it, creating it if needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); + // If the authority doesn't yet have a channel, set it, creating it if + // needed. + if (authority_state.channel_state == nullptr) { + authority_state.channel_state = + GetOrCreateChannelStateLocked(bootstrap_->server()); + } + authority_state.channel_state->SubscribeLocked(XdsApi::kCdsTypeUrl, + *resource); } - authority_state.channel_state->SubscribeLocked(XdsApi::kCdsTypeUrl, - *resource); + work_serializer_.DrainQueue(); } void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, @@ -2262,38 +2385,53 @@ void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, void XdsClient::WatchEndpointData( absl::string_view eds_service_name, - std::unique_ptr watcher) { + RefCountedPtr watcher) { std::string eds_service_name_str = std::string(eds_service_name); - MutexLock lock(&mu_); EndpointWatcherInterface* w = watcher.get(); auto resource = XdsApi::ParseResourceName(eds_service_name, XdsApi::IsEds); if (!resource.ok()) { - invalid_endpoint_watchers_[w] = std::move(watcher); + { + MutexLock lock(&mu_); + invalid_endpoint_watchers_[w] = watcher; + } grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( absl::StrFormat("Unable to parse resource name for endpoint service %s", eds_service_name)); - w->OnError(GRPC_ERROR_REF(error)); + work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( + work_serializer_) { watcher->OnError(error); }, + DEBUG_LOCATION); return; } - auto& authority_state = authority_state_map_[resource->authority]; - EndpointState& endpoint_state = authority_state.endpoint_map[resource->id]; - endpoint_state.watchers[w] = std::move(watcher); - // If we've already received an EDS update, notify the new watcher - // immediately. - if (endpoint_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s", - this, eds_service_name_str.c_str()); + { + MutexLock lock(&mu_); + auto& authority_state = authority_state_map_[resource->authority]; + EndpointState& endpoint_state = authority_state.endpoint_map[resource->id]; + endpoint_state.watchers[w] = watcher; + // If we've already received an EDS update, notify the new watcher + // immediately. + if (endpoint_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached endpoint data for %s", this, + eds_service_name_str.c_str()); + } + auto& value = endpoint_state.update.value(); + work_serializer_.Schedule( + [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + watcher->OnEndpointChanged(value); + }, + DEBUG_LOCATION); } - w->OnEndpointChanged(endpoint_state.update.value()); - } - // If the authority doesn't yet have a channel, set it, creating it if needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); + // If the authority doesn't yet have a channel, set it, creating it if + // needed. + if (authority_state.channel_state == nullptr) { + authority_state.channel_state = + GetOrCreateChannelStateLocked(bootstrap_->server()); + } + authority_state.channel_state->SubscribeLocked(XdsApi::kEdsTypeUrl, + *resource); } - authority_state.channel_state->SubscribeLocked(XdsApi::kEdsTypeUrl, - *resource); + work_serializer_.DrainQueue(); } void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, @@ -2453,33 +2591,57 @@ void XdsClient::ResetBackoff() { } void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) { + std::set> listener_watchers; + std::set> route_config_watchers; + std::set> cluster_watchers; + std::set> endpoint_watchers; for (const auto& a : authority_state_map_) { for (const auto& p : a.second.listener_map) { const ListenerState& listener_state = p.second; - for (const auto& p : listener_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& q : listener_state.watchers) { + listener_watchers.insert(q.second); } } for (const auto& p : a.second.route_config_map) { const RouteConfigState& route_config_state = p.second; - for (const auto& p : route_config_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& q : route_config_state.watchers) { + route_config_watchers.insert(q.second); } } for (const auto& p : a.second.cluster_map) { const ClusterState& cluster_state = p.second; - for (const auto& p : cluster_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& q : cluster_state.watchers) { + cluster_watchers.insert(q.second); } } for (const auto& p : a.second.endpoint_map) { const EndpointState& endpoint_state = p.second; - for (const auto& p : endpoint_state.watchers) { - p.first->OnError(GRPC_ERROR_REF(error)); + for (const auto& q : endpoint_state.watchers) { + endpoint_watchers.insert(q.second); } } } - GRPC_ERROR_UNREF(error); + work_serializer_.Schedule( + // TODO(yashykt): When we move to C++14, capture *_watchers using + // std::move() + [listener_watchers, route_config_watchers, cluster_watchers, + endpoint_watchers, error]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + for (const auto& watcher : listener_watchers) { + watcher->OnError(GRPC_ERROR_REF(error)); + } + for (const auto& watcher : route_config_watchers) { + watcher->OnError(GRPC_ERROR_REF(error)); + } + for (const auto& watcher : cluster_watchers) { + watcher->OnError(GRPC_ERROR_REF(error)); + } + for (const auto& watcher : endpoint_watchers) { + watcher->OnError(GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + }, + DEBUG_LOCATION); } XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index dfcee72dc4a..153b8c495c8 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -44,39 +44,48 @@ extern TraceFlag grpc_xds_client_refcount_trace; class XdsClient : public DualRefCounted { public: // Listener data watcher interface. Implemented by callers. - class ListenerWatcherInterface { + class ListenerWatcherInterface : public RefCounted { public: - virtual ~ListenerWatcherInterface() = default; - virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; + virtual void OnListenerChanged(XdsApi::LdsUpdate listener) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnError(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnResourceDoesNotExist() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; // RouteConfiguration data watcher interface. Implemented by callers. - class RouteConfigWatcherInterface { + class RouteConfigWatcherInterface + : public RefCounted { public: - virtual ~RouteConfigWatcherInterface() = default; - virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; + virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnError(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnResourceDoesNotExist() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; // Cluster data watcher interface. Implemented by callers. - class ClusterWatcherInterface { + class ClusterWatcherInterface : public RefCounted { public: - virtual ~ClusterWatcherInterface() = default; - virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; + virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnError(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnResourceDoesNotExist() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; // Endpoint data watcher interface. Implemented by callers. - class EndpointWatcherInterface { + class EndpointWatcherInterface : public RefCounted { public: - virtual ~EndpointWatcherInterface() = default; - virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; - virtual void OnError(grpc_error_handle error) = 0; - virtual void OnResourceDoesNotExist() = 0; + virtual void OnEndpointChanged(XdsApi::EdsUpdate update) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnError(grpc_error_handle error) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnResourceDoesNotExist() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; // Factory function to get or create the global XdsClient instance. @@ -112,7 +121,7 @@ class XdsClient : public DualRefCounted { // If the caller is going to start a new watch after cancelling the // old one, it should set delay_unsubscription to true. void WatchListenerData(absl::string_view listener_name, - std::unique_ptr watcher); + RefCountedPtr watcher); void CancelListenerDataWatch(absl::string_view listener_name, ListenerWatcherInterface* watcher, bool delay_unsubscription = false); @@ -124,9 +133,8 @@ class XdsClient : public DualRefCounted { // pointer must not be used for any other purpose.) // If the caller is going to start a new watch after cancelling the // old one, it should set delay_unsubscription to true. - void WatchRouteConfigData( - absl::string_view route_config_name, - std::unique_ptr watcher); + void WatchRouteConfigData(absl::string_view route_config_name, + RefCountedPtr watcher); void CancelRouteConfigDataWatch(absl::string_view route_config_name, RouteConfigWatcherInterface* watcher, bool delay_unsubscription = false); @@ -139,7 +147,7 @@ class XdsClient : public DualRefCounted { // If the caller is going to start a new watch after cancelling the // old one, it should set delay_unsubscription to true. void WatchClusterData(absl::string_view cluster_name, - std::unique_ptr watcher); + RefCountedPtr watcher); void CancelClusterDataWatch(absl::string_view cluster_name, ClusterWatcherInterface* watcher, bool delay_unsubscription = false); @@ -152,7 +160,7 @@ class XdsClient : public DualRefCounted { // If the caller is going to start a new watch after cancelling the // old one, it should set delay_unsubscription to true. void WatchEndpointData(absl::string_view eds_service_name, - std::unique_ptr watcher); + RefCountedPtr watcher); void CancelEndpointDataWatch(absl::string_view eds_service_name, EndpointWatcherInterface* watcher, bool delay_unsubscription = false); @@ -257,8 +265,7 @@ class XdsClient : public DualRefCounted { }; struct ListenerState { - std::map> + std::map> watchers; // The latest data seen from LDS. absl::optional update; @@ -267,7 +274,7 @@ class XdsClient : public DualRefCounted { struct RouteConfigState { std::map> + RefCountedPtr> watchers; // The latest data seen from RDS. absl::optional update; @@ -275,7 +282,7 @@ class XdsClient : public DualRefCounted { }; struct ClusterState { - std::map> + std::map> watchers; // The latest data seen from CDS. absl::optional update; @@ -283,8 +290,7 @@ class XdsClient : public DualRefCounted { }; struct EndpointState { - std::map> + std::map> watchers; // The latest data seen from EDS. absl::optional update; @@ -319,6 +325,8 @@ class XdsClient : public DualRefCounted { grpc_millis last_report_time = ExecCtx::Get()->Now(); }; + class Notifier; + // Sends an error notification to all watchers. void NotifyOnErrorLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -336,6 +344,7 @@ class XdsClient : public DualRefCounted { grpc_pollset_set* interested_parties_; OrphanablePtr certificate_provider_store_; XdsApi api_; + WorkSerializer work_serializer_; Mutex mu_; @@ -354,14 +363,14 @@ class XdsClient : public DualRefCounted { // Stores started watchers whose resource name was not parsed successfully, // waiting to be cancelled or reset in Orphan(). - std::map> + std::map> invalid_listener_watchers_ ABSL_GUARDED_BY(mu_); std::map> + RefCountedPtr> invalid_route_config_watchers_ ABSL_GUARDED_BY(mu_); - std::map> + std::map> invalid_cluster_watchers_ ABSL_GUARDED_BY(mu_); - std::map> + std::map> invalid_endpoint_watchers_ ABSL_GUARDED_BY(mu_); bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; diff --git a/src/core/ext/xds/xds_server_config_fetcher.cc b/src/core/ext/xds/xds_server_config_fetcher.cc index aed2f2dc145..3cc18b32bdc 100644 --- a/src/core/ext/xds/xds_server_config_fetcher.cc +++ b/src/core/ext/xds/xds_server_config_fetcher.cc @@ -357,7 +357,7 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher { std::unique_ptr watcher) override { grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); - auto listener_watcher = absl::make_unique( + auto listener_watcher = MakeRefCounted( std::move(watcher), xds_client_, serving_status_notifier_, listening_address); auto* listener_watcher_ptr = listener_watcher.get();