minor cleanups in xds_end2end_test

pull/22500/head
Mark D. Roth 5 years ago
parent 5186b231b1
commit fcd8bbd34c
  1. 305
      test/cpp/end2end/xds_end2end_test.cc

@ -403,40 +403,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
// A queue of resource type/name pairs that have changed since the client
// subscribed to them.
using UpdateQueue = std::deque<
std::pair<std::string /* type url */, std::string /* resource name */>>;
// A struct representing a client's subscription to a particular resource.
struct SubscriptionState {
// Version that the client currently knows about.
int current_version = 0;
// The queue upon which to place updates when the resource is updated.
UpdateQueue* update_queue;
};
// A struct representing the a client's subscription to all the resources.
using SubscriptionNameMap =
std::map<std::string /* resource_name */, SubscriptionState>;
using SubscriptionMap =
std::map<std::string /* type_url */, SubscriptionNameMap>;
// A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods.
// - a list of subscriptions interested in this resource.
struct ResourceState {
int version = 0;
absl::optional<google::protobuf::Any> resource;
std::set<SubscriptionState*> subscriptions;
};
// A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole.
using ResourceNameMap =
std::map<std::string /* resource_name */, ResourceState>;
using ResourceMap = std::map<std::string /* type_url */, ResourceNameMap>;
AdsServiceImpl(bool enable_load_reporting) {
// Construct RDS response data.
default_route_config_.set_name(kDefaultResourceName);
@ -462,118 +428,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
SetCdsResource(default_cluster_, kDefaultResourceName);
}
// Starting a thread to do blocking read on the stream until cancel.
void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
bool* stream_closed) {
DiscoveryRequest request;
bool seen_first_request = false;
while (stream->Read(&request)) {
if (!seen_first_request) {
EXPECT_TRUE(request.has_node());
ASSERT_FALSE(request.node().client_features().empty());
EXPECT_EQ(request.node().client_features(0),
"envoy.lb.does_not_support_overprovisioning");
seen_first_request = true;
}
{
grpc_core::MutexLock lock(&ads_mu_);
requests->emplace_back(std::move(request));
}
}
gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this);
grpc_core::MutexLock lock(&ads_mu_);
*stream_closed = true;
}
// Checks whether the client needs to receive a newer version of
// the resource. If so, updates subscription_state->current_version and
// returns true.
bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
SubscriptionState* subscription_state) {
if (subscription_state->current_version < resource_state.version) {
subscription_state->current_version = resource_state.version;
return true;
}
return false;
}
// Subscribes to a resource if not already subscribed:
// 1. Sets the update_queue field in subscription_state.
// 2. Adds subscription_state to resource_state->subscriptions.
void MaybeSubscribe(const std::string& resource_type,
const std::string& resource_name,
SubscriptionState* subscription_state,
ResourceState* resource_state,
UpdateQueue* update_queue) {
// The update_queue will be null if we were not previously subscribed.
if (subscription_state->update_queue != nullptr) return;
subscription_state->update_queue = update_queue;
resource_state->subscriptions.emplace(subscription_state);
gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
}
// Removes subscriptions for resources no longer present in the
// current request.
void ProcessUnsubscriptions(
const std::string& resource_type,
const std::set<std::string>& resources_in_current_request,
SubscriptionNameMap* subscription_name_map,
ResourceNameMap* resource_name_map) {
for (auto it = subscription_name_map->begin();
it != subscription_name_map->end();) {
const std::string& resource_name = it->first;
SubscriptionState& subscription_state = it->second;
if (resources_in_current_request.find(resource_name) !=
resources_in_current_request.end()) {
++it;
continue;
}
gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
auto resource_it = resource_name_map->find(resource_name);
GPR_ASSERT(resource_it != resource_name_map->end());
auto& resource_state = resource_it->second;
resource_state.subscriptions.erase(&subscription_state);
if (resource_state.subscriptions.empty() &&
!resource_state.resource.has_value()) {
resource_name_map->erase(resource_it);
}
it = subscription_name_map->erase(it);
}
}
// Completing the building a DiscoveryResponse by adding common information
// for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const int version,
const SubscriptionNameMap& subscription_name_map,
const std::set<std::string>& resources_added_to_response,
DiscoveryResponse* response) {
resource_type_response_state_[resource_type] = SENT;
response->set_type_url(resource_type);
response->set_version_info(absl::StrCat(version));
response->set_nonce(absl::StrCat(version));
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones)
for (const auto& p : subscription_name_map) {
const std::string& resource_name = p.first;
if (resources_added_to_response.find(resource_name) ==
resources_added_to_response.end()) {
const ResourceState& resource_state =
resource_map_[resource_type][resource_name];
if (resource_state.resource.has_value()) {
response->add_resources()->CopyFrom(
resource_state.resource.value());
}
}
}
}
}
Status StreamAggregatedResources(ServerContext* context,
Stream* stream) override {
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
@ -932,6 +786,152 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
}
private:
// A queue of resource type/name pairs that have changed since the client
// subscribed to them.
using UpdateQueue = std::deque<
std::pair<std::string /* type url */, std::string /* resource name */>>;
// A struct representing a client's subscription to a particular resource.
struct SubscriptionState {
// Version that the client currently knows about.
int current_version = 0;
// The queue upon which to place updates when the resource is updated.
UpdateQueue* update_queue;
};
// A struct representing the a client's subscription to all the resources.
using SubscriptionNameMap =
std::map<std::string /* resource_name */, SubscriptionState>;
using SubscriptionMap =
std::map<std::string /* type_url */, SubscriptionNameMap>;
// A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods.
// - a list of subscriptions interested in this resource.
struct ResourceState {
int version = 0;
absl::optional<google::protobuf::Any> resource;
std::set<SubscriptionState*> subscriptions;
};
// A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole.
using ResourceNameMap =
std::map<std::string /* resource_name */, ResourceState>;
using ResourceMap = std::map<std::string /* type_url */, ResourceNameMap>;
// Starting a thread to do blocking read on the stream until cancel.
void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
bool* stream_closed) {
DiscoveryRequest request;
bool seen_first_request = false;
while (stream->Read(&request)) {
if (!seen_first_request) {
EXPECT_TRUE(request.has_node());
ASSERT_FALSE(request.node().client_features().empty());
EXPECT_EQ(request.node().client_features(0),
"envoy.lb.does_not_support_overprovisioning");
seen_first_request = true;
}
{
grpc_core::MutexLock lock(&ads_mu_);
requests->emplace_back(std::move(request));
}
}
gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this);
grpc_core::MutexLock lock(&ads_mu_);
*stream_closed = true;
}
// Checks whether the client needs to receive a newer version of
// the resource. If so, updates subscription_state->current_version and
// returns true.
bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
SubscriptionState* subscription_state) {
if (subscription_state->current_version < resource_state.version) {
subscription_state->current_version = resource_state.version;
return true;
}
return false;
}
// Subscribes to a resource if not already subscribed:
// 1. Sets the update_queue field in subscription_state.
// 2. Adds subscription_state to resource_state->subscriptions.
void MaybeSubscribe(const std::string& resource_type,
const std::string& resource_name,
SubscriptionState* subscription_state,
ResourceState* resource_state,
UpdateQueue* update_queue) {
// The update_queue will be null if we were not previously subscribed.
if (subscription_state->update_queue != nullptr) return;
subscription_state->update_queue = update_queue;
resource_state->subscriptions.emplace(subscription_state);
gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
}
// Removes subscriptions for resources no longer present in the
// current request.
void ProcessUnsubscriptions(
const std::string& resource_type,
const std::set<std::string>& resources_in_current_request,
SubscriptionNameMap* subscription_name_map,
ResourceNameMap* resource_name_map) {
for (auto it = subscription_name_map->begin();
it != subscription_name_map->end();) {
const std::string& resource_name = it->first;
SubscriptionState& subscription_state = it->second;
if (resources_in_current_request.find(resource_name) !=
resources_in_current_request.end()) {
++it;
continue;
}
gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
auto resource_it = resource_name_map->find(resource_name);
GPR_ASSERT(resource_it != resource_name_map->end());
auto& resource_state = resource_it->second;
resource_state.subscriptions.erase(&subscription_state);
if (resource_state.subscriptions.empty() &&
!resource_state.resource.has_value()) {
resource_name_map->erase(resource_it);
}
it = subscription_name_map->erase(it);
}
}
// Completing the building a DiscoveryResponse by adding common information
// for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const int version,
const SubscriptionNameMap& subscription_name_map,
const std::set<std::string>& resources_added_to_response,
DiscoveryResponse* response) {
resource_type_response_state_[resource_type] = SENT;
response->set_type_url(resource_type);
response->set_version_info(absl::StrCat(version));
response->set_nonce(absl::StrCat(version));
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones)
for (const auto& p : subscription_name_map) {
const std::string& resource_name = p.first;
if (resources_added_to_response.find(resource_name) ==
resources_added_to_response.end()) {
const ResourceState& resource_state =
resource_map_[resource_type][resource_name];
if (resource_state.resource.has_value()) {
response->add_resources()->CopyFrom(
resource_state.resource.value());
}
}
}
}
}
grpc_core::CondVar ads_cond_;
// Protect the members below.
grpc_core::Mutex ads_mu_;
@ -2424,17 +2424,10 @@ TEST_P(LocalityMapTest, LocalityContainingNoEndpoints) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
const double kLocalityWeightRate0 =
static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
const double kLocalityWeightRate1 =
static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
// ADS response contains 2 localities, each of which contains 1 backend.
// EDS response contains 2 localities, one with no endpoints.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(), kLocalityWeight0},
{"locality1", {}, kLocalityWeight1},
{"locality0", GetBackendPorts()},
{"locality1", {}},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);

Loading…
Cancel
Save