XdsEnd2EndTest : Use a queue to save resource updates (#28467)

* Fix XdsClient for multiple watchers on the same resource

* xds_end2end_test: Don't use XdsCredentials for XdsRbacNackTests

* Use separate response states for EDS and RDS resources

* Reviewer comments

* Reviewer comments

* Reviewer comments

* Remove blank link

* Reviewer comments
pull/28538/head
Yash Tibrewal 3 years ago committed by GitHub
parent 44763b471f
commit 9ffbc2d360
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1125
      test/cpp/end2end/xds/xds_end2end_test.cc
  2. 83
      test/cpp/end2end/xds/xds_server.h

File diff suppressed because it is too large Load Diff

@ -69,12 +69,10 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
// State for a given xDS resource type.
struct ResponseState {
enum State {
NOT_SENT, // No response sent yet.
SENT, // Response was sent, but no ACK/NACK received.
ACKED, // ACK received.
NACKED, // NACK received; error_message will contain the error.
ACKED, // ACK received.
NACKED, // NACK received; error_message will contain the error.
};
State state = NOT_SENT;
State state = ACKED;
std::string error_message;
};
@ -143,15 +141,28 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
resource_type_min_versions_[type_url] = version;
}
// Get the latest response state for each resource type.
ResponseState GetResponseState(const std::string& type_url) {
// Get the list of response state for each resource type.
absl::optional<ResponseState> GetResponseState(const std::string& type_url) {
grpc_core::MutexLock lock(&ads_mu_);
return resource_type_response_state_[type_url];
if (resource_type_response_state_[type_url].empty()) {
return absl::nullopt;
}
auto response = resource_type_response_state_[type_url].front();
resource_type_response_state_[type_url].pop_front();
return response;
}
absl::optional<ResponseState> lds_response_state() {
return GetResponseState(kLdsTypeUrl);
}
absl::optional<ResponseState> rds_response_state() {
return GetResponseState(kRdsTypeUrl);
}
absl::optional<ResponseState> cds_response_state() {
return GetResponseState(kCdsTypeUrl);
}
absl::optional<ResponseState> eds_response_state() {
return GetResponseState(kEdsTypeUrl);
}
ResponseState lds_response_state() { return GetResponseState(kLdsTypeUrl); }
ResponseState rds_response_state() { return GetResponseState(kRdsTypeUrl); }
ResponseState cds_response_state() { return GetResponseState(kCdsTypeUrl); }
ResponseState eds_response_state() { return GetResponseState(kEdsTypeUrl); }
// Starts the service.
void Start();
@ -387,29 +398,28 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
} else {
int client_nonce;
GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
// Ignore requests with stale nonces.
if (client_nonce < sent_state->nonce) return;
// Check for ACK or NACK.
auto it = parent_->resource_type_response_state_.find(v3_resource_type);
if (it != parent_->resource_type_response_state_.end()) {
if (!request.has_error_detail()) {
it->second.state = ResponseState::ACKED;
it->second.error_message.clear();
gpr_log(GPR_INFO,
"ADS[%p]: client ACKed resource_type=%s version=%s", this,
request.type_url().c_str(), request.version_info().c_str());
} else {
it->second.state = ResponseState::NACKED;
EXPECT_EQ(request.error_detail().code(),
GRPC_STATUS_INVALID_ARGUMENT);
it->second.error_message = request.error_detail().message();
gpr_log(GPR_INFO,
"ADS[%p]: client NACKed resource_type=%s version=%s: %s",
this, request.type_url().c_str(),
request.version_info().c_str(),
it->second.error_message.c_str());
}
ResponseState response_state;
if (!request.has_error_detail()) {
response_state.state = ResponseState::ACKED;
gpr_log(GPR_INFO, "ADS[%p]: client ACKed resource_type=%s version=%s",
this, request.type_url().c_str(),
request.version_info().c_str());
} else {
response_state.state = ResponseState::NACKED;
EXPECT_EQ(request.error_detail().code(),
GRPC_STATUS_INVALID_ARGUMENT);
response_state.error_message = request.error_detail().message();
gpr_log(GPR_INFO,
"ADS[%p]: client NACKed resource_type=%s version=%s: %s",
this, request.type_url().c_str(),
request.version_info().c_str(),
response_state.error_message.c_str());
}
parent_->resource_type_response_state_[v3_resource_type].emplace_back(
std::move(response_state));
// Ignore requests with stale nonces.
if (client_nonce < sent_state->nonce) return;
}
// Ignore resource types as requested by tests.
if (parent_->resource_types_to_ignore_.find(v3_resource_type) !=
@ -536,11 +546,6 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
SentState* sent_state, DiscoveryResponse* response)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->ads_mu_) {
NoopMutexLock mu(parent_->ads_mu_);
auto& response_state =
parent_->resource_type_response_state_[resource_type];
if (response_state.state == ResponseState::NOT_SENT) {
response_state.state = ResponseState::SENT;
}
response->set_type_url(is_v2_ ? v2_resource_type : resource_type);
response->set_version_info(std::to_string(version));
response->set_nonce(std::to_string(++sent_state->nonce));
@ -643,7 +648,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
grpc_core::CondVar ads_cond_;
grpc_core::Mutex ads_mu_;
bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
std::map<std::string /* type_url */, ResponseState>
std::map<std::string /* type_url */, std::deque<ResponseState>>
resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
std::set<std::string /*resource_type*/> resource_types_to_ignore_
ABSL_GUARDED_BY(ads_mu_);

Loading…
Cancel
Save