From fcd8bbd34ce95d855211c27f44a3bd68eef227a5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 27 Mar 2020 09:07:24 -0700 Subject: [PATCH] minor cleanups in xds_end2end_test --- test/cpp/end2end/xds_end2end_test.cc | 305 +++++++++++++-------------- 1 file changed, 149 insertions(+), 156 deletions(-) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 0b9e55dc2dc..9d0f3900d88 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -403,40 +403,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, using Stream = ServerReaderWriter; - // A queue of resource type/name pairs that have changed since the client - // subscribed to them. - using UpdateQueue = std::deque< - std::pair>; - - // A struct representing a client's subscription to a particular resource. - struct SubscriptionState { - // Version that the client currently knows about. - int current_version = 0; - // The queue upon which to place updates when the resource is updated. - UpdateQueue* update_queue; - }; - - // A struct representing the a client's subscription to all the resources. - using SubscriptionNameMap = - std::map; - using SubscriptionMap = - std::map; - - // A struct representing the current state for a resource: - // - the version of the resource that is set by the SetResource() methods. - // - a list of subscriptions interested in this resource. - struct ResourceState { - int version = 0; - absl::optional resource; - std::set subscriptions; - }; - - // A struct representing the current state for all resources: - // LDS, CDS, EDS, and RDS for the class as a whole. - using ResourceNameMap = - std::map; - using ResourceMap = std::map; - AdsServiceImpl(bool enable_load_reporting) { // Construct RDS response data. default_route_config_.set_name(kDefaultResourceName); @@ -462,118 +428,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, SetCdsResource(default_cluster_, kDefaultResourceName); } - // Starting a thread to do blocking read on the stream until cancel. - void BlockingRead(Stream* stream, std::deque* requests, - bool* stream_closed) { - DiscoveryRequest request; - bool seen_first_request = false; - while (stream->Read(&request)) { - if (!seen_first_request) { - EXPECT_TRUE(request.has_node()); - ASSERT_FALSE(request.node().client_features().empty()); - EXPECT_EQ(request.node().client_features(0), - "envoy.lb.does_not_support_overprovisioning"); - seen_first_request = true; - } - { - grpc_core::MutexLock lock(&ads_mu_); - requests->emplace_back(std::move(request)); - } - } - gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this); - grpc_core::MutexLock lock(&ads_mu_); - *stream_closed = true; - } - - // Checks whether the client needs to receive a newer version of - // the resource. If so, updates subscription_state->current_version and - // returns true. - bool ClientNeedsResourceUpdate(const ResourceState& resource_state, - SubscriptionState* subscription_state) { - if (subscription_state->current_version < resource_state.version) { - subscription_state->current_version = resource_state.version; - return true; - } - return false; - } - - // Subscribes to a resource if not already subscribed: - // 1. Sets the update_queue field in subscription_state. - // 2. Adds subscription_state to resource_state->subscriptions. - void MaybeSubscribe(const std::string& resource_type, - const std::string& resource_name, - SubscriptionState* subscription_state, - ResourceState* resource_state, - UpdateQueue* update_queue) { - // The update_queue will be null if we were not previously subscribed. - if (subscription_state->update_queue != nullptr) return; - subscription_state->update_queue = update_queue; - resource_state->subscriptions.emplace(subscription_state); - gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p", - this, resource_type.c_str(), resource_name.c_str(), - &subscription_state); - } - - // Removes subscriptions for resources no longer present in the - // current request. - void ProcessUnsubscriptions( - const std::string& resource_type, - const std::set& resources_in_current_request, - SubscriptionNameMap* subscription_name_map, - ResourceNameMap* resource_name_map) { - for (auto it = subscription_name_map->begin(); - it != subscription_name_map->end();) { - const std::string& resource_name = it->first; - SubscriptionState& subscription_state = it->second; - if (resources_in_current_request.find(resource_name) != - resources_in_current_request.end()) { - ++it; - continue; - } - gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", - this, resource_type.c_str(), resource_name.c_str(), - &subscription_state); - auto resource_it = resource_name_map->find(resource_name); - GPR_ASSERT(resource_it != resource_name_map->end()); - auto& resource_state = resource_it->second; - resource_state.subscriptions.erase(&subscription_state); - if (resource_state.subscriptions.empty() && - !resource_state.resource.has_value()) { - resource_name_map->erase(resource_it); - } - it = subscription_name_map->erase(it); - } - } - - // Completing the building a DiscoveryResponse by adding common information - // for all resources and by adding all subscribed resources for LDS and CDS. - void CompleteBuildingDiscoveryResponse( - const std::string& resource_type, const int version, - const SubscriptionNameMap& subscription_name_map, - const std::set& resources_added_to_response, - DiscoveryResponse* response) { - resource_type_response_state_[resource_type] = SENT; - response->set_type_url(resource_type); - response->set_version_info(absl::StrCat(version)); - response->set_nonce(absl::StrCat(version)); - if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { - // For LDS and CDS we must send back all subscribed resources - // (even the unchanged ones) - for (const auto& p : subscription_name_map) { - const std::string& resource_name = p.first; - if (resources_added_to_response.find(resource_name) == - resources_added_to_response.end()) { - const ResourceState& resource_state = - resource_map_[resource_type][resource_name]; - if (resource_state.resource.has_value()) { - response->add_resources()->CopyFrom( - resource_state.resource.value()); - } - } - } - } - } - Status StreamAggregatedResources(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this); @@ -932,6 +786,152 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, } private: + // A queue of resource type/name pairs that have changed since the client + // subscribed to them. + using UpdateQueue = std::deque< + std::pair>; + + // A struct representing a client's subscription to a particular resource. + struct SubscriptionState { + // Version that the client currently knows about. + int current_version = 0; + // The queue upon which to place updates when the resource is updated. + UpdateQueue* update_queue; + }; + + // A struct representing the a client's subscription to all the resources. + using SubscriptionNameMap = + std::map; + using SubscriptionMap = + std::map; + + // A struct representing the current state for a resource: + // - the version of the resource that is set by the SetResource() methods. + // - a list of subscriptions interested in this resource. + struct ResourceState { + int version = 0; + absl::optional resource; + std::set subscriptions; + }; + + // A struct representing the current state for all resources: + // LDS, CDS, EDS, and RDS for the class as a whole. + using ResourceNameMap = + std::map; + using ResourceMap = std::map; + + // Starting a thread to do blocking read on the stream until cancel. + void BlockingRead(Stream* stream, std::deque* requests, + bool* stream_closed) { + DiscoveryRequest request; + bool seen_first_request = false; + while (stream->Read(&request)) { + if (!seen_first_request) { + EXPECT_TRUE(request.has_node()); + ASSERT_FALSE(request.node().client_features().empty()); + EXPECT_EQ(request.node().client_features(0), + "envoy.lb.does_not_support_overprovisioning"); + seen_first_request = true; + } + { + grpc_core::MutexLock lock(&ads_mu_); + requests->emplace_back(std::move(request)); + } + } + gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this); + grpc_core::MutexLock lock(&ads_mu_); + *stream_closed = true; + } + + // Checks whether the client needs to receive a newer version of + // the resource. If so, updates subscription_state->current_version and + // returns true. + bool ClientNeedsResourceUpdate(const ResourceState& resource_state, + SubscriptionState* subscription_state) { + if (subscription_state->current_version < resource_state.version) { + subscription_state->current_version = resource_state.version; + return true; + } + return false; + } + + // Subscribes to a resource if not already subscribed: + // 1. Sets the update_queue field in subscription_state. + // 2. Adds subscription_state to resource_state->subscriptions. + void MaybeSubscribe(const std::string& resource_type, + const std::string& resource_name, + SubscriptionState* subscription_state, + ResourceState* resource_state, + UpdateQueue* update_queue) { + // The update_queue will be null if we were not previously subscribed. + if (subscription_state->update_queue != nullptr) return; + subscription_state->update_queue = update_queue; + resource_state->subscriptions.emplace(subscription_state); + gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + } + + // Removes subscriptions for resources no longer present in the + // current request. + void ProcessUnsubscriptions( + const std::string& resource_type, + const std::set& resources_in_current_request, + SubscriptionNameMap* subscription_name_map, + ResourceNameMap* resource_name_map) { + for (auto it = subscription_name_map->begin(); + it != subscription_name_map->end();) { + const std::string& resource_name = it->first; + SubscriptionState& subscription_state = it->second; + if (resources_in_current_request.find(resource_name) != + resources_in_current_request.end()) { + ++it; + continue; + } + gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + auto resource_it = resource_name_map->find(resource_name); + GPR_ASSERT(resource_it != resource_name_map->end()); + auto& resource_state = resource_it->second; + resource_state.subscriptions.erase(&subscription_state); + if (resource_state.subscriptions.empty() && + !resource_state.resource.has_value()) { + resource_name_map->erase(resource_it); + } + it = subscription_name_map->erase(it); + } + } + + // Completing the building a DiscoveryResponse by adding common information + // for all resources and by adding all subscribed resources for LDS and CDS. + void CompleteBuildingDiscoveryResponse( + const std::string& resource_type, const int version, + const SubscriptionNameMap& subscription_name_map, + const std::set& resources_added_to_response, + DiscoveryResponse* response) { + resource_type_response_state_[resource_type] = SENT; + response->set_type_url(resource_type); + response->set_version_info(absl::StrCat(version)); + response->set_nonce(absl::StrCat(version)); + if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { + // For LDS and CDS we must send back all subscribed resources + // (even the unchanged ones) + for (const auto& p : subscription_name_map) { + const std::string& resource_name = p.first; + if (resources_added_to_response.find(resource_name) == + resources_added_to_response.end()) { + const ResourceState& resource_state = + resource_map_[resource_type][resource_name]; + if (resource_state.resource.has_value()) { + response->add_resources()->CopyFrom( + resource_state.resource.value()); + } + } + } + } + } + grpc_core::CondVar ads_cond_; // Protect the members below. grpc_core::Mutex ads_mu_; @@ -2424,17 +2424,10 @@ TEST_P(LocalityMapTest, LocalityContainingNoEndpoints) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; - const int kLocalityWeight0 = 2; - const int kLocalityWeight1 = 8; - const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1; - const double kLocalityWeightRate0 = - static_cast(kLocalityWeight0) / kTotalLocalityWeight; - const double kLocalityWeightRate1 = - static_cast(kLocalityWeight1) / kTotalLocalityWeight; - // ADS response contains 2 localities, each of which contains 1 backend. + // EDS response contains 2 localities, one with no endpoints. AdsServiceImpl::EdsResourceArgs args({ - {"locality0", GetBackendPorts(), kLocalityWeight0}, - {"locality1", {}, kLocalityWeight1}, + {"locality0", GetBackendPorts()}, + {"locality1", {}}, }); balancers_[0]->ads_service()->SetEdsResource( AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);