mirror of https://github.com/grpc/grpc.git
xDS stateful session affinity: implement C-core filter (#31788)
* stateful session affinity: implement filter * register filter config parser * fix unused parameter errors * remove some FIXMEs that are not longer needed * clang-tidy * iwyu * iwyu * revert iwyu changes to observability_logging_sink.cc * generate_projects * shorten filter name * don't use absl::optional for path * fix build * don't add cookie to trailing metadata unless it's Trailers-Onlypull/31793/head^2
parent
1bfc50f27e
commit
5e4d9f4bcf
52 changed files with 622 additions and 58 deletions
@ -0,0 +1,230 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <algorithm> |
||||
#include <functional> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
#include <vector> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/escaping.h" |
||||
#include "absl/strings/match.h" |
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_join.h" |
||||
#include "absl/strings/str_split.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/impl/codegen/gpr_types.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/context.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/detail/basic_seq.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "src/core/lib/promise/try_concurrently.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/service_config/service_config_call_data.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_stateful_session_filter_trace(false, "stateful_session_filter"); |
||||
|
||||
UniqueTypeName XdsHostOverrideTypeName() { |
||||
static UniqueTypeName::Factory kFactory("xds_host_override"); |
||||
return kFactory.Create(); |
||||
} |
||||
|
||||
const grpc_channel_filter StatefulSessionFilter::kFilter = |
||||
MakePromiseBasedFilter<StatefulSessionFilter, FilterEndpoint::kClient>( |
||||
"stateful_session_filter"); |
||||
|
||||
absl::StatusOr<StatefulSessionFilter> StatefulSessionFilter::Create( |
||||
const ChannelArgs&, ChannelFilter::Args filter_args) { |
||||
return StatefulSessionFilter(filter_args); |
||||
} |
||||
|
||||
StatefulSessionFilter::StatefulSessionFilter(ChannelFilter::Args filter_args) |
||||
: index_(grpc_channel_stack_filter_instance_number( |
||||
filter_args.channel_stack(), |
||||
filter_args.uninitialized_channel_element())), |
||||
service_config_parser_index_( |
||||
StatefulSessionServiceConfigParser::ParserIndex()) {} |
||||
|
||||
namespace { |
||||
|
||||
// Adds the set-cookie header to the server initial metadata if needed.
|
||||
void MaybeUpdateServerInitialMetadata( |
||||
const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config, |
||||
absl::optional<absl::string_view> cookie_value, |
||||
ServerMetadata* server_initial_metadata) { |
||||
// Get peer string.
|
||||
auto peer_string = server_initial_metadata->get(PeerString()); |
||||
if (!peer_string.has_value()) return; // Nothing we can do.
|
||||
// If there was no cookie or if the address changed, set the cookie.
|
||||
if (!cookie_value.has_value() || *peer_string != *cookie_value) { |
||||
std::vector<std::string> parts = { |
||||
absl::StrCat(*cookie_config->name, "=", |
||||
absl::Base64Escape(*peer_string), "; HttpOnly")}; |
||||
if (!cookie_config->path.empty()) { |
||||
parts.emplace_back(absl::StrCat("Path=", cookie_config->path)); |
||||
} |
||||
if (cookie_config->ttl > Duration::Zero()) { |
||||
parts.emplace_back( |
||||
absl::StrCat("Max-Age=", cookie_config->ttl.as_timespec().tv_sec)); |
||||
} |
||||
server_initial_metadata->Append( |
||||
"set-cookie", Slice::FromCopiedString(absl::StrJoin(parts, "; ")), |
||||
[](absl::string_view error, const Slice&) { |
||||
gpr_log(GPR_ERROR, "ERROR ADDING set-cookie METADATA: %s", |
||||
std::string(error).c_str()); |
||||
GPR_ASSERT(false); |
||||
}); |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> StatefulSessionFilter::MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||
// Get config.
|
||||
auto* service_config_call_data = static_cast<ServiceConfigCallData*>( |
||||
GetContext< |
||||
grpc_call_context_element>()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] |
||||
.value); |
||||
GPR_ASSERT(service_config_call_data != nullptr); |
||||
auto* method_params = static_cast<StatefulSessionMethodParsedConfig*>( |
||||
service_config_call_data->GetMethodParsedConfig( |
||||
service_config_parser_index_)); |
||||
GPR_ASSERT(method_params != nullptr); |
||||
auto* cookie_config = method_params->GetConfig(index_); |
||||
GPR_ASSERT(cookie_config != nullptr); |
||||
if (!cookie_config->name.has_value()) { |
||||
return next_promise_factory(std::move(call_args)); |
||||
} |
||||
// We have a config.
|
||||
// If the config has a path, check to see if it matches the request path.
|
||||
if (!cookie_config->path.empty()) { |
||||
Slice* path_slice = |
||||
call_args.client_initial_metadata->get_pointer(HttpPathMetadata()); |
||||
GPR_ASSERT(path_slice != nullptr); |
||||
absl::string_view path = path_slice->as_string_view(); |
||||
// Matching criteria from
|
||||
// https://www.rfc-editor.org/rfc/rfc6265#section-5.1.4.
|
||||
if (!absl::StartsWith(path, cookie_config->path) || |
||||
(path.size() != cookie_config->path.size() && |
||||
cookie_config->path.back() != '/' && |
||||
path[cookie_config->path.size() + 1] != '/')) { |
||||
return next_promise_factory(std::move(call_args)); |
||||
} |
||||
} |
||||
// Check to see if we have a host override cookie.
|
||||
auto cookie_value = GetHostOverrideFromCookie( |
||||
call_args.client_initial_metadata, *cookie_config->name); |
||||
if (cookie_value.has_value()) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_stateful_session_filter_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"chand=%p: stateful session filter found cookie %s value %s", |
||||
this, cookie_config->name->c_str(), |
||||
std::string(*cookie_value).c_str()); |
||||
} |
||||
// We have a valid cookie, so add the call attribute to be used by the
|
||||
// xds_override_host LB policy.
|
||||
service_config_call_data->SetCallAttribute(XdsHostOverrideTypeName(), |
||||
*cookie_value); |
||||
} |
||||
// Intercept server initial metadata.
|
||||
auto* read_latch = GetContext<Arena>()->New<Latch<ServerMetadata*>>(); |
||||
auto* write_latch = |
||||
std::exchange(call_args.server_initial_metadata, read_latch); |
||||
return TryConcurrently( |
||||
Seq(next_promise_factory(std::move(call_args)), |
||||
[cookie_config, cookie_value](ServerMetadataHandle md) { |
||||
// If we got a Trailers-Only response, then add the
|
||||
// cookie to the trailing metadata instead of the
|
||||
// initial metadata.
|
||||
if (md->get(GrpcTrailersOnly()).value_or(false)) { |
||||
MaybeUpdateServerInitialMetadata(cookie_config, |
||||
cookie_value, md.get()); |
||||
} |
||||
return md; |
||||
})) |
||||
.NecessaryPull(Seq(read_latch->Wait(), |
||||
[write_latch, cookie_config, |
||||
cookie_value](ServerMetadata** md) -> absl::Status { |
||||
if (*md != nullptr) { |
||||
// Add cookie to server initial metadata if needed.
|
||||
MaybeUpdateServerInitialMetadata( |
||||
cookie_config, cookie_value, *md); |
||||
} |
||||
write_latch->Set(*md); |
||||
return absl::OkStatus(); |
||||
})); |
||||
} |
||||
|
||||
absl::optional<absl::string_view> |
||||
StatefulSessionFilter::GetHostOverrideFromCookie( |
||||
const ClientMetadataHandle& client_initial_metadata, |
||||
absl::string_view cookie_name) { |
||||
// Check to see if the cookie header is present.
|
||||
std::string buffer; |
||||
auto header_value = |
||||
client_initial_metadata->GetStringValue("cookie", &buffer); |
||||
if (!header_value.has_value()) return absl::nullopt; |
||||
// Parse cookie header.
|
||||
std::vector<absl::string_view> values; |
||||
for (absl::string_view cookie : absl::StrSplit(*header_value, "; ")) { |
||||
std::pair<absl::string_view, absl::string_view> kv = |
||||
absl::StrSplit(cookie, absl::MaxSplits('=', 1)); |
||||
if (kv.first == cookie_name) values.push_back(kv.second); |
||||
} |
||||
if (values.empty()) return absl::nullopt; |
||||
// TODO(roth): Figure out the right behavior for multiple cookies.
|
||||
// For now, just choose the first value.
|
||||
absl::string_view value = values.front(); |
||||
// Base64-decode it.
|
||||
std::string decoded_value; |
||||
if (!absl::Base64Unescape(value, &decoded_value)) return absl::nullopt; |
||||
// Copy it into the arena, since it will need to persist until the LB pick.
|
||||
char* arena_value = |
||||
static_cast<char*>(GetContext<Arena>()->Alloc(decoded_value.size())); |
||||
memcpy(arena_value, decoded_value.c_str(), decoded_value.size()); |
||||
return absl::string_view(arena_value, decoded_value.size()); |
||||
} |
||||
|
||||
void StatefulSessionFilterRegister(CoreConfiguration::Builder* builder) { |
||||
StatefulSessionServiceConfigParser::Register(builder); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,66 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_FILTER_H |
||||
#define GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_FILTER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/gprpp/unique_type_name.h" |
||||
#include "src/core/lib/promise/arena_promise.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
UniqueTypeName XdsHostOverrideTypeName(); |
||||
|
||||
// A filter to provide cookie-based stateful session affinity.
|
||||
class StatefulSessionFilter : public ChannelFilter { |
||||
public: |
||||
static const grpc_channel_filter kFilter; |
||||
|
||||
static absl::StatusOr<StatefulSessionFilter> Create( |
||||
const ChannelArgs& args, ChannelFilter::Args filter_args); |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||
|
||||
private: |
||||
explicit StatefulSessionFilter(ChannelFilter::Args filter_args); |
||||
|
||||
absl::optional<absl::string_view> GetHostOverrideFromCookie( |
||||
const ClientMetadataHandle& initial_metadata, |
||||
absl::string_view cookie_name); |
||||
|
||||
// The relative index of instances of the same filter.
|
||||
const size_t index_; |
||||
// Index of the service config parser.
|
||||
const size_t service_config_parser_index_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_FILTER_H
|
@ -0,0 +1,82 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h" |
||||
|
||||
#include <vector> |
||||
|
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
const JsonLoaderInterface* |
||||
StatefulSessionMethodParsedConfig::CookieConfig::JsonLoader(const JsonArgs&) { |
||||
static const auto* loader = JsonObjectLoader<CookieConfig>() |
||||
.OptionalField("name", &CookieConfig::name) |
||||
.OptionalField("path", &CookieConfig::path) |
||||
.OptionalField("ttl", &CookieConfig::ttl) |
||||
.Finish(); |
||||
return loader; |
||||
} |
||||
|
||||
void StatefulSessionMethodParsedConfig::CookieConfig::JsonPostLoad( |
||||
const Json&, const JsonArgs&, ValidationErrors* errors) { |
||||
// Validate that cookie_name is non-empty.
|
||||
if (name.has_value() && name->empty()) { |
||||
ValidationErrors::ScopedField field(errors, ".name"); |
||||
errors->AddError("must be non-empty"); |
||||
} |
||||
} |
||||
|
||||
const JsonLoaderInterface* StatefulSessionMethodParsedConfig::JsonLoader( |
||||
const JsonArgs&) { |
||||
static const auto* loader = |
||||
JsonObjectLoader<StatefulSessionMethodParsedConfig>() |
||||
.OptionalField("stateful_session", |
||||
&StatefulSessionMethodParsedConfig::configs_) |
||||
.Finish(); |
||||
return loader; |
||||
} |
||||
|
||||
std::unique_ptr<ServiceConfigParser::ParsedConfig> |
||||
StatefulSessionServiceConfigParser::ParsePerMethodParams( |
||||
const ChannelArgs& args, const Json& json, ValidationErrors* errors) { |
||||
// Only parse config if the following channel arg is present.
|
||||
if (!args.GetBool(GRPC_ARG_PARSE_STATEFUL_SESSION_METHOD_CONFIG) |
||||
.value_or(false)) { |
||||
return nullptr; |
||||
} |
||||
// Parse config from json.
|
||||
return LoadFromJson<std::unique_ptr<StatefulSessionMethodParsedConfig>>( |
||||
json, JsonArgs(), errors); |
||||
} |
||||
|
||||
void StatefulSessionServiceConfigParser::Register( |
||||
CoreConfiguration::Builder* builder) { |
||||
builder->service_config_parser()->RegisterParser( |
||||
std::make_unique<StatefulSessionServiceConfigParser>()); |
||||
} |
||||
|
||||
size_t StatefulSessionServiceConfigParser::ParserIndex() { |
||||
return CoreConfiguration::Get().service_config_parser().GetParserIndex( |
||||
parser_name()); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,93 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_SERVICE_CONFIG_PARSER_H |
||||
#define GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_SERVICE_CONFIG_PARSER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/gprpp/validation_errors.h" |
||||
#include "src/core/lib/json/json.h" |
||||
#include "src/core/lib/json/json_args.h" |
||||
#include "src/core/lib/json/json_object_loader.h" |
||||
#include "src/core/lib/service_config/service_config_parser.h" |
||||
|
||||
// Channel arg key for enabling parsing fault injection via method config.
|
||||
#define GRPC_ARG_PARSE_STATEFUL_SESSION_METHOD_CONFIG \ |
||||
"grpc.internal.parse_stateful_session_method_config" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class StatefulSessionMethodParsedConfig |
||||
: public ServiceConfigParser::ParsedConfig { |
||||
public: |
||||
struct CookieConfig { |
||||
absl::optional<std::string> name; // Will be unset if disabled.
|
||||
std::string path; |
||||
Duration ttl; |
||||
|
||||
static const JsonLoaderInterface* JsonLoader(const JsonArgs&); |
||||
void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors); |
||||
}; |
||||
|
||||
// Returns the config at the specified index. There might be multiple
|
||||
// stateful session filters in the list of HTTP filters at the same time.
|
||||
// The order of the list is stable, and an index is used to keep track of
|
||||
// their relative positions. Each filter instance uses this method to
|
||||
// access the appropriate parsed config for that instance.
|
||||
const CookieConfig* GetConfig(size_t index) const { |
||||
if (index >= configs_.size()) return nullptr; |
||||
return &configs_[index]; |
||||
} |
||||
|
||||
static const JsonLoaderInterface* JsonLoader(const JsonArgs&); |
||||
|
||||
private: |
||||
std::vector<CookieConfig> configs_; |
||||
}; |
||||
|
||||
class StatefulSessionServiceConfigParser final |
||||
: public ServiceConfigParser::Parser { |
||||
public: |
||||
absl::string_view name() const override { return parser_name(); } |
||||
// Parses the per-method service config for the filter.
|
||||
std::unique_ptr<ServiceConfigParser::ParsedConfig> ParsePerMethodParams( |
||||
const ChannelArgs& args, const Json& json, |
||||
ValidationErrors* errors) override; |
||||
// Returns the parser index for the parser.
|
||||
static size_t ParserIndex(); |
||||
// Registers the parser.
|
||||
static void Register(CoreConfiguration::Builder* builder); |
||||
|
||||
private: |
||||
static absl::string_view parser_name() { return "stateful_session"; } |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_FILTERS_STATEFUL_SESSION_STATEFUL_SESSION_SERVICE_CONFIG_PARSER_H
|
Loading…
Reference in new issue