From ee84fb9beb9fd02e055a3cf239a3e1a3614442eb Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 1 Apr 2022 18:51:24 -0700 Subject: [PATCH] Convert server config selector filter (#28877) * server_config_selector_filter * fix * progress * update * x * compiles! * fixed! * fix * fix --- BUILD | 2 + .../server_config_selector.cc | 10 +- .../server_config_selector.h | 4 +- .../server_config_selector_filter.cc | 278 +++++------------- .../server_config_selector_test.cc | 9 +- 5 files changed, 85 insertions(+), 218 deletions(-) diff --git a/BUILD b/BUILD index 8ca21f01119..18d15f78e32 100644 --- a/BUILD +++ b/BUILD @@ -2572,10 +2572,12 @@ grpc_cc_library( ], language = "c++", deps = [ + "arena", "gpr_base", "grpc_base", "grpc_server_config_selector", "grpc_service_config", + "promise", ], ) diff --git a/src/core/ext/filters/server_config_selector/server_config_selector.cc b/src/core/ext/filters/server_config_selector/server_config_selector.cc index 85df64e4c2a..8d21905cd98 100644 --- a/src/core/ext/filters/server_config_selector/server_config_selector.cc +++ b/src/core/ext/filters/server_config_selector/server_config_selector.cc @@ -54,14 +54,8 @@ grpc_arg ServerConfigSelectorProvider::MakeChannelArg() const { const_cast(this), &kChannelArgVtable); } -RefCountedPtr -ServerConfigSelectorProvider::GetFromChannelArgs( - const grpc_channel_args& args) { - ServerConfigSelectorProvider* config_selector_provider = - grpc_channel_args_find_pointer( - &args, kServerConfigSelectorProviderChannelArgName); - return config_selector_provider != nullptr ? config_selector_provider->Ref() - : nullptr; +absl::string_view ServerConfigSelectorProvider::ChannelArgName() { + return kServerConfigSelectorProviderChannelArgName; } } // namespace grpc_core diff --git a/src/core/ext/filters/server_config_selector/server_config_selector.h b/src/core/ext/filters/server_config_selector/server_config_selector.h index 8f8951be91e..590763f66ab 100644 --- a/src/core/ext/filters/server_config_selector/server_config_selector.h +++ b/src/core/ext/filters/server_config_selector/server_config_selector.h @@ -61,9 +61,9 @@ class ServerConfigSelectorProvider std::unique_ptr watcher) = 0; virtual void CancelWatch() = 0; + static absl::string_view ChannelArgName(); + grpc_arg MakeChannelArg() const; - static RefCountedPtr GetFromChannelArgs( - const grpc_channel_args& args); }; } // namespace grpc_core diff --git a/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc b/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc index a8f6fb595c8..bb1b9a77955 100644 --- a/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc +++ b/src/core/ext/filters/server_config_selector/server_config_selector_filter.cc @@ -1,5 +1,3 @@ -// -// // Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,14 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// -// #include #include "src/core/ext/filters/server_config_selector/server_config_selector_filter.h" #include "src/core/ext/filters/server_config_selector/server_config_selector.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/transport/error_utils.h" @@ -28,248 +28,116 @@ namespace grpc_core { namespace { -class ChannelData { +class ServerConfigSelectorFilter final : public ChannelFilter { public: - static grpc_error_handle Init(grpc_channel_element* elem, - grpc_channel_element_args* args); - static void Destroy(grpc_channel_element* elem); + ~ServerConfigSelectorFilter() override; + + ServerConfigSelectorFilter(const ServerConfigSelectorFilter&) = delete; + ServerConfigSelectorFilter& operator=(const ServerConfigSelectorFilter&) = + delete; + ServerConfigSelectorFilter(ServerConfigSelectorFilter&&) = default; + ServerConfigSelectorFilter& operator=(ServerConfigSelectorFilter&&) = default; + + static absl::StatusOr Create(ChannelArgs args, + ChannelFilter::Args); + + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; absl::StatusOr> config_selector() { - MutexLock lock(&mu_); - return config_selector_.value(); + MutexLock lock(&state_->mu); + return state_->config_selector.value(); } private: + struct State { + Mutex mu; + absl::optional>> + config_selector ABSL_GUARDED_BY(mu); + }; class ServerConfigSelectorWatcher : public ServerConfigSelectorProvider::ServerConfigSelectorWatcher { public: - explicit ServerConfigSelectorWatcher(ChannelData* chand) : chand_(chand) {} + explicit ServerConfigSelectorWatcher(std::shared_ptr state) + : state_(state) {} void OnServerConfigSelectorUpdate( absl::StatusOr> update) override { - MutexLock lock(&chand_->mu_); - chand_->config_selector_ = std::move(update); + MutexLock lock(&state_->mu); + state_->config_selector = std::move(update); } private: - ChannelData* chand_; + std::shared_ptr state_; }; - explicit ChannelData(RefCountedPtr - server_config_selector_provider); - ~ChannelData(); + explicit ServerConfigSelectorFilter( + RefCountedPtr + server_config_selector_provider); RefCountedPtr server_config_selector_provider_; - Mutex mu_; - absl::optional>> - config_selector_ ABSL_GUARDED_BY(mu_); + std::shared_ptr state_; }; -class CallData { - public: - static grpc_error_handle Init(grpc_call_element* elem, - const grpc_call_element_args* args); - static void Destroy(grpc_call_element* elem, - const grpc_call_final_info* /* final_info */, - grpc_closure* /* then_schedule_closure */); - static void StartTransportStreamOpBatch(grpc_call_element* elem, - grpc_transport_stream_op_batch* op); - - private: - CallData(grpc_call_element* elem, const grpc_call_element_args& args); - ~CallData(); - static void RecvInitialMetadataReady(void* user_data, - grpc_error_handle error); - static void RecvTrailingMetadataReady(void* user_data, - grpc_error_handle error); - void MaybeResumeRecvTrailingMetadataReady(); - - grpc_call_context_element* call_context_; - CallCombiner* call_combiner_; - ServiceConfigCallData service_config_call_data_; - // Overall error for the call - grpc_error_handle error_ = GRPC_ERROR_NONE; - // State for keeping track of recv_initial_metadata - grpc_metadata_batch* recv_initial_metadata_ = nullptr; - grpc_closure* original_recv_initial_metadata_ready_ = nullptr; - grpc_closure recv_initial_metadata_ready_; - // State for keeping of track of recv_trailing_metadata - grpc_closure* original_recv_trailing_metadata_ready_; - grpc_closure recv_trailing_metadata_ready_; - grpc_error_handle recv_trailing_metadata_ready_error_; - bool seen_recv_trailing_metadata_ready_ = false; -}; - -// ChannelData - -grpc_error_handle ChannelData::Init(grpc_channel_element* elem, - grpc_channel_element_args* args) { - GPR_ASSERT(elem->filter == &kServerConfigSelectorFilter); - RefCountedPtr server_config_selector_provider = - ServerConfigSelectorProvider::GetFromChannelArgs(*args->channel_args); +absl::StatusOr ServerConfigSelectorFilter::Create( + ChannelArgs args, ChannelFilter::Args) { + ServerConfigSelectorProvider* server_config_selector_provider = + args.GetObject(); if (server_config_selector_provider == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "No ServerConfigSelectorProvider object found"); + return absl::UnknownError("No ServerConfigSelectorProvider object found"); } - new (elem->channel_data) - ChannelData(std::move(server_config_selector_provider)); - return GRPC_ERROR_NONE; + return ServerConfigSelectorFilter(server_config_selector_provider->Ref()); } -void ChannelData::Destroy(grpc_channel_element* elem) { - auto* chand = static_cast(elem->channel_data); - chand->~ChannelData(); -} - -ChannelData::ChannelData( +ServerConfigSelectorFilter::ServerConfigSelectorFilter( RefCountedPtr server_config_selector_provider) : server_config_selector_provider_( - std::move(server_config_selector_provider)) { + std::move(server_config_selector_provider)), + state_(std::make_shared()) { GPR_ASSERT(server_config_selector_provider_ != nullptr); auto server_config_selector_watcher = - absl::make_unique(this); + absl::make_unique(state_); auto config_selector = server_config_selector_provider_->Watch( std::move(server_config_selector_watcher)); - MutexLock lock(&mu_); + MutexLock lock(&state_->mu); // It's possible for the watcher to have already updated config_selector_ - if (!config_selector_.has_value()) { - config_selector_ = std::move(config_selector); - } -} - -ChannelData::~ChannelData() { server_config_selector_provider_->CancelWatch(); } - -// CallData - -grpc_error_handle CallData::Init(grpc_call_element* elem, - const grpc_call_element_args* args) { - new (elem->call_data) CallData(elem, *args); - return GRPC_ERROR_NONE; -} - -void CallData::Destroy(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*then_schedule_closure*/) { - auto* calld = static_cast(elem->call_data); - calld->~CallData(); -} - -void CallData::StartTransportStreamOpBatch(grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - CallData* calld = static_cast(elem->call_data); - if (op->recv_initial_metadata) { - calld->recv_initial_metadata_ = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->original_recv_initial_metadata_ready_ = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->recv_initial_metadata_ready_; - } - if (op->recv_trailing_metadata) { - // We might generate errors on receiving initial metadata which we need to - // bubble up through recv_trailing_metadata_ready - calld->original_recv_trailing_metadata_ready_ = - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_; + if (!state_->config_selector.has_value()) { + state_->config_selector = std::move(config_selector); } - // Chain to the next filter. - grpc_call_next_op(elem, op); -} - -CallData::CallData(grpc_call_element* elem, const grpc_call_element_args& args) - : call_context_(args.context), call_combiner_(args.call_combiner) { - GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, - elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, - elem, grpc_schedule_on_exec_ctx); } -CallData::~CallData() { - // Remove the entry from call context, just in case anyone above us - // tries to look at it during call stack destruction. - call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = nullptr; - GRPC_ERROR_UNREF(error_); -} - -void CallData::RecvInitialMetadataReady(void* user_data, - grpc_error_handle error) { - grpc_call_element* elem = static_cast(user_data); - CallData* calld = static_cast(elem->call_data); - ChannelData* chand = static_cast(elem->channel_data); - if (error == GRPC_ERROR_NONE) { - auto config_selector = chand->config_selector(); - if (config_selector.ok()) { - auto call_config = - config_selector.value()->GetCallConfig(calld->recv_initial_metadata_); - if (call_config.error != GRPC_ERROR_NONE) { - calld->error_ = call_config.error; - } else { - calld->service_config_call_data_ = - ServiceConfigCallData(std::move(call_config.service_config), - call_config.method_configs, {}); - calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = - &calld->service_config_call_data_; - } - } else { - calld->error_ = GRPC_ERROR_CREATE_FROM_CPP_STRING( - config_selector.status().ToString()); - } - if (calld->error_ != GRPC_ERROR_NONE) { - calld->error_ = grpc_error_set_int( - calld->error_, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - error = calld->error_; // Does not take a ref - } +ServerConfigSelectorFilter::~ServerConfigSelectorFilter() { + if (server_config_selector_provider_ != nullptr) { + server_config_selector_provider_->CancelWatch(); } - calld->MaybeResumeRecvTrailingMetadataReady(); - grpc_closure* closure = calld->original_recv_initial_metadata_ready_; - calld->original_recv_initial_metadata_ready_ = nullptr; - Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); } -void CallData::RecvTrailingMetadataReady(void* user_data, - grpc_error_handle error) { - grpc_call_element* elem = static_cast(user_data); - CallData* calld = static_cast(elem->call_data); - if (calld->original_recv_initial_metadata_ready_ != nullptr) { - calld->seen_recv_trailing_metadata_ready_ = true; - calld->recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error); - GRPC_CALL_COMBINER_STOP(calld->call_combiner_, - "Deferring RecvTrailingMetadataReady until after " - "RecvInitialMetadataReady"); - return; - } - error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_); - calld->error_ = GRPC_ERROR_NONE; - grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; - calld->original_recv_trailing_metadata_ready_ = nullptr; - Closure::Run(DEBUG_LOCATION, closure, error); -} - -void CallData::MaybeResumeRecvTrailingMetadataReady() { - if (seen_recv_trailing_metadata_ready_) { - seen_recv_trailing_metadata_ready_ = false; - grpc_error_handle error = recv_trailing_metadata_ready_error_; - recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; - GRPC_CALL_COMBINER_START(call_combiner_, &recv_trailing_metadata_ready_, - error, "Continuing RecvTrailingMetadataReady"); +ArenaPromise ServerConfigSelectorFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto sel = config_selector(); + if (!sel.ok()) return Immediate(ServerMetadataHandle(sel.status())); + auto call_config = + sel.value()->GetCallConfig(call_args.client_initial_metadata.get()); + if (call_config.error != GRPC_ERROR_NONE) { + auto r = Immediate(ServerMetadataHandle( + absl::UnavailableError(grpc_error_std_string(call_config.error)))); + GRPC_ERROR_UNREF(call_config.error); + return std::move(r); } + auto& ctx = GetContext< + grpc_call_context_element>()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]; + ctx.value = GetContext()->New( + std::move(call_config.service_config), call_config.method_configs, + ServiceConfigCallData::CallAttributes{}); + ctx.destroy = [](void* p) { + static_cast(p)->~ServiceConfigCallData(); + }; + return next_promise_factory(std::move(call_args)); } } // namespace -const grpc_channel_filter kServerConfigSelectorFilter = { - CallData::StartTransportStreamOpBatch, - nullptr, - grpc_channel_next_op, - sizeof(CallData), - CallData::Init, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - CallData::Destroy, - sizeof(ChannelData), - ChannelData::Init, - ChannelData::Destroy, - grpc_channel_next_get_info, - "server_config_selector_filter", -}; +const grpc_channel_filter kServerConfigSelectorFilter = + MakePromiseBasedFilter( + "server_config_selector_filter"); } // namespace grpc_core diff --git a/test/core/server_config_selector/server_config_selector_test.cc b/test/core/server_config_selector/server_config_selector_test.cc index 908021af868..69b03cb2278 100644 --- a/test/core/server_config_selector/server_config_selector_test.cc +++ b/test/core/server_config_selector/server_config_selector_test.cc @@ -52,7 +52,8 @@ TEST(ServerConfigSelectorProviderTest, CopyChannelArgs) { grpc_arg arg = server_config_selector_provider->MakeChannelArg(); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); EXPECT_EQ(server_config_selector_provider, - ServerConfigSelectorProvider::GetFromChannelArgs(*args)); + grpc_core::ChannelArgs::FromC(args) + .GetObject()); grpc_channel_args_destroy(args); } @@ -63,8 +64,10 @@ TEST(ServerConfigSelectorProviderTest, ChannelArgsCompare) { grpc_arg arg = server_config_selector_provider->MakeChannelArg(); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); grpc_channel_args* new_args = grpc_channel_args_copy(args); - EXPECT_EQ(ServerConfigSelectorProvider::GetFromChannelArgs(*new_args), - ServerConfigSelectorProvider::GetFromChannelArgs(*args)); + EXPECT_EQ(grpc_core::ChannelArgs::FromC(new_args) + .GetObject(), + grpc_core::ChannelArgs::FromC(args) + .GetObject()); grpc_channel_args_destroy(args); grpc_channel_args_destroy(new_args); }