From 19460ea82f16c695d3a103e8333e9bb3eb899273 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 13 Jun 2023 07:23:46 -0700 Subject: [PATCH] Revert "Revert "Revert "[xDS LB] Override cluster with value from cookie"" (#33379)" (#33416) Reverts grpc/grpc#33388 Breaks import --- CMakeLists.txt | 4 - build_autogenerated.yaml | 1 - src/core/BUILD | 2 +- .../resolver/xds/xds_resolver.cc | 640 ++++++++---------- .../resolver/xds/xds_resolver.h | 18 +- .../stateful_session_filter.cc | 248 +++---- .../stateful_session_filter.h | 6 + .../service_config/service_config_call_data.h | 5 - test/cpp/end2end/xds/BUILD | 1 - test/cpp/end2end/xds/xds_end2end_test_lib.cc | 2 +- test/cpp/end2end/xds/xds_end2end_test_lib.h | 2 +- .../xds/xds_override_host_end2end_test.cc | 176 +---- .../end2end/xds/xds_routing_end2end_test.cc | 4 +- 13 files changed, 387 insertions(+), 722 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 77ca42e0951..2701c882eee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29672,10 +29672,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h - ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 6126d1ddf98..67e5645ad78 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -16772,7 +16772,6 @@ targets: - src/proto/grpc/testing/simple_messages.proto - src/proto/grpc/testing/xds/v3/address.proto - src/proto/grpc/testing/xds/v3/ads.proto - - src/proto/grpc/testing/xds/v3/aggregate_cluster.proto - src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/cluster.proto - src/proto/grpc/testing/xds/v3/config_source.proto diff --git a/src/core/BUILD b/src/core/BUILD index d896635a46d..81753688430 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3791,7 +3791,6 @@ grpc_cc_library( "channel_args", "channel_fwd", "context", - "grpc_resolver_xds_header", "grpc_service_config", "json", "json_args", @@ -5282,6 +5281,7 @@ grpc_cc_library( "ref_counted", "slice", "time", + "unique_type_name", "//:config", "//:debug_location", "//:gpr", diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index eb961a415d9..d4733e3ee26 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -47,6 +47,7 @@ #include +#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/slice/slice.h" #define XXH_INLINE_ALL @@ -101,6 +102,11 @@ namespace grpc_core { TraceFlag grpc_xds_resolver_trace(false, "xds_resolver"); +UniqueTypeName XdsClusterAttribute::TypeName() { + static UniqueTypeName::Factory kFactory("xds_cluster_name"); + return kFactory.Create(); +} + namespace { std::string GetDefaultAuthorityInternal(const URI& uri) { @@ -240,15 +246,15 @@ class XdsResolver : public Resolver { // the cluster by the ConfigSelector. The ref for each call is held // until the call is committed. When the strong refs go away, we hop // back into the WorkSerializer to remove the entry from the map. - class ClusterRef : public DualRefCounted { + class ClusterState : public DualRefCounted { public: - ClusterRef(RefCountedPtr resolver, - absl::string_view cluster_name) + ClusterState(RefCountedPtr resolver, + absl::string_view cluster_name) : resolver_(std::move(resolver)), cluster_name_(cluster_name) {} void Orphan() override { - XdsResolver* resolver_ptr = resolver_.get(); - resolver_ptr->work_serializer_->Run( + auto* resolver = resolver_.get(); + resolver->work_serializer_->Run( [resolver = std::move(resolver_)]() { resolver->MaybeRemoveUnusedClusters(); }, @@ -262,49 +268,21 @@ class XdsResolver : public Resolver { std::string cluster_name_; }; - // A routing data including cluster refs and routes table held by the - // XdsConfigSelector. A ref to this map will be taken by each call processed - // by the XdsConfigSelector, stored in a the call's call attributes, and later - // unreffed by the ClusterSelection filter. - class RouteConfigData : public RefCounted { + // A map containing cluster refs held by the XdsConfigSelector. A ref to + // this map will be taken by each call processed by the XdsConfigSelector, + // stored in a the call's call attributes, and later unreffed + // by the ClusterSelection filter. + class XdsClusterMap : public RefCounted { public: - struct RouteEntry { - struct ClusterWeightState { - uint32_t range_end; - absl::string_view cluster; - RefCountedPtr method_config; - - bool operator==(const ClusterWeightState& other) const { - return range_end == other.range_end && cluster == other.cluster && - MethodConfigsEqual(method_config.get(), - other.method_config.get()); - } - }; - - XdsRouteConfigResource::Route route; - RefCountedPtr method_config; - std::vector weighted_cluster_state; - - explicit RouteEntry(const XdsRouteConfigResource::Route& r) : route(r) {} + explicit XdsClusterMap( + std::map> clusters) + : clusters_(std::move(clusters)) {} - bool operator==(const RouteEntry& other) const { - return route == other.route && - weighted_cluster_state == other.weighted_cluster_state && - MethodConfigsEqual(method_config.get(), - other.method_config.get()); - } - }; - - static absl::StatusOr> Create( - XdsResolver* resolver, - const std::vector& routes, - const Duration& default_max_stream_duration); - - bool operator==(const RouteConfigData& other) const { - return clusters_ == other.clusters_ && routes_ == other.routes_; + bool operator==(const XdsClusterMap& other) const { + return clusters_ == other.clusters_; } - RefCountedPtr FindClusterRef(absl::string_view name) const { + RefCountedPtr Find(absl::string_view name) const { auto it = clusters_.find(name); if (it == clusters_.end()) { return nullptr; @@ -312,36 +290,14 @@ class XdsResolver : public Resolver { return it->second; } - RouteEntry* GetRouteForRequest(absl::string_view path, - grpc_metadata_batch* initial_metadata); - private: - class RouteListIterator; - - static absl::StatusOr> CreateMethodConfig( - XdsResolver* resolver, const XdsRouteConfigResource::Route& route, - const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* - cluster_weight); - - static bool MethodConfigsEqual(const ServiceConfig* sc1, - const ServiceConfig* sc2) { - if (sc1 == nullptr) return sc2 == nullptr; - if (sc2 == nullptr) return false; - return sc1->json_string() == sc2->json_string(); - } - - absl::Status AddRouteEntry(const XdsRouteConfigResource::Route& route, - const Duration& default_max_stream_duration, - XdsResolver* resolver); - - std::map> clusters_; - std::vector routes_; + std::map> clusters_; }; class XdsConfigSelector : public ConfigSelector { public: XdsConfigSelector(RefCountedPtr resolver, - RefCountedPtr route_config_data); + absl::Status* status); ~XdsConfigSelector() override; const char* name() const override { return "XdsConfigSelector"; } @@ -349,8 +305,8 @@ class XdsResolver : public Resolver { bool Equals(const ConfigSelector* other) const override { const auto* other_xds = static_cast(other); // Don't need to compare resolver_, since that will always be the same. - return *route_config_data_ == *other_xds->route_config_data_ && - filters_ == other_xds->filters_; + return route_table_ == other_xds->route_table_ && + *cluster_map_ == *other_xds->cluster_map_; } absl::Status GetCallConfig(GetCallConfigArgs args) override; @@ -360,28 +316,34 @@ class XdsResolver : public Resolver { } private: - RefCountedPtr resolver_; - RefCountedPtr route_config_data_; - std::vector filters_; - }; + struct Route { + struct ClusterWeightState { + uint32_t range_end; + absl::string_view cluster; + RefCountedPtr method_config; - class XdsRouteStateAttributeImpl : public XdsRouteStateAttribute { - public: - explicit XdsRouteStateAttributeImpl( - RefCountedPtr route_config_data, - RouteConfigData::RouteEntry* route) - : route_config_data_(std::move(route_config_data)), route_(route) {} + bool operator==(const ClusterWeightState& other) const; + }; + + XdsRouteConfigResource::Route route; + RefCountedPtr method_config; + std::vector weighted_cluster_state; - // This method can be called only once. The first call will release - // the reference to the cluster map, and subsequent calls will return - // nullptr. - RefCountedPtr LockAndGetCluster(absl::string_view cluster_name); + bool operator==(const Route& other) const; + }; + using RouteTable = std::vector; - bool HasClusterForRoute(absl::string_view cluster_name) const override; + class RouteListIterator; - private: - RefCountedPtr route_config_data_; - RouteConfigData::RouteEntry* route_; + absl::StatusOr> CreateMethodConfig( + const XdsRouteConfigResource::Route& route, + const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* + cluster_weight); + + RefCountedPtr resolver_; + RouteTable route_table_; + RefCountedPtr cluster_map_; + std::vector filters_; }; class ClusterSelectionFilter : public ChannelFilter { @@ -395,7 +357,29 @@ class XdsResolver : public Resolver { // Construct a promise for one call. ArenaPromise MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) override; + CallArgs call_args, NextPromiseFactory next_promise_factory) override { + auto* service_config_call_data = + static_cast( + GetContext() + [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] + .value); + GPR_ASSERT(service_config_call_data != nullptr); + auto* cluster_data = static_cast( + service_config_call_data->GetCallAttribute( + XdsClusterMapAttribute::TypeName())); + auto* cluster_name_attribute = static_cast( + service_config_call_data->GetCallAttribute( + XdsClusterAttribute::TypeName())); + if (cluster_data != nullptr && cluster_name_attribute != nullptr) { + auto cluster = + cluster_data->LockAndGetCluster(cluster_name_attribute->cluster()); + if (cluster != nullptr) { + service_config_call_data->SetOnCommit( + [cluster = std::move(cluster)]() mutable { cluster.reset(); }); + } + } + return next_promise_factory(std::move(call_args)); + } private: explicit ClusterSelectionFilter(ChannelFilter::Args filter_args) @@ -404,17 +388,46 @@ class XdsResolver : public Resolver { ChannelFilter::Args filter_args_; }; - RefCountedPtr GetOrCreateClusterRef( + RefCountedPtr GetOrCreateClusterState( absl::string_view cluster_name) { - auto it = cluster_ref_map_.find(cluster_name); - if (it == cluster_ref_map_.end()) { - auto cluster = MakeRefCounted(Ref(), cluster_name); - cluster_ref_map_.emplace(cluster->cluster_name(), cluster->WeakRef()); + auto it = cluster_state_map_.find(cluster_name); + if (it == cluster_state_map_.end()) { + auto cluster = MakeRefCounted(Ref(), cluster_name); + cluster_state_map_.emplace(cluster->cluster_name(), cluster->WeakRef()); return cluster; } return it->second->Ref(); } + class XdsClusterMapAttribute + : public ServiceConfigCallData::CallAttributeInterface { + public: + static UniqueTypeName TypeName() { + static UniqueTypeName::Factory factory("xds_cluster_lb_data"); + return factory.Create(); + } + + explicit XdsClusterMapAttribute(RefCountedPtr cluster_map) + : cluster_map_(std::move(cluster_map)) {} + + // This method can be called only once. The first call will release the + // reference to the cluster map, and subsequent calls will return nullptr. + RefCountedPtr LockAndGetCluster( + absl::string_view cluster_name) { + if (cluster_map_ == nullptr) { + return nullptr; + } + auto cluster = cluster_map_->Find(cluster_name); + cluster_map_.reset(); + return cluster; + } + + UniqueTypeName type() const override { return TypeName(); } + + private: + RefCountedPtr cluster_map_; + }; + void OnListenerUpdate(XdsListenerResource listener); void OnRouteConfigUpdate(XdsRouteConfigResource rds_update); void OnError(absl::string_view context, absl::Status status); @@ -448,72 +461,188 @@ class XdsResolver : public Resolver { std::string /*LB policy config*/> cluster_specifier_plugin_map_; - std::map> cluster_ref_map_; + std::map> + cluster_state_map_; }; // -// XdsResolver::RouteConfigData::RouteListIterator +// XdsResolver::XdsConfigSelector::Route // +bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) { + if (sc1 == nullptr) return sc2 == nullptr; + if (sc2 == nullptr) return false; + return sc1->json_string() == sc2->json_string(); +} + +const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter = + MakePromiseBasedFilter( + "cluster_selection_filter"); + +bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==( + const ClusterWeightState& other) const { + return range_end == other.range_end && cluster == other.cluster && + MethodConfigsEqual(method_config.get(), other.method_config.get()); +} + +bool XdsResolver::XdsConfigSelector::Route::operator==( + const Route& other) const { + return route == other.route && + weighted_cluster_state == other.weighted_cluster_state && + MethodConfigsEqual(method_config.get(), other.method_config.get()); +} + // Implementation of XdsRouting::RouteListIterator for getting the matching // route for a request. -class XdsResolver::RouteConfigData::RouteListIterator +class XdsResolver::XdsConfigSelector::RouteListIterator : public XdsRouting::RouteListIterator { public: - explicit RouteListIterator(const RouteConfigData* route_table) + explicit RouteListIterator(const RouteTable* route_table) : route_table_(route_table) {} - size_t Size() const override { return route_table_->routes_.size(); } + size_t Size() const override { return route_table_->size(); } const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute( size_t index) const override { - return route_table_->routes_[index].route.matchers; + return (*route_table_)[index].route.matchers; } private: - const RouteConfigData* route_table_; + const RouteTable* route_table_; }; // -// XdsResolver::RouteConfigData +// XdsResolver::XdsConfigSelector // -absl::StatusOr> -XdsResolver::RouteConfigData::Create( - XdsResolver* resolver, - const std::vector& routes, - const Duration& default_max_stream_duration) { - auto data = MakeRefCounted(); +XdsResolver::XdsConfigSelector::XdsConfigSelector( + RefCountedPtr resolver, absl::Status* status) + : resolver_(std::move(resolver)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", + resolver_.get(), this); + } + // 1. Construct the route table + // 2 Update resolver's cluster state map + // 3. Construct cluster list to hold on to entries in the cluster state + // map. // Reserve the necessary entries up-front to avoid reallocation as we add // elements. This is necessary because the string_view in the entry's // weighted_cluster_state field points to the memory in the route field, so // moving the entry in a reallocation will cause the string_view to point to // invalid data. - data->routes_.reserve(routes.size()); - for (auto& route : routes) { - absl::Status status = - data->AddRouteEntry(route, default_max_stream_duration, resolver); - if (!status.ok()) { - return status; + route_table_.reserve(resolver_->current_virtual_host_->routes.size()); + std::map> clusters; + auto maybe_add_cluster = [&](absl::string_view cluster_name) { + if (clusters.find(cluster_name) != clusters.end()) return; + auto cluster_state = resolver_->GetOrCreateClusterState(cluster_name); + absl::string_view name = cluster_state->cluster_name(); + clusters.emplace(name, std::move(cluster_state)); + }; + for (auto& route : resolver_->current_virtual_host_->routes) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", + resolver_.get(), this, route.ToString().c_str()); + } + route_table_.emplace_back(); + auto& route_entry = route_table_.back(); + route_entry.route = route; + auto* route_action = + absl::get_if( + &route_entry.route.action); + if (route_action != nullptr) { + // If the route doesn't specify a timeout, set its timeout to the global + // one. + if (!route_action->max_stream_duration.has_value()) { + route_action->max_stream_duration = + resolver_->current_listener_.http_max_stream_duration; + } + Match( + route_action->action, + // cluster name + [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& + cluster_name) { + auto result = CreateMethodConfig(route_entry.route, nullptr); + if (!result.ok()) { + *status = result.status(); + return; + } + route_entry.method_config = std::move(*result); + maybe_add_cluster( + absl::StrCat("cluster:", cluster_name.cluster_name)); + }, + // WeightedClusters + [&](const std::vector< + XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& + weighted_clusters) { + uint32_t end = 0; + for (const auto& weighted_cluster : weighted_clusters) { + Route::ClusterWeightState cluster_weight_state; + auto result = + CreateMethodConfig(route_entry.route, &weighted_cluster); + if (!result.ok()) { + *status = result.status(); + return; + } + cluster_weight_state.method_config = std::move(*result); + end += weighted_cluster.weight; + cluster_weight_state.range_end = end; + cluster_weight_state.cluster = weighted_cluster.name; + route_entry.weighted_cluster_state.push_back( + std::move(cluster_weight_state)); + maybe_add_cluster( + absl::StrCat("cluster:", weighted_cluster.name)); + } + }, + // ClusterSpecifierPlugin + [&](const XdsRouteConfigResource::Route::RouteAction:: + ClusterSpecifierPluginName& cluster_specifier_plugin_name) { + auto result = CreateMethodConfig(route_entry.route, nullptr); + if (!result.ok()) { + *status = result.status(); + return; + } + route_entry.method_config = std::move(*result); + maybe_add_cluster(absl::StrCat( + "cluster_specifier_plugin:", + cluster_specifier_plugin_name.cluster_specifier_plugin_name)); + }); + if (!status->ok()) return; + } + } + cluster_map_ = MakeRefCounted(std::move(clusters)); + // Populate filter list. + const auto& http_filter_registry = + static_cast(resolver_->xds_client_->bootstrap()) + .http_filter_registry(); + for (const auto& http_filter : resolver_->current_listener_.http_filters) { + // Find filter. This is guaranteed to succeed, because it's checked + // at config validation time in the XdsApi code. + const XdsHttpFilterImpl* filter_impl = + http_filter_registry.GetFilterForType( + http_filter.config.config_proto_type_name); + GPR_ASSERT(filter_impl != nullptr); + // Add C-core filter to list. + if (filter_impl->channel_filter() != nullptr) { + filters_.push_back(filter_impl->channel_filter()); } } - return data; + filters_.push_back(&ClusterSelectionFilter::kFilter); } -XdsResolver::RouteConfigData::RouteEntry* -XdsResolver::RouteConfigData::GetRouteForRequest( - absl::string_view path, grpc_metadata_batch* initial_metadata) { - auto route_index = XdsRouting::GetRouteForRequest(RouteListIterator(this), - path, initial_metadata); - if (!route_index.has_value()) { - return nullptr; +XdsResolver::XdsConfigSelector::~XdsConfigSelector() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", + resolver_.get(), this); } - return &routes_[*route_index]; + cluster_map_.reset(); + resolver_->MaybeRemoveUnusedClusters(); } absl::StatusOr> -XdsResolver::RouteConfigData::CreateMethodConfig( - XdsResolver* resolver, const XdsRouteConfigResource::Route& route, +XdsResolver::XdsConfigSelector::CreateMethodConfig( + const XdsRouteConfigResource::Route& route, const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* cluster_weight) { std::vector fields; @@ -565,11 +694,11 @@ XdsResolver::RouteConfigData::CreateMethodConfig( } // Handle xDS HTTP filters. auto result = XdsRouting::GeneratePerHTTPFilterConfigs( - static_cast(resolver->xds_client_->bootstrap()) + static_cast(resolver_->xds_client_->bootstrap()) .http_filter_registry(), - resolver->current_listener_.http_filters, - resolver->current_virtual_host_.value(), route, cluster_weight, - resolver->args_); + resolver_->current_listener_.http_filters, + resolver_->current_virtual_host_.value(), route, cluster_weight, + resolver_->args_); if (!result.ok()) return result.status(); for (const auto& p : result->per_filter_configs) { fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n", @@ -593,128 +722,6 @@ XdsResolver::RouteConfigData::CreateMethodConfig( return nullptr; } -absl::Status XdsResolver::RouteConfigData::AddRouteEntry( - const XdsRouteConfigResource::Route& route, - const Duration& default_max_stream_duration, XdsResolver* resolver) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", - resolver, this, route.ToString().c_str()); - } - routes_.emplace_back(route); - auto* route_entry = &routes_.back(); - auto maybe_add_cluster = [&](absl::string_view cluster_name) { - if (clusters_.find(cluster_name) != clusters_.end()) return; - auto cluster_state = resolver->GetOrCreateClusterRef(cluster_name); - absl::string_view name = cluster_state->cluster_name(); - clusters_.emplace(name, std::move(cluster_state)); - }; - auto* route_action = absl::get_if( - &route_entry->route.action); - if (route_action != nullptr) { - // If the route doesn't specify a timeout, set its timeout to the global - // one. - if (!route_action->max_stream_duration.has_value()) { - route_action->max_stream_duration = default_max_stream_duration; - } - absl::Status status = Match( - route_action->action, - // cluster name - [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& - cluster_name) { - auto result = - CreateMethodConfig(resolver, route_entry->route, nullptr); - if (!result.ok()) { - return result.status(); - } - route_entry->method_config = std::move(*result); - maybe_add_cluster( - absl::StrCat("cluster:", cluster_name.cluster_name)); - return absl::OkStatus(); - }, - // WeightedClusters - [&](const std::vector< - XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& - weighted_clusters) { - uint32_t end = 0; - for (const auto& weighted_cluster : weighted_clusters) { - auto result = CreateMethodConfig(resolver, route_entry->route, - &weighted_cluster); - if (!result.ok()) { - return result.status(); - } - RouteEntry::ClusterWeightState cluster_weight_state; - cluster_weight_state.method_config = std::move(*result); - end += weighted_cluster.weight; - cluster_weight_state.range_end = end; - cluster_weight_state.cluster = weighted_cluster.name; - route_entry->weighted_cluster_state.push_back( - std::move(cluster_weight_state)); - maybe_add_cluster(absl::StrCat("cluster:", weighted_cluster.name)); - } - return absl::OkStatus(); - }, - // ClusterSpecifierPlugin - [&](const XdsRouteConfigResource::Route::RouteAction:: - ClusterSpecifierPluginName& cluster_specifier_plugin_name) { - auto result = - CreateMethodConfig(resolver, route_entry->route, nullptr); - if (!result.ok()) { - return result.status(); - } - route_entry->method_config = std::move(*result); - maybe_add_cluster(absl::StrCat( - "cluster_specifier_plugin:", - cluster_specifier_plugin_name.cluster_specifier_plugin_name)); - return absl::OkStatus(); - }); - if (!status.ok()) { - return status; - } - } - return absl::OkStatus(); -} - -// -// XdsResolver::XdsConfigSelector -// - -XdsResolver::XdsConfigSelector::XdsConfigSelector( - RefCountedPtr resolver, - RefCountedPtr route_config_data) - : resolver_(std::move(resolver)), - route_config_data_(std::move(route_config_data)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", - resolver_.get(), this); - } - // Populate filter list. - const auto& http_filter_registry = - static_cast(resolver_->xds_client_->bootstrap()) - .http_filter_registry(); - for (const auto& http_filter : resolver_->current_listener_.http_filters) { - // Find filter. This is guaranteed to succeed, because it's checked - // at config validation time in the XdsApi code. - const XdsHttpFilterImpl* filter_impl = - http_filter_registry.GetFilterForType( - http_filter.config.config_proto_type_name); - GPR_ASSERT(filter_impl != nullptr); - // Add C-core filter to list. - if (filter_impl->channel_filter() != nullptr) { - filters_.push_back(filter_impl->channel_filter()); - } - } - filters_.push_back(&ClusterSelectionFilter::kFilter); -} - -XdsResolver::XdsConfigSelector::~XdsConfigSelector() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", - resolver_.get(), this); - } - route_config_data_.reset(); - resolver_->MaybeRemoveUnusedClusters(); -} - absl::optional HeaderHashHelper( const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header& header_policy, @@ -740,16 +747,18 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( GetCallConfigArgs args) { Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata()); GPR_ASSERT(path != nullptr); - auto* entry = route_config_data_->GetRouteForRequest(path->as_string_view(), - args.initial_metadata); - if (entry == nullptr) { + auto route_index = XdsRouting::GetRouteForRequest( + RouteListIterator(&route_table_), path->as_string_view(), + args.initial_metadata); + if (!route_index.has_value()) { return absl::UnavailableError( "No matching route found in xDS route config"); } + auto& entry = route_table_[*route_index]; // Found a route match const auto* route_action = absl::get_if( - &entry->route.action); + &entry.route.action); if (route_action == nullptr) { return absl::UnavailableError("Matching route has inappropriate action"); } @@ -762,24 +771,24 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( action_cluster_name) { cluster_name = absl::StrCat("cluster:", action_cluster_name.cluster_name); - method_config = entry->method_config; + method_config = entry.method_config; }, // WeightedClusters [&](const std::vector< XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& /*weighted_clusters*/) { const uint32_t key = absl::Uniform( - absl::BitGen(), 0, entry->weighted_cluster_state.back().range_end); + absl::BitGen(), 0, entry.weighted_cluster_state.back().range_end); // Find the index in weighted clusters corresponding to key. size_t mid = 0; size_t start_index = 0; - size_t end_index = entry->weighted_cluster_state.size() - 1; + size_t end_index = entry.weighted_cluster_state.size() - 1; size_t index = 0; while (end_index > start_index) { mid = (start_index + end_index) / 2; - if (entry->weighted_cluster_state[mid].range_end > key) { + if (entry.weighted_cluster_state[mid].range_end > key) { end_index = mid; - } else if (entry->weighted_cluster_state[mid].range_end < key) { + } else if (entry.weighted_cluster_state[mid].range_end < key) { start_index = mid + 1; } else { index = mid + 1; @@ -787,10 +796,10 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( } } if (index == 0) index = start_index; - GPR_ASSERT(entry->weighted_cluster_state[index].range_end > key); + GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key); cluster_name = absl::StrCat( - "cluster:", entry->weighted_cluster_state[index].cluster); - method_config = entry->weighted_cluster_state[index].method_config; + "cluster:", entry.weighted_cluster_state[index].cluster); + method_config = entry.weighted_cluster_state[index].method_config; }, // ClusterSpecifierPlugin [&](const XdsRouteConfigResource::Route::RouteAction:: @@ -798,9 +807,9 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( cluster_name = absl::StrCat( "cluster_specifier_plugin:", cluster_specifier_plugin_name.cluster_specifier_plugin_name); - method_config = entry->method_config; + method_config = entry.method_config; }); - auto cluster = route_config_data_->FindClusterRef(cluster_name); + auto cluster = cluster_map_->Find(cluster_name); GPR_ASSERT(cluster != nullptr); // Generate a hash. absl::optional hash; @@ -848,85 +857,10 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( args.service_config_call_data->SetCallAttribute( args.arena->New(hash_value)); args.service_config_call_data->SetCallAttribute( - args.arena->ManagedNew(route_config_data_, - entry)); + args.arena->ManagedNew(cluster_map_)); return absl::OkStatus(); } -// -// XdsResolver::XdsRouteStateAttributeImpl -// - -bool XdsResolver::XdsRouteStateAttributeImpl::HasClusterForRoute( - absl::string_view cluster_name) const { - // Found a route match - const auto* route_action = - absl::get_if( - &static_cast(route_)->route.action); - if (route_action == nullptr) return false; - return Match( - route_action->action, - [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& name) { - return name.cluster_name == cluster_name; - }, - [&](const std::vector< - XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& - clusters) { - for (const auto& cluster : clusters) { - if (cluster.name == cluster_name) { - return true; - } - } - return false; - }, - [&](const XdsRouteConfigResource::Route::RouteAction:: - ClusterSpecifierPluginName& /* name */) { return false; }); -} - -RefCountedPtr -XdsResolver::XdsRouteStateAttributeImpl::LockAndGetCluster( - absl::string_view cluster_name) { - if (route_config_data_ == nullptr) { - return nullptr; - } - auto cluster = route_config_data_->FindClusterRef(cluster_name); - route_config_data_.reset(); - return cluster; -} - -// -// XdsResolver::ClusterSelectionFilter -// - -const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter = - MakePromiseBasedFilter( - "cluster_selection_filter"); - -ArenaPromise -XdsResolver::ClusterSelectionFilter::MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) { - auto* service_config_call_data = - static_cast( - GetContext() - [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] - .value); - GPR_ASSERT(service_config_call_data != nullptr); - auto* route_state_attribute = static_cast( - service_config_call_data->GetCallAttribute()); - auto* cluster_name_attribute = - service_config_call_data->GetCallAttribute(); - if (route_state_attribute != nullptr && cluster_name_attribute != nullptr) { - auto cluster = route_state_attribute->LockAndGetCluster( - cluster_name_attribute->cluster()); - if (cluster != nullptr) { - service_config_call_data->SetOnCommit( - [cluster = std::move(cluster)]() mutable { cluster.reset(); }); - } - } - return next_promise_factory(std::move(call_args)); -} - // // XdsResolver // @@ -1082,6 +1016,7 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) { }); } +namespace { class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator { public: explicit VirtualHostListIterator( @@ -1098,6 +1033,7 @@ class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator { private: const std::vector* virtual_hosts_; }; +} // namespace void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { @@ -1164,7 +1100,7 @@ void XdsResolver::OnResourceDoesNotExist(std::string context) { absl::StatusOr> XdsResolver::CreateServiceConfig() { std::vector clusters; - for (const auto& cluster : cluster_ref_map_) { + for (const auto& cluster : cluster_state_map_) { absl::string_view child_name = cluster.first; if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) { clusters.push_back(absl::StrFormat( @@ -1206,16 +1142,13 @@ void XdsResolver::GenerateResult() { if (!current_virtual_host_.has_value()) return; // First create XdsConfigSelector, which may add new entries to the cluster // state map, and then CreateServiceConfig for LB policies. - auto route_config_data = - RouteConfigData::Create(this, current_virtual_host_->routes, - current_listener_.http_max_stream_duration); - if (!route_config_data.ok()) { + absl::Status status; + auto config_selector = MakeRefCounted(Ref(), &status); + if (!status.ok()) { OnError("could not create ConfigSelector", - absl::UnavailableError(route_config_data.status().message())); + absl::UnavailableError(status.message())); return; } - auto config_selector = - MakeRefCounted(Ref(), std::move(*route_config_data)); Result result; result.addresses.emplace(); result.service_config = CreateServiceConfig(); @@ -1236,13 +1169,13 @@ void XdsResolver::GenerateResult() { void XdsResolver::MaybeRemoveUnusedClusters() { bool update_needed = false; - for (auto it = cluster_ref_map_.begin(); it != cluster_ref_map_.end();) { - RefCountedPtr cluster_state = it->second->RefIfNonZero(); + for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) { + RefCountedPtr cluster_state = it->second->RefIfNonZero(); if (cluster_state != nullptr) { ++it; } else { update_needed = true; - it = cluster_ref_map_.erase(it); + it = cluster_state_map_.erase(it); } } if (update_needed && xds_client_ != nullptr) { @@ -1252,7 +1185,7 @@ void XdsResolver::MaybeRemoveUnusedClusters() { } // -// XdsResolverFactory +// Factory // class XdsResolverFactory : public ResolverFactory { @@ -1277,6 +1210,7 @@ class XdsResolverFactory : public ResolverFactory { return MakeOrphanable(std::move(args)); } }; + } // namespace void RegisterXdsResolver(CoreConfiguration::Builder* builder) { diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h index 93c48ac0d7d..ff6e4523781 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h @@ -29,33 +29,17 @@ namespace grpc_core { class XdsClusterAttribute : public ServiceConfigCallData::CallAttributeInterface { public: - static UniqueTypeName TypeName() { - static UniqueTypeName::Factory kFactory("xds_cluster_name"); - return kFactory.Create(); - } + static UniqueTypeName TypeName(); explicit XdsClusterAttribute(absl::string_view cluster) : cluster_(cluster) {} absl::string_view cluster() const { return cluster_; } - void set_cluster(absl::string_view cluster) { cluster_ = cluster; } private: UniqueTypeName type() const override { return TypeName(); } absl::string_view cluster_; }; - -class XdsRouteStateAttribute - : public ServiceConfigCallData::CallAttributeInterface { - public: - static UniqueTypeName TypeName() { - static UniqueTypeName::Factory factory("xds_cluster_lb_data"); - return factory.Create(); - } - - virtual bool HasClusterForRoute(absl::string_view cluster_name) const = 0; - UniqueTypeName type() const override { return TypeName(); } -}; } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.cc b/src/core/ext/filters/stateful_session/stateful_session_filter.cc index 315e7e5ae62..66d6bad0cc9 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.cc +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.cc @@ -33,13 +33,11 @@ #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "absl/strings/string_view.h" -#include "absl/strings/strip.h" #include "absl/types/optional.h" #include #include -#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h" #include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" @@ -85,142 +83,35 @@ StatefulSessionFilter::StatefulSessionFilter(ChannelFilter::Args filter_args) namespace { -absl::string_view AllocateStringOnArena( - absl::string_view src1, absl::string_view src2 = absl::string_view()) { - if (src1.empty() && src2.empty()) { - return absl::string_view(); - } - char* arena_allocated_value = - static_cast(GetContext()->Alloc(src1.size() + src2.size())); - memcpy(arena_allocated_value, src1.data(), src1.size()); - if (!src2.empty()) { - memcpy(arena_allocated_value + src1.size(), src2.data(), src2.size()); - } - return absl::string_view(arena_allocated_value, src1.size() + src2.size()); -} - // Adds the set-cookie header to the server initial metadata if needed. void MaybeUpdateServerInitialMetadata( const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config, - bool cluster_changed, absl::string_view host_override, - absl::string_view actual_cluster, ServerMetadata* server_initial_metadata) { + absl::optional cookie_value, + ServerMetadata* server_initial_metadata) { // Get peer string. Slice* peer_string = server_initial_metadata->get_pointer(PeerString()); - if (peer_string == nullptr) { - // No changes, keep the same set-cookie header. - return; - } - if (host_override == peer_string->as_string_view() && !cluster_changed) { - return; - } - std::string new_value(peer_string->as_string_view()); - if (!actual_cluster.empty()) { - absl::StrAppend(&new_value, ";", actual_cluster); - } - std::vector parts = {absl::StrCat( - *cookie_config->name, "=", absl::Base64Escape(new_value), "; HttpOnly")}; - if (!cookie_config->path.empty()) { - parts.emplace_back(absl::StrCat("Path=", cookie_config->path)); - } - if (cookie_config->ttl > Duration::Zero()) { - parts.emplace_back( - absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec)); - } - server_initial_metadata->Append( - "set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")), - [](absl::string_view error, const Slice&) { - Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error)); - }); -} - -// Returns an arena-allocated string containing the cluster name -// to use for this RPC, which will live long enough to use when modifying -// the server's initial metadata. If cluster_from_cookie is non-empty and -// points to a cluster present in the selected route, uses that; otherwise, -// uses the cluster selected by the XdsConfigSelector. -// Returns the empty string if cluster override cannot be used (i.e., the route -// uses a cluster specifier plugin). -absl::string_view GetClusterToUse( - absl::string_view cluster_from_cookie, - ServiceConfigCallData* service_config_call_data) { - // Get cluster assigned by the XdsConfigSelector. - auto cluster_attribute = - service_config_call_data->GetCallAttribute(); - GPR_ASSERT(cluster_attribute != nullptr); - auto current_cluster = cluster_attribute->cluster(); - static constexpr absl::string_view kClusterPrefix = "cluster:"; - // If prefix is not "cluster:", then we can't use cluster override. - if (!absl::ConsumePrefix(¤t_cluster, kClusterPrefix)) { - return absl::string_view(); - } - // No cluster in cookie, use the cluster from the attribute - if (cluster_from_cookie.empty()) { - return AllocateStringOnArena(current_cluster); - } - // Use cluster from the cookie if it is configured for the route. - auto route_data = - service_config_call_data->GetCallAttribute(); - GPR_ASSERT(route_data != nullptr); - // Cookie cluster was not configured for route - use the one from the - // attribute - if (!route_data->HasClusterForRoute(cluster_from_cookie)) { - return AllocateStringOnArena(current_cluster); - } - auto arena_allocated_cluster = - AllocateStringOnArena(kClusterPrefix, cluster_from_cookie); - // Update the cluster name attribute with an arena allocated value. - cluster_attribute->set_cluster(arena_allocated_cluster); - return absl::StripPrefix(arena_allocated_cluster, kClusterPrefix); -} - -std::string GetCookieValue(const ClientMetadataHandle& client_initial_metadata, - absl::string_view cookie_name) { - // Check to see if the cookie header is present. - std::string buffer; - auto header_value = - client_initial_metadata->GetStringValue("cookie", &buffer); - if (!header_value.has_value()) return ""; - // Parse cookie header. - std::vector values; - for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) { - std::pair kv = - absl::StrSplit(cookie, absl::MaxSplits('=', 1)); - if (kv.first == cookie_name) values.push_back(kv.second); + if (peer_string == nullptr) return; // Nothing we can do. + // If there was no cookie or if the address changed, set the cookie. + if (!cookie_value.has_value() || + peer_string->as_string_view() != *cookie_value) { + std::vector parts = {absl::StrCat( + *cookie_config->name, "=", + absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")}; + if (!cookie_config->path.empty()) { + parts.emplace_back(absl::StrCat("Path=", cookie_config->path)); + } + if (cookie_config->ttl > Duration::Zero()) { + parts.emplace_back( + absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec)); + } + server_initial_metadata->Append( + "set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")), + [](absl::string_view error, const Slice&) { + Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error)); + }); } - if (values.empty()) return ""; - // TODO(roth): Figure out the right behavior for multiple cookies. - // For now, just choose the first value. - std::string decoded; - if (absl::Base64Unescape(values.front(), &decoded)) { - return decoded; - } - return ""; } -bool IsConfiguredPath(absl::string_view configured_path, - const ClientMetadataHandle& client_initial_metadata) { - // No path configured meaning all paths match - if (configured_path.empty()) { - return true; - } - // Check to see if the configured path matches the request path. - Slice* path_slice = client_initial_metadata->get_pointer(HttpPathMetadata()); - GPR_ASSERT(path_slice != nullptr); - absl::string_view path = path_slice->as_string_view(); - // Matching criteria from - // https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4. - // The cookie-path is a prefix of the request-path (and) - if (!absl::StartsWith(path, configured_path)) { - return false; - } - // One of - // 1. The cookie-path and the request-path are identical. - // 2. The last character of the cookie-path is %x2F ("/"). - // 3. The first character of the request-path that is not included - // in the cookie-path is a %x2F ("/") character. - return path.length() == configured_path.length() || - configured_path.back() == '/' || path[configured_path.length()] == '/'; -} } // namespace // Construct a promise for one call. @@ -238,57 +129,90 @@ ArenaPromise StatefulSessionFilter::MakeCallPromise( GPR_ASSERT(method_params != nullptr); auto* cookie_config = method_params->GetConfig(index_); GPR_ASSERT(cookie_config != nullptr); - if (!cookie_config->name.has_value() || - !IsConfiguredPath(cookie_config->path, - call_args.client_initial_metadata)) { + if (!cookie_config->name.has_value()) { return next_promise_factory(std::move(call_args)); } - // Base64-decode cookie value. - std::string cookie_value = - GetCookieValue(call_args.client_initial_metadata, *cookie_config->name); - // Cookie format is "host;cluster" - std::pair host_cluster = - absl::StrSplit(cookie_value, absl::MaxSplits(';', 1)); - absl::string_view host_override; - // Set override host attribute. Allocate the string on the - // arena, so that it has the right lifetime. - if (!host_cluster.first.empty()) { - host_override = AllocateStringOnArena(host_cluster.first); + // We have a config. + // If the config has a path, check to see if it matches the request path. + if (!cookie_config->path.empty()) { + Slice* path_slice = + call_args.client_initial_metadata->get_pointer(HttpPathMetadata()); + GPR_ASSERT(path_slice != nullptr); + absl::string_view path = path_slice->as_string_view(); + // Matching criteria from + // https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4. + if (!absl::StartsWith(path, cookie_config->path) || + (path.size() != cookie_config->path.size() && + cookie_config->path.back() != '/' && + path[cookie_config->path.size() + 1] != '/')) { + return next_promise_factory(std::move(call_args)); + } + } + // Check to see if we have a host override cookie. + auto cookie_value = GetOverrideHostFromCookie( + call_args.client_initial_metadata, *cookie_config->name); + if (cookie_value.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_stateful_session_filter_trace)) { + gpr_log(GPR_INFO, + "chand=%p: stateful session filter found cookie %s value %s", + this, cookie_config->name->c_str(), + std::string(*cookie_value).c_str()); + } + // We have a valid cookie, so add the call attribute to be used by the + // xds_override_host LB policy. service_config_call_data->SetCallAttribute( - GetContext()->New(host_override)); + GetContext()->New(*cookie_value)); } - // Check if the cluster override is valid, and apply it if necessary. - // Note that cluster_name will point to an arena-allocated string - // that will still be alive when we see the server initial metadata. - // If the cluster name is empty, that means we cannot use a - // cluster override (i.e., the route uses a cluster specifier plugin). - absl::string_view cluster_name = - GetClusterToUse(host_cluster.second, service_config_call_data); - bool cluster_changed = cluster_name != host_cluster.second; // Intercept server initial metadata. call_args.server_initial_metadata->InterceptAndMap( - [cookie_config, cluster_changed, host_override, - cluster_name](ServerMetadataHandle md) { + [cookie_config, cookie_value](ServerMetadataHandle md) { // Add cookie to server initial metadata if needed. - MaybeUpdateServerInitialMetadata(cookie_config, cluster_changed, - host_override, cluster_name, md.get()); + MaybeUpdateServerInitialMetadata(cookie_config, cookie_value, md.get()); return md; }); return Map(next_promise_factory(std::move(call_args)), - [cookie_config, cluster_changed, host_override, - cluster_name](ServerMetadataHandle md) { + [cookie_config, cookie_value](ServerMetadataHandle md) { // If we got a Trailers-Only response, then add the // cookie to the trailing metadata instead of the // initial metadata. if (md->get(GrpcTrailersOnly()).value_or(false)) { - MaybeUpdateServerInitialMetadata( - cookie_config, cluster_changed, host_override, - cluster_name, md.get()); + MaybeUpdateServerInitialMetadata(cookie_config, cookie_value, + md.get()); } return md; }); } +absl::optional +StatefulSessionFilter::GetOverrideHostFromCookie( + const ClientMetadataHandle& client_initial_metadata, + absl::string_view cookie_name) { + // Check to see if the cookie header is present. + std::string buffer; + auto header_value = + client_initial_metadata->GetStringValue("cookie", &buffer); + if (!header_value.has_value()) return absl::nullopt; + // Parse cookie header. + std::vector values; + for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) { + std::pair kv = + absl::StrSplit(cookie, absl::MaxSplits('=', 1)); + if (kv.first == cookie_name) values.push_back(kv.second); + } + if (values.empty()) return absl::nullopt; + // TODO(roth): Figure out the right behavior for multiple cookies. + // For now, just choose the first value. + absl::string_view value = values.front(); + // Base64-decode it. + std::string decoded_value; + if (!absl::Base64Unescape(value, &decoded_value)) return absl::nullopt; + // Copy it into the arena, since it will need to persist until the LB pick. + char* arena_value = + static_cast(GetContext()->Alloc(decoded_value.size())); + memcpy(arena_value, decoded_value.c_str(), decoded_value.size()); + return absl::string_view(arena_value, decoded_value.size()); +} + void StatefulSessionFilterRegister(CoreConfiguration::Builder* builder) { StatefulSessionServiceConfigParser::Register(builder); } diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.h b/src/core/ext/filters/stateful_session/stateful_session_filter.h index 2fea7f47391..c45d98db6ad 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.h +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.h @@ -23,6 +23,7 @@ #include "absl/status/statusor.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" @@ -64,6 +65,11 @@ class StatefulSessionFilter : public ChannelFilter { private: explicit StatefulSessionFilter(ChannelFilter::Args filter_args); + + absl::optional GetOverrideHostFromCookie( + const ClientMetadataHandle& initial_metadata, + absl::string_view cookie_name); + // The relative index of instances of the same filter. const size_t index_; // Index of the service config parser. diff --git a/src/core/lib/service_config/service_config_call_data.h b/src/core/lib/service_config/service_config_call_data.h index 661f92755d4..61f387ea45a 100644 --- a/src/core/lib/service_config/service_config_call_data.h +++ b/src/core/lib/service_config/service_config_call_data.h @@ -88,11 +88,6 @@ class ServiceConfigCallData { call_attributes_.EmplaceBack(value); } - template - A* GetCallAttribute() const { - return static_cast(GetCallAttribute(A::TypeName())); - } - CallAttributeInterface* GetCallAttribute(UniqueTypeName type) const { for (CallAttributeInterface* attribute : call_attributes_) { if (attribute->type() == type) return attribute; diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index d31329f5e71..938bb66cce3 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -440,7 +440,6 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:grpc++", - "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", "//src/proto/grpc/testing/xds/v3:router_proto", "//src/proto/grpc/testing/xds/v3:stateful_session_cookie_proto", "//src/proto/grpc/testing/xds/v3:stateful_session_proto", diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index 145f1f201ef..bc6c55dd6cd 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -631,7 +631,7 @@ XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index, } ClusterLoadAssignment XdsEnd2endTest::BuildEdsResource( - const EdsResourceArgs& args, absl::string_view eds_service_name) { + const EdsResourceArgs& args, const char* eds_service_name) { ClusterLoadAssignment assignment; assignment.set_cluster_name(eds_service_name); for (const auto& locality : args.locality_list) { diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index 2602840aa22..ad9ade0fff5 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -654,7 +654,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // Constructs an EDS resource. ClusterLoadAssignment BuildEdsResource( const EdsResourceArgs& args, - absl::string_view eds_service_name = kDefaultEdsServiceName); + const char* eds_service_name = kDefaultEdsServiceName); // // Backend management diff --git a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc index 512493693b3..a3053dd51e5 100644 --- a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc @@ -27,7 +27,6 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/lib/config/config_vars.h" #include "src/core/lib/gprpp/match.h" -#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/outlier_detection.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h" @@ -119,24 +118,19 @@ class OverrideHostTest : public XdsEnd2endTest { return listener; } - // Send requests until a desired backend is hit and returns cookie name/value - // pairs. Empty collection is returned if the backend was never hit. - // For weighted clusters, more than one request per backend may be necessary - // to obtain the cookie. max_requests_per_backend argument specifies - // the number of requests per backend to send. std::vector> GetAffinityCookieHeaderForBackend(grpc_core::DebugLocation debug_location, size_t backend_index, - size_t max_requests_per_backend = 1) { + RpcOptions rpc_options = RpcOptions()) { EXPECT_LT(backend_index, backends_.size()); if (backend_index >= backends_.size()) { return {}; } const auto& backend = backends_[backend_index]; - for (size_t i = 0; i < max_requests_per_backend * backends_.size(); ++i) { + for (size_t i = 0; i < backends_.size(); ++i) { std::multimap server_initial_metadata; grpc::Status status = - SendRpc(RpcOptions(), nullptr, &server_initial_metadata); + SendRpc(rpc_options, nullptr, &server_initial_metadata); EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << ", message=" << status.error_message() << "\n" @@ -156,44 +150,6 @@ class OverrideHostTest : public XdsEnd2endTest { << "Desired backend had not been hit"; return {}; } - - void SetClusterResource(absl::string_view cluster_name, - absl::string_view eds_resource_name) { - Cluster cluster = default_cluster_; - cluster.set_name(cluster_name); - cluster.mutable_eds_cluster_config()->set_service_name(eds_resource_name); - balancer_->ads_service()->SetCdsResource(cluster); - } - - RouteConfiguration BuildRouteConfigurationWithWeightedClusters( - const std::map clusters) { - RouteConfiguration new_route_config = default_route_config_; - auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); - for (const auto& cluster : clusters) { - auto* weighted_cluster = - route1->mutable_route()->mutable_weighted_clusters()->add_clusters(); - weighted_cluster->set_name(cluster.first); - weighted_cluster->mutable_weight()->set_value(cluster.second); - } - return new_route_config; - } - - void SetCdsAndEdsResources(absl::string_view cluster_name, - absl::string_view eds_service_name, - size_t start_index, size_t end_index) { - balancer_->ads_service()->SetEdsResource(BuildEdsResource( - EdsResourceArgs({{"locality0", - CreateEndpointsForBackends(start_index, end_index)}}), - eds_service_name)); - SetClusterResource(cluster_name, eds_service_name); - } - - static double BackendRequestPercentage( - const std::unique_ptr& backend, - size_t num_requests) { - return static_cast(backend->backend_service()->request_count()) / - num_requests; - } }; INSTANTIATE_TEST_SUITE_P(XdsTest, OverrideHostTest, @@ -311,132 +267,6 @@ TEST_P(OverrideHostTest, DrainingExcludedFromOverrideSet) { EXPECT_EQ(2, backends_[2]->backend_service()->request_count()); ResetBackendCounters(); } - -TEST_P(OverrideHostTest, OverrideWithWeightedClusters) { - CreateAndStartBackends(3); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - const uint32_t kWeight1 = 1; - const uint32_t kWeight2 = 3; - const double kErrorTolerance = 0.025; - const size_t kNumEchoRpcs = ComputeIdealNumRpcs( - static_cast(kWeight1) / (kWeight1 + kWeight2), kErrorTolerance); - // Populate EDS and CDS resources. - SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); - SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3); - // Populating Route Configurations for LDS. - SetListenerAndRouteConfiguration( - balancer_.get(), BuildListenerWithStatefulSessionFilter(), - BuildRouteConfigurationWithWeightedClusters( - {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); - WaitForAllBackends(DEBUG_LOCATION, 0, 3); - // Get cookie - auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3); - ASSERT_FALSE(session_cookie.empty()); - // All requests go to the backend we requested. - CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, - RpcOptions().set_metadata(session_cookie)); - EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0); - EXPECT_EQ(backends_[1]->backend_service()->request_count(), kNumEchoRpcs); - EXPECT_EQ(backends_[2]->backend_service()->request_count(), 0); -} - -TEST_P(OverrideHostTest, ClusterOverrideHonoredButHostGone) { - CreateAndStartBackends(4); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - const uint32_t kWeight1 = 1; - const uint32_t kWeight2 = 3; - const double kErrorTolerance = 0.025; - const double kWeight2Percent = - static_cast(kWeight2) / (kWeight1 + kWeight2); - const size_t kNumEchoRpcs = - ComputeIdealNumRpcs(kWeight2Percent, kErrorTolerance); - // Populate EDS and CDS resources. - SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); - SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3); - // Populating Route Configurations for LDS. - SetListenerAndRouteConfiguration( - balancer_.get(), BuildListenerWithStatefulSessionFilter(), - BuildRouteConfigurationWithWeightedClusters( - {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); - WaitForAllBackends(DEBUG_LOCATION, 0, 3); - auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3); - ASSERT_FALSE(session_cookie.empty()); - // Remove backends[1] from cluster2 - balancer_->ads_service()->SetEdsResource(BuildEdsResource( - EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}}), - kNewEdsService2Name)); - WaitForAllBackends(DEBUG_LOCATION, 3, 4); - CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, - RpcOptions().set_metadata(session_cookie)); - // Traffic goes to a second cluster, where it is equally distributed between - // the two remaining hosts - EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs), - ::testing::DoubleNear(.5, kErrorTolerance)); - EXPECT_THAT(BackendRequestPercentage(backends_[3], kNumEchoRpcs), - ::testing::DoubleNear(.5, kErrorTolerance)); - EXPECT_NE(session_cookie, - GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 2, 3)); -} - -TEST_P(OverrideHostTest, ClusterGoneHostStays) { - CreateAndStartBackends(3); - const char* kNewCluster1Name = "new_cluster_1"; - const char* kNewEdsService1Name = "new_eds_service_name_1"; - const char* kNewCluster2Name = "new_cluster_2"; - const char* kNewEdsService2Name = "new_eds_service_name_2"; - const char* kNewCluster3Name = "new_cluster_3"; - const char* kNewEdsService3Name = "new_eds_service_name_3"; - const uint32_t kWeight1 = 3; - const uint32_t kWeight2 = 1; - const double kErrorTolerance = 0.025; - const double kPercentage1 = - static_cast(kWeight1) / (kWeight1 + kWeight2); - const size_t kNumEchoRpcs = - ComputeIdealNumRpcs(kPercentage1, kErrorTolerance); - // Populate EDS and CDS resources. - SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); - SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 2); - // Populating Route Configurations for LDS. - SetListenerAndRouteConfiguration( - balancer_.get(), BuildListenerWithStatefulSessionFilter(), - BuildRouteConfigurationWithWeightedClusters( - {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); - WaitForAllBackends(DEBUG_LOCATION, 0, 2); - auto backend1_in_cluster2_cookie = - GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5); - ASSERT_FALSE(backend1_in_cluster2_cookie.empty()); - // Create a new cluster, cluster 3, containing a new backend, backend 2. - SetCdsAndEdsResources(kNewCluster3Name, kNewEdsService3Name, 2, 3); - // Send an EDS update for cluster 1 that adds backend 1. (Now cluster 1 has - // backends 0 and 1.) - balancer_->ads_service()->SetEdsResource(BuildEdsResource( - EdsResourceArgs({{"locality0", CreateEndpointsForBackends(0, 2)}}), - kNewEdsService1Name)); - SetListenerAndRouteConfiguration( - balancer_.get(), BuildListenerWithStatefulSessionFilter(), - BuildRouteConfigurationWithWeightedClusters( - {{kNewCluster1Name, kWeight1}, {kNewCluster3Name, kWeight2}})); - WaitForAllBackends(DEBUG_LOCATION, 2); - CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, - RpcOptions().set_metadata(backend1_in_cluster2_cookie)); - // Traffic is split between clusters. Cluster1 traffic is sent to backends_[1] - EXPECT_THAT(BackendRequestPercentage(backends_[0], kNumEchoRpcs), - ::testing::DoubleNear(0, kErrorTolerance)); - EXPECT_THAT(BackendRequestPercentage(backends_[1], kNumEchoRpcs), - ::testing::DoubleNear(kPercentage1, kErrorTolerance)); - EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs), - ::testing::DoubleNear(1 - kPercentage1, kErrorTolerance)); - // backends_[1] cookie is updated with a new cluster - EXPECT_NE(backend1_in_cluster2_cookie, - GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5)); -} - } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/xds/xds_routing_end2end_test.cc b/test/cpp/end2end/xds/xds_routing_end2end_test.cc index a4012732404..2785ce54994 100644 --- a/test/cpp/end2end/xds/xds_routing_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_routing_end2end_test.cc @@ -402,9 +402,7 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Values(XdsTestType(), XdsTestType().set_enable_rds_testing()), &XdsTestType::Name); -MATCHER_P2(AdjustedClockInRange, t1, t2, - absl::StrFormat("time between %s and %s", t1.ToString().c_str(), - t2.ToString().c_str())) { +MATCHER_P2(AdjustedClockInRange, t1, t2, "equals time") { gpr_cycle_counter cycle_now = gpr_get_cycle_counter(); grpc_core::Timestamp cycle_time = grpc_core::Timestamp::FromCycleCounterRoundDown(cycle_now);