implement improved xDS NACK semantics (#27276)

* refactor xDS response parsing

* fix build

* implement improved xDS NACK semantics

* fix clang-tidy

* fix test
reviewable/pr27302/r1
Mark D. Roth 3 years ago committed by GitHub
parent 5765d320d2
commit f3497eb790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/core/ext/xds/xds_api.cc
  2. 89
      src/core/ext/xds/xds_client.cc
  3. 183
      test/cpp/end2end/xds_end2end_test.cc

@ -3408,15 +3408,6 @@ upb_strview EdsResourceName(
eds_resource);
}
template <typename UpdateMap>
void MoveUpdatesToFailedSet(UpdateMap* update_map,
std::set<std::string>* resource_names_failed) {
for (const auto& p : *update_map) {
resource_names_failed->insert(p.first);
}
update_map->clear();
}
} // namespace
XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
@ -3462,40 +3453,24 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
IsLds, MaybeLogListener, LdsResourceParse, response, "LDS",
expected_listener_names, &result.lds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.lds_update_map,
&result.resource_names_failed);
}
} else if (IsRds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_route_v3_RouteConfiguration_parse,
RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse,
response, "RDS", expected_route_configuration_names,
&result.rds_update_map, &result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.rds_update_map,
&result.resource_names_failed);
}
} else if (IsCds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds,
MaybeLogCluster, CdsResourceParse, response, "CDS",
expected_cluster_names, &result.cds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.cds_update_map,
&result.resource_names_failed);
}
} else if (IsEds(result.type_url)) {
result.parse_error = AdsResponseParse(
context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse,
EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse,
response, "EDS", expected_eds_service_names, &result.eds_update_map,
&result.resource_names_failed);
if (result.parse_error != GRPC_ERROR_NONE) {
MoveUpdatesToFailedSet(&result.eds_update_map,
&result.resource_names_failed);
}
}
return result;
}

@ -264,13 +264,15 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map)
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::RdsUpdateMap rds_update_map)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map)
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
XdsApi::EdsUpdateMap eds_update_map)
@ -905,7 +907,8 @@ XdsApi::ResourceMetadata CreateResourceMetadataAcked(
void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::LdsUpdateMap lds_update_map) {
XdsApi::LdsUpdateMap lds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received containing %" PRIuPTR
@ -949,6 +952,21 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
p.first->OnListenerChanged(*listener_state.update);
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& listener_name : resource_names_failed) {
auto it = xds_client()->listener_map_.find(listener_name);
if (it != xds_client()->listener_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
lds_update_map[listener_name];
if (!resource->http_connection_manager.route_config_name.empty()) {
rds_resource_names_seen.insert(
resource->http_connection_manager.route_config_name);
}
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : lds_state.subscribed_resources) {
@ -1031,7 +1049,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
std::string version, grpc_millis update_time,
XdsApi::CdsUpdateMap cds_update_map) {
XdsApi::CdsUpdateMap cds_update_map,
const std::set<std::string>& resource_names_failed) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update received containing %" PRIuPTR
@ -1073,6 +1092,20 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
p.first->OnClusterChanged(cluster_state.update.value());
}
}
// For invalid resources in the update, if they are already in the
// cache, pretend that they are present in the update, so that we
// don't incorrectly consider them deleted below.
for (const std::string& cluster_name : resource_names_failed) {
auto it = xds_client()->cluster_map_.find(cluster_name);
if (it != xds_client()->cluster_map_.end()) {
auto& resource = it->second.update;
if (!resource.has_value()) continue;
cds_update_map[cluster_name];
eds_resource_names_seen.insert(resource->eds_service_name.empty()
? cluster_name
: resource->eds_service_name);
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : cds_state.subscribed_resources) {
@ -1174,7 +1207,7 @@ void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s update NACKed containing %" PRIuPTR
" resources",
" invalid resources",
xds_client(), result.type_url.c_str(),
result.resource_names_failed.size());
}
@ -1269,9 +1302,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Update nonce.
auto& state = state_map_[result.type_url];
state.nonce = std::move(result.nonce);
// NACK or ACK the response.
// If we got an error, we'll NACK the update.
if (result.parse_error != GRPC_ERROR_NONE) {
// NACK unacceptable update.
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
"version %s, will NACK: nonce=%s error=%s",
@ -1295,27 +1327,32 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
RejectAdsUpdateLocked(update_time, result,
&xds_client()->endpoint_map_);
}
SendMessageLocked(result.type_url);
} else {
}
// Process any valid resources.
bool have_valid_resources = false;
if (result.type_url == XdsApi::kLdsTypeUrl) {
have_valid_resources = !result.lds_update_map.empty();
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
have_valid_resources = !result.rds_update_map.empty();
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
have_valid_resources = !result.cds_update_map.empty();
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map),
result.resource_names_failed);
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
have_valid_resources = !result.eds_update_map.empty();
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
if (have_valid_resources) {
seen_response_ = true;
// Accept the ADS response according to the type_url.
if (result.type_url == XdsApi::kLdsTypeUrl) {
AcceptLdsUpdateLocked(result.version, update_time,
std::move(result.lds_update_map));
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
AcceptRdsUpdateLocked(result.version, update_time,
std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
AcceptCdsUpdateLocked(result.version, update_time,
std::move(result.cds_update_map));
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdateLocked(result.version, update_time,
std::move(result.eds_update_map));
}
xds_client()->resource_version_map_[result.type_url] =
std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
@ -1323,6 +1360,8 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
}
// Send ACK or NACK.
SendMessageLocked(result.type_url);
}
if (xds_client()->shutting_down_) return true;
// Keep listening for updates.

@ -3389,44 +3389,98 @@ TEST_P(GlobalXdsClientTest, MultipleChannelsShareXdsClient) {
// Tests that the NACK for multiple bad LDS resources includes both errors.
TEST_P(GlobalXdsClientTest, MultipleBadResources) {
constexpr char kServerName2[] = "server.other.com";
constexpr char kServerName3[] = "server.another.com";
auto listener = default_listener_;
listener.clear_api_listener();
balancers_[0]->ads_service()->SetLdsResource(listener);
listener.set_name(kServerName2);
balancers_[0]->ads_service()->SetLdsResource(listener);
listener = default_listener_;
listener.set_name(kServerName3);
SetListenerAndRouteConfiguration(0, listener, default_route_config_);
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
// Need to create a second channel to subscribe to a second LDS resource.
auto channel2 = CreateChannel(0, kServerName2);
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
// Wait for second NACK to be reported to xDS server.
{
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
// Wait for second NACK to be reported to xDS server.
auto deadline = absl::Now() + absl::Seconds(30);
bool timed_out = false;
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) {
if (absl::Now() >= deadline) {
timed_out = true;
return false;
}
const auto response_state =
balancers_[0]->ads_service()->lds_response_state();
return response_state.state !=
AdsServiceImpl::ResponseState::NACKED ||
::testing::Matches(::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener.*",
kServerName2,
": validation error.*"
"Listener has neither address nor ApiListener")))(
response_state.error_message);
}));
ASSERT_FALSE(timed_out);
}
// Now start a new channel with a third server name, this one with a
// valid resource.
auto channel3 = CreateChannel(0, kServerName3);
auto stub3 = grpc::testing::EchoTestService::NewStub(channel3);
{
ClientContext context;
EchoRequest request;
request.set_message(kRequestMessage);
EchoResponse response;
grpc::Status status = stub3->Echo(&context, request, &response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
}
}
// Tests that we don't trigger does-not-exist callbacks for a resource
// that was previously valid but is updated to be invalid.
TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) {
// Set up valid resources and check that the channel works.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendOk();
// Now send an update changing the Listener to be invalid.
auto listener = default_listener_;
listener.clear_api_listener();
balancers_[0]->ads_service()->SetLdsResource(listener);
// Wait for xDS server to see NACK.
auto deadline = absl::Now() + absl::Seconds(30);
bool timed_out = false;
CheckRpcSendFailure(
CheckRpcSendFailureOptions().set_continue_predicate([&](size_t) {
if (absl::Now() >= deadline) {
timed_out = true;
return false;
}
const auto response_state =
balancers_[0]->ads_service()->lds_response_state();
return response_state.state != AdsServiceImpl::ResponseState::NACKED ||
::testing::Matches(::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener.*",
kServerName2,
": validation error.*"
"Listener has neither address nor ApiListener")))(
response_state.error_message);
}));
ASSERT_FALSE(timed_out);
do {
CheckRpcSendOk();
ASSERT_LT(absl::Now(), deadline);
} while (balancers_[0]->ads_service()->lds_response_state().state !=
AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(balancers_[0]->ads_service()->lds_response_state().error_message,
::testing::ContainsRegex(absl::StrCat(
kServerName,
": validation error.*"
"Listener has neither address nor ApiListener")));
// Check one more time, just to make sure it still works after NACK.
CheckRpcSendOk();
}
class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest {
@ -7233,19 +7287,42 @@ TEST_P(CdsTest, UnsupportedClusterType) {
// Tests that the NACK for multiple bad resources includes both errors.
TEST_P(CdsTest, MultipleBadResources) {
constexpr char kClusterName2[] = "cluster_name_2";
// Use unsupported type for default cluster.
constexpr char kClusterName3[] = "cluster_name_3";
// Add cluster with unsupported type.
auto cluster = default_cluster_;
cluster.set_name(kClusterName2);
cluster.set_type(Cluster::STATIC);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Add second cluster with the same error.
cluster.set_name(kClusterName2);
cluster.set_name(kClusterName3);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Change RouteConfig to point to both clusters.
// Change RouteConfig to point to all clusters.
RouteConfiguration route_config = default_route_config_;
route_config.mutable_virtual_hosts(0)->clear_routes();
// First route: default cluster, selected based on header.
auto* route = route_config.mutable_virtual_hosts(0)->add_routes();
route->mutable_match()->set_prefix("");
auto* header_matcher = route->mutable_match()->add_headers();
header_matcher->set_name("cluster");
header_matcher->set_exact_match(kDefaultClusterName);
route->mutable_route()->set_cluster(kDefaultClusterName);
// Second route: cluster 2, selected based on header.
route = route_config.mutable_virtual_hosts(0)->add_routes();
route->mutable_match()->set_prefix("");
header_matcher = route->mutable_match()->add_headers();
header_matcher->set_name("cluster");
header_matcher->set_exact_match(kClusterName2);
route->mutable_route()->set_cluster(kClusterName2);
// Third route: cluster 3, used by default.
route = route_config.mutable_virtual_hosts(0)->add_routes();
route->mutable_match()->set_prefix("");
route->mutable_route()->set_cluster(kClusterName3);
SetRouteConfiguration(0, route_config);
// Add EDS resource.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
// Send RPC.
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@ -7255,12 +7332,54 @@ TEST_P(CdsTest, MultipleBadResources) {
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(
response_state.error_message,
::testing::ContainsRegex(absl::StrCat(kDefaultClusterName,
::testing::ContainsRegex(absl::StrCat(kClusterName2,
": validation error.*"
"DiscoveryType is not valid.*",
kClusterName2,
kClusterName3,
": validation error.*"
"DiscoveryType is not valid")));
// RPCs for default cluster should succeed.
std::vector<std::pair<std::string, std::string>> metadata_default_cluster = {
{"cluster", kDefaultClusterName},
};
CheckRpcSendOk(
1, RpcOptions().set_metadata(std::move(metadata_default_cluster)));
// RPCs for cluster 2 should fail.
std::vector<std::pair<std::string, std::string>> metadata_cluster_2 = {
{"cluster", kClusterName2},
};
CheckRpcSendFailure(CheckRpcSendFailureOptions().set_rpc_options(
RpcOptions().set_metadata(std::move(metadata_cluster_2))));
}
// Tests that we don't trigger does-not-exist callbacks for a resource
// that was previously valid but is updated to be invalid.
TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(BuildEdsResource(args));
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Check that everything works.
CheckRpcSendOk();
// Now send an update changing the Cluster to be invalid.
auto cluster = default_cluster_;
cluster.set_type(Cluster::STATIC);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Wait for xDS server to see NACK.
auto deadline = absl::Now() + absl::Seconds(30);
do {
CheckRpcSendOk();
ASSERT_LT(absl::Now(), deadline);
} while (balancers_[0]->ads_service()->cds_response_state().state !=
AdsServiceImpl::ResponseState::NACKED);
EXPECT_THAT(balancers_[0]->ads_service()->cds_response_state().error_message,
::testing::ContainsRegex(absl::StrCat(
kDefaultClusterName,
": validation error.*DiscoveryType is not valid")));
// Check one more time, just to make sure it still works after NACK.
CheckRpcSendOk();
}
// Tests that CDS client should send a NACK if the eds_config in CDS response

Loading…
Cancel
Save