diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 1fbdbad953e..e8430e3b01c 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -400,7 +400,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, std::pair>; // A struct representing a client's subscription to a particular resource. - struct SubscriberState { + struct SubscriptionState { // Version that the client currently knows about. int current_version = 0; // The queue upon which to place updates when the resource is updated. @@ -408,23 +408,25 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, }; // A struct representing the a client's subscription to all the resources. + using SubscriptionNameMap = + std::map; using SubscriptionMap = - std::map>; + std::map; // A struct representing the current state for a resource: // - the version of the resource that is set by the SetResource() methods. - // - a list of subscribers interested in this resource. + // - a list of subscriptions interested in this resource. struct ResourceState { int version = 0; absl::optional resource; - std::set subscribers; + std::set subscriptions; }; // A struct representing the current state for all resources: // LDS, CDS, EDS, and RDS for the class as a whole. - using ResourcesMap = - std::map>; + using ResourceNameMap = + std::map; + using ResourceMap = std::map; AdsServiceImpl(bool enable_load_reporting) { // Construct RDS response data. @@ -475,101 +477,61 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, } // Checks whether the client needs to receive a newer version of - // the resource. - bool ClientNeedsResourceUpdate(const string& resource_type, - const string& name, - SubscriptionMap* subscription_map) { - auto subscriber_it = (*subscription_map)[resource_type].find(name); - if (subscriber_it == (*subscription_map)[resource_type].end()) { - gpr_log(GPR_INFO, - "ADS[%p]: Skipping an unsubscribed update for resource %s and " - "name %s", - this, resource_type.c_str(), name.c_str()); - return false; - } - const auto& resource_state = resources_map_[resource_type][name]; - if (subscriber_it->second.current_version < resource_state.version) { - subscriber_it->second.current_version = resource_state.version; - gpr_log(GPR_INFO, - "ADS[%p]: Need to process new %s update %s, bring current to %d", - this, resource_type.c_str(), name.c_str(), - subscriber_it->second.current_version); + // the resource. If so, updates subscription_state->current_version and + // returns true. + bool ClientNeedsResourceUpdate(const ResourceState& resource_state, + SubscriptionState* subscription_state) { + if (subscription_state->current_version < resource_state.version) { + subscription_state->current_version = resource_state.version; return true; - } else { - gpr_log(GPR_INFO, - "ADS[%p]: Skipping an old %s update %s, current is at %d", this, - resource_type.c_str(), name.c_str(), - subscriber_it->second.current_version); - return false; } return false; } - // Resource subscription: - // 1. inserting an entry into the subscription map indexed by resource - // type/name pair. - // 2. inserting or updating an entry into the resources map indexed - // by resource type/name pair about this subscription. - void ResourceSubscribe(const std::string& resource_type, - const std::string& name, UpdateQueue* update_queue, - SubscriptionMap* subscription_map) { - SubscriberState& subscriber_state = - (*subscription_map)[resource_type][name]; - subscriber_state.update_queue = update_queue; - ResourceState& resource_state = resources_map_[resource_type][name]; - resource_state.subscribers.emplace(&subscriber_state); - gpr_log( - GPR_INFO, - "ADS[%p]: subscribe to resource type %s name %s version %d state %p", - this, resource_type.c_str(), name.c_str(), resource_state.version, - &subscriber_state); - } - - // Resource unsubscription: - // 1. update the entry in the resources map indexed - // by resource type/name pair to remove this subscription - // 2. remove this entry from the subscription map. - // 3. remove this resource type from the subscription map if there are no more - // resources subscribed for the resource type. - void ResourceUnsubscribe(const std::string& resource_type, - const std::string& name, - SubscriptionMap* subscription_map) { - auto subscription_by_type_it = subscription_map->find(resource_type); - if (subscription_by_type_it == subscription_map->end()) { - gpr_log(GPR_INFO, "ADS[%p]: resource type %s not subscribed", this, - resource_type.c_str()); - return; - } - auto& subscription_by_type_map = subscription_by_type_it->second; - auto subscription_it = subscription_by_type_map.find(name); - if (subscription_it == subscription_by_type_map.end()) { - gpr_log(GPR_INFO, "ADS[%p]: resource name %s of type %s not subscribed", - this, name.c_str(), resource_type.c_str()); - return; - } - gpr_log(GPR_INFO, - "ADS[%p]: Unsubscribe to resource type %s name %s state %p", this, - resource_type.c_str(), name.c_str(), &subscription_it->second); - auto resource_by_type_it = resources_map_.find(resource_type); - GPR_ASSERT(resource_by_type_it != resources_map_.end()); - auto& resource_by_type_map = resource_by_type_it->second; - auto resource_it = resource_by_type_map.find(name); - GPR_ASSERT(resource_it != resource_by_type_map.end()); - resource_it->second.subscribers.erase(&subscription_it->second); - if (resource_it->second.subscribers.empty() && - !resource_it->second.resource.has_value()) { - gpr_log(GPR_INFO, - "ADS[%p]: Erasing resource type %s name %s from resource map " - "since there are no more subscribers for this unset resource", - this, resource_type.c_str(), name.c_str()); - resource_by_type_map.erase(resource_it); - } - subscription_by_type_map.erase(subscription_it); - if (subscription_by_type_map.empty()) { - gpr_log(GPR_INFO, - "ADS[%p]: Erasing resource type %s from subscription_map", this, - resource_type.c_str()); - subscription_map->erase(subscription_by_type_it); + // Subscribes to a resource if not already subscribed: + // 1. Sets the update_queue field in subscription_state. + // 2. Adds subscription_state to resource_state->subscriptions. + void MaybeSubscribe(const std::string& resource_type, + const std::string& resource_name, + SubscriptionState* subscription_state, + ResourceState* resource_state, + UpdateQueue* update_queue) { + if (subscription_state->update_queue != nullptr) return; + subscription_state->update_queue = update_queue; + resource_state->subscriptions.emplace(subscription_state); + gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + } + + // Removes subscriptions for resources no longer present in the + // current request. + void ProcessUnsubscriptions( + const std::string& resource_type, + const std::set& resources_in_current_request, + SubscriptionNameMap* subscription_name_map, + ResourceNameMap* resource_name_map) { + for (auto it = subscription_name_map->begin(); + it != subscription_name_map->end();) { + const std::string& resource_name = it->first; + SubscriptionState& subscription_state = it->second; + if (resources_in_current_request.find(resource_name) != + resources_in_current_request.end()) { + ++it; + continue; + } + gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + auto resource_it = resource_name_map->find(resource_name); + GPR_ASSERT(resource_it != resource_name_map->end()); + auto& resource_state = resource_it->second; + resource_state.subscriptions.erase(&subscription_state); + if (resource_state.subscriptions.empty() && + !resource_state.resource.has_value()) { + resource_name_map->erase(resource_it); + } + it = subscription_name_map->erase(it); } } @@ -577,7 +539,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // for all resources and by adding all subscribed resources for LDS and CDS. void CompleteBuildingDiscoveryResponse( const std::string& resource_type, const int version, - const SubscriptionMap& subscription_map, + const SubscriptionNameMap& subscription_name_map, const std::set& resources_added_to_response, DiscoveryResponse* response) { resource_type_response_state_[resource_type] = SENT; @@ -587,18 +549,15 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { // For LDS and CDS we must send back all subscribed resources // (even the unchanged ones) - auto subscription_map_by_type_it = subscription_map.find(resource_type); - GPR_ASSERT(subscription_map_by_type_it != subscription_map.end()); - for (const auto& subscription : subscription_map_by_type_it->second) { - if (resources_added_to_response.find(subscription.first) == + for (const auto& p : subscription_name_map) { + const std::string& resource_name = p.first; + if (resources_added_to_response.find(resource_name) == resources_added_to_response.end()) { - absl::optional& resource = - resources_map_[resource_type][subscription.first].resource; - if (resource.has_value()) { - response->add_resources()->CopyFrom(resource.value()); - } else { - gpr_log(GPR_INFO, "ADS[%p]: Unknown resource type %s and name %s", - this, resource_type.c_str(), subscription.first.c_str()); + const ResourceState& resource_state = + resource_map_[resource_type][resource_name]; + if (resource_state.resource.has_value()) { + response->add_resources()->CopyFrom( + resource_state.resource.value()); } } } @@ -622,7 +581,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Resources that the client will be subscribed to keyed by resource type // url. SubscriptionMap subscription_map; - std::map subscriber_map; // Current Version map keyed by resource type url. std::map resource_type_version; // Creating blocking thread to read from stream. @@ -647,7 +605,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, DiscoveryRequest request = std::move(requests.front()); requests.pop_front(); did_work = true; - gpr_log(GPR_INFO, "ADS[%p]: Handling request %s with content %s", + gpr_log(GPR_INFO, + "ADS[%p]: Received request for type %s with content %s", this, request.type_url().c_str(), request.DebugString().c_str()); // Identify ACK and NACK by looking for version information and @@ -667,58 +626,51 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // 3. unsubscribe if necessary if (resource_types_to_ignore_.find(request.type_url()) == resource_types_to_ignore_.end()) { + auto& subscription_name_map = + subscription_map[request.type_url()]; + auto& resource_name_map = resource_map_[request.type_url()]; std::set resources_in_current_request; std::set resources_added_to_response; for (const std::string& resource_name : request.resource_names()) { resources_in_current_request.emplace(resource_name); - auto subscriber_it = - subscription_map[request.type_url()].find(resource_name); - if (subscriber_it == - subscription_map[request.type_url()].end()) { - ResourceSubscribe(request.type_url(), resource_name, - &update_queue, &subscription_map); - } - if (ClientNeedsResourceUpdate(request.type_url(), resource_name, - &subscription_map)) { + auto& subscription_state = subscription_name_map[resource_name]; + auto& resource_state = resource_name_map[resource_name]; + MaybeSubscribe(request.type_url(), resource_name, + &subscription_state, &resource_state, + &update_queue); + if (ClientNeedsResourceUpdate(resource_state, + &subscription_state)) { + gpr_log( + GPR_INFO, + "ADS[%p]: Sending update for type=%s name=%s version=%d", + this, request.type_url().c_str(), resource_name.c_str(), + resource_state.version); resources_added_to_response.emplace(resource_name); - gpr_log(GPR_INFO, - "ADS[%p]: Handling resource type %s and name %s", - this, request.type_url().c_str(), - resource_name.c_str()); - auto resource = - resources_map_[request.type_url()][resource_name]; - GPR_ASSERT(resource.resource.has_value()); - response.add_resources()->CopyFrom(resource.resource.value()); - } - } - // Remove subscriptions no longer requested: build a list of - // unsubscriber names first while iterating the subscription_map - // and then erase from the subscription_map in - // ResourceUnsubscribe. - std::set unsubscriber_list; - for (const auto& subscription : - subscription_map[request.type_url()]) { - if (resources_in_current_request.find(subscription.first) == - resources_in_current_request.end()) { - unsubscriber_list.emplace(subscription.first); + if (resource_state.resource.has_value()) { + response.add_resources()->CopyFrom( + resource_state.resource.value()); + } } } - for (const auto& name : unsubscriber_list) { - ResourceUnsubscribe(request.type_url(), name, - &subscription_map); - } - if (!response.resources().empty()) { + // Process unsubscriptions for any resource no longer + // present in the request's resource list. + ProcessUnsubscriptions( + request.type_url(), resources_in_current_request, + &subscription_name_map, &resource_name_map); + // Send response if needed. + if (!resources_added_to_response.empty()) { CompleteBuildingDiscoveryResponse( request.type_url(), ++resource_type_version[request.type_url()], - subscription_map, resources_added_to_response, &response); + subscription_name_map, resources_added_to_response, + &response); } } } } if (!response.resources().empty()) { - gpr_log(GPR_INFO, "ADS[%p]: sending request response '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, response.DebugString().c_str()); stream->Write(response); } @@ -727,32 +679,40 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, { grpc_core::MutexLock lock(&ads_mu_); if (!update_queue.empty()) { - std::pair update = - std::move(update_queue.front()); + const std::string resource_type = + std::move(update_queue.front().first); + const std::string resource_name = + std::move(update_queue.front().second); update_queue.pop_front(); did_work = true; - gpr_log(GPR_INFO, "ADS[%p]: Handling update type %s name %s", this, - update.first.c_str(), update.second.c_str()); - auto subscriber_it = - subscription_map[update.first].find(update.second); - if (subscriber_it != subscription_map[update.first].end()) { - if (ClientNeedsResourceUpdate(update.first, update.second, - &subscription_map)) { - gpr_log(GPR_INFO, - "ADS[%p]: Updating resource type %s and name %s", this, - update.first.c_str(), update.second.c_str()); - auto resource = resources_map_[update.first][update.second]; - GPR_ASSERT(resource.resource.has_value()); - response.add_resources()->CopyFrom(resource.resource.value()); - CompleteBuildingDiscoveryResponse( - update.first, ++resource_type_version[update.first], - subscription_map, {update.second}, &response); + gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", + this, resource_type.c_str(), resource_name.c_str()); + auto& subscription_name_map = subscription_map[resource_type]; + auto& resource_name_map = resource_map_[resource_type]; + auto it = subscription_name_map.find(resource_name); + if (it != subscription_name_map.end()) { + SubscriptionState& subscription_state = it->second; + ResourceState& resource_state = resource_name_map[resource_name]; + if (ClientNeedsResourceUpdate(resource_state, + &subscription_state)) { + gpr_log( + GPR_INFO, + "ADS[%p]: Sending update for type=%s name=%s version=%d", + this, resource_type.c_str(), resource_name.c_str(), + resource_state.version); + if (resource_state.resource.has_value()) { + response.add_resources()->CopyFrom( + resource_state.resource.value()); + CompleteBuildingDiscoveryResponse( + resource_type, ++resource_type_version[resource_type], + subscription_name_map, {resource_name}, &response); + } } } } } if (!response.resources().empty()) { - gpr_log(GPR_INFO, "ADS[%p]: sending update response '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, response.DebugString().c_str()); stream->Write(response); } @@ -808,13 +768,13 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); - ResourceState& state = resources_map_[type_url][name]; + ResourceState& state = resource_map_[type_url][name]; ++state.version; state.resource = std::move(resource); gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this, type_url.c_str(), name.c_str(), state.version); - for (SubscriberState* subscriber : state.subscribers) { - subscriber->update_queue->emplace_back(type_url, name); + for (SubscriptionState* subscription : state.subscriptions) { + subscription->update_queue->emplace_back(type_url, name); } } @@ -873,15 +833,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, { grpc_core::MutexLock lock(&ads_mu_); NotifyDoneWithAdsCallLocked(); - resources_map_.clear(); + resource_map_.clear(); resource_type_response_state_.clear(); } gpr_log(GPR_INFO, "ADS[%p]: shut down", this); } - static ClusterLoadAssignment BuildEdsResource(const EdsResourceArgs& args) { + static ClusterLoadAssignment BuildEdsResource( + const EdsResourceArgs& args, + const char* cluster_name = kDefaultResourceName) { ClusterLoadAssignment assignment; - assignment.set_cluster_name(kDefaultResourceName); + assignment.set_cluster_name(cluster_name); for (const auto& locality : args.locality_list) { auto* endpoints = assignment.add_endpoints(); endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight); @@ -946,8 +908,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Note that an entry will exist whenever either of the following is true: // - The resource exists (i.e., has been created by SetResource() and has not // yet been destroyed by UnsetResource()). - // - There is at least one subscriber for the resource. - ResourcesMap resources_map_; + // - There is at least one subscription for the resource. + ResourceMap resource_map_; }; class LrsServiceImpl : public LrsService,