diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 5e7d34823fb..51e250a3cad 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -295,7 +295,8 @@ void CdsLb::UpdateLocked(UpdateArgs args) { old_config->cluster().c_str()); } xds_client_->CancelClusterDataWatch( - StringView(old_config->cluster().c_str()), cluster_watcher_); + StringView(old_config->cluster().c_str()), cluster_watcher_, + /*delay_unsubscription=*/true); } if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 5277f22a32a..5053222e69c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -792,7 +792,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) { old_eds_service_name); } xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), - endpoint_watcher_); + endpoint_watcher_, + /*delay_unsubscription=*/true); } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this, diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 08ab447d794..65e1e5c4308 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -128,7 +128,8 @@ class XdsClient::ChannelState::AdsCallState bool seen_response() const { return seen_response_; } void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name, + bool delay_unsubscription); bool HasSubscribedResources() const; @@ -557,9 +558,10 @@ void XdsClient::ChannelState::Subscribe(const std::string& type_url, } void XdsClient::ChannelState::Unsubscribe(const std::string& type_url, - const std::string& name) { + const std::string& name, + bool delay_unsubscription) { if (ads_calld_ != nullptr) { - ads_calld_->calld()->Unsubscribe(type_url, name); + ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription); if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset(); } } @@ -862,9 +864,10 @@ void XdsClient::ChannelState::AdsCallState::Subscribe( } void XdsClient::ChannelState::AdsCallState::Unsubscribe( - const std::string& type_url, const std::string& name) { + const std::string& type_url, const std::string& name, + bool delay_unsubscription) { state_map_[type_url].subscribed_resources.erase(name); - SendMessageLocked(type_url); + if (!delay_unsubscription) SendMessageLocked(type_url); } bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { @@ -910,7 +913,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( return; } if (!xds_client()->route_config_name_.empty()) { - Unsubscribe(XdsApi::kRdsTypeUrl, xds_client()->route_config_name_); + Unsubscribe( + XdsApi::kRdsTypeUrl, xds_client()->route_config_name_, + /*delay_unsubscription=*/!lds_update->route_config_name.empty()); } xds_client()->route_config_name_ = std::move(lds_update->route_config_name); if (lds_update->rds_update.has_value()) { @@ -1874,7 +1879,8 @@ void XdsClient::WatchClusterData( } void XdsClient::CancelClusterDataWatch(StringView cluster_name, - ClusterWatcherInterface* watcher) { + ClusterWatcherInterface* watcher, + bool delay_unsubscription) { if (shutting_down_) return; std::string cluster_name_str = std::string(cluster_name); ClusterState& cluster_state = cluster_map_[cluster_name_str]; @@ -1883,7 +1889,8 @@ void XdsClient::CancelClusterDataWatch(StringView cluster_name, cluster_state.watchers.erase(it); if (cluster_state.watchers.empty()) { cluster_map_.erase(cluster_name_str); - chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str); + chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str, + delay_unsubscription); } } } @@ -1908,7 +1915,8 @@ void XdsClient::WatchEndpointData( } void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, - EndpointWatcherInterface* watcher) { + EndpointWatcherInterface* watcher, + bool delay_unsubscription) { if (shutting_down_) return; std::string eds_service_name_str = std::string(eds_service_name); EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; @@ -1917,7 +1925,8 @@ void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, endpoint_state.watchers.erase(it); if (endpoint_state.watchers.empty()) { endpoint_map_.erase(eds_service_name_str); - chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str); + chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str, + delay_unsubscription); } } } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index f3975e80e55..228b9a21b47 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -86,20 +86,26 @@ class XdsClient : public InternallyRefCounted { // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. void WatchClusterData(StringView cluster_name, std::unique_ptr watcher); void CancelClusterDataWatch(StringView cluster_name, - ClusterWatcherInterface* watcher); + ClusterWatcherInterface* watcher, + bool delay_unsubscription = false); // Start and cancel endpoint data watch for a cluster. // The XdsClient takes ownership of the watcher, but the caller may // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. void WatchEndpointData(StringView eds_service_name, std::unique_ptr watcher); void CancelEndpointDataWatch(StringView eds_service_name, - EndpointWatcherInterface* watcher); + EndpointWatcherInterface* watcher, + bool delay_unsubscription = false); // Adds and removes drop stats for cluster_name and eds_service_name. RefCountedPtr AddClusterDropStats( @@ -167,7 +173,8 @@ class XdsClient : public InternallyRefCounted { void CancelConnectivityWatchLocked(); void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name, + bool delay_unsubscription); private: class StateWatcher;