xds: notify watchers when NACKing resource updates (#26757)

* xds: notify watchers when NACKing resource updates

* check status of failed RPC when waiting for NACK
pull/26742/head^2
Mark D. Roth 3 years ago committed by GitHub
parent e98fa13a57
commit 246c57829b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 106
      src/core/ext/xds/xds_client.cc
  2. 4
      src/core/ext/xds/xds_client.h
  3. 657
      test/cpp/end2end/xds_end2end_test.cc

@ -275,6 +275,12 @@ class XdsClient::ChannelState::AdsCallState
XdsApi::EdsUpdateMap eds_update_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
template <typename StateMap>
void RejectAdsUpdateLocked(grpc_millis update_time,
const XdsApi::AdsParseResult& result,
StateMap* state_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnRequestSent(void* arg, grpc_error_handle error);
void OnRequestSentLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
@ -1145,6 +1151,47 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
}
}
namespace {
// Update resource_metadata for NACK.
void UpdateResourceMetadataNacked(const std::string& version,
const std::string& details,
grpc_millis update_time,
XdsApi::ResourceMetadata* resource_metadata) {
resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
resource_metadata->failed_version = version;
resource_metadata->failed_details = details;
resource_metadata->failed_update_time = update_time;
}
} // namespace
template <typename StateMap>
void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
grpc_millis update_time, const XdsApi::AdsParseResult& result,
StateMap* state_map) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s update NACKed containing %" PRIuPTR
" resources",
xds_client(), result.type_url.c_str(),
result.resource_names_failed.size());
}
std::string details = grpc_error_std_string(result.parse_error);
for (auto& name : result.resource_names_failed) {
auto it = state_map->find(name);
if (it == state_map->end()) continue;
auto& state = it->second;
// Notify watchers of error.
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(result.parse_error));
}
// Update resource metadata for CSDS.
UpdateResourceMetadataNacked(result.version, details, update_time,
&state.meta);
}
}
void XdsClient::ChannelState::AdsCallState::OnRequestSent(
void* arg, grpc_error_handle error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
@ -1223,10 +1270,6 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
state.nonce = std::move(result.nonce);
// NACK or ACK the response.
if (result.parse_error != GRPC_ERROR_NONE) {
xds_client()->UpdateResourceMetadataWithFailedParseResultLocked(
update_time, result);
GRPC_ERROR_UNREF(state.error);
state.error = result.parse_error;
// NACK unacceptable update.
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
@ -1234,6 +1277,23 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
xds_client(), result.type_url.c_str(), result.version.c_str(),
state.nonce.c_str(),
grpc_error_std_string(result.parse_error).c_str());
result.parse_error =
grpc_error_set_int(result.parse_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
GRPC_ERROR_UNREF(state.error);
state.error = result.parse_error;
if (result.type_url == XdsApi::kLdsTypeUrl) {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->listener_map_);
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->route_config_map_);
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
RejectAdsUpdateLocked(update_time, result, &xds_client()->cluster_map_);
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->endpoint_map_);
}
SendMessageLocked(result.type_url);
} else {
seen_response_ = true;
@ -2253,44 +2313,6 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
return snapshot_map;
}
void XdsClient::UpdateResourceMetadataWithFailedParseResultLocked(
grpc_millis update_time, const XdsApi::AdsParseResult& result) {
// ADS update is rejected and the resource names in the failed update is
// available.
std::string details = grpc_error_std_string(result.parse_error);
for (auto& name : result.resource_names_failed) {
XdsApi::ResourceMetadata* resource_metadata = nullptr;
if (result.type_url == XdsApi::kLdsTypeUrl) {
auto it = listener_map_.find(name);
if (it != listener_map_.end()) {
resource_metadata = &it->second.meta;
}
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
auto it = route_config_map_.find(name);
if (route_config_map_.find(name) != route_config_map_.end()) {
resource_metadata = &it->second.meta;
}
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
auto it = cluster_map_.find(name);
if (cluster_map_.find(name) != cluster_map_.end()) {
resource_metadata = &it->second.meta;
}
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
auto it = endpoint_map_.find(name);
if (endpoint_map_.find(name) != endpoint_map_.end()) {
resource_metadata = &it->second.meta;
}
}
if (resource_metadata == nullptr) {
return;
}
resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
resource_metadata->failed_version = result.version;
resource_metadata->failed_details = details;
resource_metadata->failed_update_time = update_time;
}
}
std::string XdsClient::DumpClientConfigBinary() {
MutexLock lock(&mu_);
XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;

@ -324,10 +324,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
bool send_all_clusters, const std::set<std::string>& clusters)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void UpdateResourceMetadataWithFailedParseResultLocked(
grpc_millis update_time, const XdsApi::AdsParseResult& result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::unique_ptr<XdsBootstrap> bootstrap_;
grpc_channel_args* args_;
const grpc_millis request_timeout_;

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save