Revert "Revert "[xDS LB] Override cluster with value from cookie"" (#33388)

Reapplying #32973
pull/33414/head
Eugene Ostroukhov 1 year ago committed by GitHub
parent 6a04e9c7e5
commit 57bb6fb65c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 1
      build_autogenerated.yaml
  3. 2
      src/core/BUILD
  4. 640
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  5. 18
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
  6. 248
      src/core/ext/filters/stateful_session/stateful_session_filter.cc
  7. 6
      src/core/ext/filters/stateful_session/stateful_session_filter.h
  8. 5
      src/core/lib/service_config/service_config_call_data.h
  9. 1
      test/cpp/end2end/xds/BUILD
  10. 2
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  11. 2
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  12. 176
      test/cpp/end2end/xds/xds_override_host_end2end_test.cc
  13. 4
      test/cpp/end2end/xds/xds_routing_end2end_test.cc

4
CMakeLists.txt generated

@ -29618,6 +29618,10 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h

@ -16759,6 +16759,7 @@ targets:
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/ads.proto
- src/proto/grpc/testing/xds/v3/aggregate_cluster.proto
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/cluster.proto
- src/proto/grpc/testing/xds/v3/config_source.proto

@ -3789,6 +3789,7 @@ grpc_cc_library(
"channel_args",
"channel_fwd",
"context",
"grpc_resolver_xds_header",
"grpc_service_config",
"json",
"json_args",
@ -5279,7 +5280,6 @@ grpc_cc_library(
"ref_counted",
"slice",
"time",
"unique_type_name",
"//:config",
"//:debug_location",
"//:gpr",

@ -47,7 +47,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/slice/slice.h"
#define XXH_INLINE_ALL
@ -102,11 +101,6 @@ namespace grpc_core {
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
UniqueTypeName XdsClusterAttribute::TypeName() {
static UniqueTypeName::Factory kFactory("xds_cluster_name");
return kFactory.Create();
}
namespace {
std::string GetDefaultAuthorityInternal(const URI& uri) {
@ -246,15 +240,15 @@ class XdsResolver : public Resolver {
// the cluster by the ConfigSelector. The ref for each call is held
// until the call is committed. When the strong refs go away, we hop
// back into the WorkSerializer to remove the entry from the map.
class ClusterState : public DualRefCounted<ClusterState> {
class ClusterRef : public DualRefCounted<ClusterRef> {
public:
ClusterState(RefCountedPtr<XdsResolver> resolver,
absl::string_view cluster_name)
ClusterRef(RefCountedPtr<XdsResolver> resolver,
absl::string_view cluster_name)
: resolver_(std::move(resolver)), cluster_name_(cluster_name) {}
void Orphan() override {
auto* resolver = resolver_.get();
resolver->work_serializer_->Run(
XdsResolver* resolver_ptr = resolver_.get();
resolver_ptr->work_serializer_->Run(
[resolver = std::move(resolver_)]() {
resolver->MaybeRemoveUnusedClusters();
},
@ -268,21 +262,49 @@ class XdsResolver : public Resolver {
std::string cluster_name_;
};
// A map containing cluster refs held by the XdsConfigSelector. A ref to
// this map will be taken by each call processed by the XdsConfigSelector,
// stored in a the call's call attributes, and later unreffed
// by the ClusterSelection filter.
class XdsClusterMap : public RefCounted<XdsClusterMap> {
// A routing data including cluster refs and routes table held by the
// XdsConfigSelector. A ref to this map will be taken by each call processed
// by the XdsConfigSelector, stored in a the call's call attributes, and later
// unreffed by the ClusterSelection filter.
class RouteConfigData : public RefCounted<RouteConfigData> {
public:
explicit XdsClusterMap(
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters)
: clusters_(std::move(clusters)) {}
struct RouteEntry {
struct ClusterWeightState {
uint32_t range_end;
absl::string_view cluster;
RefCountedPtr<ServiceConfig> method_config;
bool operator==(const ClusterWeightState& other) const {
return range_end == other.range_end && cluster == other.cluster &&
MethodConfigsEqual(method_config.get(),
other.method_config.get());
}
};
XdsRouteConfigResource::Route route;
RefCountedPtr<ServiceConfig> method_config;
std::vector<ClusterWeightState> weighted_cluster_state;
explicit RouteEntry(const XdsRouteConfigResource::Route& r) : route(r) {}
bool operator==(const XdsClusterMap& other) const {
return clusters_ == other.clusters_;
bool operator==(const RouteEntry& other) const {
return route == other.route &&
weighted_cluster_state == other.weighted_cluster_state &&
MethodConfigsEqual(method_config.get(),
other.method_config.get());
}
};
static absl::StatusOr<RefCountedPtr<RouteConfigData>> Create(
XdsResolver* resolver,
const std::vector<XdsRouteConfigResource::Route>& routes,
const Duration& default_max_stream_duration);
bool operator==(const RouteConfigData& other) const {
return clusters_ == other.clusters_ && routes_ == other.routes_;
}
RefCountedPtr<ClusterState> Find(absl::string_view name) const {
RefCountedPtr<ClusterRef> FindClusterRef(absl::string_view name) const {
auto it = clusters_.find(name);
if (it == clusters_.end()) {
return nullptr;
@ -290,14 +312,36 @@ class XdsResolver : public Resolver {
return it->second;
}
RouteEntry* GetRouteForRequest(absl::string_view path,
grpc_metadata_batch* initial_metadata);
private:
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
class RouteListIterator;
static absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateMethodConfig(
XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
cluster_weight);
static bool MethodConfigsEqual(const ServiceConfig* sc1,
const ServiceConfig* sc2) {
if (sc1 == nullptr) return sc2 == nullptr;
if (sc2 == nullptr) return false;
return sc1->json_string() == sc2->json_string();
}
absl::Status AddRouteEntry(const XdsRouteConfigResource::Route& route,
const Duration& default_max_stream_duration,
XdsResolver* resolver);
std::map<absl::string_view, RefCountedPtr<ClusterRef>> clusters_;
std::vector<RouteEntry> routes_;
};
class XdsConfigSelector : public ConfigSelector {
public:
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
absl::Status* status);
RefCountedPtr<RouteConfigData> route_config_data);
~XdsConfigSelector() override;
const char* name() const override { return "XdsConfigSelector"; }
@ -305,8 +349,8 @@ class XdsResolver : public Resolver {
bool Equals(const ConfigSelector* other) const override {
const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
// Don't need to compare resolver_, since that will always be the same.
return route_table_ == other_xds->route_table_ &&
*cluster_map_ == *other_xds->cluster_map_;
return *route_config_data_ == *other_xds->route_config_data_ &&
filters_ == other_xds->filters_;
}
absl::Status GetCallConfig(GetCallConfigArgs args) override;
@ -316,34 +360,28 @@ class XdsResolver : public Resolver {
}
private:
struct Route {
struct ClusterWeightState {
uint32_t range_end;
absl::string_view cluster;
RefCountedPtr<ServiceConfig> method_config;
bool operator==(const ClusterWeightState& other) const;
};
XdsRouteConfigResource::Route route;
RefCountedPtr<ServiceConfig> method_config;
std::vector<ClusterWeightState> weighted_cluster_state;
RefCountedPtr<XdsResolver> resolver_;
RefCountedPtr<RouteConfigData> route_config_data_;
std::vector<const grpc_channel_filter*> filters_;
};
bool operator==(const Route& other) const;
};
using RouteTable = std::vector<Route>;
class XdsRouteStateAttributeImpl : public XdsRouteStateAttribute {
public:
explicit XdsRouteStateAttributeImpl(
RefCountedPtr<RouteConfigData> route_config_data,
RouteConfigData::RouteEntry* route)
: route_config_data_(std::move(route_config_data)), route_(route) {}
class RouteListIterator;
// This method can be called only once. The first call will release
// the reference to the cluster map, and subsequent calls will return
// nullptr.
RefCountedPtr<ClusterRef> LockAndGetCluster(absl::string_view cluster_name);
absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateMethodConfig(
const XdsRouteConfigResource::Route& route,
const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
cluster_weight);
bool HasClusterForRoute(absl::string_view cluster_name) const override;
RefCountedPtr<XdsResolver> resolver_;
RouteTable route_table_;
RefCountedPtr<XdsClusterMap> cluster_map_;
std::vector<const grpc_channel_filter*> filters_;
private:
RefCountedPtr<RouteConfigData> route_config_data_;
RouteConfigData::RouteEntry* route_;
};
class ClusterSelectionFilter : public ChannelFilter {
@ -357,29 +395,7 @@ class XdsResolver : public Resolver {
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override {
auto* service_config_call_data =
static_cast<ClientChannelServiceConfigCallData*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
.value);
GPR_ASSERT(service_config_call_data != nullptr);
auto* cluster_data = static_cast<XdsClusterMapAttribute*>(
service_config_call_data->GetCallAttribute(
XdsClusterMapAttribute::TypeName()));
auto* cluster_name_attribute = static_cast<XdsClusterAttribute*>(
service_config_call_data->GetCallAttribute(
XdsClusterAttribute::TypeName()));
if (cluster_data != nullptr && cluster_name_attribute != nullptr) {
auto cluster =
cluster_data->LockAndGetCluster(cluster_name_attribute->cluster());
if (cluster != nullptr) {
service_config_call_data->SetOnCommit(
[cluster = std::move(cluster)]() mutable { cluster.reset(); });
}
}
return next_promise_factory(std::move(call_args));
}
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
explicit ClusterSelectionFilter(ChannelFilter::Args filter_args)
@ -388,46 +404,17 @@ class XdsResolver : public Resolver {
ChannelFilter::Args filter_args_;
};
RefCountedPtr<ClusterState> GetOrCreateClusterState(
RefCountedPtr<ClusterRef> GetOrCreateClusterRef(
absl::string_view cluster_name) {
auto it = cluster_state_map_.find(cluster_name);
if (it == cluster_state_map_.end()) {
auto cluster = MakeRefCounted<ClusterState>(Ref(), cluster_name);
cluster_state_map_.emplace(cluster->cluster_name(), cluster->WeakRef());
auto it = cluster_ref_map_.find(cluster_name);
if (it == cluster_ref_map_.end()) {
auto cluster = MakeRefCounted<ClusterRef>(Ref(), cluster_name);
cluster_ref_map_.emplace(cluster->cluster_name(), cluster->WeakRef());
return cluster;
}
return it->second->Ref();
}
class XdsClusterMapAttribute
: public ServiceConfigCallData::CallAttributeInterface {
public:
static UniqueTypeName TypeName() {
static UniqueTypeName::Factory factory("xds_cluster_lb_data");
return factory.Create();
}
explicit XdsClusterMapAttribute(RefCountedPtr<XdsClusterMap> cluster_map)
: cluster_map_(std::move(cluster_map)) {}
// This method can be called only once. The first call will release the
// reference to the cluster map, and subsequent calls will return nullptr.
RefCountedPtr<ClusterState> LockAndGetCluster(
absl::string_view cluster_name) {
if (cluster_map_ == nullptr) {
return nullptr;
}
auto cluster = cluster_map_->Find(cluster_name);
cluster_map_.reset();
return cluster;
}
UniqueTypeName type() const override { return TypeName(); }
private:
RefCountedPtr<XdsClusterMap> cluster_map_;
};
void OnListenerUpdate(XdsListenerResource listener);
void OnRouteConfigUpdate(XdsRouteConfigResource rds_update);
void OnError(absl::string_view context, absl::Status status);
@ -461,188 +448,72 @@ class XdsResolver : public Resolver {
std::string /*LB policy config*/>
cluster_specifier_plugin_map_;
std::map<absl::string_view, WeakRefCountedPtr<ClusterState>>
cluster_state_map_;
std::map<absl::string_view, WeakRefCountedPtr<ClusterRef>> cluster_ref_map_;
};
//
// XdsResolver::XdsConfigSelector::Route
// XdsResolver::RouteConfigData::RouteListIterator
//
bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
if (sc1 == nullptr) return sc2 == nullptr;
if (sc2 == nullptr) return false;
return sc1->json_string() == sc2->json_string();
}
const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter =
MakePromiseBasedFilter<ClusterSelectionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>(
"cluster_selection_filter");
bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
const ClusterWeightState& other) const {
return range_end == other.range_end && cluster == other.cluster &&
MethodConfigsEqual(method_config.get(), other.method_config.get());
}
bool XdsResolver::XdsConfigSelector::Route::operator==(
const Route& other) const {
return route == other.route &&
weighted_cluster_state == other.weighted_cluster_state &&
MethodConfigsEqual(method_config.get(), other.method_config.get());
}
// Implementation of XdsRouting::RouteListIterator for getting the matching
// route for a request.
class XdsResolver::XdsConfigSelector::RouteListIterator
class XdsResolver::RouteConfigData::RouteListIterator
: public XdsRouting::RouteListIterator {
public:
explicit RouteListIterator(const RouteTable* route_table)
explicit RouteListIterator(const RouteConfigData* route_table)
: route_table_(route_table) {}
size_t Size() const override { return route_table_->size(); }
size_t Size() const override { return route_table_->routes_.size(); }
const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
size_t index) const override {
return (*route_table_)[index].route.matchers;
return route_table_->routes_[index].route.matchers;
}
private:
const RouteTable* route_table_;
const RouteConfigData* route_table_;
};
//
// XdsResolver::XdsConfigSelector
// XdsResolver::RouteConfigData
//
XdsResolver::XdsConfigSelector::XdsConfigSelector(
RefCountedPtr<XdsResolver> resolver, absl::Status* status)
: resolver_(std::move(resolver)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
resolver_.get(), this);
}
// 1. Construct the route table
// 2 Update resolver's cluster state map
// 3. Construct cluster list to hold on to entries in the cluster state
// map.
absl::StatusOr<RefCountedPtr<XdsResolver::RouteConfigData>>
XdsResolver::RouteConfigData::Create(
XdsResolver* resolver,
const std::vector<XdsRouteConfigResource::Route>& routes,
const Duration& default_max_stream_duration) {
auto data = MakeRefCounted<RouteConfigData>();
// Reserve the necessary entries up-front to avoid reallocation as we add
// elements. This is necessary because the string_view in the entry's
// weighted_cluster_state field points to the memory in the route field, so
// moving the entry in a reallocation will cause the string_view to point to
// invalid data.
route_table_.reserve(resolver_->current_virtual_host_->routes.size());
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters;
auto maybe_add_cluster = [&](absl::string_view cluster_name) {
if (clusters.find(cluster_name) != clusters.end()) return;
auto cluster_state = resolver_->GetOrCreateClusterState(cluster_name);
absl::string_view name = cluster_state->cluster_name();
clusters.emplace(name, std::move(cluster_state));
};
for (auto& route : resolver_->current_virtual_host_->routes) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
resolver_.get(), this, route.ToString().c_str());
}
route_table_.emplace_back();
auto& route_entry = route_table_.back();
route_entry.route = route;
auto* route_action =
absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&route_entry.route.action);
if (route_action != nullptr) {
// If the route doesn't specify a timeout, set its timeout to the global
// one.
if (!route_action->max_stream_duration.has_value()) {
route_action->max_stream_duration =
resolver_->current_listener_.http_max_stream_duration;
}
Match(
route_action->action,
// cluster name
[&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
cluster_name) {
auto result = CreateMethodConfig(route_entry.route, nullptr);
if (!result.ok()) {
*status = result.status();
return;
}
route_entry.method_config = std::move(*result);
maybe_add_cluster(
absl::StrCat("cluster:", cluster_name.cluster_name));
},
// WeightedClusters
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
weighted_clusters) {
uint32_t end = 0;
for (const auto& weighted_cluster : weighted_clusters) {
Route::ClusterWeightState cluster_weight_state;
auto result =
CreateMethodConfig(route_entry.route, &weighted_cluster);
if (!result.ok()) {
*status = result.status();
return;
}
cluster_weight_state.method_config = std::move(*result);
end += weighted_cluster.weight;
cluster_weight_state.range_end = end;
cluster_weight_state.cluster = weighted_cluster.name;
route_entry.weighted_cluster_state.push_back(
std::move(cluster_weight_state));
maybe_add_cluster(
absl::StrCat("cluster:", weighted_cluster.name));
}
},
// ClusterSpecifierPlugin
[&](const XdsRouteConfigResource::Route::RouteAction::
ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
auto result = CreateMethodConfig(route_entry.route, nullptr);
if (!result.ok()) {
*status = result.status();
return;
}
route_entry.method_config = std::move(*result);
maybe_add_cluster(absl::StrCat(
"cluster_specifier_plugin:",
cluster_specifier_plugin_name.cluster_specifier_plugin_name));
});
if (!status->ok()) return;
}
}
cluster_map_ = MakeRefCounted<XdsClusterMap>(std::move(clusters));
// Populate filter list.
const auto& http_filter_registry =
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
.http_filter_registry();
for (const auto& http_filter : resolver_->current_listener_.http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
http_filter_registry.GetFilterForType(
http_filter.config.config_proto_type_name);
GPR_ASSERT(filter_impl != nullptr);
// Add C-core filter to list.
if (filter_impl->channel_filter() != nullptr) {
filters_.push_back(filter_impl->channel_filter());
data->routes_.reserve(routes.size());
for (auto& route : routes) {
absl::Status status =
data->AddRouteEntry(route, default_max_stream_duration, resolver);
if (!status.ok()) {
return status;
}
}
filters_.push_back(&ClusterSelectionFilter::kFilter);
return data;
}
XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
resolver_.get(), this);
XdsResolver::RouteConfigData::RouteEntry*
XdsResolver::RouteConfigData::GetRouteForRequest(
absl::string_view path, grpc_metadata_batch* initial_metadata) {
auto route_index = XdsRouting::GetRouteForRequest(RouteListIterator(this),
path, initial_metadata);
if (!route_index.has_value()) {
return nullptr;
}
cluster_map_.reset();
resolver_->MaybeRemoveUnusedClusters();
return &routes_[*route_index];
}
absl::StatusOr<RefCountedPtr<ServiceConfig>>
XdsResolver::XdsConfigSelector::CreateMethodConfig(
const XdsRouteConfigResource::Route& route,
XdsResolver::RouteConfigData::CreateMethodConfig(
XdsResolver* resolver, const XdsRouteConfigResource::Route& route,
const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
cluster_weight) {
std::vector<std::string> fields;
@ -694,11 +565,11 @@ XdsResolver::XdsConfigSelector::CreateMethodConfig(
}
// Handle xDS HTTP filters.
auto result = XdsRouting::GeneratePerHTTPFilterConfigs(
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
static_cast<const GrpcXdsBootstrap&>(resolver->xds_client_->bootstrap())
.http_filter_registry(),
resolver_->current_listener_.http_filters,
resolver_->current_virtual_host_.value(), route, cluster_weight,
resolver_->args_);
resolver->current_listener_.http_filters,
resolver->current_virtual_host_.value(), route, cluster_weight,
resolver->args_);
if (!result.ok()) return result.status();
for (const auto& p : result->per_filter_configs) {
fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
@ -722,6 +593,128 @@ XdsResolver::XdsConfigSelector::CreateMethodConfig(
return nullptr;
}
absl::Status XdsResolver::RouteConfigData::AddRouteEntry(
const XdsRouteConfigResource::Route& route,
const Duration& default_max_stream_duration, XdsResolver* resolver) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
resolver, this, route.ToString().c_str());
}
routes_.emplace_back(route);
auto* route_entry = &routes_.back();
auto maybe_add_cluster = [&](absl::string_view cluster_name) {
if (clusters_.find(cluster_name) != clusters_.end()) return;
auto cluster_state = resolver->GetOrCreateClusterRef(cluster_name);
absl::string_view name = cluster_state->cluster_name();
clusters_.emplace(name, std::move(cluster_state));
};
auto* route_action = absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&route_entry->route.action);
if (route_action != nullptr) {
// If the route doesn't specify a timeout, set its timeout to the global
// one.
if (!route_action->max_stream_duration.has_value()) {
route_action->max_stream_duration = default_max_stream_duration;
}
absl::Status status = Match(
route_action->action,
// cluster name
[&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
cluster_name) {
auto result =
CreateMethodConfig(resolver, route_entry->route, nullptr);
if (!result.ok()) {
return result.status();
}
route_entry->method_config = std::move(*result);
maybe_add_cluster(
absl::StrCat("cluster:", cluster_name.cluster_name));
return absl::OkStatus();
},
// WeightedClusters
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
weighted_clusters) {
uint32_t end = 0;
for (const auto& weighted_cluster : weighted_clusters) {
auto result = CreateMethodConfig(resolver, route_entry->route,
&weighted_cluster);
if (!result.ok()) {
return result.status();
}
RouteEntry::ClusterWeightState cluster_weight_state;
cluster_weight_state.method_config = std::move(*result);
end += weighted_cluster.weight;
cluster_weight_state.range_end = end;
cluster_weight_state.cluster = weighted_cluster.name;
route_entry->weighted_cluster_state.push_back(
std::move(cluster_weight_state));
maybe_add_cluster(absl::StrCat("cluster:", weighted_cluster.name));
}
return absl::OkStatus();
},
// ClusterSpecifierPlugin
[&](const XdsRouteConfigResource::Route::RouteAction::
ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
auto result =
CreateMethodConfig(resolver, route_entry->route, nullptr);
if (!result.ok()) {
return result.status();
}
route_entry->method_config = std::move(*result);
maybe_add_cluster(absl::StrCat(
"cluster_specifier_plugin:",
cluster_specifier_plugin_name.cluster_specifier_plugin_name));
return absl::OkStatus();
});
if (!status.ok()) {
return status;
}
}
return absl::OkStatus();
}
//
// XdsResolver::XdsConfigSelector
//
XdsResolver::XdsConfigSelector::XdsConfigSelector(
RefCountedPtr<XdsResolver> resolver,
RefCountedPtr<RouteConfigData> route_config_data)
: resolver_(std::move(resolver)),
route_config_data_(std::move(route_config_data)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
resolver_.get(), this);
}
// Populate filter list.
const auto& http_filter_registry =
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
.http_filter_registry();
for (const auto& http_filter : resolver_->current_listener_.http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
http_filter_registry.GetFilterForType(
http_filter.config.config_proto_type_name);
GPR_ASSERT(filter_impl != nullptr);
// Add C-core filter to list.
if (filter_impl->channel_filter() != nullptr) {
filters_.push_back(filter_impl->channel_filter());
}
}
filters_.push_back(&ClusterSelectionFilter::kFilter);
}
XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
resolver_.get(), this);
}
route_config_data_.reset();
resolver_->MaybeRemoveUnusedClusters();
}
absl::optional<uint64_t> HeaderHashHelper(
const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header&
header_policy,
@ -747,18 +740,16 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
GetCallConfigArgs args) {
Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
auto route_index = XdsRouting::GetRouteForRequest(
RouteListIterator(&route_table_), path->as_string_view(),
args.initial_metadata);
if (!route_index.has_value()) {
auto* entry = route_config_data_->GetRouteForRequest(path->as_string_view(),
args.initial_metadata);
if (entry == nullptr) {
return absl::UnavailableError(
"No matching route found in xDS route config");
}
auto& entry = route_table_[*route_index];
// Found a route match
const auto* route_action =
absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&entry.route.action);
&entry->route.action);
if (route_action == nullptr) {
return absl::UnavailableError("Matching route has inappropriate action");
}
@ -771,24 +762,24 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
action_cluster_name) {
cluster_name =
absl::StrCat("cluster:", action_cluster_name.cluster_name);
method_config = entry.method_config;
method_config = entry->method_config;
},
// WeightedClusters
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
/*weighted_clusters*/) {
const uint32_t key = absl::Uniform<uint32_t>(
absl::BitGen(), 0, entry.weighted_cluster_state.back().range_end);
absl::BitGen(), 0, entry->weighted_cluster_state.back().range_end);
// Find the index in weighted clusters corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = entry.weighted_cluster_state.size() - 1;
size_t end_index = entry->weighted_cluster_state.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (entry.weighted_cluster_state[mid].range_end > key) {
if (entry->weighted_cluster_state[mid].range_end > key) {
end_index = mid;
} else if (entry.weighted_cluster_state[mid].range_end < key) {
} else if (entry->weighted_cluster_state[mid].range_end < key) {
start_index = mid + 1;
} else {
index = mid + 1;
@ -796,10 +787,10 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
}
}
if (index == 0) index = start_index;
GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
GPR_ASSERT(entry->weighted_cluster_state[index].range_end > key);
cluster_name = absl::StrCat(
"cluster:", entry.weighted_cluster_state[index].cluster);
method_config = entry.weighted_cluster_state[index].method_config;
"cluster:", entry->weighted_cluster_state[index].cluster);
method_config = entry->weighted_cluster_state[index].method_config;
},
// ClusterSpecifierPlugin
[&](const XdsRouteConfigResource::Route::RouteAction::
@ -807,9 +798,9 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
cluster_name = absl::StrCat(
"cluster_specifier_plugin:",
cluster_specifier_plugin_name.cluster_specifier_plugin_name);
method_config = entry.method_config;
method_config = entry->method_config;
});
auto cluster = cluster_map_->Find(cluster_name);
auto cluster = route_config_data_->FindClusterRef(cluster_name);
GPR_ASSERT(cluster != nullptr);
// Generate a hash.
absl::optional<uint64_t> hash;
@ -857,10 +848,85 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
args.service_config_call_data->SetCallAttribute(
args.arena->New<RequestHashAttribute>(hash_value));
args.service_config_call_data->SetCallAttribute(
args.arena->ManagedNew<XdsClusterMapAttribute>(cluster_map_));
args.arena->ManagedNew<XdsRouteStateAttributeImpl>(route_config_data_,
entry));
return absl::OkStatus();
}
//
// XdsResolver::XdsRouteStateAttributeImpl
//
bool XdsResolver::XdsRouteStateAttributeImpl::HasClusterForRoute(
absl::string_view cluster_name) const {
// Found a route match
const auto* route_action =
absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&static_cast<RouteConfigData::RouteEntry*>(route_)->route.action);
if (route_action == nullptr) return false;
return Match(
route_action->action,
[&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& name) {
return name.cluster_name == cluster_name;
},
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
clusters) {
for (const auto& cluster : clusters) {
if (cluster.name == cluster_name) {
return true;
}
}
return false;
},
[&](const XdsRouteConfigResource::Route::RouteAction::
ClusterSpecifierPluginName& /* name */) { return false; });
}
RefCountedPtr<XdsResolver::ClusterRef>
XdsResolver::XdsRouteStateAttributeImpl::LockAndGetCluster(
absl::string_view cluster_name) {
if (route_config_data_ == nullptr) {
return nullptr;
}
auto cluster = route_config_data_->FindClusterRef(cluster_name);
route_config_data_.reset();
return cluster;
}
//
// XdsResolver::ClusterSelectionFilter
//
const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter =
MakePromiseBasedFilter<ClusterSelectionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>(
"cluster_selection_filter");
ArenaPromise<ServerMetadataHandle>
XdsResolver::ClusterSelectionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto* service_config_call_data =
static_cast<ClientChannelServiceConfigCallData*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
.value);
GPR_ASSERT(service_config_call_data != nullptr);
auto* route_state_attribute = static_cast<XdsRouteStateAttributeImpl*>(
service_config_call_data->GetCallAttribute<XdsRouteStateAttribute>());
auto* cluster_name_attribute =
service_config_call_data->GetCallAttribute<XdsClusterAttribute>();
if (route_state_attribute != nullptr && cluster_name_attribute != nullptr) {
auto cluster = route_state_attribute->LockAndGetCluster(
cluster_name_attribute->cluster());
if (cluster != nullptr) {
service_config_call_data->SetOnCommit(
[cluster = std::move(cluster)]() mutable { cluster.reset(); });
}
}
return next_promise_factory(std::move(call_args));
}
//
// XdsResolver
//
@ -1016,7 +1082,6 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
});
}
namespace {
class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
public:
explicit VirtualHostListIterator(
@ -1033,7 +1098,6 @@ class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
private:
const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
};
} // namespace
void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
@ -1100,7 +1164,7 @@ void XdsResolver::OnResourceDoesNotExist(std::string context) {
absl::StatusOr<RefCountedPtr<ServiceConfig>>
XdsResolver::CreateServiceConfig() {
std::vector<std::string> clusters;
for (const auto& cluster : cluster_state_map_) {
for (const auto& cluster : cluster_ref_map_) {
absl::string_view child_name = cluster.first;
if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) {
clusters.push_back(absl::StrFormat(
@ -1142,13 +1206,16 @@ void XdsResolver::GenerateResult() {
if (!current_virtual_host_.has_value()) return;
// First create XdsConfigSelector, which may add new entries to the cluster
// state map, and then CreateServiceConfig for LB policies.
absl::Status status;
auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &status);
if (!status.ok()) {
auto route_config_data =
RouteConfigData::Create(this, current_virtual_host_->routes,
current_listener_.http_max_stream_duration);
if (!route_config_data.ok()) {
OnError("could not create ConfigSelector",
absl::UnavailableError(status.message()));
absl::UnavailableError(route_config_data.status().message()));
return;
}
auto config_selector =
MakeRefCounted<XdsConfigSelector>(Ref(), std::move(*route_config_data));
Result result;
result.addresses.emplace();
result.service_config = CreateServiceConfig();
@ -1169,13 +1236,13 @@ void XdsResolver::GenerateResult() {
void XdsResolver::MaybeRemoveUnusedClusters() {
bool update_needed = false;
for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
for (auto it = cluster_ref_map_.begin(); it != cluster_ref_map_.end();) {
RefCountedPtr<ClusterRef> cluster_state = it->second->RefIfNonZero();
if (cluster_state != nullptr) {
++it;
} else {
update_needed = true;
it = cluster_state_map_.erase(it);
it = cluster_ref_map_.erase(it);
}
}
if (update_needed && xds_client_ != nullptr) {
@ -1185,7 +1252,7 @@ void XdsResolver::MaybeRemoveUnusedClusters() {
}
//
// Factory
// XdsResolverFactory
//
class XdsResolverFactory : public ResolverFactory {
@ -1210,7 +1277,6 @@ class XdsResolverFactory : public ResolverFactory {
return MakeOrphanable<XdsResolver>(std::move(args));
}
};
} // namespace
void RegisterXdsResolver(CoreConfiguration::Builder* builder) {

@ -29,17 +29,33 @@ namespace grpc_core {
class XdsClusterAttribute
: public ServiceConfigCallData::CallAttributeInterface {
public:
static UniqueTypeName TypeName();
static UniqueTypeName TypeName() {
static UniqueTypeName::Factory kFactory("xds_cluster_name");
return kFactory.Create();
}
explicit XdsClusterAttribute(absl::string_view cluster) : cluster_(cluster) {}
absl::string_view cluster() const { return cluster_; }
void set_cluster(absl::string_view cluster) { cluster_ = cluster; }
private:
UniqueTypeName type() const override { return TypeName(); }
absl::string_view cluster_;
};
class XdsRouteStateAttribute
: public ServiceConfigCallData::CallAttributeInterface {
public:
static UniqueTypeName TypeName() {
static UniqueTypeName::Factory factory("xds_cluster_lb_data");
return factory.Create();
}
virtual bool HasClusterForRoute(absl::string_view cluster_name) const = 0;
UniqueTypeName type() const override { return TypeName(); }
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H

@ -33,11 +33,13 @@
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
#include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
@ -83,35 +85,142 @@ StatefulSessionFilter::StatefulSessionFilter(ChannelFilter::Args filter_args)
namespace {
absl::string_view AllocateStringOnArena(
absl::string_view src1, absl::string_view src2 = absl::string_view()) {
if (src1.empty() && src2.empty()) {
return absl::string_view();
}
char* arena_allocated_value =
static_cast<char*>(GetContext<Arena>()->Alloc(src1.size() + src2.size()));
memcpy(arena_allocated_value, src1.data(), src1.size());
if (!src2.empty()) {
memcpy(arena_allocated_value + src1.size(), src2.data(), src2.size());
}
return absl::string_view(arena_allocated_value, src1.size() + src2.size());
}
// Adds the set-cookie header to the server initial metadata if needed.
void MaybeUpdateServerInitialMetadata(
const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config,
absl::optional<absl::string_view> cookie_value,
ServerMetadata* server_initial_metadata) {
bool cluster_changed, absl::string_view host_override,
absl::string_view actual_cluster, ServerMetadata* server_initial_metadata) {
// Get peer string.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) return; // Nothing we can do.
// If there was no cookie or if the address changed, set the cookie.
if (!cookie_value.has_value() ||
peer_string->as_string_view() != *cookie_value) {
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=",
absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
if (cookie_config->ttl > Duration::Zero()) {
parts.emplace_back(
absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec));
}
server_initial_metadata->Append(
"set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")),
[](absl::string_view error, const Slice&) {
Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error));
});
if (peer_string == nullptr) {
// No changes, keep the same set-cookie header.
return;
}
if (host_override == peer_string->as_string_view() && !cluster_changed) {
return;
}
std::string new_value(peer_string->as_string_view());
if (!actual_cluster.empty()) {
absl::StrAppend(&new_value, ";", actual_cluster);
}
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=", absl::Base64Escape(new_value), "; HttpOnly")};
if (!cookie_config->path.empty()) {
parts.emplace_back(absl::StrCat("Path=", cookie_config->path));
}
if (cookie_config->ttl > Duration::Zero()) {
parts.emplace_back(
absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec));
}
server_initial_metadata->Append(
"set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")),
[](absl::string_view error, const Slice&) {
Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error));
});
}
// Returns an arena-allocated string containing the cluster name
// to use for this RPC, which will live long enough to use when modifying
// the server's initial metadata. If cluster_from_cookie is non-empty and
// points to a cluster present in the selected route, uses that; otherwise,
// uses the cluster selected by the XdsConfigSelector.
// Returns the empty string if cluster override cannot be used (i.e., the route
// uses a cluster specifier plugin).
absl::string_view GetClusterToUse(
absl::string_view cluster_from_cookie,
ServiceConfigCallData* service_config_call_data) {
// Get cluster assigned by the XdsConfigSelector.
auto cluster_attribute =
service_config_call_data->GetCallAttribute<XdsClusterAttribute>();
GPR_ASSERT(cluster_attribute != nullptr);
auto current_cluster = cluster_attribute->cluster();
static constexpr absl::string_view kClusterPrefix = "cluster:";
// If prefix is not "cluster:", then we can't use cluster override.
if (!absl::ConsumePrefix(&current_cluster, kClusterPrefix)) {
return absl::string_view();
}
// No cluster in cookie, use the cluster from the attribute
if (cluster_from_cookie.empty()) {
return AllocateStringOnArena(current_cluster);
}
// Use cluster from the cookie if it is configured for the route.
auto route_data =
service_config_call_data->GetCallAttribute<XdsRouteStateAttribute>();
GPR_ASSERT(route_data != nullptr);
// Cookie cluster was not configured for route - use the one from the
// attribute
if (!route_data->HasClusterForRoute(cluster_from_cookie)) {
return AllocateStringOnArena(current_cluster);
}
auto arena_allocated_cluster =
AllocateStringOnArena(kClusterPrefix, cluster_from_cookie);
// Update the cluster name attribute with an arena allocated value.
cluster_attribute->set_cluster(arena_allocated_cluster);
return absl::StripPrefix(arena_allocated_cluster, kClusterPrefix);
}
std::string GetCookieValue(const ClientMetadataHandle& client_initial_metadata,
absl::string_view cookie_name) {
// Check to see if the cookie header is present.
std::string buffer;
auto header_value =
client_initial_metadata->GetStringValue("cookie", &buffer);
if (!header_value.has_value()) return "";
// Parse cookie header.
std::vector<absl::string_view> values;
for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) {
std::pair<absl::string_view, absl::string_view> kv =
absl::StrSplit(cookie, absl::MaxSplits('=', 1));
if (kv.first == cookie_name) values.push_back(kv.second);
}
if (values.empty()) return "";
// TODO(roth): Figure out the right behavior for multiple cookies.
// For now, just choose the first value.
std::string decoded;
if (absl::Base64Unescape(values.front(), &decoded)) {
return decoded;
}
return "";
}
bool IsConfiguredPath(absl::string_view configured_path,
const ClientMetadataHandle& client_initial_metadata) {
// No path configured meaning all paths match
if (configured_path.empty()) {
return true;
}
// Check to see if the configured path matches the request path.
Slice* path_slice = client_initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path_slice != nullptr);
absl::string_view path = path_slice->as_string_view();
// Matching criteria from
// https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4.
// The cookie-path is a prefix of the request-path (and)
if (!absl::StartsWith(path, configured_path)) {
return false;
}
// One of
// 1. The cookie-path and the request-path are identical.
// 2. The last character of the cookie-path is %x2F ("/").
// 3. The first character of the request-path that is not included
// in the cookie-path is a %x2F ("/") character.
return path.length() == configured_path.length() ||
configured_path.back() == '/' || path[configured_path.length()] == '/';
}
} // namespace
// Construct a promise for one call.
@ -129,90 +238,57 @@ ArenaPromise<ServerMetadataHandle> StatefulSessionFilter::MakeCallPromise(
GPR_ASSERT(method_params != nullptr);
auto* cookie_config = method_params->GetConfig(index_);
GPR_ASSERT(cookie_config != nullptr);
if (!cookie_config->name.has_value()) {
if (!cookie_config->name.has_value() ||
!IsConfiguredPath(cookie_config->path,
call_args.client_initial_metadata)) {
return next_promise_factory(std::move(call_args));
}
// We have a config.
// If the config has a path, check to see if it matches the request path.
if (!cookie_config->path.empty()) {
Slice* path_slice =
call_args.client_initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path_slice != nullptr);
absl::string_view path = path_slice->as_string_view();
// Matching criteria from
// https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4.
if (!absl::StartsWith(path, cookie_config->path) ||
(path.size() != cookie_config->path.size() &&
cookie_config->path.back() != '/' &&
path[cookie_config->path.size() + 1] != '/')) {
return next_promise_factory(std::move(call_args));
}
}
// Check to see if we have a host override cookie.
auto cookie_value = GetOverrideHostFromCookie(
call_args.client_initial_metadata, *cookie_config->name);
if (cookie_value.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_stateful_session_filter_trace)) {
gpr_log(GPR_INFO,
"chand=%p: stateful session filter found cookie %s value %s",
this, cookie_config->name->c_str(),
std::string(*cookie_value).c_str());
}
// We have a valid cookie, so add the call attribute to be used by the
// xds_override_host LB policy.
// Base64-decode cookie value.
std::string cookie_value =
GetCookieValue(call_args.client_initial_metadata, *cookie_config->name);
// Cookie format is "host;cluster"
std::pair<absl::string_view, absl::string_view> host_cluster =
absl::StrSplit(cookie_value, absl::MaxSplits(';', 1));
absl::string_view host_override;
// Set override host attribute. Allocate the string on the
// arena, so that it has the right lifetime.
if (!host_cluster.first.empty()) {
host_override = AllocateStringOnArena(host_cluster.first);
service_config_call_data->SetCallAttribute(
GetContext<Arena>()->New<XdsOverrideHostAttribute>(*cookie_value));
GetContext<Arena>()->New<XdsOverrideHostAttribute>(host_override));
}
// Check if the cluster override is valid, and apply it if necessary.
// Note that cluster_name will point to an arena-allocated string
// that will still be alive when we see the server initial metadata.
// If the cluster name is empty, that means we cannot use a
// cluster override (i.e., the route uses a cluster specifier plugin).
absl::string_view cluster_name =
GetClusterToUse(host_cluster.second, service_config_call_data);
bool cluster_changed = cluster_name != host_cluster.second;
// Intercept server initial metadata.
call_args.server_initial_metadata->InterceptAndMap(
[cookie_config, cookie_value](ServerMetadataHandle md) {
[cookie_config, cluster_changed, host_override,
cluster_name](ServerMetadataHandle md) {
// Add cookie to server initial metadata if needed.
MaybeUpdateServerInitialMetadata(cookie_config, cookie_value, md.get());
MaybeUpdateServerInitialMetadata(cookie_config, cluster_changed,
host_override, cluster_name, md.get());
return md;
});
return Map(next_promise_factory(std::move(call_args)),
[cookie_config, cookie_value](ServerMetadataHandle md) {
[cookie_config, cluster_changed, host_override,
cluster_name](ServerMetadataHandle md) {
// If we got a Trailers-Only response, then add the
// cookie to the trailing metadata instead of the
// initial metadata.
if (md->get(GrpcTrailersOnly()).value_or(false)) {
MaybeUpdateServerInitialMetadata(cookie_config, cookie_value,
md.get());
MaybeUpdateServerInitialMetadata(
cookie_config, cluster_changed, host_override,
cluster_name, md.get());
}
return md;
});
}
absl::optional<absl::string_view>
StatefulSessionFilter::GetOverrideHostFromCookie(
const ClientMetadataHandle& client_initial_metadata,
absl::string_view cookie_name) {
// Check to see if the cookie header is present.
std::string buffer;
auto header_value =
client_initial_metadata->GetStringValue("cookie", &buffer);
if (!header_value.has_value()) return absl::nullopt;
// Parse cookie header.
std::vector<absl::string_view> values;
for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) {
std::pair<absl::string_view, absl::string_view> kv =
absl::StrSplit(cookie, absl::MaxSplits('=', 1));
if (kv.first == cookie_name) values.push_back(kv.second);
}
if (values.empty()) return absl::nullopt;
// TODO(roth): Figure out the right behavior for multiple cookies.
// For now, just choose the first value.
absl::string_view value = values.front();
// Base64-decode it.
std::string decoded_value;
if (!absl::Base64Unescape(value, &decoded_value)) return absl::nullopt;
// Copy it into the arena, since it will need to persist until the LB pick.
char* arena_value =
static_cast<char*>(GetContext<Arena>()->Alloc(decoded_value.size()));
memcpy(arena_value, decoded_value.c_str(), decoded_value.size());
return absl::string_view(arena_value, decoded_value.size());
}
void StatefulSessionFilterRegister(CoreConfiguration::Builder* builder) {
StatefulSessionServiceConfigParser::Register(builder);
}

@ -23,7 +23,6 @@
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
@ -65,11 +64,6 @@ class StatefulSessionFilter : public ChannelFilter {
private:
explicit StatefulSessionFilter(ChannelFilter::Args filter_args);
absl::optional<absl::string_view> GetOverrideHostFromCookie(
const ClientMetadataHandle& initial_metadata,
absl::string_view cookie_name);
// The relative index of instances of the same filter.
const size_t index_;
// Index of the service config parser.

@ -88,6 +88,11 @@ class ServiceConfigCallData {
call_attributes_.EmplaceBack(value);
}
template <typename A>
A* GetCallAttribute() const {
return static_cast<A*>(GetCallAttribute(A::TypeName()));
}
CallAttributeInterface* GetCallAttribute(UniqueTypeName type) const {
for (CallAttributeInterface* attribute : call_attributes_) {
if (attribute->type() == type) return attribute;

@ -440,6 +440,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto",
"//src/proto/grpc/testing/xds/v3:router_proto",
"//src/proto/grpc/testing/xds/v3:stateful_session_cookie_proto",
"//src/proto/grpc/testing/xds/v3:stateful_session_proto",

@ -631,7 +631,7 @@ XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index,
}
ClusterLoadAssignment XdsEnd2endTest::BuildEdsResource(
const EdsResourceArgs& args, const char* eds_service_name) {
const EdsResourceArgs& args, absl::string_view eds_service_name) {
ClusterLoadAssignment assignment;
assignment.set_cluster_name(eds_service_name);
for (const auto& locality : args.locality_list) {

@ -654,7 +654,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
// Constructs an EDS resource.
ClusterLoadAssignment BuildEdsResource(
const EdsResourceArgs& args,
const char* eds_service_name = kDefaultEdsServiceName);
absl::string_view eds_service_name = kDefaultEdsServiceName);
//
// Backend management

@ -27,6 +27,7 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/match.h"
#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/outlier_detection.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h"
@ -118,19 +119,24 @@ class OverrideHostTest : public XdsEnd2endTest {
return listener;
}
// Send requests until a desired backend is hit and returns cookie name/value
// pairs. Empty collection is returned if the backend was never hit.
// For weighted clusters, more than one request per backend may be necessary
// to obtain the cookie. max_requests_per_backend argument specifies
// the number of requests per backend to send.
std::vector<std::pair<std::string, std::string>>
GetAffinityCookieHeaderForBackend(grpc_core::DebugLocation debug_location,
size_t backend_index,
RpcOptions rpc_options = RpcOptions()) {
size_t max_requests_per_backend = 1) {
EXPECT_LT(backend_index, backends_.size());
if (backend_index >= backends_.size()) {
return {};
}
const auto& backend = backends_[backend_index];
for (size_t i = 0; i < backends_.size(); ++i) {
for (size_t i = 0; i < max_requests_per_backend * backends_.size(); ++i) {
std::multimap<std::string, std::string> server_initial_metadata;
grpc::Status status =
SendRpc(rpc_options, nullptr, &server_initial_metadata);
SendRpc(RpcOptions(), nullptr, &server_initial_metadata);
EXPECT_TRUE(status.ok())
<< "code=" << status.error_code()
<< ", message=" << status.error_message() << "\n"
@ -150,6 +156,44 @@ class OverrideHostTest : public XdsEnd2endTest {
<< "Desired backend had not been hit";
return {};
}
void SetClusterResource(absl::string_view cluster_name,
absl::string_view eds_resource_name) {
Cluster cluster = default_cluster_;
cluster.set_name(cluster_name);
cluster.mutable_eds_cluster_config()->set_service_name(eds_resource_name);
balancer_->ads_service()->SetCdsResource(cluster);
}
RouteConfiguration BuildRouteConfigurationWithWeightedClusters(
const std::map<absl::string_view, uint32_t> clusters) {
RouteConfiguration new_route_config = default_route_config_;
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
for (const auto& cluster : clusters) {
auto* weighted_cluster =
route1->mutable_route()->mutable_weighted_clusters()->add_clusters();
weighted_cluster->set_name(cluster.first);
weighted_cluster->mutable_weight()->set_value(cluster.second);
}
return new_route_config;
}
void SetCdsAndEdsResources(absl::string_view cluster_name,
absl::string_view eds_service_name,
size_t start_index, size_t end_index) {
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
CreateEndpointsForBackends(start_index, end_index)}}),
eds_service_name));
SetClusterResource(cluster_name, eds_service_name);
}
static double BackendRequestPercentage(
const std::unique_ptr<BackendServerThread>& backend,
size_t num_requests) {
return static_cast<double>(backend->backend_service()->request_count()) /
num_requests;
}
};
INSTANTIATE_TEST_SUITE_P(XdsTest, OverrideHostTest,
@ -267,6 +311,132 @@ TEST_P(OverrideHostTest, DrainingExcludedFromOverrideSet) {
EXPECT_EQ(2, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
}
TEST_P(OverrideHostTest, OverrideWithWeightedClusters) {
CreateAndStartBackends(3);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
const uint32_t kWeight1 = 1;
const uint32_t kWeight2 = 3;
const double kErrorTolerance = 0.025;
const size_t kNumEchoRpcs = ComputeIdealNumRpcs(
static_cast<double>(kWeight1) / (kWeight1 + kWeight2), kErrorTolerance);
// Populate EDS and CDS resources.
SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1);
SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3);
// Populating Route Configurations for LDS.
SetListenerAndRouteConfiguration(
balancer_.get(), BuildListenerWithStatefulSessionFilter(),
BuildRouteConfigurationWithWeightedClusters(
{{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}}));
WaitForAllBackends(DEBUG_LOCATION, 0, 3);
// Get cookie
auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3);
ASSERT_FALSE(session_cookie.empty());
// All requests go to the backend we requested.
CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs,
RpcOptions().set_metadata(session_cookie));
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), kNumEchoRpcs);
EXPECT_EQ(backends_[2]->backend_service()->request_count(), 0);
}
TEST_P(OverrideHostTest, ClusterOverrideHonoredButHostGone) {
CreateAndStartBackends(4);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
const uint32_t kWeight1 = 1;
const uint32_t kWeight2 = 3;
const double kErrorTolerance = 0.025;
const double kWeight2Percent =
static_cast<double>(kWeight2) / (kWeight1 + kWeight2);
const size_t kNumEchoRpcs =
ComputeIdealNumRpcs(kWeight2Percent, kErrorTolerance);
// Populate EDS and CDS resources.
SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1);
SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3);
// Populating Route Configurations for LDS.
SetListenerAndRouteConfiguration(
balancer_.get(), BuildListenerWithStatefulSessionFilter(),
BuildRouteConfigurationWithWeightedClusters(
{{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}}));
WaitForAllBackends(DEBUG_LOCATION, 0, 3);
auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3);
ASSERT_FALSE(session_cookie.empty());
// Remove backends[1] from cluster2
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}}),
kNewEdsService2Name));
WaitForAllBackends(DEBUG_LOCATION, 3, 4);
CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs,
RpcOptions().set_metadata(session_cookie));
// Traffic goes to a second cluster, where it is equally distributed between
// the two remaining hosts
EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs),
::testing::DoubleNear(.5, kErrorTolerance));
EXPECT_THAT(BackendRequestPercentage(backends_[3], kNumEchoRpcs),
::testing::DoubleNear(.5, kErrorTolerance));
EXPECT_NE(session_cookie,
GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 2, 3));
}
TEST_P(OverrideHostTest, ClusterGoneHostStays) {
CreateAndStartBackends(3);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
const char* kNewCluster3Name = "new_cluster_3";
const char* kNewEdsService3Name = "new_eds_service_name_3";
const uint32_t kWeight1 = 3;
const uint32_t kWeight2 = 1;
const double kErrorTolerance = 0.025;
const double kPercentage1 =
static_cast<double>(kWeight1) / (kWeight1 + kWeight2);
const size_t kNumEchoRpcs =
ComputeIdealNumRpcs(kPercentage1, kErrorTolerance);
// Populate EDS and CDS resources.
SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1);
SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 2);
// Populating Route Configurations for LDS.
SetListenerAndRouteConfiguration(
balancer_.get(), BuildListenerWithStatefulSessionFilter(),
BuildRouteConfigurationWithWeightedClusters(
{{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}}));
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
auto backend1_in_cluster2_cookie =
GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5);
ASSERT_FALSE(backend1_in_cluster2_cookie.empty());
// Create a new cluster, cluster 3, containing a new backend, backend 2.
SetCdsAndEdsResources(kNewCluster3Name, kNewEdsService3Name, 2, 3);
// Send an EDS update for cluster 1 that adds backend 1. (Now cluster 1 has
// backends 0 and 1.)
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0", CreateEndpointsForBackends(0, 2)}}),
kNewEdsService1Name));
SetListenerAndRouteConfiguration(
balancer_.get(), BuildListenerWithStatefulSessionFilter(),
BuildRouteConfigurationWithWeightedClusters(
{{kNewCluster1Name, kWeight1}, {kNewCluster3Name, kWeight2}}));
WaitForAllBackends(DEBUG_LOCATION, 2);
CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs,
RpcOptions().set_metadata(backend1_in_cluster2_cookie));
// Traffic is split between clusters. Cluster1 traffic is sent to backends_[1]
EXPECT_THAT(BackendRequestPercentage(backends_[0], kNumEchoRpcs),
::testing::DoubleNear(0, kErrorTolerance));
EXPECT_THAT(BackendRequestPercentage(backends_[1], kNumEchoRpcs),
::testing::DoubleNear(kPercentage1, kErrorTolerance));
EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs),
::testing::DoubleNear(1 - kPercentage1, kErrorTolerance));
// backends_[1] cookie is updated with a new cluster
EXPECT_NE(backend1_in_cluster2_cookie,
GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5));
}
} // namespace
} // namespace testing
} // namespace grpc

@ -402,7 +402,9 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values(XdsTestType(), XdsTestType().set_enable_rds_testing()),
&XdsTestType::Name);
MATCHER_P2(AdjustedClockInRange, t1, t2, "equals time") {
MATCHER_P2(AdjustedClockInRange, t1, t2,
absl::StrFormat("time between %s and %s", t1.ToString().c_str(),
t2.ToString().c_str())) {
gpr_cycle_counter cycle_now = gpr_get_cycle_counter();
grpc_core::Timestamp cycle_time =
grpc_core::Timestamp::FromCycleCounterRoundDown(cycle_now);

Loading…
Cancel
Save