From c306d7940f311e2382cac637a794ae4f7e2a1c1c Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 10 Mar 2020 10:53:39 -0700 Subject: [PATCH 1/2] add test for changing clusters --- test/cpp/end2end/xds_end2end_test.cc | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index e8430e3b01c..90922fb96ad 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -765,6 +765,18 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, resource_types_to_ignore_.emplace(type_url); } + void UnsetResource(const std::string& type_url, const std::string& name) { + grpc_core::MutexLock lock(&ads_mu_); + ResourceState& state = resource_map_[type_url][name]; + ++state.version; + state.resource.reset(); + gpr_log(GPR_INFO, "ADS[%p]: Unsetting %s resource %s to version %u", this, + type_url.c_str(), name.c_str(), state.version); + for (SubscriptionState* subscription : state.subscriptions) { + subscription->update_queue->emplace_back(type_url, name); + } + } + void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); @@ -1639,6 +1651,45 @@ TEST_P(BasicTest, BackendsRestart) { true /* wait_for_ready */); } +using XdsResolverOnlyTest = BasicTest; + +// Tests switching over from one cluster to another. +TEST_P(XdsResolverOnlyTest, ChangeClusters) { + const char* kNewClusterName = "new_cluster_name"; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // We need to wait for all backends to come online. + WaitForAllBackends(0, 2); + // Populate new EDS resource. + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(2, 4)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args2, kNewClusterName), + kNewClusterName); + // Populate new CDS resource. + Cluster new_cluster = balancers_[0]->ads_service()->default_cluster(); + new_cluster.set_name(kNewClusterName); + balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName); + // Change RDS resource to point to new cluster. + RouteConfiguration new_route_config = + balancers_[0]->ads_service()->default_route_config(); + new_route_config.mutable_virtual_hosts(0)->mutable_routes(0) + ->mutable_route()->set_cluster(kNewClusterName); + Listener listener = + balancers_[0]->ads_service()->BuildListener(new_route_config); + balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); + // Wait for all new backends to be used. + std::tuple counts = WaitForAllBackends(2, 4); + // Make sure no RPCs failed in the transition. + EXPECT_EQ(0, std::get<1>(counts)); +} + using SecureNamingTest = BasicTest; // Tests that secure naming check passes if target name is expected. @@ -3307,6 +3358,12 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest, TestType(true, true)), &TestTypeName); +// XdsResolverOnlyTest depends on XdsResolver. +INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest, + ::testing::Values(TestType(true, false), + TestType(true, true)), + &TestTypeName); + INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, ::testing::Values(TestType(false, true), TestType(false, false), From 46b44146552ce4a3dc112d844ce9bbc6b9546d28 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 10 Mar 2020 11:44:39 -0700 Subject: [PATCH 2/2] change logic to send empty response when resources go away --- test/cpp/end2end/xds_end2end_test.cc | 31 ++++++++++++++-------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 90922fb96ad..1b3d792d9fb 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -351,7 +351,6 @@ class ClientStats { std::map dropped_requests_; }; -// TODO(roth): Change this service to a real fake. class AdsServiceImpl : public AggregatedDiscoveryService::Service, public std::enable_shared_from_this { public: @@ -594,7 +593,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Main loop to look for requests and updates. while (true) { // Look for new requests and and decide what to handle. - DiscoveryResponse response; + absl::optional response; // Boolean to keep track if the loop received any work to do: a request // or an update; regardless whether a response was actually sent out. bool did_work = false; @@ -647,8 +646,9 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, this, request.type_url().c_str(), resource_name.c_str(), resource_state.version); resources_added_to_response.emplace(resource_name); + if (!response.has_value()) response.emplace(); if (resource_state.resource.has_value()) { - response.add_resources()->CopyFrom( + response->add_resources()->CopyFrom( resource_state.resource.value()); } } @@ -664,17 +664,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, request.type_url(), ++resource_type_version[request.type_url()], subscription_name_map, resources_added_to_response, - &response); + &response.value()); } } } } - if (!response.resources().empty()) { + if (response.has_value()) { gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, - response.DebugString().c_str()); - stream->Write(response); + response->DebugString().c_str()); + stream->Write(response.value()); } - response.Clear(); + response.reset(); // Look for updates and decide what to handle. { grpc_core::MutexLock lock(&ads_mu_); @@ -700,21 +700,22 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, "ADS[%p]: Sending update for type=%s name=%s version=%d", this, resource_type.c_str(), resource_name.c_str(), resource_state.version); + response.emplace(); if (resource_state.resource.has_value()) { - response.add_resources()->CopyFrom( + response->add_resources()->CopyFrom( resource_state.resource.value()); - CompleteBuildingDiscoveryResponse( - resource_type, ++resource_type_version[resource_type], - subscription_name_map, {resource_name}, &response); } + CompleteBuildingDiscoveryResponse( + resource_type, ++resource_type_version[resource_type], + subscription_name_map, {resource_name}, &response.value()); } } } } - if (!response.resources().empty()) { + if (response.has_value()) { gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, - response.DebugString().c_str()); - stream->Write(response); + response->DebugString().c_str()); + stream->Write(response.value()); } // If we didn't find anything to do, delay before the next loop // iteration; otherwise, check whether we should exit and then