Integrating weighted target policy into xds routing policy

pull/22749/head
Donna Dionne 5 years ago
parent 6640651bcf
commit 9ad561c436
  1. 1
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  2. 101
      src/core/ext/filters/client_channel/xds/xds_api.cc
  3. 35
      src/core/ext/filters/client_channel/xds/xds_api.h
  4. 206
      src/core/ext/filters/client_channel/xds/xds_client.cc
  5. 32
      src/core/ext/filters/client_channel/xds/xds_client.h
  6. 16
      src/proto/grpc/testing/xds/lds_rds_for_test.proto
  7. 657
      test/cpp/end2end/xds_end2end_test.cc

@ -302,6 +302,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
targets_[name]->UpdateLocked(config, std::move(address_map[name]),
args.args);
}
UpdateStateLocked();
}
void WeightedTargetLb::UpdateStateLocked() {

@ -298,6 +298,10 @@ inline absl::string_view UpbStringToAbsl(const upb_strview& str) {
return absl::string_view(str.data, str.size);
}
inline std::string UpbStringToStdString(const upb_strview& str) {
return std::string(str.data, str.size);
}
inline void AddStringField(const char* name, const upb_strview& value,
std::vector<std::string>* fields,
bool add_if_empty = false) {
@ -1023,7 +1027,7 @@ grpc_error* RouteConfigParse(
const envoy_api_v2_route_Route* route = routes[i];
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
XdsApi::RdsRoute rds_route;
XdsApi::RdsUpdate::RdsRoute rds_route;
if (envoy_api_v2_route_RouteMatch_has_prefix(match)) {
upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match);
// Empty prefix "" is accepted.
@ -1085,18 +1089,65 @@ grpc_error* RouteConfigParse(
}
const envoy_api_v2_route_RouteAction* route_action =
envoy_api_v2_route_Route_route(route);
// Get the cluster in the RouteAction.
if (!envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No cluster found in RouteAction.");
}
const upb_strview action =
envoy_api_v2_route_RouteAction_cluster(route_action);
if (action.size == 0) {
// Get the cluster or weighted_clusters in the RouteAction.
if (envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
const upb_strview cluster_name =
envoy_api_v2_route_RouteAction_cluster(route_action);
if (cluster_name.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains empty cluster name.");
}
rds_route.cluster_name = UpbStringToStdString(cluster_name);
} else if (envoy_api_v2_route_RouteAction_has_weighted_clusters(
route_action)) {
const envoy_api_v2_route_WeightedCluster* weighted_cluster =
envoy_api_v2_route_RouteAction_weighted_clusters(route_action);
uint32_t total_weight = 100;
const google_protobuf_UInt32Value* weight =
envoy_api_v2_route_WeightedCluster_total_weight(weighted_cluster);
if (weight != nullptr) {
total_weight = google_protobuf_UInt32Value_value(weight);
}
size_t clusters_size;
const envoy_api_v2_route_WeightedCluster_ClusterWeight* const* clusters =
envoy_api_v2_route_WeightedCluster_clusters(weighted_cluster,
&clusters_size);
uint32_t sum_of_weights = 0;
for (size_t j = 0; j < clusters_size; ++j) {
const envoy_api_v2_route_WeightedCluster_ClusterWeight* cluster_weight =
clusters[j];
XdsApi::RdsUpdate::RdsRoute::ClusterWeight cluster;
cluster.name = UpbStringToStdString(
envoy_api_v2_route_WeightedCluster_ClusterWeight_name(
cluster_weight));
if (cluster.name.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster cluster contains empty cluster "
"name.");
}
const google_protobuf_UInt32Value* weight =
envoy_api_v2_route_WeightedCluster_ClusterWeight_weight(
cluster_weight);
if (weight == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster cluster missing weight");
}
cluster.weight = google_protobuf_UInt32Value_value(weight);
sum_of_weights += cluster.weight;
rds_route.weighted_clusters.emplace_back(std::move(cluster));
}
if (total_weight != sum_of_weights) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster has incorrect total weight");
}
if (rds_route.weighted_clusters.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster has no valid clusters specified.");
}
} else {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction contains empty cluster.");
"No cluster or weighted_clusters found in RouteAction.");
}
rds_route.cluster_name = std::string(action.data, action.size);
rds_update->routes.emplace_back(std::move(rds_route));
}
if (rds_update->routes.empty()) {
@ -1190,12 +1241,10 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
"HttpConnectionManager ConfigSource for RDS does not specify ADS.");
}
// Get the route_config_name.
const upb_strview route_config_name =
envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
rds);
lds_update->emplace();
(*lds_update)->route_config_name =
std::string(route_config_name.data, route_config_name.size);
(*lds_update)->route_config_name = UpbStringToStdString(
envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
rds));
return GRPC_ERROR_NONE;
}
return GRPC_ERROR_NONE;
@ -1297,8 +1346,7 @@ grpc_error* CdsResponseParse(
upb_strview service_name =
envoy_api_v2_Cluster_EdsClusterConfig_service_name(eds_cluster_config);
if (service_name.size != 0) {
cds_update.eds_service_name =
std::string(service_name.data, service_name.size);
cds_update.eds_service_name = UpbStringToStdString(service_name);
}
// Check the LB policy.
if (envoy_api_v2_Cluster_lb_policy(cluster) !=
@ -1316,7 +1364,7 @@ grpc_error* CdsResponseParse(
}
cds_update.lrs_load_reporting_server_name.emplace("");
}
cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
cds_update_map->emplace(UpbStringToStdString(cluster_name),
std::move(cds_update));
}
return GRPC_ERROR_NONE;
@ -1377,8 +1425,8 @@ grpc_error* LocalityParse(
upb_strview zone = envoy_api_v2_core_Locality_region(locality);
upb_strview sub_zone = envoy_api_v2_core_Locality_sub_zone(locality);
output_locality->name = MakeRefCounted<XdsLocalityName>(
std::string(region.data, region.size), std::string(zone.data, zone.size),
std::string(sub_zone.data, sub_zone.size));
UpbStringToStdString(region), UpbStringToStdString(zone),
UpbStringToStdString(sub_zone));
// Parse the addresses.
size_t size;
const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
@ -1428,8 +1476,7 @@ grpc_error* DropParseAndAppend(
}
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
drop_config->AddCategory(std::string(category.data, category.size),
numerator);
drop_config->AddCategory(UpbStringToStdString(category), numerator);
return GRPC_ERROR_NONE;
}
@ -1508,7 +1555,7 @@ grpc_error* EdsResponseParse(
if (error != GRPC_ERROR_NONE) return error;
}
}
eds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
eds_update_map->emplace(UpbStringToStdString(cluster_name),
std::move(eds_update));
}
return GRPC_ERROR_NONE;
@ -1542,12 +1589,12 @@ grpc_error* XdsApi::ParseAdsResponse(
// Record the type_url, the version_info, and the nonce of the response.
upb_strview type_url_strview =
envoy_api_v2_DiscoveryResponse_type_url(response);
*type_url = std::string(type_url_strview.data, type_url_strview.size);
*type_url = UpbStringToStdString(type_url_strview);
upb_strview version_info =
envoy_api_v2_DiscoveryResponse_version_info(response);
*version = std::string(version_info.data, version_info.size);
*version = UpbStringToStdString(version_info);
upb_strview nonce_strview = envoy_api_v2_DiscoveryResponse_nonce(response);
*nonce = std::string(nonce_strview.data, nonce_strview.size);
*nonce = UpbStringToStdString(nonce_strview);
// Parse the response according to the resource type.
if (*type_url == kLdsTypeUrl) {
return LdsResponseParse(client_, tracer_, response, expected_server_name,

@ -45,18 +45,31 @@ class XdsApi {
static const char* kCdsTypeUrl;
static const char* kEdsTypeUrl;
struct RdsRoute {
std::string service;
std::string method;
std::string cluster_name;
bool operator==(const RdsRoute& other) const {
return (service == other.service && method == other.method &&
cluster_name == other.cluster_name);
}
};
struct RdsUpdate {
struct RdsRoute {
std::string service;
std::string method;
// TODO(donnadionne): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of cluster_name and
// weighted_clusters can be set.
std::string cluster_name;
struct ClusterWeight {
std::string name;
uint32_t weight;
bool operator==(const ClusterWeight& other) const {
return (name == other.name && weight == other.weight);
}
};
std::vector<ClusterWeight> weighted_clusters;
bool operator==(const RdsRoute& other) const {
return (service == other.service && method == other.method &&
cluster_name == other.cluster_name &&
weighted_clusters == other.weighted_clusters);
}
};
std::vector<RdsRoute> routes;
bool operator==(const RdsUpdate& other) const {

@ -2055,10 +2055,10 @@ std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
" }\n"
" } ]\n"
" }",
cluster_name.c_str(), cluster_name.c_str());
cluster_name, cluster_name);
}
std::string CreateServiceConfigRoute(const std::string& cluster_name,
std::string CreateServiceConfigRoute(const std::string& action_name,
const std::string& service,
const std::string& method) {
return absl::StrFormat(
@ -2067,43 +2067,203 @@ std::string CreateServiceConfigRoute(const std::string& cluster_name,
" \"service\": \"%s\",\n"
" \"method\": \"%s\"\n"
" },\n"
" \"action\": \"cds:%s\"\n"
" \"action\": \"%s\"\n"
" }",
service.c_str(), method.c_str(), cluster_name.c_str());
service, method, action_name);
}
// Create the service config for one weighted cluster.
std::string CreateServiceConfigActionWeightedCluster(
const std::string& name,
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& clusters) {
std::vector<std::string> config_parts;
config_parts.push_back(
absl::StrFormat(" \"weighted:%s\":{\n"
" \"child_policy\":[ {\n"
" \"weighted_target_experimental\":{\n"
" \"targets\":{\n",
name));
std::vector<std::string> weighted_targets;
weighted_targets.reserve(clusters.size());
for (const auto& cluster_weight : clusters) {
weighted_targets.push_back(absl::StrFormat(
" \"%s\":{\n"
" \"weight\":%d,\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_weight.name, cluster_weight.weight, cluster_weight.name));
}
config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
config_parts.push_back(
" }\n"
" }\n"
" } ]\n"
" }");
return absl::StrJoin(config_parts, "");
}
struct WeightedClustersKeys {
std::string cluster_names_key;
std::string cluster_weights_key;
};
// Returns the cluster names and weights key or the cluster names only key.
WeightedClustersKeys GetWeightedClustersKey(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
std::set<std::string> cluster_names;
std::set<std::string> cluster_weights;
for (const auto& cluster_weight : weighted_clusters) {
cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
cluster_weights.emplace(
absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
}
return {absl::StrJoin(cluster_names, "_"),
absl::StrJoin(cluster_weights, "_")};
}
} // namespace
std::string XdsClient::WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
auto cluster_names_map_it =
weighted_cluster_index_map_.find(keys.cluster_names_key);
GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
const auto& cluster_weights_map =
cluster_names_map_it->second.cluster_weights_map;
auto cluster_weights_map_it =
cluster_weights_map.find(keys.cluster_weights_key);
GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
return absl::StrFormat("%s_%d", keys.cluster_names_key,
cluster_weights_map_it->second);
}
void XdsClient::UpdateWeightedClusterIndexMap(
const XdsApi::RdsUpdate& rds_update) {
// Construct a list of unique WeightedCluster
// actions which we need to process: to find action names
std::map<std::string /* cluster_weights_key */,
std::string /* cluster_names_key */>
actions_to_process;
for (const auto& route : rds_update.routes) {
if (!route.weighted_clusters.empty()) {
WeightedClustersKeys keys =
GetWeightedClustersKey(route.weighted_clusters);
auto action_it = actions_to_process.find(keys.cluster_weights_key);
if (action_it == actions_to_process.end()) {
actions_to_process[std::move(keys.cluster_weights_key)] =
std::move(keys.cluster_names_key);
}
}
}
// First pass of all unique WeightedCluster actions: if the exact same
// weighted target policy (same clusters and weights) appears in the old map,
// then that old action name is taken again and should be moved to the new
// map; any other action names from the old set of actions are candidates for
// reuse.
XdsClient::WeightedClusterIndexMap new_weighted_cluster_index_map;
for (auto action_it = actions_to_process.begin();
action_it != actions_to_process.end();) {
const std::string& cluster_names_key = action_it->second;
const std::string& cluster_weights_key = action_it->first;
auto old_cluster_names_map_it =
weighted_cluster_index_map_.find(cluster_names_key);
if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
// Add cluster_names_key to the new map and copy next_index.
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
new_cluster_names_info.next_index =
old_cluster_names_map_it->second.next_index;
// Lookup cluster_weights_key in old map.
auto& old_cluster_weights_map =
old_cluster_names_map_it->second.cluster_weights_map;
auto old_cluster_weights_map_it =
old_cluster_weights_map.find(cluster_weights_key);
if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
// same policy found, move from old map to new map.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_map_it->second;
old_cluster_weights_map.erase(old_cluster_weights_map_it);
// This action has been added to new map, so no need to process it
// again.
action_it = actions_to_process.erase(action_it);
continue;
}
}
++action_it;
}
// Second pass of all remaining unique WeightedCluster actions: if clusters
// for a new action are the same as an old unused action, reuse the name. If
// clusters differ, use a brand new name.
for (const auto& action : actions_to_process) {
const std::string& cluster_names_key = action.second;
const std::string& cluster_weights_key = action.first;
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
auto& old_cluster_weights_map =
weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
auto old_cluster_weights_it = old_cluster_weights_map.begin();
if (old_cluster_weights_it != old_cluster_weights_map.end()) {
// There is something to reuse: this action uses the same set
// of clusters as a previous action and that action name is not
// already taken.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_it->second;
// Remove the name from being able to reuse again.
old_cluster_weights_map.erase(old_cluster_weights_it);
} else {
// There is nothing to reuse, take the next index to use and
// increment.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
new_cluster_names_info.next_index++;
}
}
weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
}
grpc_error* XdsClient::CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const {
RefCountedPtr<ServiceConfig>* service_config) {
UpdateWeightedClusterIndexMap(rds_update);
std::vector<std::string> actions_vector;
std::vector<std::string> route_table;
std::set<std::string> actions_set;
for (const auto& route : rds_update.routes) {
const std::string action_name =
route.weighted_clusters.empty()
? route.cluster_name
: WeightedClustersActionName(route.weighted_clusters);
if (actions_set.find(action_name) == actions_set.end()) {
actions_set.emplace(action_name);
actions_vector.push_back(
route.weighted_clusters.empty()
? CreateServiceConfigActionCluster(action_name)
: CreateServiceConfigActionWeightedCluster(
action_name, route.weighted_clusters));
}
route_table.push_back(CreateServiceConfigRoute(
absl::StrFormat("%s:%s",
route.weighted_clusters.empty() ? "cds" : "weighted",
action_name),
route.service, route.method));
}
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":{\n");
std::vector<std::string> actions_vector;
std::set<std::string> actions_set;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route = rds_update.routes[i];
if (actions_set.find(route.cluster_name) == actions_set.end()) {
actions_vector.push_back(
CreateServiceConfigActionCluster(route.cluster_name.c_str()));
actions_set.emplace(route.cluster_name);
}
}
config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
config_parts.push_back(
" },\n"
" \"routes\":[\n");
std::vector<std::string> routes_vector;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route_info = rds_update.routes[i];
routes_vector.push_back(CreateServiceConfigRoute(
route_info.cluster_name.c_str(), route_info.service.c_str(),
route_info.method.c_str()));
}
config_parts.push_back(absl::StrJoin(routes_vector, ",\n"));
config_parts.push_back(absl::StrJoin(route_table, ",\n"));
config_parts.push_back(
" ]\n"
" } }\n"

@ -232,9 +232,19 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// Sends an error notification to all watchers.
void NotifyOnError(grpc_error* error);
grpc_error* CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const;
// Returns the weighted_clusters action name to use from
// weighted_cluster_index_map_ for a WeightedClusters route action.
std::string WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters);
// Updates weighted_cluster_index_map_ that will
// determine the names of the WeightedCluster actions for the current update.
void UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate& rds_update);
// Create the service config generated by the RdsUpdate.
grpc_error* CreateServiceConfig(const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
bool send_all_clusters, const std::set<std::string>& clusters);
@ -275,6 +285,22 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
LoadReportState>
load_report_map_;
// 2-level map to store WeightedCluster action names.
// Top level map is keyed by cluster names without weight like a_b_c; bottom
// level map is keyed by cluster names + weights like a10_b50_c40.
struct ClusterNamesInfo {
uint64_t next_index = 0;
std::map<std::string /*cluster names + weights*/,
uint64_t /*policy index number*/>
cluster_weights_map;
};
using WeightedClusterIndexMap =
std::map<std::string /*cluster names*/, ClusterNamesInfo>;
// Cache of action names for WeightedCluster targets in the current
// service config.
WeightedClusterIndexMap weighted_cluster_index_map_;
bool shutting_down_ = false;
};

@ -26,6 +26,7 @@ syntax = "proto3";
package envoy.api.v2;
import "google/protobuf/any.proto";
import "google/protobuf/wrappers.proto";
import "src/proto/grpc/testing/xds/cds_for_test.proto";
@ -38,6 +39,15 @@ message RouteMatch {
}
}
message WeightedCluster {
message ClusterWeight {
string name = 1;
google.protobuf.UInt32Value weight = 2;
}
repeated ClusterWeight clusters = 1;
google.protobuf.UInt32Value total_weight = 3;
}
message RouteAction {
oneof cluster_specifier {
// Indicates the upstream cluster to which the request should be routed
@ -54,6 +64,12 @@ message RouteAction {
// Internally, Envoy always uses the HTTP/2 *:authority* header to represent the HTTP/1
// *Host* header. Thus, if attempting to match on *Host*, match on *:authority* instead.
string cluster_header = 2;
// Multiple upstream clusters can be specified for a given route. The
// request is routed to one of the upstream clusters based on weights
// assigned to each cluster. See
// :ref:`traffic splitting <config_http_conn_man_route_table_traffic_splitting_split>`
// for additional documentation.
WeightedCluster weighted_clusters = 3;
}
}

@ -1226,21 +1226,102 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
}
void ResetBackendCounters() {
for (auto& backend : backends_) backend->backend_service()->ResetCounters();
enum RpcService {
SERVICE_ECHO,
SERVICE_ECHO1,
SERVICE_ECHO2,
};
enum RpcMethod {
METHOD_ECHO,
METHOD_ECHO1,
METHOD_ECHO2,
};
struct RpcOptions {
RpcService service = SERVICE_ECHO;
RpcMethod method = METHOD_ECHO;
int timeout_ms = 1000;
bool wait_for_ready = false;
bool server_fail = false;
RpcOptions() {}
RpcOptions& set_rpc_service(RpcService rpc_service) {
service = rpc_service;
return *this;
}
RpcOptions& set_rpc_method(RpcMethod rpc_method) {
method = rpc_method;
return *this;
}
RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
timeout_ms = rpc_timeout_ms;
return *this;
}
RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
wait_for_ready = rpc_wait_for_ready;
return *this;
}
RpcOptions& set_server_fail(bool rpc_server_fail) {
server_fail = rpc_server_fail;
return *this;
}
};
template <typename Stub>
Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options,
ClientContext* context, EchoRequest& request,
EchoResponse* response) {
switch (rpc_options.method) {
case METHOD_ECHO:
return (*stub)->Echo(context, request, response);
case METHOD_ECHO1:
return (*stub)->Echo1(context, request, response);
case METHOD_ECHO2:
return (*stub)->Echo2(context, request, response);
}
}
bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0) {
void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0) {
if (stop_index == 0) stop_index = backends_.size();
for (size_t i = start_index; i < stop_index; ++i) {
if (backends_[i]->backend_service()->request_count() == 0) return false;
backends_[i]->backend_service()->ResetCounters();
backends_[i]->backend_service1()->ResetCounters();
backends_[i]->backend_service2()->ResetCounters();
}
}
bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0,
const RpcOptions& rpc_options = RpcOptions()) {
if (stop_index == 0) stop_index = backends_.size();
for (size_t i = start_index; i < stop_index; ++i) {
switch (rpc_options.service) {
case SERVICE_ECHO:
if (backends_[i]->backend_service()->request_count() == 0)
return false;
break;
case SERVICE_ECHO1:
if (backends_[i]->backend_service1()->request_count() == 0)
return false;
break;
case SERVICE_ECHO2:
if (backends_[i]->backend_service2()->request_count() == 0)
return false;
break;
}
}
return true;
}
void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure,
int* num_drops) {
const Status status = SendRpc();
int* num_drops,
const RpcOptions& rpc_options = RpcOptions()) {
const Status status = SendRpc(rpc_options);
if (status.ok()) {
++*num_ok;
} else {
@ -1253,15 +1334,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
++*num_total;
}
std::tuple<int, int, int> WaitForAllBackends(size_t start_index = 0,
size_t stop_index = 0,
bool reset_counters = true) {
std::tuple<int, int, int> WaitForAllBackends(
size_t start_index = 0, size_t stop_index = 0, bool reset_counters = true,
const RpcOptions& rpc_options = RpcOptions()) {
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
int num_total = 0;
while (!SeenAllBackends(start_index, stop_index)) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
while (!SeenAllBackends(start_index, stop_index, rpc_options)) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops,
rpc_options);
}
if (reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO,
@ -1377,67 +1459,6 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return backend_ports;
}
enum RpcService {
SERVICE_ECHO,
SERVICE_ECHO1,
SERVICE_ECHO2,
};
enum RpcMethod {
METHOD_ECHO,
METHOD_ECHO1,
METHOD_ECHO2,
};
struct RpcOptions {
RpcService service = SERVICE_ECHO;
RpcMethod method = METHOD_ECHO;
int timeout_ms = 1000;
bool wait_for_ready = false;
bool server_fail = false;
RpcOptions() {}
RpcOptions& set_rpc_service(RpcService rpc_service) {
service = rpc_service;
return *this;
}
RpcOptions& set_rpc_method(RpcMethod rpc_method) {
method = rpc_method;
return *this;
}
RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
timeout_ms = rpc_timeout_ms;
return *this;
}
RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
wait_for_ready = rpc_wait_for_ready;
return *this;
}
RpcOptions& set_server_fail(bool rpc_server_fail) {
server_fail = rpc_server_fail;
return *this;
}
};
template <typename Stub>
Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options,
ClientContext* context, EchoRequest& request,
EchoResponse* response) {
switch (rpc_options.method) {
case METHOD_ECHO:
return (*stub)->Echo(context, request, response);
case METHOD_ECHO1:
return (*stub)->Echo1(context, request, response);
case METHOD_ECHO2:
return (*stub)->Echo2(context, request, response);
}
}
Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
EchoResponse* response = nullptr) {
const bool local_response = (response == nullptr);
@ -2607,9 +2628,10 @@ TEST_P(LdsRdsTest, RouteHasNoRouteAction) {
EXPECT_EQ(response_state.error_message, "No RouteAction found in route.");
}
// Tests that LDS client should send a NACK if RouteAction has a
// cluster_specifier other than cluster in the LDS response.
TEST_P(LdsRdsTest, RouteActionHasNoCluster) {
// Tests that LDS client should send a NACK if route has a
// cluster_specifier other than cluster or weighted_clusters in the LDS
// response.
TEST_P(LdsRdsTest, RouteActionUnsupportedClusterSpecifier) {
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
route_config.mutable_virtual_hosts(0)
@ -2622,7 +2644,126 @@ TEST_P(LdsRdsTest, RouteActionHasNoCluster) {
CheckRpcSendFailure();
const auto& response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message, "No cluster found in RouteAction.");
EXPECT_EQ(response_state.error_message,
"No cluster or weighted_clusters found in RouteAction.");
}
TEST_P(LdsRdsTest, RouteActionClusterHasEmptyClusterName) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
route1->mutable_route()->set_cluster("");
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, route_config);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message,
"RouteAction cluster contains empty cluster name.");
}
TEST_P(LdsRdsTest, RouteActionWeightedTargetHasIncorrectTotalWeightSet) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const size_t kWeight75 = 75;
const char* kNewCluster1Name = "new_cluster_1";
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name(kNewCluster1Name);
weighted_cluster1->mutable_weight()->set_value(kWeight75);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75 + 1);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, route_config);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message,
"RouteAction weighted_cluster has incorrect total weight");
}
TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasEmptyClusterName) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const size_t kWeight75 = 75;
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name("");
weighted_cluster1->mutable_weight()->set_value(kWeight75);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, route_config);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(
response_state.error_message,
"RouteAction weighted_cluster cluster contains empty cluster name.");
}
TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasNoWeight) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const size_t kWeight75 = 75;
const char* kNewCluster1Name = "new_cluster_1";
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name(kNewCluster1Name);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, route_config);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
const auto& response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
EXPECT_EQ(response_state.error_message,
"RouteAction weighted_cluster cluster missing weight");
}
// Tests that LDS client times out when no response received.
@ -2787,6 +2928,370 @@ TEST_P(LdsRdsTest, XdsRoutingPrefixMatching) {
EXPECT_EQ(kNumEcho2Rpcs, backends_[3]->backend_service2()->request_count());
}
TEST_P(LdsRdsTest, XdsRoutingWeightedCluster) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumEcho1Rpcs = 1000;
const size_t kNumEchoRpcs = 10;
const size_t kWeight75 = 75;
const size_t kWeight25 = 25;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(1, 2)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(2, 3)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args1, kNewCluster1Name));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewCluster2Name));
// Populate new CDS resources.
Cluster new_cluster1 = balancers_[0]->ads_service()->default_cluster();
new_cluster1.set_name(kNewCluster1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = balancers_[0]->ads_service()->default_cluster();
new_cluster2.set_name(kNewCluster2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2);
// Populating Route Configurations for LDS.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name(kNewCluster1Name);
weighted_cluster1->mutable_weight()->set_value(kWeight75);
auto* weighted_cluster2 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster2->set_name(kNewCluster2Name);
weighted_cluster2->mutable_weight()->set_value(kWeight25);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75 + kWeight25);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, new_route_config);
WaitForAllBackends(0, 1);
WaitForAllBackends(1, 3, true, RpcOptions().set_rpc_service(SERVICE_ECHO1));
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(kNumEchoRpcs, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[0]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
const int weight_75_request_count =
backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
const int weight_25_request_count =
backends_[2]->backend_service1()->request_count();
const double kErrorTolerance = 0.2;
EXPECT_THAT(weight_75_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight75 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight75 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_25_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight25 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight25 / 100 *
(1 + kErrorTolerance))));
}
TEST_P(LdsRdsTest, XdsRoutingWeightedClusterUpdateWeights) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "anew_cluster_2";
const char* kNewCluster3Name = "new_cluster_3";
const size_t kNumEcho1Rpcs = 1000;
const size_t kNumEchoRpcs = 10;
const size_t kWeight75 = 75;
const size_t kWeight25 = 25;
const size_t kWeight50 = 50;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(1, 2)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(2, 3)},
});
AdsServiceImpl::EdsResourceArgs args3({
{"locality0", GetBackendPorts(3, 4)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args1, kNewCluster1Name));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewCluster2Name));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args3, kNewCluster3Name));
// Populate new CDS resources.
Cluster new_cluster1 = balancers_[0]->ads_service()->default_cluster();
new_cluster1.set_name(kNewCluster1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = balancers_[0]->ads_service()->default_cluster();
new_cluster2.set_name(kNewCluster2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2);
Cluster new_cluster3 = balancers_[0]->ads_service()->default_cluster();
new_cluster3.set_name(kNewCluster3Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster3);
// Populating Route Configurations.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name(kNewCluster1Name);
weighted_cluster1->mutable_weight()->set_value(kWeight75);
auto* weighted_cluster2 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster2->set_name(kNewCluster2Name);
weighted_cluster2->mutable_weight()->set_value(kWeight25);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75 + kWeight25);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, new_route_config);
WaitForAllBackends(0, 1);
WaitForAllBackends(1, 3, true, RpcOptions().set_rpc_service(SERVICE_ECHO1));
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(kNumEchoRpcs, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[0]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
const int weight_75_request_count =
backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[1]->backend_service2()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
const int weight_25_request_count =
backends_[2]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
const double kErrorTolerance = 0.2;
EXPECT_THAT(weight_75_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight75 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight75 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_25_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight25 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight25 / 100 *
(1 + kErrorTolerance))));
// Change Route Configurations: same clusters different weights.
weighted_cluster1->mutable_weight()->set_value(kWeight50);
weighted_cluster2->mutable_weight()->set_value(kWeight50);
// Change default route to a new cluster to help to identify when new polices
// are seen by the client.
default_route->mutable_route()->set_cluster(kNewCluster3Name);
SetRouteConfiguration(0, new_route_config);
ResetBackendCounters();
WaitForAllBackends(3, 4);
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[0]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
const int weight_50_request_count_1 =
backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
const int weight_50_request_count_2 =
backends_[2]->backend_service1()->request_count();
EXPECT_EQ(kNumEchoRpcs, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
EXPECT_THAT(weight_50_request_count_1,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight50 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight50 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_50_request_count_2,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight50 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight50 / 100 *
(1 + kErrorTolerance))));
}
TEST_P(LdsRdsTest, XdsRoutingWeightedClusterUpdateClusters) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "anew_cluster_2";
const char* kNewCluster3Name = "new_cluster_3";
const size_t kNumEcho1Rpcs = 1000;
const size_t kNumEchoRpcs = 10;
const size_t kWeight75 = 75;
const size_t kWeight25 = 25;
const size_t kWeight50 = 50;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(1, 2)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(2, 3)},
});
AdsServiceImpl::EdsResourceArgs args3({
{"locality0", GetBackendPorts(3, 4)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args1, kNewCluster1Name));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewCluster2Name));
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args3, kNewCluster3Name));
// Populate new CDS resources.
Cluster new_cluster1 = balancers_[0]->ads_service()->default_cluster();
new_cluster1.set_name(kNewCluster1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = balancers_[0]->ads_service()->default_cluster();
new_cluster2.set_name(kNewCluster2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2);
Cluster new_cluster3 = balancers_[0]->ads_service()->default_cluster();
new_cluster3.set_name(kNewCluster3Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster3);
// Populating Route Configurations.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
auto* weighted_cluster1 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster1->set_name(kNewCluster1Name);
weighted_cluster1->mutable_weight()->set_value(kWeight75);
auto* weighted_cluster2 =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster2->set_name(kDefaultResourceName);
weighted_cluster2->mutable_weight()->set_value(kWeight25);
route1->mutable_route()
->mutable_weighted_clusters()
->mutable_total_weight()
->set_value(kWeight75 + kWeight25);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
SetRouteConfiguration(0, new_route_config);
WaitForAllBackends(0, 1);
WaitForAllBackends(1, 2, true, RpcOptions().set_rpc_service(SERVICE_ECHO1));
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(kNumEchoRpcs, backends_[0]->backend_service()->request_count());
int weight_25_request_count =
backends_[0]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
int weight_75_request_count =
backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
const double kErrorTolerance = 0.2;
EXPECT_THAT(weight_75_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight75 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight75 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_25_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight25 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight25 / 100 *
(1 + kErrorTolerance))));
// Change Route Configurations: new set of clusters with different weights.
weighted_cluster1->mutable_weight()->set_value(kWeight50);
weighted_cluster2->set_name(kNewCluster2Name);
weighted_cluster2->mutable_weight()->set_value(kWeight50);
SetRouteConfiguration(0, new_route_config);
ResetBackendCounters();
WaitForAllBackends(2, 3, true, RpcOptions().set_rpc_service(SERVICE_ECHO1));
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(kNumEchoRpcs, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[0]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
const int weight_50_request_count_1 =
backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
const int weight_50_request_count_2 =
backends_[2]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
EXPECT_THAT(weight_50_request_count_1,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight50 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight50 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_50_request_count_2,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight50 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight50 / 100 *
(1 + kErrorTolerance))));
// Change Route Configurations.
weighted_cluster1->mutable_weight()->set_value(kWeight75);
weighted_cluster2->set_name(kNewCluster3Name);
weighted_cluster2->mutable_weight()->set_value(kWeight25);
SetRouteConfiguration(0, new_route_config);
ResetBackendCounters();
WaitForAllBackends(3, 4, true, RpcOptions().set_rpc_service(SERVICE_ECHO1));
CheckRpcSendOk(kNumEchoRpcs);
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions().set_rpc_service(SERVICE_ECHO1));
// Make sure RPCs all go to the correct backend.
EXPECT_EQ(kNumEchoRpcs, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[0]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
weight_75_request_count = backends_[1]->backend_service1()->request_count();
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
weight_25_request_count = backends_[3]->backend_service1()->request_count();
EXPECT_THAT(weight_75_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight75 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight75 / 100 *
(1 + kErrorTolerance))));
EXPECT_THAT(weight_25_request_count,
::testing::AllOf(::testing::Ge(kNumEcho1Rpcs * kWeight25 / 100 *
(1 - kErrorTolerance)),
::testing::Le(kNumEcho1Rpcs * kWeight25 / 100 *
(1 + kErrorTolerance))));
}
using CdsTest = BasicTest;
// Tests that CDS client should send an ACK upon correct CDS response.

Loading…
Cancel
Save