Merge pull request #24436 from markdroth/xds_resource_version

Fix xDS resource type version to persist across stream restarts.
pull/24472/head
Mark D. Roth 4 years ago committed by GitHub
commit 1bb28c90b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      src/core/ext/xds/xds_client.cc
  2. 3
      src/core/ext/xds/xds_client.h
  3. 42
      test/cpp/end2end/xds_end2end_test.cc

@ -143,8 +143,11 @@ class XdsClient::ChannelState::AdsCallState
private:
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
ResourceState(const std::string& type_url, const std::string& name)
: type_url_(type_url), name_(name) {
ResourceState(const std::string& type_url, const std::string& name,
bool sent_initial_request)
: type_url_(type_url),
name_(name),
sent_initial_request_(sent_initial_request) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
@ -155,8 +158,8 @@ class XdsClient::ChannelState::AdsCallState
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
if (sent_) return;
sent_ = true;
if (sent_initial_request_) return;
sent_initial_request_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
@ -229,7 +232,7 @@ class XdsClient::ChannelState::AdsCallState
const std::string name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool sent_ = false;
bool sent_initial_request_;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
@ -238,8 +241,7 @@ class XdsClient::ChannelState::AdsCallState
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
// Version, nonce, and error for this resource type.
std::string version;
// Nonce and error for this resource type.
std::string nonce;
grpc_error* error = GRPC_ERROR_NONE;
@ -767,8 +769,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
type_url, resource_names, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
type_url, resource_names, xds_client()->resource_version_map_[type_url],
state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url);
@ -778,7 +780,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
"error=%s resources=%s",
xds_client(), type_url.c_str(), state.version.c_str(),
xds_client(), type_url.c_str(),
xds_client()->resource_version_map_[type_url].c_str(),
state.nonce.c_str(), grpc_error_string(state.error),
absl::StrJoin(resource_names, " ").c_str());
}
@ -810,7 +813,8 @@ void XdsClient::ChannelState::AdsCallState::Subscribe(
const std::string& type_url, const std::string& name) {
auto& state = state_map_[type_url].subscribed_resources[name];
if (state == nullptr) {
state = MakeOrphanable<ResourceState>(type_url, name);
state = MakeOrphanable<ResourceState>(
type_url, name, !xds_client()->resource_version_map_[type_url].empty());
SendMessageLocked(type_url);
}
}
@ -1174,7 +1178,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdate(std::move(result.eds_update_map));
}
state.version = std::move(result.version);
xds_client()->resource_version_map_[result.type_url] =
std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.

@ -313,6 +313,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
LoadReportState>
load_report_map_;
// Stores the most recent accepted resource version for each resource type.
std::map<std::string /*type*/, std::string /*version*/> resource_version_map_;
bool shutting_down_ = false;
};

@ -539,6 +539,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
resource_types_to_ignore_.emplace(type_url);
}
void SetResourceMinVersion(const std::string& type_url, int version) {
grpc_core::MutexLock lock(&ads_mu_);
resource_type_min_versions_[type_url] = version;
}
void UnsetResource(const std::string& type_url, const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceTypeState& resource_type_state = resource_map_[type_url];
@ -887,7 +892,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
}
// Check the nonce sent by the client, if any.
// (This will be absent on the first request on a stream.)
if (!request.response_nonce().empty()) {
if (request.response_nonce().empty()) {
EXPECT_GE(client_resource_type_version,
parent_->resource_type_min_versions_[v3_resource_type])
<< "resource_type: " << v3_resource_type;
} else {
int client_nonce;
GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
// Ignore requests with stale nonces.
@ -1190,6 +1199,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
std::map<std::string /* type_url */, ResponseState>
resource_type_response_state_;
std::set<std::string /*resource_type*/> resource_types_to_ignore_;
std::map<std::string /*resource_type*/, int> resource_type_min_versions_;
// An instance data member containing the current state of all resources.
// Note that an entry will exist whenever either of the following is true:
// - The resource exists (i.e., has been created by SetResource() and has not
@ -2198,6 +2208,36 @@ TEST_P(BasicTest, IgnoresDuplicateUpdates) {
using XdsResolverOnlyTest = BasicTest;
TEST_P(XdsResolverOnlyTest, ResourceTypeVersionPersistsAcrossStreamRestarts) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// Wait for backends to come online.
WaitForAllBackends(0, 1);
// Stop balancer.
balancers_[0]->Shutdown();
// Tell balancer to require minimum version 1 for all resource types.
balancers_[0]->ads_service()->SetResourceMinVersion(kLdsTypeUrl, 1);
balancers_[0]->ads_service()->SetResourceMinVersion(kRdsTypeUrl, 1);
balancers_[0]->ads_service()->SetResourceMinVersion(kCdsTypeUrl, 1);
balancers_[0]->ads_service()->SetResourceMinVersion(kEdsTypeUrl, 1);
// Update backend, just so we can be sure that the client has
// reconnected to the balancer.
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(1, 2)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2));
// Restart balancer.
balancers_[0]->Start();
// Make sure client has reconnected.
WaitForAllBackends(1, 2);
}
// Tests switching over from one cluster to another.
TEST_P(XdsResolverOnlyTest, ChangeClusters) {
const char* kNewClusterName = "new_cluster_name";

Loading…
Cancel
Save