diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 003729ee722..86405de397e 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -143,8 +143,11 @@ class XdsClient::ChannelState::AdsCallState private: class ResourceState : public InternallyRefCounted { public: - ResourceState(const std::string& type_url, const std::string& name) - : type_url_(type_url), name_(name) { + ResourceState(const std::string& type_url, const std::string& name, + bool sent_initial_request) + : type_url_(type_url), + name_(name), + sent_initial_request_(sent_initial_request) { GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, grpc_schedule_on_exec_ctx); } @@ -155,8 +158,8 @@ class XdsClient::ChannelState::AdsCallState } void Start(RefCountedPtr ads_calld) { - if (sent_) return; - sent_ = true; + if (sent_initial_request_) return; + sent_initial_request_ = true; ads_calld_ = std::move(ads_calld); Ref(DEBUG_LOCATION, "timer").release(); timer_pending_ = true; @@ -229,7 +232,7 @@ class XdsClient::ChannelState::AdsCallState const std::string name_; RefCountedPtr ads_calld_; - bool sent_ = false; + bool sent_initial_request_; bool timer_pending_ = false; grpc_timer timer_; grpc_closure timer_callback_; @@ -238,8 +241,7 @@ class XdsClient::ChannelState::AdsCallState struct ResourceTypeState { ~ResourceTypeState() { GRPC_ERROR_UNREF(error); } - // Version, nonce, and error for this resource type. - std::string version; + // Nonce and error for this resource type. std::string nonce; grpc_error* error = GRPC_ERROR_NONE; @@ -767,8 +769,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( std::set resource_names = ResourceNamesForRequest(type_url); request_payload_slice = xds_client()->api_.CreateAdsRequest( - type_url, resource_names, state.version, state.nonce, - GRPC_ERROR_REF(state.error), !sent_initial_message_); + type_url, resource_names, xds_client()->resource_version_map_[type_url], + state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { state_map_.erase(type_url); @@ -778,7 +780,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( gpr_log(GPR_INFO, "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s " "error=%s resources=%s", - xds_client(), type_url.c_str(), state.version.c_str(), + xds_client(), type_url.c_str(), + xds_client()->resource_version_map_[type_url].c_str(), state.nonce.c_str(), grpc_error_string(state.error), absl::StrJoin(resource_names, " ").c_str()); } @@ -810,7 +813,8 @@ void XdsClient::ChannelState::AdsCallState::Subscribe( const std::string& type_url, const std::string& name) { auto& state = state_map_[type_url].subscribed_resources[name]; if (state == nullptr) { - state = MakeOrphanable(type_url, name); + state = MakeOrphanable( + type_url, name, !xds_client()->resource_version_map_[type_url].empty()); SendMessageLocked(type_url); } } @@ -1174,7 +1178,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { } else if (result.type_url == XdsApi::kEdsTypeUrl) { AcceptEdsUpdate(std::move(result.eds_update_map)); } - state.version = std::move(result.version); + xds_client()->resource_version_map_[result.type_url] = + std::move(result.version); // ACK the update. SendMessageLocked(result.type_url); // Start load reporting if needed. diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 9e222fbb536..3e07e8e8052 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -313,6 +313,9 @@ class XdsClient : public DualRefCounted { LoadReportState> load_report_map_; + // Stores the most recent accepted resource version for each resource type. + std::map resource_version_map_; + bool shutting_down_ = false; }; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index a0cddc86274..99ac22ed867 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -538,6 +538,11 @@ class AdsServiceImpl : public std::enable_shared_from_this { resource_types_to_ignore_.emplace(type_url); } + void SetResourceMinVersion(const std::string& type_url, int version) { + grpc_core::MutexLock lock(&ads_mu_); + resource_type_min_versions_[type_url] = version; + } + void UnsetResource(const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); ResourceTypeState& resource_type_state = resource_map_[type_url]; @@ -886,7 +891,11 @@ class AdsServiceImpl : public std::enable_shared_from_this { } // Check the nonce sent by the client, if any. // (This will be absent on the first request on a stream.) - if (!request.response_nonce().empty()) { + if (request.response_nonce().empty()) { + EXPECT_GE(client_resource_type_version, + parent_->resource_type_min_versions_[v3_resource_type]) + << "resource_type: " << v3_resource_type; + } else { int client_nonce; GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce)); // Ignore requests with stale nonces. @@ -1189,6 +1198,7 @@ class AdsServiceImpl : public std::enable_shared_from_this { std::map resource_type_response_state_; std::set resource_types_to_ignore_; + std::map resource_type_min_versions_; // An instance data member containing the current state of all resources. // Note that an entry will exist whenever either of the following is true: // - The resource exists (i.e., has been created by SetResource() and has not @@ -2197,6 +2207,36 @@ TEST_P(BasicTest, IgnoresDuplicateUpdates) { using XdsResolverOnlyTest = BasicTest; +TEST_P(XdsResolverOnlyTest, ResourceTypeVersionPersistsAcrossStreamRestarts) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // Wait for backends to come online. + WaitForAllBackends(0, 1); + // Stop balancer. + balancers_[0]->Shutdown(); + // Tell balancer to require minimum version 1 for all resource types. + balancers_[0]->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1); + balancers_[0]->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1); + balancers_[0]->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1); + balancers_[0]->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1); + // Update backend, just so we can be sure that the client has + // reconnected to the balancer. + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(1, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args2)); + // Restart balancer. + balancers_[0]->Start(); + // Make sure client has reconnected. + WaitForAllBackends(1, 2); +} + // Tests switching over from one cluster to another. TEST_P(XdsResolverOnlyTest, ChangeClusters) { const char* kNewClusterName = "new_cluster_name";