Merge pull request #23781 from markdroth/xds_client_api

Move logic for generating service config from XdsClient to xds resolver.
pull/23787/head
Mark D. Roth 5 years ago committed by GitHub
commit a2f77ffe19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 387
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  2. 268
      src/core/ext/xds/xds_api.cc
  3. 37
      src/core/ext/xds/xds_api.h
  4. 360
      src/core/ext/xds/xds_client.cc
  5. 45
      src/core/ext/xds/xds_client.h

@ -18,6 +18,8 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "absl/strings/str_join.h"
#include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client.h"
@ -68,12 +70,11 @@ class XdsResolver : public Resolver {
} }
private: private:
class ServiceConfigWatcher : public XdsClient::ServiceConfigWatcherInterface { class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
public: public:
explicit ServiceConfigWatcher(RefCountedPtr<XdsResolver> resolver) explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {} : resolver_(std::move(resolver)) {}
void OnServiceConfigChanged( void OnListenerChanged(XdsApi::LdsUpdate listener_data) override;
RefCountedPtr<ServiceConfig> service_config) override;
void OnError(grpc_error* error) override; void OnError(grpc_error* error) override;
void OnResourceDoesNotExist() override; void OnResourceDoesNotExist() override;
@ -88,32 +89,75 @@ class XdsResolver : public Resolver {
} }
}; };
// Returns the weighted_clusters action name to use from
// weighted_cluster_index_map_ for a WeightedClusters route action.
std::string WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters);
// Updates weighted_cluster_index_map_ that will
// determine the names of the WeightedCluster actions for the current update.
void UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate& rds_update);
// Create the service config generated by the RdsUpdate.
grpc_error* CreateServiceConfig(const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config);
std::string server_name_; std::string server_name_;
const grpc_channel_args* args_; const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_; grpc_pollset_set* interested_parties_;
OrphanablePtr<XdsClient> xds_client_; OrphanablePtr<XdsClient> xds_client_;
RefCountedPtr<XdsConfigSelector> config_selector_; RefCountedPtr<XdsConfigSelector> config_selector_;
// 2-level map to store WeightedCluster action names.
// Top level map is keyed by cluster names without weight like a_b_c; bottom
// level map is keyed by cluster names + weights like a10_b50_c40.
struct ClusterNamesInfo {
uint64_t next_index = 0;
std::map<std::string /*cluster names + weights*/,
uint64_t /*policy index number*/>
cluster_weights_map;
};
using WeightedClusterIndexMap =
std::map<std::string /*cluster names*/, ClusterNamesInfo>;
// Cache of action names for WeightedCluster targets in the current
// service config.
WeightedClusterIndexMap weighted_cluster_index_map_;
}; };
void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged( //
RefCountedPtr<ServiceConfig> service_config) { // XdsResolver::ListenerWatcher
//
void XdsResolver::ListenerWatcher::OnListenerChanged(
XdsApi::LdsUpdate listener_data) {
if (resolver_->xds_client_ == nullptr) return; if (resolver_->xds_client_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated service config: %s", gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data",
resolver_.get(), service_config->json_string().c_str()); resolver_.get());
}
Result result;
grpc_error* error = resolver_->CreateServiceConfig(*listener_data.rds_update,
&result.service_config);
if (error != GRPC_ERROR_NONE) {
OnError(error);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s",
resolver_.get(), result.service_config->json_string().c_str());
} }
grpc_arg new_args[] = { grpc_arg new_args[] = {
resolver_->xds_client_->MakeChannelArg(), resolver_->xds_client_->MakeChannelArg(),
resolver_->config_selector_->MakeChannelArg(), resolver_->config_selector_->MakeChannelArg(),
}; };
Result result;
result.args = grpc_channel_args_copy_and_add(resolver_->args_, new_args, result.args = grpc_channel_args_copy_and_add(resolver_->args_, new_args,
GPR_ARRAY_SIZE(new_args)); GPR_ARRAY_SIZE(new_args));
result.service_config = std::move(service_config);
resolver_->result_handler()->ReturnResult(std::move(result)); resolver_->result_handler()->ReturnResult(std::move(result));
} }
void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) { void XdsResolver::ListenerWatcher::OnError(grpc_error* error) {
if (resolver_->xds_client_ == nullptr) return; if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(), gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(),
grpc_error_string(error)); grpc_error_string(error));
@ -125,7 +169,7 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
resolver_->result_handler()->ReturnResult(std::move(result)); resolver_->result_handler()->ReturnResult(std::move(result));
} }
void XdsResolver::ServiceConfigWatcher::OnResourceDoesNotExist() { void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() {
if (resolver_->xds_client_ == nullptr) return; if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xds_resolver %p] LDS/RDS resource does not exist -- returning " "[xds_resolver %p] LDS/RDS resource does not exist -- returning "
@ -139,11 +183,15 @@ void XdsResolver::ServiceConfigWatcher::OnResourceDoesNotExist() {
resolver_->result_handler()->ReturnResult(std::move(result)); resolver_->result_handler()->ReturnResult(std::move(result));
} }
//
// XdsResolver
//
void XdsResolver::StartLocked() { void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>( xds_client_ = MakeOrphanable<XdsClient>(
work_serializer(), interested_parties_, server_name_, work_serializer(), interested_parties_, server_name_,
absl::make_unique<ServiceConfigWatcher>(Ref()), *args_, &error); absl::make_unique<ListenerWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in " "Failed to create xds client -- channel will remain in "
@ -153,6 +201,319 @@ void XdsResolver::StartLocked() {
} }
} }
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
return absl::StrFormat(
" \"cds:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_name, cluster_name);
}
std::string CreateServiceConfigRoute(const std::string& action_name,
const XdsApi::RdsUpdate::RdsRoute& route) {
std::vector<std::string> headers;
for (const auto& header : route.matchers.header_matchers) {
std::string header_matcher;
switch (header.type) {
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::EXACT:
header_matcher = absl::StrFormat(" \"exact_match\": \"%s\"",
header.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::REGEX:
header_matcher = absl::StrFormat(" \"regex_match\": \"%s\"",
header.regex_match->pattern());
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::RANGE:
header_matcher = absl::StrFormat(
" \"range_match\":{\n"
" \"start\":%d,\n"
" \"end\":%d\n"
" }",
header.range_start, header.range_end);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::PRESENT:
header_matcher =
absl::StrFormat(" \"present_match\": %s",
header.present_match ? "true" : "false");
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::PREFIX:
header_matcher = absl::StrFormat(
" \"prefix_match\": \"%s\"", header.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::SUFFIX:
header_matcher = absl::StrFormat(
" \"suffix_match\": \"%s\"", header.string_matcher);
break;
default:
break;
}
std::vector<std::string> header_parts;
header_parts.push_back(
absl::StrFormat(" { \n"
" \"name\": \"%s\",\n",
header.name));
header_parts.push_back(header_matcher);
if (header.invert_match) {
header_parts.push_back(
absl::StrFormat(",\n"
" \"invert_match\": true"));
}
header_parts.push_back(
absl::StrFormat("\n"
" }"));
headers.push_back(absl::StrJoin(header_parts, ""));
}
std::vector<std::string> headers_service_config;
if (!headers.empty()) {
headers_service_config.push_back("\"headers\":[\n");
headers_service_config.push_back(absl::StrJoin(headers, ","));
headers_service_config.push_back(" ],\n");
}
std::string path_match_str;
switch (route.matchers.path_matcher.type) {
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
PREFIX:
path_match_str = absl::StrFormat(
"\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
PATH:
path_match_str = absl::StrFormat(
"\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
REGEX:
path_match_str =
absl::StrFormat("\"regex\": \"%s\",\n",
route.matchers.path_matcher.regex_matcher->pattern());
break;
}
return absl::StrFormat(
" { \n"
" %s"
" %s"
" %s"
" \"action\": \"%s\"\n"
" }",
path_match_str, absl::StrJoin(headers_service_config, ""),
route.matchers.fraction_per_million.has_value()
? absl::StrFormat("\"match_fraction\":%d,\n",
route.matchers.fraction_per_million.value())
: "",
action_name);
}
// Create the service config for one weighted cluster.
std::string CreateServiceConfigActionWeightedCluster(
const std::string& name,
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& clusters) {
std::vector<std::string> config_parts;
config_parts.push_back(
absl::StrFormat(" \"weighted:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"weighted_target_experimental\":{\n"
" \"targets\":{\n",
name));
std::vector<std::string> weighted_targets;
weighted_targets.reserve(clusters.size());
for (const auto& cluster_weight : clusters) {
weighted_targets.push_back(absl::StrFormat(
" \"%s\":{\n"
" \"weight\":%d,\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_weight.name, cluster_weight.weight, cluster_weight.name));
}
config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
config_parts.push_back(
" }\n"
" }\n"
" } ]\n"
" }");
return absl::StrJoin(config_parts, "");
}
struct WeightedClustersKeys {
std::string cluster_names_key;
std::string cluster_weights_key;
};
// Returns the cluster names and weights key or the cluster names only key.
WeightedClustersKeys GetWeightedClustersKey(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
std::set<std::string> cluster_names;
std::set<std::string> cluster_weights;
for (const auto& cluster_weight : weighted_clusters) {
cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
cluster_weights.emplace(
absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
}
return {absl::StrJoin(cluster_names, "_"),
absl::StrJoin(cluster_weights, "_")};
}
std::string XdsResolver::WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
auto cluster_names_map_it =
weighted_cluster_index_map_.find(keys.cluster_names_key);
GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
const auto& cluster_weights_map =
cluster_names_map_it->second.cluster_weights_map;
auto cluster_weights_map_it =
cluster_weights_map.find(keys.cluster_weights_key);
GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
return absl::StrFormat("%s_%d", keys.cluster_names_key,
cluster_weights_map_it->second);
}
void XdsResolver::UpdateWeightedClusterIndexMap(
const XdsApi::RdsUpdate& rds_update) {
// Construct a list of unique WeightedCluster
// actions which we need to process: to find action names
std::map<std::string /* cluster_weights_key */,
std::string /* cluster_names_key */>
actions_to_process;
for (const auto& route : rds_update.routes) {
if (!route.weighted_clusters.empty()) {
WeightedClustersKeys keys =
GetWeightedClustersKey(route.weighted_clusters);
auto action_it = actions_to_process.find(keys.cluster_weights_key);
if (action_it == actions_to_process.end()) {
actions_to_process[std::move(keys.cluster_weights_key)] =
std::move(keys.cluster_names_key);
}
}
}
// First pass of all unique WeightedCluster actions: if the exact same
// weighted target policy (same clusters and weights) appears in the old map,
// then that old action name is taken again and should be moved to the new
// map; any other action names from the old set of actions are candidates for
// reuse.
XdsResolver::WeightedClusterIndexMap new_weighted_cluster_index_map;
for (auto action_it = actions_to_process.begin();
action_it != actions_to_process.end();) {
const std::string& cluster_names_key = action_it->second;
const std::string& cluster_weights_key = action_it->first;
auto old_cluster_names_map_it =
weighted_cluster_index_map_.find(cluster_names_key);
if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
// Add cluster_names_key to the new map and copy next_index.
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
new_cluster_names_info.next_index =
old_cluster_names_map_it->second.next_index;
// Lookup cluster_weights_key in old map.
auto& old_cluster_weights_map =
old_cluster_names_map_it->second.cluster_weights_map;
auto old_cluster_weights_map_it =
old_cluster_weights_map.find(cluster_weights_key);
if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
// same policy found, move from old map to new map.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_map_it->second;
old_cluster_weights_map.erase(old_cluster_weights_map_it);
// This action has been added to new map, so no need to process it
// again.
action_it = actions_to_process.erase(action_it);
continue;
}
}
++action_it;
}
// Second pass of all remaining unique WeightedCluster actions: if clusters
// for a new action are the same as an old unused action, reuse the name. If
// clusters differ, use a brand new name.
for (const auto& action : actions_to_process) {
const std::string& cluster_names_key = action.second;
const std::string& cluster_weights_key = action.first;
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
auto& old_cluster_weights_map =
weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
auto old_cluster_weights_it = old_cluster_weights_map.begin();
if (old_cluster_weights_it != old_cluster_weights_map.end()) {
// There is something to reuse: this action uses the same set
// of clusters as a previous action and that action name is not
// already taken.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_it->second;
// Remove the name from being able to reuse again.
old_cluster_weights_map.erase(old_cluster_weights_it);
} else {
// There is nothing to reuse, take the next index to use and
// increment.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
new_cluster_names_info.next_index++;
}
}
weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
}
grpc_error* XdsResolver::CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) {
UpdateWeightedClusterIndexMap(rds_update);
std::vector<std::string> actions_vector;
std::vector<std::string> route_table;
std::set<std::string> actions_set;
for (const auto& route : rds_update.routes) {
const std::string action_name =
route.weighted_clusters.empty()
? route.cluster_name
: WeightedClustersActionName(route.weighted_clusters);
if (actions_set.find(action_name) == actions_set.end()) {
actions_set.emplace(action_name);
actions_vector.push_back(
route.weighted_clusters.empty()
? CreateServiceConfigActionCluster(action_name)
: CreateServiceConfigActionWeightedCluster(
action_name, route.weighted_clusters));
}
route_table.push_back(CreateServiceConfigRoute(
absl::StrFormat("%s:%s",
route.weighted_clusters.empty() ? "cds" : "weighted",
action_name),
route));
}
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":{\n");
config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
config_parts.push_back(
" },\n"
" \"routes\":[\n");
config_parts.push_back(absl::StrJoin(route_table, ",\n"));
config_parts.push_back(
" ]\n"
" } }\n"
" ]\n"
"}");
std::string json = absl::StrJoin(config_parts, "");
grpc_error* error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(json.c_str(), &error);
return error;
}
// //
// Factory // Factory
// //

@ -74,101 +74,44 @@
namespace grpc_core { namespace grpc_core {
// //
// XdsApi::PriorityListUpdate // XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher
// //
bool XdsApi::PriorityListUpdate::operator==( XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcher(
const XdsApi::PriorityListUpdate& other) const { const PathMatcher& other)
if (priorities_.size() != other.priorities_.size()) return false; : type(other.type) {
for (size_t i = 0; i < priorities_.size(); ++i) { if (type == PathMatcherType::REGEX) {
if (priorities_[i].localities != other.priorities_[i].localities) { regex_matcher = absl::make_unique<RE2>(other.regex_matcher->pattern());
return false; } else {
} string_matcher = other.string_matcher;
} }
return true;
} }
void XdsApi::PriorityListUpdate::Add( XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher&
XdsApi::PriorityListUpdate::LocalityMap::Locality locality) { XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::operator=(
// Pad the missing priorities in case the localities are not ordered by const PathMatcher& other) {
// priority. type = other.type;
if (!Contains(locality.priority)) priorities_.resize(locality.priority + 1); if (type == PathMatcherType::REGEX) {
LocalityMap& locality_map = priorities_[locality.priority]; regex_matcher = absl::make_unique<RE2>(other.regex_matcher->pattern());
locality_map.localities.emplace(locality.name, std::move(locality)); } else {
string_matcher = other.string_matcher;
} }
return *this;
const XdsApi::PriorityListUpdate::LocalityMap* XdsApi::PriorityListUpdate::Find(
uint32_t priority) const {
if (!Contains(priority)) return nullptr;
return &priorities_[priority];
} }
bool XdsApi::PriorityListUpdate::Contains( bool XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::operator==(
const RefCountedPtr<XdsLocalityName>& name) { const PathMatcher& other) const {
for (size_t i = 0; i < priorities_.size(); ++i) { if (type != other.type) return false;
const LocalityMap& locality_map = priorities_[i]; if (type == PathMatcherType::REGEX) {
if (locality_map.Contains(name)) return true; // Should never be null.
} if (regex_matcher == nullptr || other.regex_matcher == nullptr) {
return false; return false;
} }
return regex_matcher->pattern() == other.regex_matcher->pattern();
//
// XdsApi::DropConfig
//
bool XdsApi::DropConfig::ShouldDrop(const std::string** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const uint32_t random = static_cast<uint32_t>(rand()) % 1000000;
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;
}
} }
return false; return string_matcher == other.string_matcher;
} }
//
// XdsApi
//
const char* XdsApi::kLdsTypeUrl =
"type.googleapis.com/envoy.config.listener.v3.Listener";
const char* XdsApi::kRdsTypeUrl =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
const char* XdsApi::kCdsTypeUrl =
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
const char* XdsApi::kEdsTypeUrl =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
namespace {
const char* kLdsV2TypeUrl = "type.googleapis.com/envoy.api.v2.Listener";
const char* kRdsV2TypeUrl =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
const char* kCdsV2TypeUrl = "type.googleapis.com/envoy.api.v2.Cluster";
const char* kEdsV2TypeUrl =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
bool IsLds(absl::string_view type_url) {
return type_url == XdsApi::kLdsTypeUrl || type_url == kLdsV2TypeUrl;
}
bool IsRds(absl::string_view type_url) {
return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl;
}
bool IsCds(absl::string_view type_url) {
return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl;
}
bool IsEds(absl::string_view type_url) {
return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl;
}
} // namespace
std::string XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::ToString() std::string XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::ToString()
const { const {
std::string path_type_string; std::string path_type_string;
@ -185,12 +128,75 @@ std::string XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::ToString()
default: default:
break; break;
} }
return absl::StrFormat("Path %s:/%s/", path_type_string, return absl::StrFormat("Path %s:%s", path_type_string,
type == PathMatcherType::REGEX type == PathMatcherType::REGEX
? regex_matcher->pattern() ? regex_matcher->pattern()
: string_matcher); : string_matcher);
} }
//
// XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher
//
XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::HeaderMatcher(
const HeaderMatcher& other)
: name(other.name), type(other.type), invert_match(other.invert_match) {
switch (type) {
case HeaderMatcherType::REGEX:
regex_match = absl::make_unique<RE2>(other.regex_match->pattern());
break;
case HeaderMatcherType::RANGE:
range_start = other.range_start;
range_end = other.range_end;
break;
case HeaderMatcherType::PRESENT:
present_match = other.present_match;
break;
default:
string_matcher = other.string_matcher;
}
}
XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher&
XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::operator=(
const HeaderMatcher& other) {
name = other.name;
type = other.type;
invert_match = other.invert_match;
switch (type) {
case HeaderMatcherType::REGEX:
regex_match = absl::make_unique<RE2>(other.regex_match->pattern());
break;
case HeaderMatcherType::RANGE:
range_start = other.range_start;
range_end = other.range_end;
break;
case HeaderMatcherType::PRESENT:
present_match = other.present_match;
break;
default:
string_matcher = other.string_matcher;
}
return *this;
}
bool XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::operator==(
const HeaderMatcher& other) const {
if (name != other.name) return false;
if (type != other.type) return false;
if (invert_match != other.invert_match) return false;
switch (type) {
case HeaderMatcherType::REGEX:
return regex_match->pattern() != other.regex_match->pattern();
case HeaderMatcherType::RANGE:
return range_start != other.range_start && range_end != other.range_end;
case HeaderMatcherType::PRESENT:
return present_match != other.present_match;
default:
return string_matcher != other.string_matcher;
}
}
std::string XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::ToString() std::string XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::ToString()
const { const {
switch (type) { switch (type) {
@ -257,6 +263,102 @@ std::string XdsApi::RdsUpdate::ToString() const {
return absl::StrJoin(contents, ",\n"); return absl::StrJoin(contents, ",\n");
} }
//
// XdsApi::PriorityListUpdate
//
bool XdsApi::PriorityListUpdate::operator==(
const XdsApi::PriorityListUpdate& other) const {
if (priorities_.size() != other.priorities_.size()) return false;
for (size_t i = 0; i < priorities_.size(); ++i) {
if (priorities_[i].localities != other.priorities_[i].localities) {
return false;
}
}
return true;
}
void XdsApi::PriorityListUpdate::Add(
XdsApi::PriorityListUpdate::LocalityMap::Locality locality) {
// Pad the missing priorities in case the localities are not ordered by
// priority.
if (!Contains(locality.priority)) priorities_.resize(locality.priority + 1);
LocalityMap& locality_map = priorities_[locality.priority];
locality_map.localities.emplace(locality.name, std::move(locality));
}
const XdsApi::PriorityListUpdate::LocalityMap* XdsApi::PriorityListUpdate::Find(
uint32_t priority) const {
if (!Contains(priority)) return nullptr;
return &priorities_[priority];
}
bool XdsApi::PriorityListUpdate::Contains(
const RefCountedPtr<XdsLocalityName>& name) {
for (size_t i = 0; i < priorities_.size(); ++i) {
const LocalityMap& locality_map = priorities_[i];
if (locality_map.Contains(name)) return true;
}
return false;
}
//
// XdsApi::DropConfig
//
bool XdsApi::DropConfig::ShouldDrop(const std::string** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const uint32_t random = static_cast<uint32_t>(rand()) % 1000000;
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;
}
}
return false;
}
//
// XdsApi
//
const char* XdsApi::kLdsTypeUrl =
"type.googleapis.com/envoy.config.listener.v3.Listener";
const char* XdsApi::kRdsTypeUrl =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
const char* XdsApi::kCdsTypeUrl =
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
const char* XdsApi::kEdsTypeUrl =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
namespace {
const char* kLdsV2TypeUrl = "type.googleapis.com/envoy.api.v2.Listener";
const char* kRdsV2TypeUrl =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
const char* kCdsV2TypeUrl = "type.googleapis.com/envoy.api.v2.Cluster";
const char* kEdsV2TypeUrl =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
bool IsLds(absl::string_view type_url) {
return type_url == XdsApi::kLdsTypeUrl || type_url == kLdsV2TypeUrl;
}
bool IsRds(absl::string_view type_url) {
return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl;
}
bool IsCds(absl::string_view type_url) {
return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl;
}
bool IsEds(absl::string_view type_url) {
return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl;
}
} // namespace
XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer, XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer,
const XdsBootstrap* bootstrap) const XdsBootstrap* bootstrap)
: client_(client), : client_(client),

@ -61,19 +61,14 @@ class XdsApi {
PathMatcherType type; PathMatcherType type;
std::string string_matcher; std::string string_matcher;
std::unique_ptr<RE2> regex_matcher; std::unique_ptr<RE2> regex_matcher;
bool operator==(const PathMatcher& other) const {
if (type != other.type) return false; PathMatcher() = default;
if (type == PathMatcherType::REGEX) { PathMatcher(const PathMatcher& other);
// Should never be null. PathMatcher& operator=(const PathMatcher& other);
if (regex_matcher == nullptr || other.regex_matcher == nullptr) { bool operator==(const PathMatcher& other) const;
return false;
}
return regex_matcher->pattern() == other.regex_matcher->pattern();
}
return string_matcher == other.string_matcher;
}
std::string ToString() const; std::string ToString() const;
}; };
struct HeaderMatcher { struct HeaderMatcher {
enum class HeaderMatcherType { enum class HeaderMatcherType {
EXACT, // value stored in string_matcher field EXACT, // value stored in string_matcher field
@ -93,19 +88,18 @@ class XdsApi {
// invert_match field may or may not exisit, so initialize it to // invert_match field may or may not exisit, so initialize it to
// false. // false.
bool invert_match = false; bool invert_match = false;
bool operator==(const HeaderMatcher& other) const {
return (name == other.name && type == other.type && HeaderMatcher() = default;
range_start == other.range_start && HeaderMatcher(const HeaderMatcher& other);
range_end == other.range_end && HeaderMatcher& operator=(const HeaderMatcher& other);
string_matcher == other.string_matcher && bool operator==(const HeaderMatcher& other) const;
present_match == other.present_match &&
invert_match == other.invert_match);
}
std::string ToString() const; std::string ToString() const;
}; };
PathMatcher path_matcher; PathMatcher path_matcher;
std::vector<HeaderMatcher> header_matchers; std::vector<HeaderMatcher> header_matchers;
absl::optional<uint32_t> fraction_per_million; absl::optional<uint32_t> fraction_per_million;
bool operator==(const Matchers& other) const { bool operator==(const Matchers& other) const {
return (path_matcher == other.path_matcher && return (path_matcher == other.path_matcher &&
header_matchers == other.header_matchers && header_matchers == other.header_matchers &&
@ -113,13 +107,16 @@ class XdsApi {
} }
std::string ToString() const; std::string ToString() const;
}; };
Matchers matchers; Matchers matchers;
// Action for this route. // Action for this route.
// TODO(roth): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of the two fields can be set.
std::string cluster_name; std::string cluster_name;
struct ClusterWeight { struct ClusterWeight {
std::string name; std::string name;
uint32_t weight; uint32_t weight;
bool operator==(const ClusterWeight& other) const { bool operator==(const ClusterWeight& other) const {
return (name == other.name && weight == other.weight); return (name == other.name && weight == other.weight);
} }

@ -187,8 +187,7 @@ class XdsClient::ChannelState::AdsCallState
} }
if (type_url_ == XdsApi::kLdsTypeUrl || if (type_url_ == XdsApi::kLdsTypeUrl ||
type_url_ == XdsApi::kRdsTypeUrl) { type_url_ == XdsApi::kRdsTypeUrl) {
ads_calld_->xds_client()->service_config_watcher_->OnError( ads_calld_->xds_client()->listener_watcher_->OnError(watcher_error);
watcher_error);
} else if (type_url_ == XdsApi::kCdsTypeUrl) { } else if (type_url_ == XdsApi::kCdsTypeUrl) {
ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_]; ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
for (const auto& p : state.watchers) { for (const auto& p : state.watchers) {
@ -719,7 +718,7 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// Op: send request message. // Op: send request message.
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
if (xds_client()->service_config_watcher_ != nullptr) { if (xds_client()->listener_watcher_ != nullptr) {
Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_); Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_);
if (xds_client()->lds_result_.has_value() && if (xds_client()->lds_result_.has_value() &&
!xds_client()->lds_result_->route_config_name.empty()) { !xds_client()->lds_result_->route_config_name.empty()) {
@ -882,7 +881,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
xds_client()->rds_result_.reset(); xds_client()->rds_result_.reset();
} }
xds_client()->lds_result_.reset(); xds_client()->lds_result_.reset();
xds_client()->service_config_watcher_->OnResourceDoesNotExist(); xds_client()->listener_watcher_->OnResourceDoesNotExist();
return; return;
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -925,15 +924,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
if (xds_client()->lds_result_->rds_update.has_value()) { if (xds_client()->lds_result_->rds_update.has_value()) {
// If the RouteConfiguration was found inlined in LDS response, notify // If the RouteConfiguration was found inlined in LDS response, notify
// the watcher immediately. // the watcher immediately.
RefCountedPtr<ServiceConfig> service_config; xds_client()->listener_watcher_->OnListenerChanged(
grpc_error* error = xds_client()->CreateServiceConfig( *xds_client()->lds_result_);
xds_client()->lds_result_->rds_update.value(), &service_config);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
} else {
xds_client()->service_config_watcher_->OnError(error);
}
} else { } else {
// Send RDS request for dynamic resolution. // Send RDS request for dynamic resolution.
Subscribe(XdsApi::kRdsTypeUrl, Subscribe(XdsApi::kRdsTypeUrl,
@ -948,7 +940,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
"[xds_client %p] RDS update does not include requested resource", "[xds_client %p] RDS update does not include requested resource",
xds_client()); xds_client());
xds_client()->rds_result_.reset(); xds_client()->rds_result_.reset();
xds_client()->service_config_watcher_->OnResourceDoesNotExist(); xds_client()->listener_watcher_->OnResourceDoesNotExist();
return; return;
} }
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -977,15 +969,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
} }
xds_client()->rds_result_ = std::move(rds_update); xds_client()->rds_result_ = std::move(rds_update);
// Notify the watcher. // Notify the watcher.
RefCountedPtr<ServiceConfig> service_config; XdsApi::LdsUpdate lds_result = *xds_client()->lds_result_;
grpc_error* error = xds_client()->CreateServiceConfig( lds_result.rds_update = xds_client()->rds_result_;
xds_client()->rds_result_.value(), &service_config); xds_client()->listener_watcher_->OnListenerChanged(lds_result);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
} else {
xds_client()->service_config_watcher_->OnError(error);
}
} }
void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
@ -1759,7 +1745,7 @@ grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer, XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
absl::string_view server_name, absl::string_view server_name,
std::unique_ptr<ServiceConfigWatcherInterface> watcher, std::unique_ptr<ListenerWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error) const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace), : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)), request_timeout_(GetRequestTimeout(channel_args)),
@ -1769,7 +1755,7 @@ XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
api_(this, &grpc_xds_client_trace, bootstrap_.get()), api_(this, &grpc_xds_client_trace, bootstrap_.get()),
server_name_(server_name), server_name_(server_name),
service_config_watcher_(std::move(watcher)) { listener_watcher_(std::move(watcher)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
} }
@ -1792,7 +1778,7 @@ XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
} }
chand_ = MakeOrphanable<ChannelState>( chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
if (service_config_watcher_ != nullptr) { if (listener_watcher_ != nullptr) {
chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name)); chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name));
} }
} }
@ -1815,7 +1801,7 @@ void XdsClient::Orphan() {
// possible for ADS calls to be in progress. Unreffing the loadbalancing // possible for ADS calls to be in progress. Unreffing the loadbalancing
// policies before those calls are done would lead to issues such as // policies before those calls are done would lead to issues such as
// https://github.com/grpc/grpc/issues/20928. // https://github.com/grpc/grpc/issues/20928.
if (service_config_watcher_ != nullptr) { if (listener_watcher_ != nullptr) {
cluster_map_.clear(); cluster_map_.clear();
endpoint_map_.clear(); endpoint_map_.clear();
} }
@ -1990,322 +1976,6 @@ void XdsClient::ResetBackoff() {
} }
} }
namespace {
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
return absl::StrFormat(
" \"cds:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_name, cluster_name);
}
std::string CreateServiceConfigRoute(const std::string& action_name,
const XdsApi::RdsUpdate::RdsRoute& route) {
std::vector<std::string> headers;
for (const auto& header : route.matchers.header_matchers) {
std::string header_matcher;
switch (header.type) {
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::EXACT:
header_matcher = absl::StrFormat(" \"exact_match\": \"%s\"",
header.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::REGEX:
header_matcher = absl::StrFormat(" \"regex_match\": \"%s\"",
header.regex_match->pattern());
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::RANGE:
header_matcher = absl::StrFormat(
" \"range_match\":{\n"
" \"start\":%d,\n"
" \"end\":%d\n"
" }",
header.range_start, header.range_end);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::PRESENT:
header_matcher =
absl::StrFormat(" \"present_match\": %s",
header.present_match ? "true" : "false");
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::PREFIX:
header_matcher = absl::StrFormat(
" \"prefix_match\": \"%s\"", header.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
HeaderMatcherType::SUFFIX:
header_matcher = absl::StrFormat(
" \"suffix_match\": \"%s\"", header.string_matcher);
break;
default:
break;
}
std::vector<std::string> header_parts;
header_parts.push_back(
absl::StrFormat(" { \n"
" \"name\": \"%s\",\n",
header.name));
header_parts.push_back(header_matcher);
if (header.invert_match) {
header_parts.push_back(
absl::StrFormat(",\n"
" \"invert_match\": true"));
}
header_parts.push_back(
absl::StrFormat("\n"
" }"));
headers.push_back(absl::StrJoin(header_parts, ""));
}
std::vector<std::string> headers_service_config;
if (!headers.empty()) {
headers_service_config.push_back("\"headers\":[\n");
headers_service_config.push_back(absl::StrJoin(headers, ","));
headers_service_config.push_back(" ],\n");
}
std::string path_match_str;
switch (route.matchers.path_matcher.type) {
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
PREFIX:
path_match_str = absl::StrFormat(
"\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
PATH:
path_match_str = absl::StrFormat(
"\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
break;
case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
REGEX:
path_match_str =
absl::StrFormat("\"regex\": \"%s\",\n",
route.matchers.path_matcher.regex_matcher->pattern());
break;
}
return absl::StrFormat(
" { \n"
" %s"
" %s"
" %s"
" \"action\": \"%s\"\n"
" }",
path_match_str, absl::StrJoin(headers_service_config, ""),
route.matchers.fraction_per_million.has_value()
? absl::StrFormat("\"match_fraction\":%d,\n",
route.matchers.fraction_per_million.value())
: "",
action_name);
}
// Create the service config for one weighted cluster.
std::string CreateServiceConfigActionWeightedCluster(
const std::string& name,
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& clusters) {
std::vector<std::string> config_parts;
config_parts.push_back(
absl::StrFormat(" \"weighted:%s\":{\n"
" \"childPolicy\":[ {\n"
" \"weighted_target_experimental\":{\n"
" \"targets\":{\n",
name));
std::vector<std::string> weighted_targets;
weighted_targets.reserve(clusters.size());
for (const auto& cluster_weight : clusters) {
weighted_targets.push_back(absl::StrFormat(
" \"%s\":{\n"
" \"weight\":%d,\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_weight.name, cluster_weight.weight, cluster_weight.name));
}
config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
config_parts.push_back(
" }\n"
" }\n"
" } ]\n"
" }");
return absl::StrJoin(config_parts, "");
}
struct WeightedClustersKeys {
std::string cluster_names_key;
std::string cluster_weights_key;
};
// Returns the cluster names and weights key or the cluster names only key.
WeightedClustersKeys GetWeightedClustersKey(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
std::set<std::string> cluster_names;
std::set<std::string> cluster_weights;
for (const auto& cluster_weight : weighted_clusters) {
cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
cluster_weights.emplace(
absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
}
return {absl::StrJoin(cluster_names, "_"),
absl::StrJoin(cluster_weights, "_")};
}
} // namespace
std::string XdsClient::WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters) {
WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
auto cluster_names_map_it =
weighted_cluster_index_map_.find(keys.cluster_names_key);
GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
const auto& cluster_weights_map =
cluster_names_map_it->second.cluster_weights_map;
auto cluster_weights_map_it =
cluster_weights_map.find(keys.cluster_weights_key);
GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
return absl::StrFormat("%s_%d", keys.cluster_names_key,
cluster_weights_map_it->second);
}
void XdsClient::UpdateWeightedClusterIndexMap(
const XdsApi::RdsUpdate& rds_update) {
// Construct a list of unique WeightedCluster
// actions which we need to process: to find action names
std::map<std::string /* cluster_weights_key */,
std::string /* cluster_names_key */>
actions_to_process;
for (const auto& route : rds_update.routes) {
if (!route.weighted_clusters.empty()) {
WeightedClustersKeys keys =
GetWeightedClustersKey(route.weighted_clusters);
auto action_it = actions_to_process.find(keys.cluster_weights_key);
if (action_it == actions_to_process.end()) {
actions_to_process[std::move(keys.cluster_weights_key)] =
std::move(keys.cluster_names_key);
}
}
}
// First pass of all unique WeightedCluster actions: if the exact same
// weighted target policy (same clusters and weights) appears in the old map,
// then that old action name is taken again and should be moved to the new
// map; any other action names from the old set of actions are candidates for
// reuse.
XdsClient::WeightedClusterIndexMap new_weighted_cluster_index_map;
for (auto action_it = actions_to_process.begin();
action_it != actions_to_process.end();) {
const std::string& cluster_names_key = action_it->second;
const std::string& cluster_weights_key = action_it->first;
auto old_cluster_names_map_it =
weighted_cluster_index_map_.find(cluster_names_key);
if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
// Add cluster_names_key to the new map and copy next_index.
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
new_cluster_names_info.next_index =
old_cluster_names_map_it->second.next_index;
// Lookup cluster_weights_key in old map.
auto& old_cluster_weights_map =
old_cluster_names_map_it->second.cluster_weights_map;
auto old_cluster_weights_map_it =
old_cluster_weights_map.find(cluster_weights_key);
if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
// same policy found, move from old map to new map.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_map_it->second;
old_cluster_weights_map.erase(old_cluster_weights_map_it);
// This action has been added to new map, so no need to process it
// again.
action_it = actions_to_process.erase(action_it);
continue;
}
}
++action_it;
}
// Second pass of all remaining unique WeightedCluster actions: if clusters
// for a new action are the same as an old unused action, reuse the name. If
// clusters differ, use a brand new name.
for (const auto& action : actions_to_process) {
const std::string& cluster_names_key = action.second;
const std::string& cluster_weights_key = action.first;
auto& new_cluster_names_info =
new_weighted_cluster_index_map[cluster_names_key];
auto& old_cluster_weights_map =
weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
auto old_cluster_weights_it = old_cluster_weights_map.begin();
if (old_cluster_weights_it != old_cluster_weights_map.end()) {
// There is something to reuse: this action uses the same set
// of clusters as a previous action and that action name is not
// already taken.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
old_cluster_weights_it->second;
// Remove the name from being able to reuse again.
old_cluster_weights_map.erase(old_cluster_weights_it);
} else {
// There is nothing to reuse, take the next index to use and
// increment.
new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
new_cluster_names_info.next_index++;
}
}
weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
}
grpc_error* XdsClient::CreateServiceConfig(
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) {
UpdateWeightedClusterIndexMap(rds_update);
std::vector<std::string> actions_vector;
std::vector<std::string> route_table;
std::set<std::string> actions_set;
for (const auto& route : rds_update.routes) {
const std::string action_name =
route.weighted_clusters.empty()
? route.cluster_name
: WeightedClustersActionName(route.weighted_clusters);
if (actions_set.find(action_name) == actions_set.end()) {
actions_set.emplace(action_name);
actions_vector.push_back(
route.weighted_clusters.empty()
? CreateServiceConfigActionCluster(action_name)
: CreateServiceConfigActionWeightedCluster(
action_name, route.weighted_clusters));
}
route_table.push_back(CreateServiceConfigRoute(
absl::StrFormat("%s:%s",
route.weighted_clusters.empty() ? "cds" : "weighted",
action_name),
route));
}
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":{\n");
config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
config_parts.push_back(
" },\n"
" \"routes\":[\n");
config_parts.push_back(absl::StrJoin(route_table, ",\n"));
config_parts.push_back(
" ]\n"
" } }\n"
" ]\n"
"}");
std::string json = absl::StrJoin(config_parts, "");
grpc_error* error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(json.c_str(), &error);
return error;
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot( XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
bool send_all_clusters, const std::set<std::string>& clusters) { bool send_all_clusters, const std::set<std::string>& clusters) {
XdsApi::ClusterLoadReportMap snapshot_map; XdsApi::ClusterLoadReportMap snapshot_map;
@ -2375,8 +2045,8 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
} }
void XdsClient::NotifyOnError(grpc_error* error) { void XdsClient::NotifyOnError(grpc_error* error) {
if (service_config_watcher_ != nullptr) { if (listener_watcher_ != nullptr) {
service_config_watcher_->OnError(GRPC_ERROR_REF(error)); listener_watcher_->OnError(GRPC_ERROR_REF(error));
} }
for (const auto& p : cluster_map_) { for (const auto& p : cluster_map_) {
const ClusterState& cluster_state = p.second; const ClusterState& cluster_state = p.second;

@ -24,7 +24,6 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_client_stats.h"
@ -41,13 +40,12 @@ extern TraceFlag xds_client_trace;
class XdsClient : public InternallyRefCounted<XdsClient> { class XdsClient : public InternallyRefCounted<XdsClient> {
public: public:
// Service config watcher interface. Implemented by callers. // Listener data watcher interface. Implemented by callers.
class ServiceConfigWatcherInterface { class ListenerWatcherInterface {
public: public:
virtual ~ServiceConfigWatcherInterface() = default; virtual ~ListenerWatcherInterface() = default;
virtual void OnServiceConfigChanged( virtual void OnListenerChanged(XdsApi::LdsUpdate listener_data) = 0;
RefCountedPtr<ServiceConfig> service_config) = 0;
virtual void OnError(grpc_error* error) = 0; virtual void OnError(grpc_error* error) = 0;
@ -82,7 +80,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// an error initializing the client. // an error initializing the client.
XdsClient(std::shared_ptr<WorkSerializer> work_serializer, XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
grpc_pollset_set* interested_parties, absl::string_view server_name, grpc_pollset_set* interested_parties, absl::string_view server_name,
std::unique_ptr<ServiceConfigWatcherInterface> watcher, std::unique_ptr<ListenerWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error); const grpc_channel_args& channel_args, grpc_error** error);
~XdsClient(); ~XdsClient();
@ -234,20 +232,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// Sends an error notification to all watchers. // Sends an error notification to all watchers.
void NotifyOnError(grpc_error* error); void NotifyOnError(grpc_error* error);
// Returns the weighted_clusters action name to use from
// weighted_cluster_index_map_ for a WeightedClusters route action.
std::string WeightedClustersActionName(
const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
weighted_clusters);
// Updates weighted_cluster_index_map_ that will
// determine the names of the WeightedCluster actions for the current update.
void UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate& rds_update);
// Create the service config generated by the RdsUpdate.
grpc_error* CreateServiceConfig(const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot( XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
bool send_all_clusters, const std::set<std::string>& clusters); bool send_all_clusters, const std::set<std::string>& clusters);
@ -267,8 +251,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
XdsApi api_; XdsApi api_;
const std::string server_name_; const std::string server_name_;
std::unique_ptr<ListenerWatcherInterface> listener_watcher_;
std::unique_ptr<ServiceConfigWatcherInterface> service_config_watcher_;
// The channel for communicating with the xds server. // The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_; OrphanablePtr<ChannelState> chand_;
@ -285,22 +268,6 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
LoadReportState> LoadReportState>
load_report_map_; load_report_map_;
// 2-level map to store WeightedCluster action names.
// Top level map is keyed by cluster names without weight like a_b_c; bottom
// level map is keyed by cluster names + weights like a10_b50_c40.
struct ClusterNamesInfo {
uint64_t next_index = 0;
std::map<std::string /*cluster names + weights*/,
uint64_t /*policy index number*/>
cluster_weights_map;
};
using WeightedClusterIndexMap =
std::map<std::string /*cluster names*/, ClusterNamesInfo>;
// Cache of action names for WeightedCluster targets in the current
// service config.
WeightedClusterIndexMap weighted_cluster_index_map_;
bool shutting_down_ = false; bool shutting_down_ = false;
}; };

Loading…
Cancel
Save