Merge pull request #22289 from markdroth/xds_test_cleanup

Clean up xds_end2end_test code
pull/22309/head
Mark D. Roth 5 years ago committed by GitHub
commit 35ee039c29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 302
      test/cpp/end2end/xds_end2end_test.cc

@ -400,7 +400,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
std::pair<std::string /* type url */, std::string /* resource name */>>; std::pair<std::string /* type url */, std::string /* resource name */>>;
// A struct representing a client's subscription to a particular resource. // A struct representing a client's subscription to a particular resource.
struct SubscriberState { struct SubscriptionState {
// Version that the client currently knows about. // Version that the client currently knows about.
int current_version = 0; int current_version = 0;
// The queue upon which to place updates when the resource is updated. // The queue upon which to place updates when the resource is updated.
@ -408,23 +408,25 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
}; };
// A struct representing the a client's subscription to all the resources. // A struct representing the a client's subscription to all the resources.
using SubscriptionNameMap =
std::map<std::string /* resource_name */, SubscriptionState>;
using SubscriptionMap = using SubscriptionMap =
std::map<std::string /* type_url */, std::map<std::string /* type_url */, SubscriptionNameMap>;
std::map<std::string /* resource_name */, SubscriberState>>;
// A struct representing the current state for a resource: // A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods. // - the version of the resource that is set by the SetResource() methods.
// - a list of subscribers interested in this resource. // - a list of subscriptions interested in this resource.
struct ResourceState { struct ResourceState {
int version = 0; int version = 0;
absl::optional<google::protobuf::Any> resource; absl::optional<google::protobuf::Any> resource;
std::set<SubscriberState*> subscribers; std::set<SubscriptionState*> subscriptions;
}; };
// A struct representing the current state for all resources: // A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole. // LDS, CDS, EDS, and RDS for the class as a whole.
using ResourcesMap = using ResourceNameMap =
std::map<std::string, std::map<std::string, ResourceState>>; std::map<std::string /* resource_name */, ResourceState>;
using ResourceMap = std::map<std::string /* type_url */, ResourceNameMap>;
AdsServiceImpl(bool enable_load_reporting) { AdsServiceImpl(bool enable_load_reporting) {
// Construct RDS response data. // Construct RDS response data.
@ -475,101 +477,61 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
} }
// Checks whether the client needs to receive a newer version of // Checks whether the client needs to receive a newer version of
// the resource. // the resource. If so, updates subscription_state->current_version and
bool ClientNeedsResourceUpdate(const string& resource_type, // returns true.
const string& name, bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
SubscriptionMap* subscription_map) { SubscriptionState* subscription_state) {
auto subscriber_it = (*subscription_map)[resource_type].find(name); if (subscription_state->current_version < resource_state.version) {
if (subscriber_it == (*subscription_map)[resource_type].end()) { subscription_state->current_version = resource_state.version;
gpr_log(GPR_INFO,
"ADS[%p]: Skipping an unsubscribed update for resource %s and "
"name %s",
this, resource_type.c_str(), name.c_str());
return false;
}
const auto& resource_state = resources_map_[resource_type][name];
if (subscriber_it->second.current_version < resource_state.version) {
subscriber_it->second.current_version = resource_state.version;
gpr_log(GPR_INFO,
"ADS[%p]: Need to process new %s update %s, bring current to %d",
this, resource_type.c_str(), name.c_str(),
subscriber_it->second.current_version);
return true; return true;
} else {
gpr_log(GPR_INFO,
"ADS[%p]: Skipping an old %s update %s, current is at %d", this,
resource_type.c_str(), name.c_str(),
subscriber_it->second.current_version);
return false;
} }
return false; return false;
} }
// Resource subscription: // Subscribes to a resource if not already subscribed:
// 1. inserting an entry into the subscription map indexed by resource // 1. Sets the update_queue field in subscription_state.
// type/name pair. // 2. Adds subscription_state to resource_state->subscriptions.
// 2. inserting or updating an entry into the resources map indexed void MaybeSubscribe(const std::string& resource_type,
// by resource type/name pair about this subscription. const std::string& resource_name,
void ResourceSubscribe(const std::string& resource_type, SubscriptionState* subscription_state,
const std::string& name, UpdateQueue* update_queue, ResourceState* resource_state,
SubscriptionMap* subscription_map) { UpdateQueue* update_queue) {
SubscriberState& subscriber_state = if (subscription_state->update_queue != nullptr) return;
(*subscription_map)[resource_type][name]; subscription_state->update_queue = update_queue;
subscriber_state.update_queue = update_queue; resource_state->subscriptions.emplace(subscription_state);
ResourceState& resource_state = resources_map_[resource_type][name]; gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
resource_state.subscribers.emplace(&subscriber_state); this, resource_type.c_str(), resource_name.c_str(),
gpr_log( &subscription_state);
GPR_INFO, }
"ADS[%p]: subscribe to resource type %s name %s version %d state %p",
this, resource_type.c_str(), name.c_str(), resource_state.version, // Removes subscriptions for resources no longer present in the
&subscriber_state); // current request.
} void ProcessUnsubscriptions(
const std::string& resource_type,
// Resource unsubscription: const std::set<std::string>& resources_in_current_request,
// 1. update the entry in the resources map indexed SubscriptionNameMap* subscription_name_map,
// by resource type/name pair to remove this subscription ResourceNameMap* resource_name_map) {
// 2. remove this entry from the subscription map. for (auto it = subscription_name_map->begin();
// 3. remove this resource type from the subscription map if there are no more it != subscription_name_map->end();) {
// resources subscribed for the resource type. const std::string& resource_name = it->first;
void ResourceUnsubscribe(const std::string& resource_type, SubscriptionState& subscription_state = it->second;
const std::string& name, if (resources_in_current_request.find(resource_name) !=
SubscriptionMap* subscription_map) { resources_in_current_request.end()) {
auto subscription_by_type_it = subscription_map->find(resource_type); ++it;
if (subscription_by_type_it == subscription_map->end()) { continue;
gpr_log(GPR_INFO, "ADS[%p]: resource type %s not subscribed", this,
resource_type.c_str());
return;
}
auto& subscription_by_type_map = subscription_by_type_it->second;
auto subscription_it = subscription_by_type_map.find(name);
if (subscription_it == subscription_by_type_map.end()) {
gpr_log(GPR_INFO, "ADS[%p]: resource name %s of type %s not subscribed",
this, name.c_str(), resource_type.c_str());
return;
} }
gpr_log(GPR_INFO, gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p",
"ADS[%p]: Unsubscribe to resource type %s name %s state %p", this, this, resource_type.c_str(), resource_name.c_str(),
resource_type.c_str(), name.c_str(), &subscription_it->second); &subscription_state);
auto resource_by_type_it = resources_map_.find(resource_type); auto resource_it = resource_name_map->find(resource_name);
GPR_ASSERT(resource_by_type_it != resources_map_.end()); GPR_ASSERT(resource_it != resource_name_map->end());
auto& resource_by_type_map = resource_by_type_it->second; auto& resource_state = resource_it->second;
auto resource_it = resource_by_type_map.find(name); resource_state.subscriptions.erase(&subscription_state);
GPR_ASSERT(resource_it != resource_by_type_map.end()); if (resource_state.subscriptions.empty() &&
resource_it->second.subscribers.erase(&subscription_it->second); !resource_state.resource.has_value()) {
if (resource_it->second.subscribers.empty() && resource_name_map->erase(resource_it);
!resource_it->second.resource.has_value()) {
gpr_log(GPR_INFO,
"ADS[%p]: Erasing resource type %s name %s from resource map "
"since there are no more subscribers for this unset resource",
this, resource_type.c_str(), name.c_str());
resource_by_type_map.erase(resource_it);
} }
subscription_by_type_map.erase(subscription_it); it = subscription_name_map->erase(it);
if (subscription_by_type_map.empty()) {
gpr_log(GPR_INFO,
"ADS[%p]: Erasing resource type %s from subscription_map", this,
resource_type.c_str());
subscription_map->erase(subscription_by_type_it);
} }
} }
@ -577,7 +539,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// for all resources and by adding all subscribed resources for LDS and CDS. // for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse( void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const int version, const std::string& resource_type, const int version,
const SubscriptionMap& subscription_map, const SubscriptionNameMap& subscription_name_map,
const std::set<std::string>& resources_added_to_response, const std::set<std::string>& resources_added_to_response,
DiscoveryResponse* response) { DiscoveryResponse* response) {
resource_type_response_state_[resource_type] = SENT; resource_type_response_state_[resource_type] = SENT;
@ -587,18 +549,15 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources // For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones) // (even the unchanged ones)
auto subscription_map_by_type_it = subscription_map.find(resource_type); for (const auto& p : subscription_name_map) {
GPR_ASSERT(subscription_map_by_type_it != subscription_map.end()); const std::string& resource_name = p.first;
for (const auto& subscription : subscription_map_by_type_it->second) { if (resources_added_to_response.find(resource_name) ==
if (resources_added_to_response.find(subscription.first) ==
resources_added_to_response.end()) { resources_added_to_response.end()) {
absl::optional<google::protobuf::Any>& resource = const ResourceState& resource_state =
resources_map_[resource_type][subscription.first].resource; resource_map_[resource_type][resource_name];
if (resource.has_value()) { if (resource_state.resource.has_value()) {
response->add_resources()->CopyFrom(resource.value()); response->add_resources()->CopyFrom(
} else { resource_state.resource.value());
gpr_log(GPR_INFO, "ADS[%p]: Unknown resource type %s and name %s",
this, resource_type.c_str(), subscription.first.c_str());
} }
} }
} }
@ -622,7 +581,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Resources that the client will be subscribed to keyed by resource type // Resources that the client will be subscribed to keyed by resource type
// url. // url.
SubscriptionMap subscription_map; SubscriptionMap subscription_map;
std::map<std::string, SubscriberState> subscriber_map;
// Current Version map keyed by resource type url. // Current Version map keyed by resource type url.
std::map<std::string, int> resource_type_version; std::map<std::string, int> resource_type_version;
// Creating blocking thread to read from stream. // Creating blocking thread to read from stream.
@ -647,7 +605,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
DiscoveryRequest request = std::move(requests.front()); DiscoveryRequest request = std::move(requests.front());
requests.pop_front(); requests.pop_front();
did_work = true; did_work = true;
gpr_log(GPR_INFO, "ADS[%p]: Handling request %s with content %s", gpr_log(GPR_INFO,
"ADS[%p]: Received request for type %s with content %s",
this, request.type_url().c_str(), this, request.type_url().c_str(),
request.DebugString().c_str()); request.DebugString().c_str());
// Identify ACK and NACK by looking for version information and // Identify ACK and NACK by looking for version information and
@ -667,58 +626,51 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// 3. unsubscribe if necessary // 3. unsubscribe if necessary
if (resource_types_to_ignore_.find(request.type_url()) == if (resource_types_to_ignore_.find(request.type_url()) ==
resource_types_to_ignore_.end()) { resource_types_to_ignore_.end()) {
auto& subscription_name_map =
subscription_map[request.type_url()];
auto& resource_name_map = resource_map_[request.type_url()];
std::set<std::string> resources_in_current_request; std::set<std::string> resources_in_current_request;
std::set<std::string> resources_added_to_response; std::set<std::string> resources_added_to_response;
for (const std::string& resource_name : for (const std::string& resource_name :
request.resource_names()) { request.resource_names()) {
resources_in_current_request.emplace(resource_name); resources_in_current_request.emplace(resource_name);
auto subscriber_it = auto& subscription_state = subscription_name_map[resource_name];
subscription_map[request.type_url()].find(resource_name); auto& resource_state = resource_name_map[resource_name];
if (subscriber_it == MaybeSubscribe(request.type_url(), resource_name,
subscription_map[request.type_url()].end()) { &subscription_state, &resource_state,
ResourceSubscribe(request.type_url(), resource_name, &update_queue);
&update_queue, &subscription_map); if (ClientNeedsResourceUpdate(resource_state,
} &subscription_state)) {
if (ClientNeedsResourceUpdate(request.type_url(), resource_name, gpr_log(
&subscription_map)) { GPR_INFO,
"ADS[%p]: Sending update for type=%s name=%s version=%d",
this, request.type_url().c_str(), resource_name.c_str(),
resource_state.version);
resources_added_to_response.emplace(resource_name); resources_added_to_response.emplace(resource_name);
gpr_log(GPR_INFO, if (resource_state.resource.has_value()) {
"ADS[%p]: Handling resource type %s and name %s", response.add_resources()->CopyFrom(
this, request.type_url().c_str(), resource_state.resource.value());
resource_name.c_str());
auto resource =
resources_map_[request.type_url()][resource_name];
GPR_ASSERT(resource.resource.has_value());
response.add_resources()->CopyFrom(resource.resource.value());
}
}
// Remove subscriptions no longer requested: build a list of
// unsubscriber names first while iterating the subscription_map
// and then erase from the subscription_map in
// ResourceUnsubscribe.
std::set<std::string> unsubscriber_list;
for (const auto& subscription :
subscription_map[request.type_url()]) {
if (resources_in_current_request.find(subscription.first) ==
resources_in_current_request.end()) {
unsubscriber_list.emplace(subscription.first);
} }
} }
for (const auto& name : unsubscriber_list) {
ResourceUnsubscribe(request.type_url(), name,
&subscription_map);
} }
if (!response.resources().empty()) { // Process unsubscriptions for any resource no longer
// present in the request's resource list.
ProcessUnsubscriptions(
request.type_url(), resources_in_current_request,
&subscription_name_map, &resource_name_map);
// Send response if needed.
if (!resources_added_to_response.empty()) {
CompleteBuildingDiscoveryResponse( CompleteBuildingDiscoveryResponse(
request.type_url(), request.type_url(),
++resource_type_version[request.type_url()], ++resource_type_version[request.type_url()],
subscription_map, resources_added_to_response, &response); subscription_name_map, resources_added_to_response,
&response);
} }
} }
} }
} }
if (!response.resources().empty()) { if (!response.resources().empty()) {
gpr_log(GPR_INFO, "ADS[%p]: sending request response '%s'", this, gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
response.DebugString().c_str()); response.DebugString().c_str());
stream->Write(response); stream->Write(response);
} }
@ -727,32 +679,40 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{ {
grpc_core::MutexLock lock(&ads_mu_); grpc_core::MutexLock lock(&ads_mu_);
if (!update_queue.empty()) { if (!update_queue.empty()) {
std::pair<std::string, std::string> update = const std::string resource_type =
std::move(update_queue.front()); std::move(update_queue.front().first);
const std::string resource_name =
std::move(update_queue.front().second);
update_queue.pop_front(); update_queue.pop_front();
did_work = true; did_work = true;
gpr_log(GPR_INFO, "ADS[%p]: Handling update type %s name %s", this, gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s",
update.first.c_str(), update.second.c_str()); this, resource_type.c_str(), resource_name.c_str());
auto subscriber_it = auto& subscription_name_map = subscription_map[resource_type];
subscription_map[update.first].find(update.second); auto& resource_name_map = resource_map_[resource_type];
if (subscriber_it != subscription_map[update.first].end()) { auto it = subscription_name_map.find(resource_name);
if (ClientNeedsResourceUpdate(update.first, update.second, if (it != subscription_name_map.end()) {
&subscription_map)) { SubscriptionState& subscription_state = it->second;
gpr_log(GPR_INFO, ResourceState& resource_state = resource_name_map[resource_name];
"ADS[%p]: Updating resource type %s and name %s", this, if (ClientNeedsResourceUpdate(resource_state,
update.first.c_str(), update.second.c_str()); &subscription_state)) {
auto resource = resources_map_[update.first][update.second]; gpr_log(
GPR_ASSERT(resource.resource.has_value()); GPR_INFO,
response.add_resources()->CopyFrom(resource.resource.value()); "ADS[%p]: Sending update for type=%s name=%s version=%d",
this, resource_type.c_str(), resource_name.c_str(),
resource_state.version);
if (resource_state.resource.has_value()) {
response.add_resources()->CopyFrom(
resource_state.resource.value());
CompleteBuildingDiscoveryResponse( CompleteBuildingDiscoveryResponse(
update.first, ++resource_type_version[update.first], resource_type, ++resource_type_version[resource_type],
subscription_map, {update.second}, &response); subscription_name_map, {resource_name}, &response);
}
} }
} }
} }
} }
if (!response.resources().empty()) { if (!response.resources().empty()) {
gpr_log(GPR_INFO, "ADS[%p]: sending update response '%s'", this, gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
response.DebugString().c_str()); response.DebugString().c_str());
stream->Write(response); stream->Write(response);
} }
@ -808,13 +768,13 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
void SetResource(google::protobuf::Any resource, const std::string& type_url, void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name) { const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_); grpc_core::MutexLock lock(&ads_mu_);
ResourceState& state = resources_map_[type_url][name]; ResourceState& state = resource_map_[type_url][name];
++state.version; ++state.version;
state.resource = std::move(resource); state.resource = std::move(resource);
gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this, gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this,
type_url.c_str(), name.c_str(), state.version); type_url.c_str(), name.c_str(), state.version);
for (SubscriberState* subscriber : state.subscribers) { for (SubscriptionState* subscription : state.subscriptions) {
subscriber->update_queue->emplace_back(type_url, name); subscription->update_queue->emplace_back(type_url, name);
} }
} }
@ -873,15 +833,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{ {
grpc_core::MutexLock lock(&ads_mu_); grpc_core::MutexLock lock(&ads_mu_);
NotifyDoneWithAdsCallLocked(); NotifyDoneWithAdsCallLocked();
resources_map_.clear(); resource_map_.clear();
resource_type_response_state_.clear(); resource_type_response_state_.clear();
} }
gpr_log(GPR_INFO, "ADS[%p]: shut down", this); gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
} }
static ClusterLoadAssignment BuildEdsResource(const EdsResourceArgs& args) { static ClusterLoadAssignment BuildEdsResource(
const EdsResourceArgs& args,
const char* cluster_name = kDefaultResourceName) {
ClusterLoadAssignment assignment; ClusterLoadAssignment assignment;
assignment.set_cluster_name(kDefaultResourceName); assignment.set_cluster_name(cluster_name);
for (const auto& locality : args.locality_list) { for (const auto& locality : args.locality_list) {
auto* endpoints = assignment.add_endpoints(); auto* endpoints = assignment.add_endpoints();
endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight); endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
@ -946,8 +908,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Note that an entry will exist whenever either of the following is true: // Note that an entry will exist whenever either of the following is true:
// - The resource exists (i.e., has been created by SetResource() and has not // - The resource exists (i.e., has been created by SetResource() and has not
// yet been destroyed by UnsetResource()). // yet been destroyed by UnsetResource()).
// - There is at least one subscriber for the resource. // - There is at least one subscription for the resource.
ResourcesMap resources_map_; ResourceMap resource_map_;
}; };
class LrsServiceImpl : public LrsService, class LrsServiceImpl : public LrsService,

Loading…
Cancel
Save