diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 40a3e66a513..9bd8231c584 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -716,6 +716,11 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( grpc_schedule_on_exec_ctx); if (xds_client()->service_config_watcher_ != nullptr) { Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_); + if (xds_client()->lds_result_.has_value() && + !xds_client()->lds_result_->route_config_name.empty()) { + Subscribe(XdsApi::kRdsTypeUrl, + xds_client()->lds_result_->route_config_name); + } } for (const auto& p : xds_client()->cluster_map_) { Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first)); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index c0e5aba9dc9..0f4c26ea65e 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -577,6 +577,12 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, Status StreamAggregatedResources(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this); + // Resources (type/name pairs) that have changed since the client + // subscribed to them. + UpdateQueue update_queue; + // Resources that the client will be subscribed to keyed by resource type + // url. + SubscriptionMap subscription_map; [&]() { { grpc_core::MutexLock lock(&ads_mu_); @@ -585,12 +591,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Balancer shouldn't receive the call credentials metadata. EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), context->client_metadata().end()); - // Resources (type/name pairs) that have changed since the client - // subscribed to them. - UpdateQueue update_queue; - // Resources that the client will be subscribed to keyed by resource type - // url. - SubscriptionMap subscription_map; // Current Version map keyed by resource type url. std::map resource_type_version; // Creating blocking thread to read from stream. @@ -742,6 +742,21 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, } reader.join(); }(); + // Clean up any subscriptions that were still active when the call finished. + { + grpc_core::MutexLock lock(&ads_mu_); + for (auto& p : subscription_map) { + const std::string& type_url = p.first; + SubscriptionNameMap& subscription_name_map = p.second; + for (auto& q : subscription_name_map) { + const std::string& resource_name = q.first; + SubscriptionState& subscription_state = q.second; + ResourceState& resource_state = + resource_map_[type_url][resource_name]; + resource_state.subscriptions.erase(&subscription_state); + } + } + } gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this); return Status::OK; } @@ -857,7 +872,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, { grpc_core::MutexLock lock(&ads_mu_); NotifyDoneWithAdsCallLocked(); - resource_map_.clear(); resource_type_response_state_.clear(); } gpr_log(GPR_INFO, "ADS[%p]: shut down", this); @@ -1072,8 +1086,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, int client_load_reporting_interval_seconds = 100) - : server_host_("localhost"), - num_backends_(num_backends), + : num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( client_load_reporting_interval_seconds) {} @@ -1101,7 +1114,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // Start the backends. for (size_t i = 0; i < num_backends_; ++i) { backends_.emplace_back(new BackendServerThread); - backends_.back()->Start(server_host_); + backends_.back()->Start(); } // Start the load balancers. for (size_t i = 0; i < num_balancers_; ++i) { @@ -1109,7 +1122,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { new BalancerServerThread(GetParam().enable_load_reporting() ? client_load_reporting_interval_seconds_ : 0)); - balancers_.back()->Start(server_host_); + balancers_.back()->Start(); } ResetStub(); } @@ -1120,10 +1133,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam { } void StartAllBackends() { - for (auto& backend : backends_) backend->Start(server_host_); + for (auto& backend : backends_) backend->Start(); } - void StartBackend(size_t index) { backends_[index]->Start(server_host_); } + void StartBackend(size_t index) { backends_[index]->Start(); } void ShutdownAllBackends() { for (auto& backend : backends_) backend->Shutdown(); @@ -1376,7 +1389,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { ServerThread() : port_(g_port_saver->GetPort()) {} virtual ~ServerThread(){}; - void Start(const grpc::string& server_host) { + void Start() { gpr_log(GPR_INFO, "starting %s server on port %d", Type(), port_); GPR_ASSERT(!running_); running_ = true; @@ -1386,19 +1399,18 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // by ServerThread::Serve from firing before the wait below is hit. grpc_core::MutexLock lock(&mu); grpc_core::CondVar cond; - thread_.reset(new std::thread( - std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); + thread_.reset( + new std::thread(std::bind(&ServerThread::Serve, this, &mu, &cond))); cond.Wait(&mu); gpr_log(GPR_INFO, "%s server startup complete", Type()); } - void Serve(const grpc::string& server_host, grpc_core::Mutex* mu, - grpc_core::CondVar* cond) { + void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. grpc_core::MutexLock lock(mu); std::ostringstream server_address; - server_address << server_host << ":" << port_; + server_address << "localhost:" << port_; ServerBuilder builder; std::shared_ptr creds(new SecureServerCredentials( grpc_fake_transport_security_server_credentials_create())); @@ -1457,8 +1469,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { : ads_service_(new AdsServiceImpl(client_load_reporting_interval > 0)), lrs_service_(new LrsServiceImpl(client_load_reporting_interval)) {} - std::shared_ptr ads_service() { return ads_service_; } - std::shared_ptr lrs_service() { return lrs_service_; } + AdsServiceImpl* ads_service() { return ads_service_.get(); } + LrsServiceImpl* lrs_service() { return lrs_service_.get(); } private: void RegisterAllServices(ServerBuilder* builder) override { @@ -1482,7 +1494,6 @@ class XdsEnd2endTest : public ::testing::TestWithParam { std::shared_ptr lrs_service_; }; - const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; const int client_load_reporting_interval_seconds_; @@ -1736,6 +1747,52 @@ TEST_P(XdsResolverOnlyTest, ClusterRemoved) { AdsServiceImpl::ACKED); } +// Tests that we restart all xDS requests when we reestablish the ADS call. +TEST_P(XdsResolverOnlyTest, RestartsRequestsUponReconnection) { + balancers_[0]->ads_service()->SetLdsToUseDynamicRds(); + const char* kNewClusterName = "new_cluster_name"; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // We need to wait for all backends to come online. + WaitForAllBackends(0, 2); + // Now shut down and restart the balancer. When the client + // reconnects, it should automatically restart the requests for all + // resource types. + balancers_[0]->Shutdown(); + balancers_[0]->Start(); + // Make sure things are still working. + CheckRpcSendOk(100); + // Populate new EDS resource. + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(2, 4)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args2, kNewClusterName), + kNewClusterName); + // Populate new CDS resource. + Cluster new_cluster = balancers_[0]->ads_service()->default_cluster(); + new_cluster.set_name(kNewClusterName); + balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName); + // Change RDS resource to point to new cluster. + RouteConfiguration new_route_config = + balancers_[0]->ads_service()->default_route_config(); + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + balancers_[0]->ads_service()->SetRdsResource(new_route_config, + kDefaultResourceName); + // Wait for all new backends to be used. + std::tuple counts = WaitForAllBackends(2, 4); + // Make sure no RPCs failed in the transition. + EXPECT_EQ(0, std::get<1>(counts)); +} + class XdsResolverLoadReportingOnlyTest : public XdsEnd2endTest { public: XdsResolverLoadReportingOnlyTest() : XdsEnd2endTest(4, 1, 3) {} @@ -3473,7 +3530,7 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { int num_started = std::get<0>(WaitForAllBackends( /* start_index */ 0, /* stop_index */ kNumBackendsFirstPass)); // Now restart the balancer, this time pointing to the new backends. - balancers_[0]->Start(server_host_); + balancers_[0]->Start(); args = AdsServiceImpl::EdsResourceArgs({ {"locality0", GetBackendPorts(kNumBackendsFirstPass)}, });