diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 1fbdbad953e..e8430e3b01c 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -400,7 +400,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
std::pair>;
// A struct representing a client's subscription to a particular resource.
- struct SubscriberState {
+ struct SubscriptionState {
// Version that the client currently knows about.
int current_version = 0;
// The queue upon which to place updates when the resource is updated.
@@ -408,23 +408,25 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
};
// A struct representing the a client's subscription to all the resources.
+ using SubscriptionNameMap =
+ std::map;
using SubscriptionMap =
- std::map>;
+ std::map;
// A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods.
- // - a list of subscribers interested in this resource.
+ // - a list of subscriptions interested in this resource.
struct ResourceState {
int version = 0;
absl::optional resource;
- std::set subscribers;
+ std::set subscriptions;
};
// A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole.
- using ResourcesMap =
- std::map>;
+ using ResourceNameMap =
+ std::map;
+ using ResourceMap = std::map;
AdsServiceImpl(bool enable_load_reporting) {
// Construct RDS response data.
@@ -475,101 +477,61 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
}
// Checks whether the client needs to receive a newer version of
- // the resource.
- bool ClientNeedsResourceUpdate(const string& resource_type,
- const string& name,
- SubscriptionMap* subscription_map) {
- auto subscriber_it = (*subscription_map)[resource_type].find(name);
- if (subscriber_it == (*subscription_map)[resource_type].end()) {
- gpr_log(GPR_INFO,
- "ADS[%p]: Skipping an unsubscribed update for resource %s and "
- "name %s",
- this, resource_type.c_str(), name.c_str());
- return false;
- }
- const auto& resource_state = resources_map_[resource_type][name];
- if (subscriber_it->second.current_version < resource_state.version) {
- subscriber_it->second.current_version = resource_state.version;
- gpr_log(GPR_INFO,
- "ADS[%p]: Need to process new %s update %s, bring current to %d",
- this, resource_type.c_str(), name.c_str(),
- subscriber_it->second.current_version);
+ // the resource. If so, updates subscription_state->current_version and
+ // returns true.
+ bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
+ SubscriptionState* subscription_state) {
+ if (subscription_state->current_version < resource_state.version) {
+ subscription_state->current_version = resource_state.version;
return true;
- } else {
- gpr_log(GPR_INFO,
- "ADS[%p]: Skipping an old %s update %s, current is at %d", this,
- resource_type.c_str(), name.c_str(),
- subscriber_it->second.current_version);
- return false;
}
return false;
}
- // Resource subscription:
- // 1. inserting an entry into the subscription map indexed by resource
- // type/name pair.
- // 2. inserting or updating an entry into the resources map indexed
- // by resource type/name pair about this subscription.
- void ResourceSubscribe(const std::string& resource_type,
- const std::string& name, UpdateQueue* update_queue,
- SubscriptionMap* subscription_map) {
- SubscriberState& subscriber_state =
- (*subscription_map)[resource_type][name];
- subscriber_state.update_queue = update_queue;
- ResourceState& resource_state = resources_map_[resource_type][name];
- resource_state.subscribers.emplace(&subscriber_state);
- gpr_log(
- GPR_INFO,
- "ADS[%p]: subscribe to resource type %s name %s version %d state %p",
- this, resource_type.c_str(), name.c_str(), resource_state.version,
- &subscriber_state);
- }
-
- // Resource unsubscription:
- // 1. update the entry in the resources map indexed
- // by resource type/name pair to remove this subscription
- // 2. remove this entry from the subscription map.
- // 3. remove this resource type from the subscription map if there are no more
- // resources subscribed for the resource type.
- void ResourceUnsubscribe(const std::string& resource_type,
- const std::string& name,
- SubscriptionMap* subscription_map) {
- auto subscription_by_type_it = subscription_map->find(resource_type);
- if (subscription_by_type_it == subscription_map->end()) {
- gpr_log(GPR_INFO, "ADS[%p]: resource type %s not subscribed", this,
- resource_type.c_str());
- return;
- }
- auto& subscription_by_type_map = subscription_by_type_it->second;
- auto subscription_it = subscription_by_type_map.find(name);
- if (subscription_it == subscription_by_type_map.end()) {
- gpr_log(GPR_INFO, "ADS[%p]: resource name %s of type %s not subscribed",
- this, name.c_str(), resource_type.c_str());
- return;
- }
- gpr_log(GPR_INFO,
- "ADS[%p]: Unsubscribe to resource type %s name %s state %p", this,
- resource_type.c_str(), name.c_str(), &subscription_it->second);
- auto resource_by_type_it = resources_map_.find(resource_type);
- GPR_ASSERT(resource_by_type_it != resources_map_.end());
- auto& resource_by_type_map = resource_by_type_it->second;
- auto resource_it = resource_by_type_map.find(name);
- GPR_ASSERT(resource_it != resource_by_type_map.end());
- resource_it->second.subscribers.erase(&subscription_it->second);
- if (resource_it->second.subscribers.empty() &&
- !resource_it->second.resource.has_value()) {
- gpr_log(GPR_INFO,
- "ADS[%p]: Erasing resource type %s name %s from resource map "
- "since there are no more subscribers for this unset resource",
- this, resource_type.c_str(), name.c_str());
- resource_by_type_map.erase(resource_it);
- }
- subscription_by_type_map.erase(subscription_it);
- if (subscription_by_type_map.empty()) {
- gpr_log(GPR_INFO,
- "ADS[%p]: Erasing resource type %s from subscription_map", this,
- resource_type.c_str());
- subscription_map->erase(subscription_by_type_it);
+ // Subscribes to a resource if not already subscribed:
+ // 1. Sets the update_queue field in subscription_state.
+ // 2. Adds subscription_state to resource_state->subscriptions.
+ void MaybeSubscribe(const std::string& resource_type,
+ const std::string& resource_name,
+ SubscriptionState* subscription_state,
+ ResourceState* resource_state,
+ UpdateQueue* update_queue) {
+ if (subscription_state->update_queue != nullptr) return;
+ subscription_state->update_queue = update_queue;
+ resource_state->subscriptions.emplace(subscription_state);
+ gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
+ this, resource_type.c_str(), resource_name.c_str(),
+ &subscription_state);
+ }
+
+ // Removes subscriptions for resources no longer present in the
+ // current request.
+ void ProcessUnsubscriptions(
+ const std::string& resource_type,
+ const std::set& resources_in_current_request,
+ SubscriptionNameMap* subscription_name_map,
+ ResourceNameMap* resource_name_map) {
+ for (auto it = subscription_name_map->begin();
+ it != subscription_name_map->end();) {
+ const std::string& resource_name = it->first;
+ SubscriptionState& subscription_state = it->second;
+ if (resources_in_current_request.find(resource_name) !=
+ resources_in_current_request.end()) {
+ ++it;
+ continue;
+ }
+ gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p",
+ this, resource_type.c_str(), resource_name.c_str(),
+ &subscription_state);
+ auto resource_it = resource_name_map->find(resource_name);
+ GPR_ASSERT(resource_it != resource_name_map->end());
+ auto& resource_state = resource_it->second;
+ resource_state.subscriptions.erase(&subscription_state);
+ if (resource_state.subscriptions.empty() &&
+ !resource_state.resource.has_value()) {
+ resource_name_map->erase(resource_it);
+ }
+ it = subscription_name_map->erase(it);
}
}
@@ -577,7 +539,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const int version,
- const SubscriptionMap& subscription_map,
+ const SubscriptionNameMap& subscription_name_map,
const std::set& resources_added_to_response,
DiscoveryResponse* response) {
resource_type_response_state_[resource_type] = SENT;
@@ -587,18 +549,15 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones)
- auto subscription_map_by_type_it = subscription_map.find(resource_type);
- GPR_ASSERT(subscription_map_by_type_it != subscription_map.end());
- for (const auto& subscription : subscription_map_by_type_it->second) {
- if (resources_added_to_response.find(subscription.first) ==
+ for (const auto& p : subscription_name_map) {
+ const std::string& resource_name = p.first;
+ if (resources_added_to_response.find(resource_name) ==
resources_added_to_response.end()) {
- absl::optional& resource =
- resources_map_[resource_type][subscription.first].resource;
- if (resource.has_value()) {
- response->add_resources()->CopyFrom(resource.value());
- } else {
- gpr_log(GPR_INFO, "ADS[%p]: Unknown resource type %s and name %s",
- this, resource_type.c_str(), subscription.first.c_str());
+ const ResourceState& resource_state =
+ resource_map_[resource_type][resource_name];
+ if (resource_state.resource.has_value()) {
+ response->add_resources()->CopyFrom(
+ resource_state.resource.value());
}
}
}
@@ -622,7 +581,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Resources that the client will be subscribed to keyed by resource type
// url.
SubscriptionMap subscription_map;
- std::map subscriber_map;
// Current Version map keyed by resource type url.
std::map resource_type_version;
// Creating blocking thread to read from stream.
@@ -647,7 +605,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
DiscoveryRequest request = std::move(requests.front());
requests.pop_front();
did_work = true;
- gpr_log(GPR_INFO, "ADS[%p]: Handling request %s with content %s",
+ gpr_log(GPR_INFO,
+ "ADS[%p]: Received request for type %s with content %s",
this, request.type_url().c_str(),
request.DebugString().c_str());
// Identify ACK and NACK by looking for version information and
@@ -667,58 +626,51 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// 3. unsubscribe if necessary
if (resource_types_to_ignore_.find(request.type_url()) ==
resource_types_to_ignore_.end()) {
+ auto& subscription_name_map =
+ subscription_map[request.type_url()];
+ auto& resource_name_map = resource_map_[request.type_url()];
std::set resources_in_current_request;
std::set resources_added_to_response;
for (const std::string& resource_name :
request.resource_names()) {
resources_in_current_request.emplace(resource_name);
- auto subscriber_it =
- subscription_map[request.type_url()].find(resource_name);
- if (subscriber_it ==
- subscription_map[request.type_url()].end()) {
- ResourceSubscribe(request.type_url(), resource_name,
- &update_queue, &subscription_map);
- }
- if (ClientNeedsResourceUpdate(request.type_url(), resource_name,
- &subscription_map)) {
+ auto& subscription_state = subscription_name_map[resource_name];
+ auto& resource_state = resource_name_map[resource_name];
+ MaybeSubscribe(request.type_url(), resource_name,
+ &subscription_state, &resource_state,
+ &update_queue);
+ if (ClientNeedsResourceUpdate(resource_state,
+ &subscription_state)) {
+ gpr_log(
+ GPR_INFO,
+ "ADS[%p]: Sending update for type=%s name=%s version=%d",
+ this, request.type_url().c_str(), resource_name.c_str(),
+ resource_state.version);
resources_added_to_response.emplace(resource_name);
- gpr_log(GPR_INFO,
- "ADS[%p]: Handling resource type %s and name %s",
- this, request.type_url().c_str(),
- resource_name.c_str());
- auto resource =
- resources_map_[request.type_url()][resource_name];
- GPR_ASSERT(resource.resource.has_value());
- response.add_resources()->CopyFrom(resource.resource.value());
- }
- }
- // Remove subscriptions no longer requested: build a list of
- // unsubscriber names first while iterating the subscription_map
- // and then erase from the subscription_map in
- // ResourceUnsubscribe.
- std::set unsubscriber_list;
- for (const auto& subscription :
- subscription_map[request.type_url()]) {
- if (resources_in_current_request.find(subscription.first) ==
- resources_in_current_request.end()) {
- unsubscriber_list.emplace(subscription.first);
+ if (resource_state.resource.has_value()) {
+ response.add_resources()->CopyFrom(
+ resource_state.resource.value());
+ }
}
}
- for (const auto& name : unsubscriber_list) {
- ResourceUnsubscribe(request.type_url(), name,
- &subscription_map);
- }
- if (!response.resources().empty()) {
+ // Process unsubscriptions for any resource no longer
+ // present in the request's resource list.
+ ProcessUnsubscriptions(
+ request.type_url(), resources_in_current_request,
+ &subscription_name_map, &resource_name_map);
+ // Send response if needed.
+ if (!resources_added_to_response.empty()) {
CompleteBuildingDiscoveryResponse(
request.type_url(),
++resource_type_version[request.type_url()],
- subscription_map, resources_added_to_response, &response);
+ subscription_name_map, resources_added_to_response,
+ &response);
}
}
}
}
if (!response.resources().empty()) {
- gpr_log(GPR_INFO, "ADS[%p]: sending request response '%s'", this,
+ gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
response.DebugString().c_str());
stream->Write(response);
}
@@ -727,32 +679,40 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{
grpc_core::MutexLock lock(&ads_mu_);
if (!update_queue.empty()) {
- std::pair update =
- std::move(update_queue.front());
+ const std::string resource_type =
+ std::move(update_queue.front().first);
+ const std::string resource_name =
+ std::move(update_queue.front().second);
update_queue.pop_front();
did_work = true;
- gpr_log(GPR_INFO, "ADS[%p]: Handling update type %s name %s", this,
- update.first.c_str(), update.second.c_str());
- auto subscriber_it =
- subscription_map[update.first].find(update.second);
- if (subscriber_it != subscription_map[update.first].end()) {
- if (ClientNeedsResourceUpdate(update.first, update.second,
- &subscription_map)) {
- gpr_log(GPR_INFO,
- "ADS[%p]: Updating resource type %s and name %s", this,
- update.first.c_str(), update.second.c_str());
- auto resource = resources_map_[update.first][update.second];
- GPR_ASSERT(resource.resource.has_value());
- response.add_resources()->CopyFrom(resource.resource.value());
- CompleteBuildingDiscoveryResponse(
- update.first, ++resource_type_version[update.first],
- subscription_map, {update.second}, &response);
+ gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s",
+ this, resource_type.c_str(), resource_name.c_str());
+ auto& subscription_name_map = subscription_map[resource_type];
+ auto& resource_name_map = resource_map_[resource_type];
+ auto it = subscription_name_map.find(resource_name);
+ if (it != subscription_name_map.end()) {
+ SubscriptionState& subscription_state = it->second;
+ ResourceState& resource_state = resource_name_map[resource_name];
+ if (ClientNeedsResourceUpdate(resource_state,
+ &subscription_state)) {
+ gpr_log(
+ GPR_INFO,
+ "ADS[%p]: Sending update for type=%s name=%s version=%d",
+ this, resource_type.c_str(), resource_name.c_str(),
+ resource_state.version);
+ if (resource_state.resource.has_value()) {
+ response.add_resources()->CopyFrom(
+ resource_state.resource.value());
+ CompleteBuildingDiscoveryResponse(
+ resource_type, ++resource_type_version[resource_type],
+ subscription_name_map, {resource_name}, &response);
+ }
}
}
}
}
if (!response.resources().empty()) {
- gpr_log(GPR_INFO, "ADS[%p]: sending update response '%s'", this,
+ gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
response.DebugString().c_str());
stream->Write(response);
}
@@ -808,13 +768,13 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
- ResourceState& state = resources_map_[type_url][name];
+ ResourceState& state = resource_map_[type_url][name];
++state.version;
state.resource = std::move(resource);
gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this,
type_url.c_str(), name.c_str(), state.version);
- for (SubscriberState* subscriber : state.subscribers) {
- subscriber->update_queue->emplace_back(type_url, name);
+ for (SubscriptionState* subscription : state.subscriptions) {
+ subscription->update_queue->emplace_back(type_url, name);
}
}
@@ -873,15 +833,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{
grpc_core::MutexLock lock(&ads_mu_);
NotifyDoneWithAdsCallLocked();
- resources_map_.clear();
+ resource_map_.clear();
resource_type_response_state_.clear();
}
gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
}
- static ClusterLoadAssignment BuildEdsResource(const EdsResourceArgs& args) {
+ static ClusterLoadAssignment BuildEdsResource(
+ const EdsResourceArgs& args,
+ const char* cluster_name = kDefaultResourceName) {
ClusterLoadAssignment assignment;
- assignment.set_cluster_name(kDefaultResourceName);
+ assignment.set_cluster_name(cluster_name);
for (const auto& locality : args.locality_list) {
auto* endpoints = assignment.add_endpoints();
endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
@@ -946,8 +908,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Note that an entry will exist whenever either of the following is true:
// - The resource exists (i.e., has been created by SetResource() and has not
// yet been destroyed by UnsetResource()).
- // - There is at least one subscriber for the resource.
- ResourcesMap resources_map_;
+ // - There is at least one subscription for the resource.
+ ResourceMap resource_map_;
};
class LrsServiceImpl : public LrsService,