diff --git a/CMakeLists.txt b/CMakeLists.txt index 9389676a9a9..959a5185cf7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29618,6 +29618,10 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 95d5e06f1b6..7af28b6ce5f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -16759,6 +16759,7 @@ targets: - src/proto/grpc/testing/simple_messages.proto - src/proto/grpc/testing/xds/v3/address.proto - src/proto/grpc/testing/xds/v3/ads.proto + - src/proto/grpc/testing/xds/v3/aggregate_cluster.proto - src/proto/grpc/testing/xds/v3/base.proto - src/proto/grpc/testing/xds/v3/cluster.proto - src/proto/grpc/testing/xds/v3/config_source.proto diff --git a/src/core/BUILD b/src/core/BUILD index 85472b4dc38..68902dbcdf8 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3789,6 +3789,7 @@ grpc_cc_library( "channel_args", "channel_fwd", "context", + "grpc_resolver_xds_header", "grpc_service_config", "json", "json_args", @@ -5279,7 +5280,6 @@ grpc_cc_library( "ref_counted", "slice", "time", - "unique_type_name", "//:config", "//:debug_location", "//:gpr", diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index d4733e3ee26..eb961a415d9 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -47,7 +47,6 @@ #include -#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/slice/slice.h" #define XXH_INLINE_ALL @@ -102,11 +101,6 @@ namespace grpc_core { TraceFlag grpc_xds_resolver_trace(false, "xds_resolver"); -UniqueTypeName XdsClusterAttribute::TypeName() { - static UniqueTypeName::Factory kFactory("xds_cluster_name"); - return kFactory.Create(); -} - namespace { std::string GetDefaultAuthorityInternal(const URI& uri) { @@ -246,15 +240,15 @@ class XdsResolver : public Resolver { // the cluster by the ConfigSelector. The ref for each call is held // until the call is committed. When the strong refs go away, we hop // back into the WorkSerializer to remove the entry from the map. - class ClusterState : public DualRefCounted { + class ClusterRef : public DualRefCounted { public: - ClusterState(RefCountedPtr resolver, - absl::string_view cluster_name) + ClusterRef(RefCountedPtr resolver, + absl::string_view cluster_name) : resolver_(std::move(resolver)), cluster_name_(cluster_name) {} void Orphan() override { - auto* resolver = resolver_.get(); - resolver->work_serializer_->Run( + XdsResolver* resolver_ptr = resolver_.get(); + resolver_ptr->work_serializer_->Run( [resolver = std::move(resolver_)]() { resolver->MaybeRemoveUnusedClusters(); }, @@ -268,21 +262,49 @@ class XdsResolver : public Resolver { std::string cluster_name_; }; - // A map containing cluster refs held by the XdsConfigSelector. A ref to - // this map will be taken by each call processed by the XdsConfigSelector, - // stored in a the call's call attributes, and later unreffed - // by the ClusterSelection filter. - class XdsClusterMap : public RefCounted { + // A routing data including cluster refs and routes table held by the + // XdsConfigSelector. A ref to this map will be taken by each call processed + // by the XdsConfigSelector, stored in a the call's call attributes, and later + // unreffed by the ClusterSelection filter. + class RouteConfigData : public RefCounted { public: - explicit XdsClusterMap( - std::map> clusters) - : clusters_(std::move(clusters)) {} + struct RouteEntry { + struct ClusterWeightState { + uint32_t range_end; + absl::string_view cluster; + RefCountedPtr method_config; + + bool operator==(const ClusterWeightState& other) const { + return range_end == other.range_end && cluster == other.cluster && + MethodConfigsEqual(method_config.get(), + other.method_config.get()); + } + }; + + XdsRouteConfigResource::Route route; + RefCountedPtr method_config; + std::vector weighted_cluster_state; + + explicit RouteEntry(const XdsRouteConfigResource::Route& r) : route(r) {} - bool operator==(const XdsClusterMap& other) const { - return clusters_ == other.clusters_; + bool operator==(const RouteEntry& other) const { + return route == other.route && + weighted_cluster_state == other.weighted_cluster_state && + MethodConfigsEqual(method_config.get(), + other.method_config.get()); + } + }; + + static absl::StatusOr> Create( + XdsResolver* resolver, + const std::vector& routes, + const Duration& default_max_stream_duration); + + bool operator==(const RouteConfigData& other) const { + return clusters_ == other.clusters_ && routes_ == other.routes_; } - RefCountedPtr Find(absl::string_view name) const { + RefCountedPtr FindClusterRef(absl::string_view name) const { auto it = clusters_.find(name); if (it == clusters_.end()) { return nullptr; @@ -290,14 +312,36 @@ class XdsResolver : public Resolver { return it->second; } + RouteEntry* GetRouteForRequest(absl::string_view path, + grpc_metadata_batch* initial_metadata); + private: - std::map> clusters_; + class RouteListIterator; + + static absl::StatusOr> CreateMethodConfig( + XdsResolver* resolver, const XdsRouteConfigResource::Route& route, + const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* + cluster_weight); + + static bool MethodConfigsEqual(const ServiceConfig* sc1, + const ServiceConfig* sc2) { + if (sc1 == nullptr) return sc2 == nullptr; + if (sc2 == nullptr) return false; + return sc1->json_string() == sc2->json_string(); + } + + absl::Status AddRouteEntry(const XdsRouteConfigResource::Route& route, + const Duration& default_max_stream_duration, + XdsResolver* resolver); + + std::map> clusters_; + std::vector routes_; }; class XdsConfigSelector : public ConfigSelector { public: XdsConfigSelector(RefCountedPtr resolver, - absl::Status* status); + RefCountedPtr route_config_data); ~XdsConfigSelector() override; const char* name() const override { return "XdsConfigSelector"; } @@ -305,8 +349,8 @@ class XdsResolver : public Resolver { bool Equals(const ConfigSelector* other) const override { const auto* other_xds = static_cast(other); // Don't need to compare resolver_, since that will always be the same. - return route_table_ == other_xds->route_table_ && - *cluster_map_ == *other_xds->cluster_map_; + return *route_config_data_ == *other_xds->route_config_data_ && + filters_ == other_xds->filters_; } absl::Status GetCallConfig(GetCallConfigArgs args) override; @@ -316,34 +360,28 @@ class XdsResolver : public Resolver { } private: - struct Route { - struct ClusterWeightState { - uint32_t range_end; - absl::string_view cluster; - RefCountedPtr method_config; - - bool operator==(const ClusterWeightState& other) const; - }; - - XdsRouteConfigResource::Route route; - RefCountedPtr method_config; - std::vector weighted_cluster_state; + RefCountedPtr resolver_; + RefCountedPtr route_config_data_; + std::vector filters_; + }; - bool operator==(const Route& other) const; - }; - using RouteTable = std::vector; + class XdsRouteStateAttributeImpl : public XdsRouteStateAttribute { + public: + explicit XdsRouteStateAttributeImpl( + RefCountedPtr route_config_data, + RouteConfigData::RouteEntry* route) + : route_config_data_(std::move(route_config_data)), route_(route) {} - class RouteListIterator; + // This method can be called only once. The first call will release + // the reference to the cluster map, and subsequent calls will return + // nullptr. + RefCountedPtr LockAndGetCluster(absl::string_view cluster_name); - absl::StatusOr> CreateMethodConfig( - const XdsRouteConfigResource::Route& route, - const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* - cluster_weight); + bool HasClusterForRoute(absl::string_view cluster_name) const override; - RefCountedPtr resolver_; - RouteTable route_table_; - RefCountedPtr cluster_map_; - std::vector filters_; + private: + RefCountedPtr route_config_data_; + RouteConfigData::RouteEntry* route_; }; class ClusterSelectionFilter : public ChannelFilter { @@ -357,29 +395,7 @@ class XdsResolver : public Resolver { // Construct a promise for one call. ArenaPromise MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) override { - auto* service_config_call_data = - static_cast( - GetContext() - [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] - .value); - GPR_ASSERT(service_config_call_data != nullptr); - auto* cluster_data = static_cast( - service_config_call_data->GetCallAttribute( - XdsClusterMapAttribute::TypeName())); - auto* cluster_name_attribute = static_cast( - service_config_call_data->GetCallAttribute( - XdsClusterAttribute::TypeName())); - if (cluster_data != nullptr && cluster_name_attribute != nullptr) { - auto cluster = - cluster_data->LockAndGetCluster(cluster_name_attribute->cluster()); - if (cluster != nullptr) { - service_config_call_data->SetOnCommit( - [cluster = std::move(cluster)]() mutable { cluster.reset(); }); - } - } - return next_promise_factory(std::move(call_args)); - } + CallArgs call_args, NextPromiseFactory next_promise_factory) override; private: explicit ClusterSelectionFilter(ChannelFilter::Args filter_args) @@ -388,46 +404,17 @@ class XdsResolver : public Resolver { ChannelFilter::Args filter_args_; }; - RefCountedPtr GetOrCreateClusterState( + RefCountedPtr GetOrCreateClusterRef( absl::string_view cluster_name) { - auto it = cluster_state_map_.find(cluster_name); - if (it == cluster_state_map_.end()) { - auto cluster = MakeRefCounted(Ref(), cluster_name); - cluster_state_map_.emplace(cluster->cluster_name(), cluster->WeakRef()); + auto it = cluster_ref_map_.find(cluster_name); + if (it == cluster_ref_map_.end()) { + auto cluster = MakeRefCounted(Ref(), cluster_name); + cluster_ref_map_.emplace(cluster->cluster_name(), cluster->WeakRef()); return cluster; } return it->second->Ref(); } - class XdsClusterMapAttribute - : public ServiceConfigCallData::CallAttributeInterface { - public: - static UniqueTypeName TypeName() { - static UniqueTypeName::Factory factory("xds_cluster_lb_data"); - return factory.Create(); - } - - explicit XdsClusterMapAttribute(RefCountedPtr cluster_map) - : cluster_map_(std::move(cluster_map)) {} - - // This method can be called only once. The first call will release the - // reference to the cluster map, and subsequent calls will return nullptr. - RefCountedPtr LockAndGetCluster( - absl::string_view cluster_name) { - if (cluster_map_ == nullptr) { - return nullptr; - } - auto cluster = cluster_map_->Find(cluster_name); - cluster_map_.reset(); - return cluster; - } - - UniqueTypeName type() const override { return TypeName(); } - - private: - RefCountedPtr cluster_map_; - }; - void OnListenerUpdate(XdsListenerResource listener); void OnRouteConfigUpdate(XdsRouteConfigResource rds_update); void OnError(absl::string_view context, absl::Status status); @@ -461,188 +448,72 @@ class XdsResolver : public Resolver { std::string /*LB policy config*/> cluster_specifier_plugin_map_; - std::map> - cluster_state_map_; + std::map> cluster_ref_map_; }; // -// XdsResolver::XdsConfigSelector::Route +// XdsResolver::RouteConfigData::RouteListIterator // -bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) { - if (sc1 == nullptr) return sc2 == nullptr; - if (sc2 == nullptr) return false; - return sc1->json_string() == sc2->json_string(); -} - -const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter = - MakePromiseBasedFilter( - "cluster_selection_filter"); - -bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==( - const ClusterWeightState& other) const { - return range_end == other.range_end && cluster == other.cluster && - MethodConfigsEqual(method_config.get(), other.method_config.get()); -} - -bool XdsResolver::XdsConfigSelector::Route::operator==( - const Route& other) const { - return route == other.route && - weighted_cluster_state == other.weighted_cluster_state && - MethodConfigsEqual(method_config.get(), other.method_config.get()); -} - // Implementation of XdsRouting::RouteListIterator for getting the matching // route for a request. -class XdsResolver::XdsConfigSelector::RouteListIterator +class XdsResolver::RouteConfigData::RouteListIterator : public XdsRouting::RouteListIterator { public: - explicit RouteListIterator(const RouteTable* route_table) + explicit RouteListIterator(const RouteConfigData* route_table) : route_table_(route_table) {} - size_t Size() const override { return route_table_->size(); } + size_t Size() const override { return route_table_->routes_.size(); } const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute( size_t index) const override { - return (*route_table_)[index].route.matchers; + return route_table_->routes_[index].route.matchers; } private: - const RouteTable* route_table_; + const RouteConfigData* route_table_; }; // -// XdsResolver::XdsConfigSelector +// XdsResolver::RouteConfigData // -XdsResolver::XdsConfigSelector::XdsConfigSelector( - RefCountedPtr resolver, absl::Status* status) - : resolver_(std::move(resolver)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", - resolver_.get(), this); - } - // 1. Construct the route table - // 2 Update resolver's cluster state map - // 3. Construct cluster list to hold on to entries in the cluster state - // map. +absl::StatusOr> +XdsResolver::RouteConfigData::Create( + XdsResolver* resolver, + const std::vector& routes, + const Duration& default_max_stream_duration) { + auto data = MakeRefCounted(); // Reserve the necessary entries up-front to avoid reallocation as we add // elements. This is necessary because the string_view in the entry's // weighted_cluster_state field points to the memory in the route field, so // moving the entry in a reallocation will cause the string_view to point to // invalid data. - route_table_.reserve(resolver_->current_virtual_host_->routes.size()); - std::map> clusters; - auto maybe_add_cluster = [&](absl::string_view cluster_name) { - if (clusters.find(cluster_name) != clusters.end()) return; - auto cluster_state = resolver_->GetOrCreateClusterState(cluster_name); - absl::string_view name = cluster_state->cluster_name(); - clusters.emplace(name, std::move(cluster_state)); - }; - for (auto& route : resolver_->current_virtual_host_->routes) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", - resolver_.get(), this, route.ToString().c_str()); - } - route_table_.emplace_back(); - auto& route_entry = route_table_.back(); - route_entry.route = route; - auto* route_action = - absl::get_if( - &route_entry.route.action); - if (route_action != nullptr) { - // If the route doesn't specify a timeout, set its timeout to the global - // one. - if (!route_action->max_stream_duration.has_value()) { - route_action->max_stream_duration = - resolver_->current_listener_.http_max_stream_duration; - } - Match( - route_action->action, - // cluster name - [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& - cluster_name) { - auto result = CreateMethodConfig(route_entry.route, nullptr); - if (!result.ok()) { - *status = result.status(); - return; - } - route_entry.method_config = std::move(*result); - maybe_add_cluster( - absl::StrCat("cluster:", cluster_name.cluster_name)); - }, - // WeightedClusters - [&](const std::vector< - XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& - weighted_clusters) { - uint32_t end = 0; - for (const auto& weighted_cluster : weighted_clusters) { - Route::ClusterWeightState cluster_weight_state; - auto result = - CreateMethodConfig(route_entry.route, &weighted_cluster); - if (!result.ok()) { - *status = result.status(); - return; - } - cluster_weight_state.method_config = std::move(*result); - end += weighted_cluster.weight; - cluster_weight_state.range_end = end; - cluster_weight_state.cluster = weighted_cluster.name; - route_entry.weighted_cluster_state.push_back( - std::move(cluster_weight_state)); - maybe_add_cluster( - absl::StrCat("cluster:", weighted_cluster.name)); - } - }, - // ClusterSpecifierPlugin - [&](const XdsRouteConfigResource::Route::RouteAction:: - ClusterSpecifierPluginName& cluster_specifier_plugin_name) { - auto result = CreateMethodConfig(route_entry.route, nullptr); - if (!result.ok()) { - *status = result.status(); - return; - } - route_entry.method_config = std::move(*result); - maybe_add_cluster(absl::StrCat( - "cluster_specifier_plugin:", - cluster_specifier_plugin_name.cluster_specifier_plugin_name)); - }); - if (!status->ok()) return; - } - } - cluster_map_ = MakeRefCounted(std::move(clusters)); - // Populate filter list. - const auto& http_filter_registry = - static_cast(resolver_->xds_client_->bootstrap()) - .http_filter_registry(); - for (const auto& http_filter : resolver_->current_listener_.http_filters) { - // Find filter. This is guaranteed to succeed, because it's checked - // at config validation time in the XdsApi code. - const XdsHttpFilterImpl* filter_impl = - http_filter_registry.GetFilterForType( - http_filter.config.config_proto_type_name); - GPR_ASSERT(filter_impl != nullptr); - // Add C-core filter to list. - if (filter_impl->channel_filter() != nullptr) { - filters_.push_back(filter_impl->channel_filter()); + data->routes_.reserve(routes.size()); + for (auto& route : routes) { + absl::Status status = + data->AddRouteEntry(route, default_max_stream_duration, resolver); + if (!status.ok()) { + return status; } } - filters_.push_back(&ClusterSelectionFilter::kFilter); + return data; } -XdsResolver::XdsConfigSelector::~XdsConfigSelector() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", - resolver_.get(), this); +XdsResolver::RouteConfigData::RouteEntry* +XdsResolver::RouteConfigData::GetRouteForRequest( + absl::string_view path, grpc_metadata_batch* initial_metadata) { + auto route_index = XdsRouting::GetRouteForRequest(RouteListIterator(this), + path, initial_metadata); + if (!route_index.has_value()) { + return nullptr; } - cluster_map_.reset(); - resolver_->MaybeRemoveUnusedClusters(); + return &routes_[*route_index]; } absl::StatusOr> -XdsResolver::XdsConfigSelector::CreateMethodConfig( - const XdsRouteConfigResource::Route& route, +XdsResolver::RouteConfigData::CreateMethodConfig( + XdsResolver* resolver, const XdsRouteConfigResource::Route& route, const XdsRouteConfigResource::Route::RouteAction::ClusterWeight* cluster_weight) { std::vector fields; @@ -694,11 +565,11 @@ XdsResolver::XdsConfigSelector::CreateMethodConfig( } // Handle xDS HTTP filters. auto result = XdsRouting::GeneratePerHTTPFilterConfigs( - static_cast(resolver_->xds_client_->bootstrap()) + static_cast(resolver->xds_client_->bootstrap()) .http_filter_registry(), - resolver_->current_listener_.http_filters, - resolver_->current_virtual_host_.value(), route, cluster_weight, - resolver_->args_); + resolver->current_listener_.http_filters, + resolver->current_virtual_host_.value(), route, cluster_weight, + resolver->args_); if (!result.ok()) return result.status(); for (const auto& p : result->per_filter_configs) { fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n", @@ -722,6 +593,128 @@ XdsResolver::XdsConfigSelector::CreateMethodConfig( return nullptr; } +absl::Status XdsResolver::RouteConfigData::AddRouteEntry( + const XdsRouteConfigResource::Route& route, + const Duration& default_max_stream_duration, XdsResolver* resolver) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", + resolver, this, route.ToString().c_str()); + } + routes_.emplace_back(route); + auto* route_entry = &routes_.back(); + auto maybe_add_cluster = [&](absl::string_view cluster_name) { + if (clusters_.find(cluster_name) != clusters_.end()) return; + auto cluster_state = resolver->GetOrCreateClusterRef(cluster_name); + absl::string_view name = cluster_state->cluster_name(); + clusters_.emplace(name, std::move(cluster_state)); + }; + auto* route_action = absl::get_if( + &route_entry->route.action); + if (route_action != nullptr) { + // If the route doesn't specify a timeout, set its timeout to the global + // one. + if (!route_action->max_stream_duration.has_value()) { + route_action->max_stream_duration = default_max_stream_duration; + } + absl::Status status = Match( + route_action->action, + // cluster name + [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& + cluster_name) { + auto result = + CreateMethodConfig(resolver, route_entry->route, nullptr); + if (!result.ok()) { + return result.status(); + } + route_entry->method_config = std::move(*result); + maybe_add_cluster( + absl::StrCat("cluster:", cluster_name.cluster_name)); + return absl::OkStatus(); + }, + // WeightedClusters + [&](const std::vector< + XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& + weighted_clusters) { + uint32_t end = 0; + for (const auto& weighted_cluster : weighted_clusters) { + auto result = CreateMethodConfig(resolver, route_entry->route, + &weighted_cluster); + if (!result.ok()) { + return result.status(); + } + RouteEntry::ClusterWeightState cluster_weight_state; + cluster_weight_state.method_config = std::move(*result); + end += weighted_cluster.weight; + cluster_weight_state.range_end = end; + cluster_weight_state.cluster = weighted_cluster.name; + route_entry->weighted_cluster_state.push_back( + std::move(cluster_weight_state)); + maybe_add_cluster(absl::StrCat("cluster:", weighted_cluster.name)); + } + return absl::OkStatus(); + }, + // ClusterSpecifierPlugin + [&](const XdsRouteConfigResource::Route::RouteAction:: + ClusterSpecifierPluginName& cluster_specifier_plugin_name) { + auto result = + CreateMethodConfig(resolver, route_entry->route, nullptr); + if (!result.ok()) { + return result.status(); + } + route_entry->method_config = std::move(*result); + maybe_add_cluster(absl::StrCat( + "cluster_specifier_plugin:", + cluster_specifier_plugin_name.cluster_specifier_plugin_name)); + return absl::OkStatus(); + }); + if (!status.ok()) { + return status; + } + } + return absl::OkStatus(); +} + +// +// XdsResolver::XdsConfigSelector +// + +XdsResolver::XdsConfigSelector::XdsConfigSelector( + RefCountedPtr resolver, + RefCountedPtr route_config_data) + : resolver_(std::move(resolver)), + route_config_data_(std::move(route_config_data)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", + resolver_.get(), this); + } + // Populate filter list. + const auto& http_filter_registry = + static_cast(resolver_->xds_client_->bootstrap()) + .http_filter_registry(); + for (const auto& http_filter : resolver_->current_listener_.http_filters) { + // Find filter. This is guaranteed to succeed, because it's checked + // at config validation time in the XdsApi code. + const XdsHttpFilterImpl* filter_impl = + http_filter_registry.GetFilterForType( + http_filter.config.config_proto_type_name); + GPR_ASSERT(filter_impl != nullptr); + // Add C-core filter to list. + if (filter_impl->channel_filter() != nullptr) { + filters_.push_back(filter_impl->channel_filter()); + } + } + filters_.push_back(&ClusterSelectionFilter::kFilter); +} + +XdsResolver::XdsConfigSelector::~XdsConfigSelector() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", + resolver_.get(), this); + } + route_config_data_.reset(); + resolver_->MaybeRemoveUnusedClusters(); +} + absl::optional HeaderHashHelper( const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header& header_policy, @@ -747,18 +740,16 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( GetCallConfigArgs args) { Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata()); GPR_ASSERT(path != nullptr); - auto route_index = XdsRouting::GetRouteForRequest( - RouteListIterator(&route_table_), path->as_string_view(), - args.initial_metadata); - if (!route_index.has_value()) { + auto* entry = route_config_data_->GetRouteForRequest(path->as_string_view(), + args.initial_metadata); + if (entry == nullptr) { return absl::UnavailableError( "No matching route found in xDS route config"); } - auto& entry = route_table_[*route_index]; // Found a route match const auto* route_action = absl::get_if( - &entry.route.action); + &entry->route.action); if (route_action == nullptr) { return absl::UnavailableError("Matching route has inappropriate action"); } @@ -771,24 +762,24 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( action_cluster_name) { cluster_name = absl::StrCat("cluster:", action_cluster_name.cluster_name); - method_config = entry.method_config; + method_config = entry->method_config; }, // WeightedClusters [&](const std::vector< XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& /*weighted_clusters*/) { const uint32_t key = absl::Uniform( - absl::BitGen(), 0, entry.weighted_cluster_state.back().range_end); + absl::BitGen(), 0, entry->weighted_cluster_state.back().range_end); // Find the index in weighted clusters corresponding to key. size_t mid = 0; size_t start_index = 0; - size_t end_index = entry.weighted_cluster_state.size() - 1; + size_t end_index = entry->weighted_cluster_state.size() - 1; size_t index = 0; while (end_index > start_index) { mid = (start_index + end_index) / 2; - if (entry.weighted_cluster_state[mid].range_end > key) { + if (entry->weighted_cluster_state[mid].range_end > key) { end_index = mid; - } else if (entry.weighted_cluster_state[mid].range_end < key) { + } else if (entry->weighted_cluster_state[mid].range_end < key) { start_index = mid + 1; } else { index = mid + 1; @@ -796,10 +787,10 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( } } if (index == 0) index = start_index; - GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key); + GPR_ASSERT(entry->weighted_cluster_state[index].range_end > key); cluster_name = absl::StrCat( - "cluster:", entry.weighted_cluster_state[index].cluster); - method_config = entry.weighted_cluster_state[index].method_config; + "cluster:", entry->weighted_cluster_state[index].cluster); + method_config = entry->weighted_cluster_state[index].method_config; }, // ClusterSpecifierPlugin [&](const XdsRouteConfigResource::Route::RouteAction:: @@ -807,9 +798,9 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( cluster_name = absl::StrCat( "cluster_specifier_plugin:", cluster_specifier_plugin_name.cluster_specifier_plugin_name); - method_config = entry.method_config; + method_config = entry->method_config; }); - auto cluster = cluster_map_->Find(cluster_name); + auto cluster = route_config_data_->FindClusterRef(cluster_name); GPR_ASSERT(cluster != nullptr); // Generate a hash. absl::optional hash; @@ -857,10 +848,85 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( args.service_config_call_data->SetCallAttribute( args.arena->New(hash_value)); args.service_config_call_data->SetCallAttribute( - args.arena->ManagedNew(cluster_map_)); + args.arena->ManagedNew(route_config_data_, + entry)); return absl::OkStatus(); } +// +// XdsResolver::XdsRouteStateAttributeImpl +// + +bool XdsResolver::XdsRouteStateAttributeImpl::HasClusterForRoute( + absl::string_view cluster_name) const { + // Found a route match + const auto* route_action = + absl::get_if( + &static_cast(route_)->route.action); + if (route_action == nullptr) return false; + return Match( + route_action->action, + [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName& name) { + return name.cluster_name == cluster_name; + }, + [&](const std::vector< + XdsRouteConfigResource::Route::RouteAction::ClusterWeight>& + clusters) { + for (const auto& cluster : clusters) { + if (cluster.name == cluster_name) { + return true; + } + } + return false; + }, + [&](const XdsRouteConfigResource::Route::RouteAction:: + ClusterSpecifierPluginName& /* name */) { return false; }); +} + +RefCountedPtr +XdsResolver::XdsRouteStateAttributeImpl::LockAndGetCluster( + absl::string_view cluster_name) { + if (route_config_data_ == nullptr) { + return nullptr; + } + auto cluster = route_config_data_->FindClusterRef(cluster_name); + route_config_data_.reset(); + return cluster; +} + +// +// XdsResolver::ClusterSelectionFilter +// + +const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter = + MakePromiseBasedFilter( + "cluster_selection_filter"); + +ArenaPromise +XdsResolver::ClusterSelectionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto* service_config_call_data = + static_cast( + GetContext() + [GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] + .value); + GPR_ASSERT(service_config_call_data != nullptr); + auto* route_state_attribute = static_cast( + service_config_call_data->GetCallAttribute()); + auto* cluster_name_attribute = + service_config_call_data->GetCallAttribute(); + if (route_state_attribute != nullptr && cluster_name_attribute != nullptr) { + auto cluster = route_state_attribute->LockAndGetCluster( + cluster_name_attribute->cluster()); + if (cluster != nullptr) { + service_config_call_data->SetOnCommit( + [cluster = std::move(cluster)]() mutable { cluster.reset(); }); + } + } + return next_promise_factory(std::move(call_args)); +} + // // XdsResolver // @@ -1016,7 +1082,6 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) { }); } -namespace { class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator { public: explicit VirtualHostListIterator( @@ -1033,7 +1098,6 @@ class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator { private: const std::vector* virtual_hosts_; }; -} // namespace void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { @@ -1100,7 +1164,7 @@ void XdsResolver::OnResourceDoesNotExist(std::string context) { absl::StatusOr> XdsResolver::CreateServiceConfig() { std::vector clusters; - for (const auto& cluster : cluster_state_map_) { + for (const auto& cluster : cluster_ref_map_) { absl::string_view child_name = cluster.first; if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) { clusters.push_back(absl::StrFormat( @@ -1142,13 +1206,16 @@ void XdsResolver::GenerateResult() { if (!current_virtual_host_.has_value()) return; // First create XdsConfigSelector, which may add new entries to the cluster // state map, and then CreateServiceConfig for LB policies. - absl::Status status; - auto config_selector = MakeRefCounted(Ref(), &status); - if (!status.ok()) { + auto route_config_data = + RouteConfigData::Create(this, current_virtual_host_->routes, + current_listener_.http_max_stream_duration); + if (!route_config_data.ok()) { OnError("could not create ConfigSelector", - absl::UnavailableError(status.message())); + absl::UnavailableError(route_config_data.status().message())); return; } + auto config_selector = + MakeRefCounted(Ref(), std::move(*route_config_data)); Result result; result.addresses.emplace(); result.service_config = CreateServiceConfig(); @@ -1169,13 +1236,13 @@ void XdsResolver::GenerateResult() { void XdsResolver::MaybeRemoveUnusedClusters() { bool update_needed = false; - for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) { - RefCountedPtr cluster_state = it->second->RefIfNonZero(); + for (auto it = cluster_ref_map_.begin(); it != cluster_ref_map_.end();) { + RefCountedPtr cluster_state = it->second->RefIfNonZero(); if (cluster_state != nullptr) { ++it; } else { update_needed = true; - it = cluster_state_map_.erase(it); + it = cluster_ref_map_.erase(it); } } if (update_needed && xds_client_ != nullptr) { @@ -1185,7 +1252,7 @@ void XdsResolver::MaybeRemoveUnusedClusters() { } // -// Factory +// XdsResolverFactory // class XdsResolverFactory : public ResolverFactory { @@ -1210,7 +1277,6 @@ class XdsResolverFactory : public ResolverFactory { return MakeOrphanable(std::move(args)); } }; - } // namespace void RegisterXdsResolver(CoreConfiguration::Builder* builder) { diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h index ff6e4523781..93c48ac0d7d 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h @@ -29,17 +29,33 @@ namespace grpc_core { class XdsClusterAttribute : public ServiceConfigCallData::CallAttributeInterface { public: - static UniqueTypeName TypeName(); + static UniqueTypeName TypeName() { + static UniqueTypeName::Factory kFactory("xds_cluster_name"); + return kFactory.Create(); + } explicit XdsClusterAttribute(absl::string_view cluster) : cluster_(cluster) {} absl::string_view cluster() const { return cluster_; } + void set_cluster(absl::string_view cluster) { cluster_ = cluster; } private: UniqueTypeName type() const override { return TypeName(); } absl::string_view cluster_; }; + +class XdsRouteStateAttribute + : public ServiceConfigCallData::CallAttributeInterface { + public: + static UniqueTypeName TypeName() { + static UniqueTypeName::Factory factory("xds_cluster_lb_data"); + return factory.Create(); + } + + virtual bool HasClusterForRoute(absl::string_view cluster_name) const = 0; + UniqueTypeName type() const override { return TypeName(); } +}; } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_XDS_XDS_RESOLVER_H diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.cc b/src/core/ext/filters/stateful_session/stateful_session_filter.cc index 66d6bad0cc9..315e7e5ae62 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.cc +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.cc @@ -33,11 +33,13 @@ #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "absl/strings/string_view.h" +#include "absl/strings/strip.h" #include "absl/types/optional.h" #include #include +#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h" #include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" @@ -83,35 +85,142 @@ StatefulSessionFilter::StatefulSessionFilter(ChannelFilter::Args filter_args) namespace { +absl::string_view AllocateStringOnArena( + absl::string_view src1, absl::string_view src2 = absl::string_view()) { + if (src1.empty() && src2.empty()) { + return absl::string_view(); + } + char* arena_allocated_value = + static_cast(GetContext()->Alloc(src1.size() + src2.size())); + memcpy(arena_allocated_value, src1.data(), src1.size()); + if (!src2.empty()) { + memcpy(arena_allocated_value + src1.size(), src2.data(), src2.size()); + } + return absl::string_view(arena_allocated_value, src1.size() + src2.size()); +} + // Adds the set-cookie header to the server initial metadata if needed. void MaybeUpdateServerInitialMetadata( const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config, - absl::optional cookie_value, - ServerMetadata* server_initial_metadata) { + bool cluster_changed, absl::string_view host_override, + absl::string_view actual_cluster, ServerMetadata* server_initial_metadata) { // Get peer string. Slice* peer_string = server_initial_metadata->get_pointer(PeerString()); - if (peer_string == nullptr) return; // Nothing we can do. - // If there was no cookie or if the address changed, set the cookie. - if (!cookie_value.has_value() || - peer_string->as_string_view() != *cookie_value) { - std::vector parts = {absl::StrCat( - *cookie_config->name, "=", - absl::Base64Escape(peer_string->as_string_view()), "; HttpOnly")}; - if (!cookie_config->path.empty()) { - parts.emplace_back(absl::StrCat("Path=", cookie_config->path)); - } - if (cookie_config->ttl > Duration::Zero()) { - parts.emplace_back( - absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec)); - } - server_initial_metadata->Append( - "set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")), - [](absl::string_view error, const Slice&) { - Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error)); - }); + if (peer_string == nullptr) { + // No changes, keep the same set-cookie header. + return; + } + if (host_override == peer_string->as_string_view() && !cluster_changed) { + return; + } + std::string new_value(peer_string->as_string_view()); + if (!actual_cluster.empty()) { + absl::StrAppend(&new_value, ";", actual_cluster); + } + std::vector parts = {absl::StrCat( + *cookie_config->name, "=", absl::Base64Escape(new_value), "; HttpOnly")}; + if (!cookie_config->path.empty()) { + parts.emplace_back(absl::StrCat("Path=", cookie_config->path)); + } + if (cookie_config->ttl > Duration::Zero()) { + parts.emplace_back( + absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec)); + } + server_initial_metadata->Append( + "set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")), + [](absl::string_view error, const Slice&) { + Crash(absl::StrCat("ERROR ADDING set-cookie METADATA: ", error)); + }); +} + +// Returns an arena-allocated string containing the cluster name +// to use for this RPC, which will live long enough to use when modifying +// the server's initial metadata. If cluster_from_cookie is non-empty and +// points to a cluster present in the selected route, uses that; otherwise, +// uses the cluster selected by the XdsConfigSelector. +// Returns the empty string if cluster override cannot be used (i.e., the route +// uses a cluster specifier plugin). +absl::string_view GetClusterToUse( + absl::string_view cluster_from_cookie, + ServiceConfigCallData* service_config_call_data) { + // Get cluster assigned by the XdsConfigSelector. + auto cluster_attribute = + service_config_call_data->GetCallAttribute(); + GPR_ASSERT(cluster_attribute != nullptr); + auto current_cluster = cluster_attribute->cluster(); + static constexpr absl::string_view kClusterPrefix = "cluster:"; + // If prefix is not "cluster:", then we can't use cluster override. + if (!absl::ConsumePrefix(¤t_cluster, kClusterPrefix)) { + return absl::string_view(); + } + // No cluster in cookie, use the cluster from the attribute + if (cluster_from_cookie.empty()) { + return AllocateStringOnArena(current_cluster); + } + // Use cluster from the cookie if it is configured for the route. + auto route_data = + service_config_call_data->GetCallAttribute(); + GPR_ASSERT(route_data != nullptr); + // Cookie cluster was not configured for route - use the one from the + // attribute + if (!route_data->HasClusterForRoute(cluster_from_cookie)) { + return AllocateStringOnArena(current_cluster); + } + auto arena_allocated_cluster = + AllocateStringOnArena(kClusterPrefix, cluster_from_cookie); + // Update the cluster name attribute with an arena allocated value. + cluster_attribute->set_cluster(arena_allocated_cluster); + return absl::StripPrefix(arena_allocated_cluster, kClusterPrefix); +} + +std::string GetCookieValue(const ClientMetadataHandle& client_initial_metadata, + absl::string_view cookie_name) { + // Check to see if the cookie header is present. + std::string buffer; + auto header_value = + client_initial_metadata->GetStringValue("cookie", &buffer); + if (!header_value.has_value()) return ""; + // Parse cookie header. + std::vector values; + for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) { + std::pair kv = + absl::StrSplit(cookie, absl::MaxSplits('=', 1)); + if (kv.first == cookie_name) values.push_back(kv.second); } + if (values.empty()) return ""; + // TODO(roth): Figure out the right behavior for multiple cookies. + // For now, just choose the first value. + std::string decoded; + if (absl::Base64Unescape(values.front(), &decoded)) { + return decoded; + } + return ""; } +bool IsConfiguredPath(absl::string_view configured_path, + const ClientMetadataHandle& client_initial_metadata) { + // No path configured meaning all paths match + if (configured_path.empty()) { + return true; + } + // Check to see if the configured path matches the request path. + Slice* path_slice = client_initial_metadata->get_pointer(HttpPathMetadata()); + GPR_ASSERT(path_slice != nullptr); + absl::string_view path = path_slice->as_string_view(); + // Matching criteria from + // https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4. + // The cookie-path is a prefix of the request-path (and) + if (!absl::StartsWith(path, configured_path)) { + return false; + } + // One of + // 1. The cookie-path and the request-path are identical. + // 2. The last character of the cookie-path is %x2F ("/"). + // 3. The first character of the request-path that is not included + // in the cookie-path is a %x2F ("/") character. + return path.length() == configured_path.length() || + configured_path.back() == '/' || path[configured_path.length()] == '/'; +} } // namespace // Construct a promise for one call. @@ -129,90 +238,57 @@ ArenaPromise StatefulSessionFilter::MakeCallPromise( GPR_ASSERT(method_params != nullptr); auto* cookie_config = method_params->GetConfig(index_); GPR_ASSERT(cookie_config != nullptr); - if (!cookie_config->name.has_value()) { + if (!cookie_config->name.has_value() || + !IsConfiguredPath(cookie_config->path, + call_args.client_initial_metadata)) { return next_promise_factory(std::move(call_args)); } - // We have a config. - // If the config has a path, check to see if it matches the request path. - if (!cookie_config->path.empty()) { - Slice* path_slice = - call_args.client_initial_metadata->get_pointer(HttpPathMetadata()); - GPR_ASSERT(path_slice != nullptr); - absl::string_view path = path_slice->as_string_view(); - // Matching criteria from - // https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4. - if (!absl::StartsWith(path, cookie_config->path) || - (path.size() != cookie_config->path.size() && - cookie_config->path.back() != '/' && - path[cookie_config->path.size() + 1] != '/')) { - return next_promise_factory(std::move(call_args)); - } - } - // Check to see if we have a host override cookie. - auto cookie_value = GetOverrideHostFromCookie( - call_args.client_initial_metadata, *cookie_config->name); - if (cookie_value.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_stateful_session_filter_trace)) { - gpr_log(GPR_INFO, - "chand=%p: stateful session filter found cookie %s value %s", - this, cookie_config->name->c_str(), - std::string(*cookie_value).c_str()); - } - // We have a valid cookie, so add the call attribute to be used by the - // xds_override_host LB policy. + // Base64-decode cookie value. + std::string cookie_value = + GetCookieValue(call_args.client_initial_metadata, *cookie_config->name); + // Cookie format is "host;cluster" + std::pair host_cluster = + absl::StrSplit(cookie_value, absl::MaxSplits(';', 1)); + absl::string_view host_override; + // Set override host attribute. Allocate the string on the + // arena, so that it has the right lifetime. + if (!host_cluster.first.empty()) { + host_override = AllocateStringOnArena(host_cluster.first); service_config_call_data->SetCallAttribute( - GetContext()->New(*cookie_value)); + GetContext()->New(host_override)); } + // Check if the cluster override is valid, and apply it if necessary. + // Note that cluster_name will point to an arena-allocated string + // that will still be alive when we see the server initial metadata. + // If the cluster name is empty, that means we cannot use a + // cluster override (i.e., the route uses a cluster specifier plugin). + absl::string_view cluster_name = + GetClusterToUse(host_cluster.second, service_config_call_data); + bool cluster_changed = cluster_name != host_cluster.second; // Intercept server initial metadata. call_args.server_initial_metadata->InterceptAndMap( - [cookie_config, cookie_value](ServerMetadataHandle md) { + [cookie_config, cluster_changed, host_override, + cluster_name](ServerMetadataHandle md) { // Add cookie to server initial metadata if needed. - MaybeUpdateServerInitialMetadata(cookie_config, cookie_value, md.get()); + MaybeUpdateServerInitialMetadata(cookie_config, cluster_changed, + host_override, cluster_name, md.get()); return md; }); return Map(next_promise_factory(std::move(call_args)), - [cookie_config, cookie_value](ServerMetadataHandle md) { + [cookie_config, cluster_changed, host_override, + cluster_name](ServerMetadataHandle md) { // If we got a Trailers-Only response, then add the // cookie to the trailing metadata instead of the // initial metadata. if (md->get(GrpcTrailersOnly()).value_or(false)) { - MaybeUpdateServerInitialMetadata(cookie_config, cookie_value, - md.get()); + MaybeUpdateServerInitialMetadata( + cookie_config, cluster_changed, host_override, + cluster_name, md.get()); } return md; }); } -absl::optional -StatefulSessionFilter::GetOverrideHostFromCookie( - const ClientMetadataHandle& client_initial_metadata, - absl::string_view cookie_name) { - // Check to see if the cookie header is present. - std::string buffer; - auto header_value = - client_initial_metadata->GetStringValue("cookie", &buffer); - if (!header_value.has_value()) return absl::nullopt; - // Parse cookie header. - std::vector values; - for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) { - std::pair kv = - absl::StrSplit(cookie, absl::MaxSplits('=', 1)); - if (kv.first == cookie_name) values.push_back(kv.second); - } - if (values.empty()) return absl::nullopt; - // TODO(roth): Figure out the right behavior for multiple cookies. - // For now, just choose the first value. - absl::string_view value = values.front(); - // Base64-decode it. - std::string decoded_value; - if (!absl::Base64Unescape(value, &decoded_value)) return absl::nullopt; - // Copy it into the arena, since it will need to persist until the LB pick. - char* arena_value = - static_cast(GetContext()->Alloc(decoded_value.size())); - memcpy(arena_value, decoded_value.c_str(), decoded_value.size()); - return absl::string_view(arena_value, decoded_value.size()); -} - void StatefulSessionFilterRegister(CoreConfiguration::Builder* builder) { StatefulSessionServiceConfigParser::Register(builder); } diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.h b/src/core/ext/filters/stateful_session/stateful_session_filter.h index c45d98db6ad..2fea7f47391 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.h +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.h @@ -23,7 +23,6 @@ #include "absl/status/statusor.h" #include "absl/strings/string_view.h" -#include "absl/types/optional.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" @@ -65,11 +64,6 @@ class StatefulSessionFilter : public ChannelFilter { private: explicit StatefulSessionFilter(ChannelFilter::Args filter_args); - - absl::optional GetOverrideHostFromCookie( - const ClientMetadataHandle& initial_metadata, - absl::string_view cookie_name); - // The relative index of instances of the same filter. const size_t index_; // Index of the service config parser. diff --git a/src/core/lib/service_config/service_config_call_data.h b/src/core/lib/service_config/service_config_call_data.h index 61f387ea45a..661f92755d4 100644 --- a/src/core/lib/service_config/service_config_call_data.h +++ b/src/core/lib/service_config/service_config_call_data.h @@ -88,6 +88,11 @@ class ServiceConfigCallData { call_attributes_.EmplaceBack(value); } + template + A* GetCallAttribute() const { + return static_cast(GetCallAttribute(A::TypeName())); + } + CallAttributeInterface* GetCallAttribute(UniqueTypeName type) const { for (CallAttributeInterface* attribute : call_attributes_) { if (attribute->type() == type) return attribute; diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 938bb66cce3..d31329f5e71 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -440,6 +440,7 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:grpc++", + "//src/proto/grpc/testing/xds/v3:aggregate_cluster_proto", "//src/proto/grpc/testing/xds/v3:router_proto", "//src/proto/grpc/testing/xds/v3:stateful_session_cookie_proto", "//src/proto/grpc/testing/xds/v3:stateful_session_proto", diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index bc6c55dd6cd..145f1f201ef 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -631,7 +631,7 @@ XdsEnd2endTest::CreateEndpointsForBackends(size_t start_index, } ClusterLoadAssignment XdsEnd2endTest::BuildEdsResource( - const EdsResourceArgs& args, const char* eds_service_name) { + const EdsResourceArgs& args, absl::string_view eds_service_name) { ClusterLoadAssignment assignment; assignment.set_cluster_name(eds_service_name); for (const auto& locality : args.locality_list) { diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index ad9ade0fff5..2602840aa22 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -654,7 +654,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // Constructs an EDS resource. ClusterLoadAssignment BuildEdsResource( const EdsResourceArgs& args, - const char* eds_service_name = kDefaultEdsServiceName); + absl::string_view eds_service_name = kDefaultEdsServiceName); // // Backend management diff --git a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc index a3053dd51e5..512493693b3 100644 --- a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc @@ -27,6 +27,7 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/lib/config/config_vars.h" #include "src/core/lib/gprpp/match.h" +#include "src/proto/grpc/testing/xds/v3/aggregate_cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/outlier_detection.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/router.grpc.pb.h" @@ -118,19 +119,24 @@ class OverrideHostTest : public XdsEnd2endTest { return listener; } + // Send requests until a desired backend is hit and returns cookie name/value + // pairs. Empty collection is returned if the backend was never hit. + // For weighted clusters, more than one request per backend may be necessary + // to obtain the cookie. max_requests_per_backend argument specifies + // the number of requests per backend to send. std::vector> GetAffinityCookieHeaderForBackend(grpc_core::DebugLocation debug_location, size_t backend_index, - RpcOptions rpc_options = RpcOptions()) { + size_t max_requests_per_backend = 1) { EXPECT_LT(backend_index, backends_.size()); if (backend_index >= backends_.size()) { return {}; } const auto& backend = backends_[backend_index]; - for (size_t i = 0; i < backends_.size(); ++i) { + for (size_t i = 0; i < max_requests_per_backend * backends_.size(); ++i) { std::multimap server_initial_metadata; grpc::Status status = - SendRpc(rpc_options, nullptr, &server_initial_metadata); + SendRpc(RpcOptions(), nullptr, &server_initial_metadata); EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << ", message=" << status.error_message() << "\n" @@ -150,6 +156,44 @@ class OverrideHostTest : public XdsEnd2endTest { << "Desired backend had not been hit"; return {}; } + + void SetClusterResource(absl::string_view cluster_name, + absl::string_view eds_resource_name) { + Cluster cluster = default_cluster_; + cluster.set_name(cluster_name); + cluster.mutable_eds_cluster_config()->set_service_name(eds_resource_name); + balancer_->ads_service()->SetCdsResource(cluster); + } + + RouteConfiguration BuildRouteConfigurationWithWeightedClusters( + const std::map clusters) { + RouteConfiguration new_route_config = default_route_config_; + auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + for (const auto& cluster : clusters) { + auto* weighted_cluster = + route1->mutable_route()->mutable_weighted_clusters()->add_clusters(); + weighted_cluster->set_name(cluster.first); + weighted_cluster->mutable_weight()->set_value(cluster.second); + } + return new_route_config; + } + + void SetCdsAndEdsResources(absl::string_view cluster_name, + absl::string_view eds_service_name, + size_t start_index, size_t end_index) { + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + CreateEndpointsForBackends(start_index, end_index)}}), + eds_service_name)); + SetClusterResource(cluster_name, eds_service_name); + } + + static double BackendRequestPercentage( + const std::unique_ptr& backend, + size_t num_requests) { + return static_cast(backend->backend_service()->request_count()) / + num_requests; + } }; INSTANTIATE_TEST_SUITE_P(XdsTest, OverrideHostTest, @@ -267,6 +311,132 @@ TEST_P(OverrideHostTest, DrainingExcludedFromOverrideSet) { EXPECT_EQ(2, backends_[2]->backend_service()->request_count()); ResetBackendCounters(); } + +TEST_P(OverrideHostTest, OverrideWithWeightedClusters) { + CreateAndStartBackends(3); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + const uint32_t kWeight1 = 1; + const uint32_t kWeight2 = 3; + const double kErrorTolerance = 0.025; + const size_t kNumEchoRpcs = ComputeIdealNumRpcs( + static_cast(kWeight1) / (kWeight1 + kWeight2), kErrorTolerance); + // Populate EDS and CDS resources. + SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); + SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3); + // Populating Route Configurations for LDS. + SetListenerAndRouteConfiguration( + balancer_.get(), BuildListenerWithStatefulSessionFilter(), + BuildRouteConfigurationWithWeightedClusters( + {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); + WaitForAllBackends(DEBUG_LOCATION, 0, 3); + // Get cookie + auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3); + ASSERT_FALSE(session_cookie.empty()); + // All requests go to the backend we requested. + CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, + RpcOptions().set_metadata(session_cookie)); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), kNumEchoRpcs); + EXPECT_EQ(backends_[2]->backend_service()->request_count(), 0); +} + +TEST_P(OverrideHostTest, ClusterOverrideHonoredButHostGone) { + CreateAndStartBackends(4); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + const uint32_t kWeight1 = 1; + const uint32_t kWeight2 = 3; + const double kErrorTolerance = 0.025; + const double kWeight2Percent = + static_cast(kWeight2) / (kWeight1 + kWeight2); + const size_t kNumEchoRpcs = + ComputeIdealNumRpcs(kWeight2Percent, kErrorTolerance); + // Populate EDS and CDS resources. + SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); + SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 3); + // Populating Route Configurations for LDS. + SetListenerAndRouteConfiguration( + balancer_.get(), BuildListenerWithStatefulSessionFilter(), + BuildRouteConfigurationWithWeightedClusters( + {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); + WaitForAllBackends(DEBUG_LOCATION, 0, 3); + auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 3); + ASSERT_FALSE(session_cookie.empty()); + // Remove backends[1] from cluster2 + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}}), + kNewEdsService2Name)); + WaitForAllBackends(DEBUG_LOCATION, 3, 4); + CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, + RpcOptions().set_metadata(session_cookie)); + // Traffic goes to a second cluster, where it is equally distributed between + // the two remaining hosts + EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs), + ::testing::DoubleNear(.5, kErrorTolerance)); + EXPECT_THAT(BackendRequestPercentage(backends_[3], kNumEchoRpcs), + ::testing::DoubleNear(.5, kErrorTolerance)); + EXPECT_NE(session_cookie, + GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 2, 3)); +} + +TEST_P(OverrideHostTest, ClusterGoneHostStays) { + CreateAndStartBackends(3); + const char* kNewCluster1Name = "new_cluster_1"; + const char* kNewEdsService1Name = "new_eds_service_name_1"; + const char* kNewCluster2Name = "new_cluster_2"; + const char* kNewEdsService2Name = "new_eds_service_name_2"; + const char* kNewCluster3Name = "new_cluster_3"; + const char* kNewEdsService3Name = "new_eds_service_name_3"; + const uint32_t kWeight1 = 3; + const uint32_t kWeight2 = 1; + const double kErrorTolerance = 0.025; + const double kPercentage1 = + static_cast(kWeight1) / (kWeight1 + kWeight2); + const size_t kNumEchoRpcs = + ComputeIdealNumRpcs(kPercentage1, kErrorTolerance); + // Populate EDS and CDS resources. + SetCdsAndEdsResources(kNewCluster1Name, kNewEdsService1Name, 0, 1); + SetCdsAndEdsResources(kNewCluster2Name, kNewEdsService2Name, 1, 2); + // Populating Route Configurations for LDS. + SetListenerAndRouteConfiguration( + balancer_.get(), BuildListenerWithStatefulSessionFilter(), + BuildRouteConfigurationWithWeightedClusters( + {{kNewCluster1Name, kWeight1}, {kNewCluster2Name, kWeight2}})); + WaitForAllBackends(DEBUG_LOCATION, 0, 2); + auto backend1_in_cluster2_cookie = + GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5); + ASSERT_FALSE(backend1_in_cluster2_cookie.empty()); + // Create a new cluster, cluster 3, containing a new backend, backend 2. + SetCdsAndEdsResources(kNewCluster3Name, kNewEdsService3Name, 2, 3); + // Send an EDS update for cluster 1 that adds backend 1. (Now cluster 1 has + // backends 0 and 1.) + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", CreateEndpointsForBackends(0, 2)}}), + kNewEdsService1Name)); + SetListenerAndRouteConfiguration( + balancer_.get(), BuildListenerWithStatefulSessionFilter(), + BuildRouteConfigurationWithWeightedClusters( + {{kNewCluster1Name, kWeight1}, {kNewCluster3Name, kWeight2}})); + WaitForAllBackends(DEBUG_LOCATION, 2); + CheckRpcSendOk(DEBUG_LOCATION, kNumEchoRpcs, + RpcOptions().set_metadata(backend1_in_cluster2_cookie)); + // Traffic is split between clusters. Cluster1 traffic is sent to backends_[1] + EXPECT_THAT(BackendRequestPercentage(backends_[0], kNumEchoRpcs), + ::testing::DoubleNear(0, kErrorTolerance)); + EXPECT_THAT(BackendRequestPercentage(backends_[1], kNumEchoRpcs), + ::testing::DoubleNear(kPercentage1, kErrorTolerance)); + EXPECT_THAT(BackendRequestPercentage(backends_[2], kNumEchoRpcs), + ::testing::DoubleNear(1 - kPercentage1, kErrorTolerance)); + // backends_[1] cookie is updated with a new cluster + EXPECT_NE(backend1_in_cluster2_cookie, + GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 1, 5)); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/xds/xds_routing_end2end_test.cc b/test/cpp/end2end/xds/xds_routing_end2end_test.cc index 2785ce54994..a4012732404 100644 --- a/test/cpp/end2end/xds/xds_routing_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_routing_end2end_test.cc @@ -402,7 +402,9 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Values(XdsTestType(), XdsTestType().set_enable_rds_testing()), &XdsTestType::Name); -MATCHER_P2(AdjustedClockInRange, t1, t2, "equals time") { +MATCHER_P2(AdjustedClockInRange, t1, t2, + absl::StrFormat("time between %s and %s", t1.ToString().c_str(), + t2.ToString().c_str())) { gpr_cycle_counter cycle_now = gpr_get_cycle_counter(); grpc_core::Timestamp cycle_time = grpc_core::Timestamp::FromCycleCounterRoundDown(cycle_now);