From e810225bc02a73691f6ab0d74d46f2a86600dc88 Mon Sep 17 00:00:00 2001 From: Alisha Nanda Date: Fri, 13 Oct 2023 13:47:02 -0700 Subject: [PATCH] [server] Add new flat_hash_map structure for per-channel registered methods (#34612) Protected by new experiment. Will be followed up by #34286. --- BUILD | 1 + src/core/lib/experiments/experiments.cc | 15 +++++ src/core/lib/experiments/experiments.h | 8 +++ src/core/lib/experiments/experiments.yaml | 6 ++ src/core/lib/experiments/rollouts.yaml | 2 + src/core/lib/surface/server.cc | 76 ++++++++++++++++++----- src/core/lib/surface/server.h | 39 +++++++++++- 7 files changed, 131 insertions(+), 16 deletions(-) diff --git a/BUILD b/BUILD index e61d4f73a9a..ef735a2e93e 100644 --- a/BUILD +++ b/BUILD @@ -1487,6 +1487,7 @@ grpc_cc_library( "absl/container:inlined_vector", "absl/functional:any_invocable", "absl/functional:function_ref", + "absl/hash", "absl/meta:type_traits", "absl/status", "absl/status:statusor", diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index b1e02710020..41dc5f8413b 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -114,6 +114,9 @@ const char* const description_registered_method_lookup_in_transport = "Change registered method's lookup point to transport"; const char* const additional_constraints_registered_method_lookup_in_transport = "{}"; +const char* const description_registered_methods_map = + "Use absl::flat_hash_map for registered methods."; +const char* const additional_constraints_registered_methods_map = "{}"; const char* const description_round_robin_delegate_to_pick_first = "Change round_robin code to delegate to pick_first as per dualstack " "backend design."; @@ -245,6 +248,8 @@ const ExperimentMetadata g_experiment_metadata[] = { {"registered_method_lookup_in_transport", description_registered_method_lookup_in_transport, additional_constraints_registered_method_lookup_in_transport, true, true}, + {"registered_methods_map", description_registered_methods_map, + additional_constraints_registered_methods_map, false, true}, {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, @@ -377,6 +382,9 @@ const char* const description_registered_method_lookup_in_transport = "Change registered method's lookup point to transport"; const char* const additional_constraints_registered_method_lookup_in_transport = "{}"; +const char* const description_registered_methods_map = + "Use absl::flat_hash_map for registered methods."; +const char* const additional_constraints_registered_methods_map = "{}"; const char* const description_round_robin_delegate_to_pick_first = "Change round_robin code to delegate to pick_first as per dualstack " "backend design."; @@ -508,6 +516,8 @@ const ExperimentMetadata g_experiment_metadata[] = { {"registered_method_lookup_in_transport", description_registered_method_lookup_in_transport, additional_constraints_registered_method_lookup_in_transport, true, true}, + {"registered_methods_map", description_registered_methods_map, + additional_constraints_registered_methods_map, false, true}, {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, @@ -640,6 +650,9 @@ const char* const description_registered_method_lookup_in_transport = "Change registered method's lookup point to transport"; const char* const additional_constraints_registered_method_lookup_in_transport = "{}"; +const char* const description_registered_methods_map = + "Use absl::flat_hash_map for registered methods."; +const char* const additional_constraints_registered_methods_map = "{}"; const char* const description_round_robin_delegate_to_pick_first = "Change round_robin code to delegate to pick_first as per dualstack " "backend design."; @@ -771,6 +784,8 @@ const ExperimentMetadata g_experiment_metadata[] = { {"registered_method_lookup_in_transport", description_registered_method_lookup_in_transport, additional_constraints_registered_method_lookup_in_transport, true, true}, + {"registered_methods_map", description_registered_methods_map, + additional_constraints_registered_methods_map, false, true}, {"round_robin_delegate_to_pick_first", description_round_robin_delegate_to_pick_first, additional_constraints_round_robin_delegate_to_pick_first, true, true}, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 966460eb693..b3bdc60f55c 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -102,6 +102,7 @@ inline bool IsPromiseBasedServerCallEnabled() { return false; } inline bool IsRedMaxConcurrentStreamsEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; } +inline bool IsRegisteredMethodsMapEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } inline bool IsRstpitEnabled() { return false; } @@ -173,6 +174,7 @@ inline bool IsPromiseBasedServerCallEnabled() { return false; } inline bool IsRedMaxConcurrentStreamsEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; } +inline bool IsRegisteredMethodsMapEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } inline bool IsRstpitEnabled() { return false; } @@ -244,6 +246,7 @@ inline bool IsPromiseBasedServerCallEnabled() { return false; } inline bool IsRedMaxConcurrentStreamsEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; } +inline bool IsRegisteredMethodsMapEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; } inline bool IsRstpitEnabled() { return false; } @@ -298,6 +301,7 @@ enum ExperimentIds { kExperimentIdPromiseBasedServerCall, kExperimentIdRedMaxConcurrentStreams, kExperimentIdRegisteredMethodLookupInTransport, + kExperimentIdRegisteredMethodsMap, kExperimentIdRoundRobinDelegateToPickFirst, kExperimentIdRstpit, kExperimentIdScheduleCancellationOverWrite, @@ -421,6 +425,10 @@ inline bool IsRedMaxConcurrentStreamsEnabled() { inline bool IsRegisteredMethodLookupInTransportEnabled() { return IsExperimentEnabled(kExperimentIdRegisteredMethodLookupInTransport); } +#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHODS_MAP +inline bool IsRegisteredMethodsMapEnabled() { + return IsExperimentEnabled(kExperimentIdRegisteredMethodsMap); +} #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { return IsExperimentEnabled(kExperimentIdRoundRobinDelegateToPickFirst); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 4bc4fe4f75a..1981d69d526 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -199,6 +199,12 @@ expiry: 2024/03/31 owner: yashkt@google.com test_tags: ["surface_registered_method_lookup"] +- name: registered_methods_map + description: + Use absl::flat_hash_map for registered methods. + expiry: 2024/01/31 + owner: alishananda@google.com + test_tags: [] - name: round_robin_delegate_to_pick_first description: Change round_robin code to delegate to pick_first as per dualstack diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index de5c09d9979..3cc2aaddebc 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -98,6 +98,8 @@ default: false - name: registered_method_lookup_in_transport default: true +- name: registered_methods_map + default: false - name: round_robin_delegate_to_pick_first default: true - name: rstpit diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 6f76ef26f7e..a452020c0a2 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -32,6 +32,7 @@ #include #include "absl/cleanup/cleanup.h" +#include "absl/container/flat_hash_map.h" #include "absl/status/status.h" #include "absl/types/optional.h" #include "absl/types/variant.h" @@ -40,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -825,6 +827,10 @@ Server::RegisteredMethod* Server::RegisterMethod( const char* method, const char* host, grpc_server_register_method_payload_handling payload_handling, uint32_t flags) { + if (IsRegisteredMethodsMapEnabled() && started_) { + Crash("Attempting to register method after server started"); + } + if (!method) { gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); @@ -1132,7 +1138,7 @@ class Server::ChannelData::ConnectivityWatcher // Server::ChannelData::~ChannelData() { - registered_methods_.reset(); + old_registered_methods_.reset(); if (server_ != nullptr) { if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) { server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_); @@ -1160,27 +1166,27 @@ void Server::ChannelData::InitTransport(RefCountedPtr server, // Build a lookup table phrased in terms of mdstr's in this channels context // to quickly find registered methods. size_t num_registered_methods = server_->registered_methods_.size(); - if (num_registered_methods > 0) { + if (!IsRegisteredMethodsMapEnabled() && num_registered_methods > 0) { uint32_t max_probes = 0; size_t slots = 2 * num_registered_methods; - registered_methods_ = + old_registered_methods_ = std::make_unique>(slots); for (std::unique_ptr& rm : server_->registered_methods_) { Slice host; Slice method = Slice::FromExternalString(rm->method); const bool has_host = !rm->host.empty(); if (has_host) { - host = Slice::FromExternalString(rm->host.c_str()); + host = Slice::FromExternalString(rm->host); } uint32_t hash = MixHash32(has_host ? host.Hash() : 0, method.Hash()); uint32_t probes = 0; - for (probes = 0; (*registered_methods_)[(hash + probes) % slots] + for (probes = 0; (*old_registered_methods_)[(hash + probes) % slots] .server_registered_method != nullptr; probes++) { } if (probes > max_probes) max_probes = probes; ChannelRegisteredMethod* crm = - &(*registered_methods_)[(hash + probes) % slots]; + &(*old_registered_methods_)[(hash + probes) % slots]; crm->server_registered_method = rm.get(); crm->flags = rm->flags; crm->has_host = has_host; @@ -1191,6 +1197,15 @@ void Server::ChannelData::InitTransport(RefCountedPtr server, } GPR_ASSERT(slots <= UINT32_MAX); registered_method_max_probes_ = max_probes; + } else if (IsRegisteredMethodsMapEnabled()) { + for (std::unique_ptr& rm : server_->registered_methods_) { + auto key = std::make_pair(!rm->host.empty() ? rm->host : "", rm->method); + registered_methods_.emplace( + key, std::make_unique( + rm.get(), rm->flags, /*has_host=*/!rm->host.empty(), + Slice::FromExternalString(rm->method), + Slice::FromExternalString(rm->host))); + } } // Publish channel. { @@ -1216,13 +1231,13 @@ void Server::ChannelData::InitTransport(RefCountedPtr server, Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( const grpc_slice& host, const grpc_slice& path) { - if (registered_methods_ == nullptr) return nullptr; + if (old_registered_methods_ == nullptr) return nullptr; // TODO(ctiller): unify these two searches // check for an exact match with host uint32_t hash = MixHash32(grpc_slice_hash(host), grpc_slice_hash(path)); for (size_t i = 0; i <= registered_method_max_probes_; i++) { - ChannelRegisteredMethod* rm = - &(*registered_methods_)[(hash + i) % registered_methods_->size()]; + ChannelRegisteredMethod* rm = &( + *old_registered_methods_)[(hash + i) % old_registered_methods_->size()]; if (rm->server_registered_method == nullptr) break; if (!rm->has_host) continue; if (rm->host != host) continue; @@ -1232,8 +1247,8 @@ Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( // check for a wildcard method definition (no host set) hash = MixHash32(0, grpc_slice_hash(path)); for (size_t i = 0; i <= registered_method_max_probes_; i++) { - ChannelRegisteredMethod* rm = - &(*registered_methods_)[(hash + i) % registered_methods_->size()]; + ChannelRegisteredMethod* rm = &( + *old_registered_methods_)[(hash + i) % old_registered_methods_->size()]; if (rm->server_registered_method == nullptr) break; if (rm->has_host) continue; if (rm->method != path) continue; @@ -1242,6 +1257,22 @@ Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( return nullptr; } +Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( + const absl::string_view& host, const absl::string_view& path) { + if (registered_methods_.empty()) return nullptr; + // check for an exact match with host + auto it = registered_methods_.find(std::make_pair(host, path)); + if (it != registered_methods_.end()) { + return it->second.get(); + } + // check for wildcard method definition (no host set) + it = registered_methods_.find(std::make_pair("", path)); + if (it != registered_methods_.end()) { + return it->second.get(); + } + return nullptr; +} + void Server::ChannelData::SetRegisteredMethodOnMetadata( void* arg, ServerMetadata* metadata) { auto* chand = static_cast(arg); @@ -1258,8 +1289,13 @@ void Server::ChannelData::SetRegisteredMethodOnMetadata( // Path not being set would result in an RPC error. return; } - ChannelRegisteredMethod* method = - chand->GetRegisteredMethod(authority->c_slice(), path->c_slice()); + ChannelRegisteredMethod* method; + if (!IsRegisteredMethodsMapEnabled()) { + method = chand->GetRegisteredMethod(authority->c_slice(), path->c_slice()); + } else { + method = chand->GetRegisteredMethod(authority->as_string_view(), + path->as_string_view()); + } // insert in metadata metadata->Set(GrpcRegisteredMethod(), method); } @@ -1336,7 +1372,12 @@ ArenaPromise Server::ChannelData::MakeCallPromise( call_args.client_initial_metadata->get(GrpcRegisteredMethod()) .value_or(nullptr)); } else { - rm = chand->GetRegisteredMethod(host_ptr->c_slice(), path->c_slice()); + if (!IsRegisteredMethodsMapEnabled()) { + rm = chand->GetRegisteredMethod(host_ptr->c_slice(), path->c_slice()); + } else { + rm = chand->GetRegisteredMethod(host_ptr->as_string_view(), + path->as_string_view()); + } } ArenaPromise>> maybe_read_first_message([] { return NextResult(); }); @@ -1602,7 +1643,12 @@ void Server::CallData::StartNewRpc(grpc_call_element* elem) { recv_initial_metadata_->get(GrpcRegisteredMethod()) .value_or(nullptr)); } else { - rm = chand->GetRegisteredMethod(host_->c_slice(), path_->c_slice()); + if (!IsRegisteredMethodsMapEnabled()) { + rm = chand->GetRegisteredMethod(host_->c_slice(), path_->c_slice()); + } else { + rm = chand->GetRegisteredMethod(host_->as_string_view(), + path_->as_string_view()); + } } if (rm != nullptr) { matcher_ = rm->server_registered_method->matcher.get(); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 757f73dd9ae..fc6b8acb3ca 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -31,7 +31,11 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "absl/hash/hash.h" #include "absl/status/statusor.h" +#include "absl/strings/string_view.h" #include "absl/types/optional.h" #include @@ -202,6 +206,18 @@ class Server : public InternallyRefCounted, struct RequestedCall; struct ChannelRegisteredMethod { + ChannelRegisteredMethod() = default; + ChannelRegisteredMethod(RegisteredMethod* server_registered_method_arg, + uint32_t flags_arg, bool has_host_arg, + Slice method_arg, Slice host_arg) + : server_registered_method(server_registered_method_arg), + flags(flags_arg), + has_host(has_host_arg), + method(std::move(method_arg)), + host(std::move(host_arg)) {} + + ~ChannelRegisteredMethod() = default; + RegisteredMethod* server_registered_method = nullptr; uint32_t flags; bool has_host; @@ -231,6 +247,9 @@ class Server : public InternallyRefCounted, ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host, const grpc_slice& path); + + ChannelRegisteredMethod* GetRegisteredMethod(const absl::string_view& host, + const absl::string_view& path); // Filter vtable functions. static grpc_error_handle InitChannelElement( grpc_channel_element* elem, grpc_channel_element_args* args); @@ -250,6 +269,17 @@ class Server : public InternallyRefCounted, static void FinishDestroy(void* arg, grpc_error_handle error); + struct StringViewStringViewPairHash + : absl::flat_hash_set< + std::pair>::hasher { + using is_transparent = void; + }; + + struct StringViewStringViewPairEq + : std::equal_to> { + using is_transparent = void; + }; + RefCountedPtr server_; RefCountedPtr channel_; // The index into Server::cqs_ of the CQ used as a starting point for @@ -260,7 +290,14 @@ class Server : public InternallyRefCounted, // TODO(vjpai): Convert this to an STL map type as opposed to a direct // bucket implementation. (Consider performance impact, hash function to // use, etc.) - std::unique_ptr> registered_methods_; + std::unique_ptr> + old_registered_methods_; + // Map of registered methods. + absl::flat_hash_map /*host, method*/, + std::unique_ptr, + StringViewStringViewPairHash, + StringViewStringViewPairEq> + registered_methods_; uint32_t registered_method_max_probes_; grpc_closure finish_destroy_channel_closure_; intptr_t channelz_socket_uuid_;