From a78df68e96868bcfb78b64b8c81e36dd735ea966 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 13 Jun 2022 14:01:13 -0700 Subject: [PATCH] xds: add "ignore_resource_deletion" server feature (#29633) * xds: add "ignore_resource_deletion" server feature * add logging * clang-format * fix build * fix build * add test for LDS resource deletion on gRPC server * clang-format --- src/core/ext/xds/xds_bootstrap.cc | 16 ++- src/core/ext/xds/xds_bootstrap.h | 3 +- src/core/ext/xds/xds_client.cc | 38 ++++++- src/core/ext/xds/xds_client.h | 1 + .../end2end/xds/xds_cluster_end2end_test.cc | 97 ++++++++++++---- test/cpp/end2end/xds/xds_end2end_test.cc | 63 +++++++++-- test/cpp/end2end/xds/xds_end2end_test_lib.cc | 10 +- test/cpp/end2end/xds/xds_end2end_test_lib.h | 5 + .../end2end/xds/xds_routing_end2end_test.cc | 104 +++++++++++++----- 9 files changed, 273 insertions(+), 64 deletions(-) diff --git a/src/core/ext/xds/xds_bootstrap.cc b/src/core/ext/xds/xds_bootstrap.cc index d05b972e091..37c8cfd487e 100644 --- a/src/core/ext/xds/xds_bootstrap.cc +++ b/src/core/ext/xds/xds_bootstrap.cc @@ -56,6 +56,10 @@ bool XdsFederationEnabled() { namespace { +const absl::string_view kServerFeatureXdsV3 = "xds_v3"; +const absl::string_view kServerFeatureIgnoreResourceDeletion = + "ignore_resource_deletion"; + grpc_error_handle ParseChannelCreds(const Json::Object& json, size_t idx, XdsBootstrap::XdsServer* server) { std::vector error_list; @@ -129,7 +133,9 @@ XdsBootstrap::XdsServer XdsBootstrap::XdsServer::Parse( if (server_features_array != nullptr) { for (const Json& feature_json : *server_features_array) { if (feature_json.type() == Json::Type::STRING && - feature_json.string_value() == "xds_v3") { + (feature_json.string_value() == kServerFeatureXdsV3 || + feature_json.string_value() == + kServerFeatureIgnoreResourceDeletion)) { server.server_features.insert(feature_json.string_value()); } } @@ -159,7 +165,13 @@ Json::Object XdsBootstrap::XdsServer::ToJson() const { } bool XdsBootstrap::XdsServer::ShouldUseV3() const { - return server_features.find("xds_v3") != server_features.end(); + return server_features.find(std::string(kServerFeatureXdsV3)) != + server_features.end(); +} + +bool XdsBootstrap::XdsServer::IgnoreResourceDeletion() const { + return server_features.find(std::string( + kServerFeatureIgnoreResourceDeletion)) != server_features.end(); } // diff --git a/src/core/ext/xds/xds_bootstrap.h b/src/core/ext/xds/xds_bootstrap.h index 6b8c4e1eb72..3d155fec806 100644 --- a/src/core/ext/xds/xds_bootstrap.h +++ b/src/core/ext/xds/xds_bootstrap.h @@ -76,6 +76,7 @@ class XdsBootstrap { Json::Object ToJson() const; bool ShouldUseV3() const; + bool IgnoreResourceDeletion() const; }; struct Authority { @@ -136,4 +137,4 @@ class XdsBootstrap { } // namespace grpc_core -#endif /* GRPC_CORE_EXT_XDS_XDS_BOOTSTRAP_H */ +#endif // GRPC_CORE_EXT_XDS_XDS_BOOTSTRAP_H diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 3d72c43780b..92ec00f7eb4 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -873,6 +873,17 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( if (result_.type->AllResourcesRequiredInSotW()) { result_.resources_seen[resource_name->authority].insert(resource_name->key); } + // If we previously ignored the resource's deletion, log that we're + // now re-adding it. + if (resource_state.ignored_deletion) { + gpr_log(GPR_INFO, + "[xds_client %p] xds server %s: server returned new version of " + "resource for which we previously ignored a deletion: type %s " + "name %s", + xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(), + std::string(type_url).c_str(), result->name.c_str()); + resource_state.ignored_deletion = false; + } // Update resource state based on whether the resource is valid. if (!result->resource.ok()) { result_.errors.emplace_back(absl::StrCat( @@ -1248,9 +1259,23 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // does not exist. For that case, we rely on the request timeout // instead. if (resource_state.resource == nullptr) continue; - resource_state.resource.reset(); - xds_client()->NotifyWatchersOnResourceDoesNotExist( - resource_state.watchers); + if (chand()->server_.IgnoreResourceDeletion()) { + if (!resource_state.ignored_deletion) { + gpr_log(GPR_ERROR, + "[xds_client %p] xds server %s: ignoring deletion " + "for resource type %s name %s", + xds_client(), chand()->server_.server_uri.c_str(), + result.type_url.c_str(), + XdsClient::ConstructFullXdsResourceName( + authority, result.type_url.c_str(), resource_key) + .c_str()); + resource_state.ignored_deletion = true; + } + } else { + resource_state.resource.reset(); + xds_client()->NotifyWatchersOnResourceDoesNotExist( + resource_state.watchers); + } } } } @@ -1988,6 +2013,13 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type, resource_state.watchers.erase(watcher); // Clean up empty map entries, if any. if (resource_state.watchers.empty()) { + if (resource_state.ignored_deletion) { + gpr_log(GPR_INFO, + "[xds_client %p] unsubscribing from a resource for which we " + "previously ignored a deletion: type %s name %s", + this, std::string(type->type_url()).c_str(), + std::string(name).c_str()); + } authority_state.channel_state->UnsubscribeLocked(type, *resource_name, delay_unsubscription); type_map.erase(resource_it); diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 5d8211f51e4..a7b6457cc8e 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -247,6 +247,7 @@ class XdsClient : public DualRefCounted { // The latest data seen for the resource. std::unique_ptr resource; XdsApi::ResourceMetadata meta; + bool ignored_deletion = false; }; struct AuthorityState { diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index f911a9612a6..8776e3ec42a 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -230,30 +230,6 @@ TEST_P(CdsTest, ChangeClusters) { WaitForAllBackends(DEBUG_LOCATION, 1, 2); } -// Tests that we go into TRANSIENT_FAILURE if the Cluster disappears. -TEST_P(CdsTest, ClusterRemoved) { - CreateAndStartBackends(1); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // We need to wait for all backends to come online. - WaitForAllBackends(DEBUG_LOCATION); - // Unset CDS resource. - balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); - // Wait for RPCs to start failing. - SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) { - if (result.status.ok()) return true; // Keep going. - EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code()); - EXPECT_EQ(absl::StrCat("CDS resource \"", kDefaultClusterName, - "\" does not exist"), - result.status.error_message()); - return false; - }); - // Make sure we ACK'ed the update. - auto response_state = balancer_->ads_service()->cds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); -} - TEST_P(CdsTest, CircuitBreaking) { CreateAndStartBackends(1); constexpr size_t kMaxConcurrentRequests = 10; @@ -356,6 +332,79 @@ TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) { WaitForBackend(DEBUG_LOCATION, 1); } +// +// CDS deletion tests +// + +class CdsDeletionTest : public XdsEnd2endTest { + protected: + void SetUp() override {} // Individual tests call InitClient(). +}; + +INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest, + ::testing::Values(XdsTestType()), &XdsTestType::Name); + +// Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted. +TEST_P(CdsDeletionTest, ClusterDeleted) { + InitClient(); + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(DEBUG_LOCATION); + // Unset CDS resource. + balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); + // Wait for RPCs to start failing. + SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) { + if (result.status.ok()) return true; // Keep going. + EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code()); + EXPECT_EQ(absl::StrCat("CDS resource \"", kDefaultClusterName, + "\" does not exist"), + result.status.error_message()); + return false; + }); + // Make sure we ACK'ed the update. + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); +} + +// Tests that we ignore Cluster deletions if configured to do so. +TEST_P(CdsDeletionTest, ClusterDeletionIgnored) { + InitClient(BootstrapBuilder().SetIgnoreResourceDeletion()); + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION, 0, 1); + // Make sure we ACKed the CDS update. + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Unset CDS resource and wait for client to ACK the update. + balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); + const auto deadline = absl::Now() + absl::Seconds(30); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK"; + response_state = balancer_->ads_service()->cds_response_state(); + if (response_state.has_value()) break; + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Make sure we can still send RPCs. + CheckRpcSendOk(DEBUG_LOCATION); + // Now recreate the CDS resource pointing to a new EDS resource that + // specified backend 1, and make sure the client uses it. + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + // Wait for client to start using backend 1. + WaitForAllBackends(DEBUG_LOCATION, 1, 2); +} + // // EDS tests // diff --git a/test/cpp/end2end/xds/xds_end2end_test.cc b/test/cpp/end2end/xds/xds_end2end_test.cc index ae1ec19fbdb..81fb72c5075 100644 --- a/test/cpp/end2end/xds/xds_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_end2end_test.cc @@ -1068,22 +1068,63 @@ TEST_P(XdsSecurityTest, TestFileWatcherCertificateProvider) { class XdsEnabledServerTest : public XdsEnd2endTest { protected: - void SetUp() override { - XdsEnd2endTest::SetUp(); + void SetUp() override {} // No-op -- individual tests do this themselves. + + void DoSetUp(BootstrapBuilder builder = BootstrapBuilder()) { + InitClient(builder); CreateBackends(1, /*xds_enabled=*/true); - EdsResourceArgs args({ - {"locality0", CreateEndpointsForBackends(0, 1)}, - }); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); } }; TEST_P(XdsEnabledServerTest, Basic) { + DoSetUp(); + backends_[0]->Start(); + WaitForBackend(DEBUG_LOCATION, 0); +} + +TEST_P(XdsEnabledServerTest, ListenerDeletionIgnored) { + DoSetUp(BootstrapBuilder().SetIgnoreResourceDeletion()); backends_[0]->Start(); WaitForBackend(DEBUG_LOCATION, 0); + // Check that we ACKed. + // TODO(roth): There may be multiple entries in the resource state response + // queue, because the client doesn't necessarily subscribe to all resources + // in a single message, and the server currently (I suspect incorrectly?) + // thinks that each subscription message is an ACK. So for now, we + // drain the entire LDS resource state response queue, ensuring that + // all responses are ACKs. Need to look more closely at the protocol + // semantics here and make sure the server is doing the right thing, + // in which case we may be able to avoid this. + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) break; + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + } + // Now unset the resource. + balancer_->ads_service()->UnsetResource( + kLdsTypeUrl, GetServerListenerName(backends_[0]->port())); + // Wait for update to be ACKed. + absl::Time deadline = + absl::Now() + (absl::Seconds(10) * grpc_test_slowdown_factor()); + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) { + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + continue; + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + ASSERT_LT(absl::Now(), deadline); + break; + } + // Make sure server is still serving. + CheckRpcSendOk(DEBUG_LOCATION); } TEST_P(XdsEnabledServerTest, BadLdsUpdateNoApiListenerNorAddress) { + DoSetUp(); Listener listener = default_server_listener_; listener.clear_address(); listener.set_name( @@ -1101,6 +1142,7 @@ TEST_P(XdsEnabledServerTest, BadLdsUpdateNoApiListenerNorAddress) { // TODO(roth): Re-enable the following test once // github.com/istio/istio/issues/38914 is resolved. TEST_P(XdsEnabledServerTest, DISABLED_BadLdsUpdateBothApiListenerAndAddress) { + DoSetUp(); Listener listener = default_server_listener_; listener.mutable_api_listener(); SetServerListenerNameAndRouteConfiguration(balancer_.get(), listener, @@ -1115,6 +1157,7 @@ TEST_P(XdsEnabledServerTest, DISABLED_BadLdsUpdateBothApiListenerAndAddress) { } TEST_P(XdsEnabledServerTest, NacksNonZeroXffNumTrusterHops) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1131,6 +1174,7 @@ TEST_P(XdsEnabledServerTest, NacksNonZeroXffNumTrusterHops) { } TEST_P(XdsEnabledServerTest, NacksNonEmptyOriginalIpDetectionExtensions) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1148,6 +1192,7 @@ TEST_P(XdsEnabledServerTest, NacksNonEmptyOriginalIpDetectionExtensions) { } TEST_P(XdsEnabledServerTest, UnsupportedL4Filter) { + DoSetUp(); Listener listener = default_server_listener_; listener.mutable_default_filter_chain()->clear_filters(); listener.mutable_default_filter_chain()->add_filters()->mutable_typed_config()->PackFrom(default_listener_ /* any proto object other than HttpConnectionManager */); @@ -1161,6 +1206,7 @@ TEST_P(XdsEnabledServerTest, UnsupportedL4Filter) { } TEST_P(XdsEnabledServerTest, NacksEmptyHttpFilterList) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1177,6 +1223,7 @@ TEST_P(XdsEnabledServerTest, NacksEmptyHttpFilterList) { } TEST_P(XdsEnabledServerTest, UnsupportedHttpFilter) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1202,6 +1249,7 @@ TEST_P(XdsEnabledServerTest, UnsupportedHttpFilter) { } TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServer) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1229,6 +1277,7 @@ TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServer) { TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServerIgnoredWhenOptional) { + DoSetUp(); Listener listener = default_server_listener_; HttpConnectionManager http_connection_manager = ServerHcmAccessor().Unpack(listener); @@ -1256,6 +1305,7 @@ TEST_P(XdsEnabledServerTest, // Verify that a mismatch of listening address results in "not serving" // status. TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { + DoSetUp(); Listener listener = default_server_listener_; // Set a different listening address in the LDS update listener.mutable_address()->mutable_socket_address()->set_address( @@ -1270,6 +1320,7 @@ TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { } TEST_P(XdsEnabledServerTest, UseOriginalDstNotSupported) { + DoSetUp(); Listener listener = default_server_listener_; listener.mutable_use_original_dst()->set_value(true); SetServerListenerNameAndRouteConfiguration(balancer_.get(), listener, @@ -1319,8 +1370,6 @@ class XdsServerSecurityTest : public XdsEnd2endTest { balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); } - void TearDown() override { XdsEnd2endTest::TearDown(); } - void SetLdsUpdate(absl::string_view root_instance_name, absl::string_view root_certificate_name, absl::string_view identity_instance_name, diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index 02d4a2d68c6..49cd4cd1276 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -336,9 +336,15 @@ std::string XdsEnd2endTest::BootstrapBuilder::MakeXdsServersText( " \"server_features\": []\n" " }\n" " ]"; + std::vector server_features; + if (!v2_) server_features.push_back("\"xds_v3\""); + if (ignore_resource_deletion_) { + server_features.push_back("\"ignore_resource_deletion\""); + } return absl::StrReplaceAll( - kXdsServerTemplate, {{"", server_uri}, - {"", (v2_ ? "" : "\"xds_v3\"")}}); + kXdsServerTemplate, + {{"", server_uri}, + {"", absl::StrJoin(server_features, ", ")}}); } std::string XdsEnd2endTest::BootstrapBuilder::MakeNodeText() { diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index b7ea4053c7b..f96bda984bb 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -402,6 +402,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam { v2_ = true; return *this; } + BootstrapBuilder& SetIgnoreResourceDeletion() { + ignore_resource_deletion_ = true; + return *this; + } BootstrapBuilder& SetDefaultServer(const std::string& server) { top_server_ = server; return *this; @@ -450,6 +454,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { std::string MakeAuthorityText(); bool v2_ = false; + bool ignore_resource_deletion_ = false; std::string top_server_; std::string client_default_listener_resource_name_template_; std::map plugins_; diff --git a/test/cpp/end2end/xds/xds_routing_end2end_test.cc b/test/cpp/end2end/xds/xds_routing_end2end_test.cc index b17be76e10f..4f1f541fe3f 100644 --- a/test/cpp/end2end/xds/xds_routing_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_routing_end2end_test.cc @@ -455,6 +455,85 @@ TEST_P(LdsV2Test, IgnoresHttpFilters) { CheckRpcSendOk(DEBUG_LOCATION); } +class LdsDeletionTest : public XdsEnd2endTest { + protected: + void SetUp() override {} // Individual tests call InitClient(). +}; + +INSTANTIATE_TEST_SUITE_P(XdsTest, LdsDeletionTest, + ::testing::Values(XdsTestType()), &XdsTestType::Name); + +// Tests that we go into TRANSIENT_FAILURE if the Listener is deleted. +TEST_P(LdsDeletionTest, ListenerDeleted) { + InitClient(); + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(DEBUG_LOCATION); + // Unset LDS resource. + balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); + // Wait for RPCs to start failing. + SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) { + if (result.status.ok()) return true; // Keep going. + EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE); + EXPECT_EQ(result.status.error_message(), + // TODO(roth): Improve this error message as part of + // https://github.com/grpc/grpc/issues/22883. + "empty address list: "); + return false; + }); + // Make sure we ACK'ed the update. + auto response_state = balancer_->ads_service()->lds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); +} + +// Tests that we ignore Listener deletions if configured to do so. +TEST_P(LdsDeletionTest, ListenerDeletionIgnored) { + InitClient(BootstrapBuilder().SetIgnoreResourceDeletion()); + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION, 0, 1); + // Make sure we ACKed the LDS update. + auto response_state = balancer_->ads_service()->lds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Unset LDS resource and wait for client to ACK the update. + balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); + const auto deadline = absl::Now() + absl::Seconds(30); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK"; + response_state = balancer_->ads_service()->lds_response_state(); + if (response_state.has_value()) break; + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Make sure we can still send RPCs. + CheckRpcSendOk(DEBUG_LOCATION); + // Now recreate the LDS resource pointing to a different CDS and EDS + // resource, pointing to backend 1, and make sure the client uses it. + const char* kNewClusterName = "new_cluster_name"; + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + RouteConfiguration new_route_config = default_route_config_; + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // Wait for client to start using backend 1. + WaitForAllBackends(DEBUG_LOCATION, 1, 2); +} + using LdsRdsTest = XdsEnd2endTest; // Test with and without RDS. @@ -505,31 +584,6 @@ TEST_P(LdsRdsTest, DefaultRouteSpecifiesSlashPrefix) { WaitForAllBackends(DEBUG_LOCATION); } -// Tests that we go into TRANSIENT_FAILURE if the Listener is removed. -TEST_P(LdsRdsTest, ListenerRemoved) { - CreateAndStartBackends(1); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); - // We need to wait for all backends to come online. - WaitForAllBackends(DEBUG_LOCATION); - // Unset LDS resource. - balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); - // Wait for RPCs to start failing. - SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) { - if (result.status.ok()) return true; // Keep going. - EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE); - EXPECT_EQ(result.status.error_message(), - // TODO(roth): Improve this error message as part of - // https://github.com/grpc/grpc/issues/22883. - "empty address list: "); - return false; - }); - // Make sure we ACK'ed the update. - auto response_state = balancer_->ads_service()->lds_response_state(); - ASSERT_TRUE(response_state.has_value()); - EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); -} - // Tests that LDS client ACKs but fails if matching domain can't be found in // the LDS response. TEST_P(LdsRdsTest, NoMatchedDomain) {