From 0f424ae5ccbef807b8e1cb7e97867bbbd711819e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 16 Feb 2022 18:50:29 -0800 Subject: [PATCH] Move client-idle-filter to promises (#28838) * Promise based sleep * Embrace absl::Status * Automated change: Fix sanity tests * Add another test, fix bug * fix * fix * Move client-idle-filter to promises * better call counting * x * x * x * review feedback * progress * Automated change: Fix sanity tests * progress * Automated change: Fix sanity tests * fix * channel stack refcount * remove op from lock, refcount * compiles * Automated change: Fix sanity tests * nicer api * fix * Automated change: Fix sanity tests * fix * speculative fix for windows Co-authored-by: ctiller --- BUILD | 9 + CMakeLists.txt | 4 +- Makefile | 2 + build_autogenerated.yaml | 14 +- config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 6 + gRPC-Core.podspec | 7 + grpc.gemspec | 4 + grpc.gyp | 2 + package.xml | 4 + .../filters/client_idle/client_idle_filter.cc | 237 +++++++----------- .../filters/client_idle/idle_filter_state.h | 4 +- .../filters/http/client_authority_filter.cc | 2 +- .../filters/http/client_authority_filter.h | 6 +- src/core/lib/channel/channel_stack.h | 21 ++ src/core/lib/channel/promise_based_filter.h | 65 +++-- src/core/lib/promise/loop.h | 42 +++- .../authorization/grpc_server_authz_filter.cc | 2 +- .../authorization/grpc_server_authz_filter.h | 7 +- src/core/lib/transport/transport.cc | 4 +- src/python/grpcio/grpc_core_dependencies.py | 1 + .../filters/client_authority_filter_test.cc | 19 +- tools/doxygen/Doxyfile.c++.internal | 4 + tools/doxygen/Doxyfile.core.internal | 4 + 25 files changed, 275 insertions(+), 197 deletions(-) diff --git a/BUILD b/BUILD index e844d933405..7d79013ea12 100644 --- a/BUILD +++ b/BUILD @@ -1188,6 +1188,10 @@ grpc_cc_library( grpc_cc_library( name = "loop", + external_deps = [ + "absl/types:variant", + "absl/status:statusor", + ], language = "c++", public_hdrs = [ "src/core/lib/promise/loop.h", @@ -2477,10 +2481,15 @@ grpc_cc_library( "src/core/ext/filters/client_idle/client_idle_filter.cc", ], deps = [ + "capture", "config", + "exec_ctx_wakeup_scheduler", "gpr_base", "grpc_base", "idle_filter_state", + "loop", + "sleep", + "try_seq", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 85828ad6cdf..d837eb357e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2091,6 +2091,7 @@ add_library(grpc src/core/lib/json/json_writer.cc src/core/lib/matchers/matchers.cc src/core/lib/promise/activity.cc + src/core/lib/promise/sleep.cc src/core/lib/resolver/resolver.cc src/core/lib/resolver/resolver_registry.cc src/core/lib/resolver/server_address.cc @@ -2730,6 +2731,7 @@ add_library(grpc_unsecure src/core/lib/json/json_util.cc src/core/lib/json/json_writer.cc src/core/lib/promise/activity.cc + src/core/lib/promise/sleep.cc src/core/lib/resolver/resolver.cc src/core/lib/resolver/resolver_registry.cc src/core/lib/resolver/server_address.cc @@ -12798,6 +12800,7 @@ target_include_directories(loop_test target_link_libraries(loop_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor absl::variant ) @@ -15195,7 +15198,6 @@ endif() if(gRPC_BUILD_TESTS) add_executable(sleep_test - src/core/lib/promise/sleep.cc test/core/promise/sleep_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc diff --git a/Makefile b/Makefile index 2667c420735..86ea73b0695 100644 --- a/Makefile +++ b/Makefile @@ -1538,6 +1538,7 @@ LIBGRPC_SRC = \ src/core/lib/json/json_writer.cc \ src/core/lib/matchers/matchers.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ src/core/lib/resolver/server_address.cc \ @@ -2024,6 +2025,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/json/json_util.cc \ src/core/lib/json/json_writer.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ src/core/lib/resolver/server_address.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 202a3e1bd35..a89fd212940 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -836,6 +836,7 @@ libs: - src/core/lib/event_engine/sockaddr.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/capture.h - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/dual_ref_counted.h @@ -939,6 +940,8 @@ libs: - src/core/lib/promise/promise.h - src/core/lib/promise/race.h - src/core/lib/promise/seq.h + - src/core/lib/promise/sleep.h + - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h - src/core/lib/resolver/resolver_registry.h @@ -1585,6 +1588,7 @@ libs: - src/core/lib/json/json_writer.cc - src/core/lib/matchers/matchers.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/sleep.cc - src/core/lib/resolver/resolver.cc - src/core/lib/resolver/resolver_registry.cc - src/core/lib/resolver/server_address.cc @@ -1999,6 +2003,7 @@ libs: - src/core/lib/event_engine/sockaddr.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/capture.h - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/dual_ref_counted.h @@ -2100,6 +2105,8 @@ libs: - src/core/lib/promise/promise.h - src/core/lib/promise/race.h - src/core/lib/promise/seq.h + - src/core/lib/promise/sleep.h + - src/core/lib/promise/try_seq.h - src/core/lib/resolver/resolver.h - src/core/lib/resolver/resolver_factory.h - src/core/lib/resolver/resolver_registry.h @@ -2400,6 +2407,7 @@ libs: - src/core/lib/json/json_util.cc - src/core/lib/json/json_writer.cc - src/core/lib/promise/activity.cc + - src/core/lib/promise/sleep.cc - src/core/lib/resolver/resolver.cc - src/core/lib/resolver/resolver_registry.cc - src/core/lib/resolver/server_address.cc @@ -6712,6 +6720,7 @@ targets: src: - test/core/promise/loop_test.cc deps: + - absl/status:statusor - absl/types:variant uses_polling: false - name: match_test @@ -7718,10 +7727,8 @@ targets: build: test language: c++ headers: - - src/core/lib/promise/sleep.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/lib/promise/sleep.cc - test/core/promise/sleep_test.cc deps: - grpc @@ -8205,8 +8212,7 @@ targets: gtest: true build: test language: c++ - headers: - - src/core/lib/promise/try_seq.h + headers: [] src: - test/core/promise/try_seq_metadata_test.cc deps: diff --git a/config.m4 b/config.m4 index 0c551ea8a46..8b7a1a35aa6 100644 --- a/config.m4 +++ b/config.m4 @@ -600,6 +600,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/profiling/basic_timers.cc \ src/core/lib/profiling/stap_timers.cc \ src/core/lib/promise/activity.cc \ + src/core/lib/promise/sleep.cc \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver_registry.cc \ src/core/lib/resolver/server_address.cc \ diff --git a/config.w32 b/config.w32 index 595a962e8d6..8b6d19f8926 100644 --- a/config.w32 +++ b/config.w32 @@ -566,6 +566,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\profiling\\basic_timers.cc " + "src\\core\\lib\\profiling\\stap_timers.cc " + "src\\core\\lib\\promise\\activity.cc " + + "src\\core\\lib\\promise\\sleep.cc " + "src\\core\\lib\\resolver\\resolver.cc " + "src\\core\\lib\\resolver\\resolver_registry.cc " + "src\\core\\lib\\resolver\\server_address.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 2ad1f4b1954..70d4c9314cd 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -670,6 +670,7 @@ Pod::Spec.new do |s| 'src/core/lib/gpr/useful.h', 'src/core/lib/gprpp/atomic_utils.h', 'src/core/lib/gprpp/bitset.h', + 'src/core/lib/gprpp/capture.h', 'src/core/lib/gprpp/chunked_vector.h', 'src/core/lib/gprpp/construct_destruct.h', 'src/core/lib/gprpp/cpp_impl_of.h', @@ -791,6 +792,8 @@ Pod::Spec.new do |s| 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', + 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', 'src/core/lib/resolver/resolver_registry.h', @@ -1462,6 +1465,7 @@ Pod::Spec.new do |s| 'src/core/lib/gpr/useful.h', 'src/core/lib/gprpp/atomic_utils.h', 'src/core/lib/gprpp/bitset.h', + 'src/core/lib/gprpp/capture.h', 'src/core/lib/gprpp/chunked_vector.h', 'src/core/lib/gprpp/construct_destruct.h', 'src/core/lib/gprpp/cpp_impl_of.h', @@ -1583,6 +1587,8 @@ Pod::Spec.new do |s| 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', + 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', 'src/core/lib/resolver/resolver_registry.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index c73f7b5e578..f30bc22804b 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1061,6 +1061,7 @@ Pod::Spec.new do |s| 'src/core/lib/gpr/wrap_memcpy.cc', 'src/core/lib/gprpp/atomic_utils.h', 'src/core/lib/gprpp/bitset.h', + 'src/core/lib/gprpp/capture.h', 'src/core/lib/gprpp/chunked_vector.h', 'src/core/lib/gprpp/construct_destruct.h', 'src/core/lib/gprpp/cpp_impl_of.h', @@ -1290,6 +1291,9 @@ Pod::Spec.new do |s| 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', + 'src/core/lib/promise/sleep.cc', + 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', @@ -2051,6 +2055,7 @@ Pod::Spec.new do |s| 'src/core/lib/gpr/useful.h', 'src/core/lib/gprpp/atomic_utils.h', 'src/core/lib/gprpp/bitset.h', + 'src/core/lib/gprpp/capture.h', 'src/core/lib/gprpp/chunked_vector.h', 'src/core/lib/gprpp/construct_destruct.h', 'src/core/lib/gprpp/cpp_impl_of.h', @@ -2172,6 +2177,8 @@ Pod::Spec.new do |s| 'src/core/lib/promise/promise.h', 'src/core/lib/promise/race.h', 'src/core/lib/promise/seq.h', + 'src/core/lib/promise/sleep.h', + 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/resolver.h', 'src/core/lib/resolver/resolver_factory.h', 'src/core/lib/resolver/resolver_registry.h', diff --git a/grpc.gemspec b/grpc.gemspec index f470ffc4912..f012358d135 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -980,6 +980,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gpr/wrap_memcpy.cc ) s.files += %w( src/core/lib/gprpp/atomic_utils.h ) s.files += %w( src/core/lib/gprpp/bitset.h ) + s.files += %w( src/core/lib/gprpp/capture.h ) s.files += %w( src/core/lib/gprpp/chunked_vector.h ) s.files += %w( src/core/lib/gprpp/construct_destruct.h ) s.files += %w( src/core/lib/gprpp/cpp_impl_of.h ) @@ -1209,6 +1210,9 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/promise.h ) s.files += %w( src/core/lib/promise/race.h ) s.files += %w( src/core/lib/promise/seq.h ) + s.files += %w( src/core/lib/promise/sleep.cc ) + s.files += %w( src/core/lib/promise/sleep.h ) + s.files += %w( src/core/lib/promise/try_seq.h ) s.files += %w( src/core/lib/resolver/resolver.cc ) s.files += %w( src/core/lib/resolver/resolver.h ) s.files += %w( src/core/lib/resolver/resolver_factory.h ) diff --git a/grpc.gyp b/grpc.gyp index d350502e7f3..1369e5990ee 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -994,6 +994,7 @@ 'src/core/lib/json/json_writer.cc', 'src/core/lib/matchers/matchers.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', 'src/core/lib/resolver/server_address.cc', @@ -1453,6 +1454,7 @@ 'src/core/lib/json/json_util.cc', 'src/core/lib/json/json_writer.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', 'src/core/lib/resolver/server_address.cc', diff --git a/package.xml b/package.xml index 5a50cd371b6..1a58c959ee2 100644 --- a/package.xml +++ b/package.xml @@ -960,6 +960,7 @@ + @@ -1189,6 +1190,9 @@ + + + diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc index 2abbe90ed9c..b0637d04fb4 100644 --- a/src/core/ext/filters/client_idle/client_idle_filter.cc +++ b/src/core/ext/filters/client_idle/client_idle_filter.cc @@ -25,8 +25,14 @@ #include "src/core/ext/filters/client_idle/idle_filter_state.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gprpp/capture.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/sleep.h" +#include "src/core/lib/promise/try_seq.h" #include "src/core/lib/transport/http2_errors.h" // TODO(juanlishen): The idle filter is disabled in client channel by default @@ -56,194 +62,125 @@ grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) { MIN_IDLE_TIMEOUT_MS); } -class ChannelData { +class ClientIdleFilter : public ChannelFilter { public: - static grpc_error_handle Init(grpc_channel_element* elem, - grpc_channel_element_args* args); - static void Destroy(grpc_channel_element* elem); + static absl::StatusOr Create( + const grpc_channel_args* args, ChannelFilter::Args filter_args); + ~ClientIdleFilter() override = default; - static void StartTransportOp(grpc_channel_element* elem, - grpc_transport_op* op); + ClientIdleFilter(const ClientIdleFilter&) = delete; + ClientIdleFilter& operator=(const ClientIdleFilter&) = delete; + ClientIdleFilter(ClientIdleFilter&&) = default; + ClientIdleFilter& operator=(ClientIdleFilter&&) = default; - void IncreaseCallCount(); + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + ClientInitialMetadata initial_metadata, + NextPromiseFactory next_promise_factory) override; - void DecreaseCallCount(); + bool StartTransportOp(grpc_transport_op* op) override; private: - ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args, - grpc_error_handle* error); - ~ChannelData() = default; - - static void IdleTimerCallback(void* arg, grpc_error_handle error); - static void IdleTransportOpCompleteCallback(void* arg, - grpc_error_handle error); + ClientIdleFilter(grpc_channel_stack* channel_stack, + grpc_millis client_idle_timeout) + : channel_stack_(channel_stack), + client_idle_timeout_(client_idle_timeout) {} void StartIdleTimer(); - void EnterIdle(); + void IncreaseCallCount(); + void DecreaseCallCount(); + + struct CallCountDecreaser { + void operator()(ClientIdleFilter* filter) const { + filter->DecreaseCallCount(); + } + }; - grpc_channel_element* elem_; // The channel stack to which we take refs for pending callbacks. grpc_channel_stack* channel_stack_; - // Timeout after the last RPC finishes on the client channel at which the - // channel goes back into IDLE state. - const grpc_millis client_idle_timeout_; - - // Member data used to track the state of channel. - IdleFilterState idle_filter_state_{false}; + grpc_millis client_idle_timeout_; + std::shared_ptr idle_filter_state_{ + std::make_shared(false)}; - // Idle timer and its callback closure. - grpc_timer idle_timer_; - grpc_closure idle_timer_callback_; - - // The transport op telling the client channel to enter IDLE. - grpc_transport_op idle_transport_op_; - grpc_closure idle_transport_op_complete_callback_; + ActivityPtr activity_; }; -grpc_error_handle ChannelData::Init(grpc_channel_element* elem, - grpc_channel_element_args* args) { - grpc_error_handle error = GRPC_ERROR_NONE; - new (elem->channel_data) ChannelData(elem, args, &error); - return error; +absl::StatusOr ClientIdleFilter::Create( + const grpc_channel_args* args, ChannelFilter::Args filter_args) { + ClientIdleFilter filter(filter_args.channel_stack(), + GetClientIdleTimeout(args)); + return absl::StatusOr(std::move(filter)); } -void ChannelData::Destroy(grpc_channel_element* elem) { - ChannelData* chand = static_cast(elem->channel_data); - chand->~ChannelData(); +// Construct a promise for one call. +ArenaPromise ClientIdleFilter::MakeCallPromise( + ClientInitialMetadata initial_metadata, + NextPromiseFactory next_promise_factory) { + using Decrementer = std::unique_ptr; + IncreaseCallCount(); + return ArenaPromise(Capture( + [](Decrementer*, ArenaPromise* next) + -> Poll { return (*next)(); }, + Decrementer(this), next_promise_factory(std::move(initial_metadata)))); } -void ChannelData::StartTransportOp(grpc_channel_element* elem, - grpc_transport_op* op) { - ChannelData* chand = static_cast(elem->channel_data); +bool ClientIdleFilter::StartTransportOp(grpc_transport_op* op) { // Catch the disconnect_with_error transport op. if (op->disconnect_with_error != GRPC_ERROR_NONE) { // IncreaseCallCount() introduces a phony call and prevent the timer from // being reset by other threads. - chand->IncreaseCallCount(); - // If the timer has been set, cancel the timer. - // No synchronization issues here. grpc_timer_cancel() is valid as long as - // the timer has been init()ed before. - grpc_timer_cancel(&chand->idle_timer_); + IncreaseCallCount(); + activity_.reset(); } // Pass the op to the next filter. - grpc_channel_next_op(elem, op); + return false; } -void ChannelData::IncreaseCallCount() { - idle_filter_state_.IncreaseCallCount(); +void ClientIdleFilter::IncreaseCallCount() { + idle_filter_state_->IncreaseCallCount(); } -void ChannelData::DecreaseCallCount() { - if (idle_filter_state_.DecreaseCallCount()) { +void ClientIdleFilter::DecreaseCallCount() { + if (idle_filter_state_->DecreaseCallCount()) { // If there are no more calls in progress, start the idle timer. StartIdleTimer(); } } -ChannelData::ChannelData(grpc_channel_element* elem, - grpc_channel_element_args* args, - grpc_error_handle* /*error*/) - : elem_(elem), - channel_stack_(args->channel_stack), - client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) { - // If the idle filter is explicitly disabled in channel args, this ctor should - // not get called. - GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE); - GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms", - client_idle_timeout_); - // Initialize the idle timer without setting it. - grpc_timer_init_unset(&idle_timer_); - // Initialize the idle timer callback closure. - GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this, - grpc_schedule_on_exec_ctx); - // Initialize the idle transport op complete callback. - GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_, - IdleTransportOpCompleteCallback, this, - grpc_schedule_on_exec_ctx); -} - -void ChannelData::IdleTimerCallback(void* arg, grpc_error_handle error) { - GRPC_IDLE_FILTER_LOG("timer alarms"); - ChannelData* chand = static_cast(arg); - if (error != GRPC_ERROR_NONE) { - GRPC_IDLE_FILTER_LOG("timer canceled"); - GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); - return; - } - if (chand->idle_filter_state_.CheckTimer()) { - chand->StartIdleTimer(); - } else { - chand->EnterIdle(); - } - GRPC_IDLE_FILTER_LOG("timer finishes"); - GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); -} - -void ChannelData::IdleTransportOpCompleteCallback(void* arg, - grpc_error_handle /*error*/) { - ChannelData* chand = static_cast(arg); - GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op"); -} - -void ChannelData::StartIdleTimer() { +void ClientIdleFilter::StartIdleTimer() { GRPC_IDLE_FILTER_LOG("timer has started"); + auto idle_filter_state = idle_filter_state_; // Hold a ref to the channel stack for the timer callback. - GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback"); - grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_, - &idle_timer_callback_); -} - -void ChannelData::EnterIdle() { - GRPC_IDLE_FILTER_LOG("the channel will enter IDLE"); - // Hold a ref to the channel stack for the transport op. - GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op"); - // Initialize the transport op. - idle_transport_op_ = {}; - idle_transport_op_.disconnect_with_error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), - GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); - idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_; - // Pass the transport op down to the channel stack. - grpc_channel_next_op(elem_, &idle_transport_op_); -} - -class CallData { - public: - static grpc_error_handle Init(grpc_call_element* elem, - const grpc_call_element_args* args); - static void Destroy(grpc_call_element* elem, - const grpc_call_final_info* final_info, - grpc_closure* then_schedule_closure); -}; - -grpc_error_handle CallData::Init(grpc_call_element* elem, - const grpc_call_element_args* /*args*/) { - ChannelData* chand = static_cast(elem->channel_data); - chand->IncreaseCallCount(); - return GRPC_ERROR_NONE; -} - -void CallData::Destroy(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - ChannelData* chand = static_cast(elem->channel_data); - chand->DecreaseCallCount(); + auto channel_stack = channel_stack_->Ref(); + auto timeout = client_idle_timeout_; + auto promise = Loop([timeout, idle_filter_state]() { + return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), + [idle_filter_state]() -> Poll> { + if (idle_filter_state->CheckTimer()) { + return Continue{}; + } else { + return absl::OkStatus(); + } + }); + }); + activity_ = MakeActivity( + std::move(promise), ExecCtxWakeupScheduler{}, + [channel_stack](absl::Status status) { + if (!status.ok()) return; + auto* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), + GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); + // Pass the transport op down to the channel stack. + auto* elem = grpc_channel_stack_element(channel_stack.get(), 0); + elem->filter->start_transport_op(elem, op); + }); } -const grpc_channel_filter grpc_client_idle_filter = { - grpc_call_next_op, - nullptr, - ChannelData::StartTransportOp, - sizeof(CallData), - CallData::Init, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - CallData::Destroy, - sizeof(ChannelData), - ChannelData::Init, - ChannelData::Destroy, - grpc_channel_next_get_info, - "client_idle"}; +const grpc_channel_filter grpc_client_idle_filter = + MakePromiseBasedFilter( + "client_idle"); } // namespace diff --git a/src/core/ext/filters/client_idle/idle_filter_state.h b/src/core/ext/filters/client_idle/idle_filter_state.h index ef236b14b4d..3c3a1952164 100644 --- a/src/core/ext/filters/client_idle/idle_filter_state.h +++ b/src/core/ext/filters/client_idle/idle_filter_state.h @@ -29,8 +29,8 @@ class IdleFilterState { explicit IdleFilterState(bool start_timer); ~IdleFilterState() = default; - IdleFilterState(const IdleFilterState&); - IdleFilterState& operator=(const IdleFilterState&); + IdleFilterState(const IdleFilterState&) = delete; + IdleFilterState& operator=(const IdleFilterState&) = delete; // Increment the number of calls in progress. void IncreaseCallCount(); diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc index 580645399b6..318f5ebc76d 100644 --- a/src/core/ext/filters/http/client_authority_filter.cc +++ b/src/core/ext/filters/http/client_authority_filter.cc @@ -40,7 +40,7 @@ namespace grpc_core { absl::StatusOr ClientAuthorityFilter::Create( - const grpc_channel_args* args) { + const grpc_channel_args* args, ChannelFilter::Args) { const grpc_arg* default_authority_arg = grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY); if (default_authority_arg == nullptr) { diff --git a/src/core/ext/filters/http/client_authority_filter.h b/src/core/ext/filters/http/client_authority_filter.h index 70aa01adef9..9d16f00a619 100644 --- a/src/core/ext/filters/http/client_authority_filter.h +++ b/src/core/ext/filters/http/client_authority_filter.h @@ -31,15 +31,15 @@ namespace grpc_core { -class ClientAuthorityFilter { +class ClientAuthorityFilter final : public ChannelFilter { public: static absl::StatusOr Create( - const grpc_channel_args* args); + const grpc_channel_args* args, ChannelFilter::Args); // Construct a promise for one call. ArenaPromise MakeCallPromise( ClientInitialMetadata initial_metadata, - NextPromiseFactory next_promise_factory); + NextPromiseFactory next_promise_factory) override; private: explicit ClientAuthorityFilter(Slice default_authority) diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index c7038ae2e47..e78c2d5db4d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/time_precise.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/resource_quota/arena.h" @@ -201,6 +202,18 @@ struct grpc_channel_stack { /* Memory required for a call stack (computed at channel stack initialization) */ size_t call_stack_size; + + // Minimal infrastructure to act like a RefCounted thing without converting + // everything. + // It's likely that we'll want to replace grpc_channel_stack with something + // less regimented once the promise conversion completes, so avoiding doing a + // full C++-ification for now. + void IncrementRefCount(); + void Unref(); + grpc_core::RefCountedPtr Ref() { + IncrementRefCount(); + return grpc_core::RefCountedPtr(this); + } }; /* A call stack tracks a set of related filters for one call, and guarantees @@ -286,6 +299,14 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, } while (0); #endif +inline void grpc_channel_stack::IncrementRefCount() { + GRPC_CHANNEL_STACK_REF(this, "smart_pointer"); +} + +inline void grpc_channel_stack::Unref() { + GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer"); +} + /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack* stack, const grpc_call_final_info* final_info, diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index cb4f4b91518..96114bba45a 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -35,6 +35,37 @@ namespace grpc_core { +class ChannelFilter { + public: + class Args { + public: + Args() : Args(nullptr) {} + explicit Args(grpc_channel_stack* channel_stack) + : channel_stack_(channel_stack) {} + + grpc_channel_stack* channel_stack() const { return channel_stack_; } + + private: + friend class ChannelFilter; + grpc_channel_stack* channel_stack_; + }; + + // Construct a promise for one call. + virtual ArenaPromise MakeCallPromise( + ClientInitialMetadata initial_metadata, + NextPromiseFactory next_promise_factory) = 0; + + // Start a legacy transport op + // Return true if the op was handled, false if it should be passed to the + // next filter. + // TODO(ctiller): design a new API for this - we probably don't want big op + // structures going forward. + virtual bool StartTransportOp(grpc_transport_op*) { return false; } + + protected: + virtual ~ChannelFilter() = default; +}; + // Designator for whether a filter is client side or server side. // Please don't use this outside calls to MakePromiseBasedFilter - it's intended // to be deleted once the promise conversion is complete. @@ -666,19 +697,18 @@ class CallData : public BaseCallData { } // namespace promise_filter_detail -// ChannelFilter contains the following: -// class SomeChannelFilter { +// F implements ChannelFilter and : +// class SomeChannelFilter : public ChannelFilter { // public: // static absl::StatusOr Create( -// const grpc_channel_args* args); -// ArenaPromise MakeCallPromise( -// InitialMetadata* initial_metadata, NextPromiseFactory next_promise); +// ChannelFilter::Args filter_args); // }; // TODO(ctiller): allow implementing get_channel_info, start_transport_op in // some way on ChannelFilter. -template -grpc_channel_filter MakePromiseBasedFilter(const char* name) { - using CallData = promise_filter_detail::CallData; +template +absl::enable_if_t::value, grpc_channel_filter> +MakePromiseBasedFilter(const char* name) { + using CallData = promise_filter_detail::CallData; return grpc_channel_filter{ // start_transport_stream_op_batch @@ -688,12 +718,16 @@ grpc_channel_filter MakePromiseBasedFilter(const char* name) { // make_call_promise [](grpc_channel_element* elem, ClientInitialMetadata initial_metadata, NextPromiseFactory next_promise_factory) { - return static_cast(elem->channel_data) + return static_cast(elem->channel_data) ->MakeCallPromise(std::move(initial_metadata), std::move(next_promise_factory)); }, - // start_transport_op - for now unsupported - grpc_channel_next_op, + // start_transport_op + [](grpc_channel_element* elem, grpc_transport_op* op) { + if (!static_cast(elem->channel_data)->StartTransportOp(op)) { + grpc_channel_next_op(elem, op); + } + }, // sizeof_call_data sizeof(CallData), // init_call_elem @@ -708,18 +742,19 @@ grpc_channel_filter MakePromiseBasedFilter(const char* name) { static_cast(elem->call_data)->~CallData(); }, // sizeof_channel_data - sizeof(ChannelFilter), + sizeof(F), // init_channel_elem [](grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); - auto status = ChannelFilter::Create(args->channel_args); + auto status = F::Create(args->channel_args, + ChannelFilter::Args(args->channel_stack)); if (!status.ok()) return absl_status_to_grpc_error(status.status()); - new (elem->channel_data) ChannelFilter(std::move(*status)); + new (elem->channel_data) F(std::move(*status)); return GRPC_ERROR_NONE; }, // destroy_channel_elem [](grpc_channel_element* elem) { - static_cast(elem->channel_data)->~ChannelFilter(); + static_cast(elem->channel_data)->~F(); }, // get_channel_info grpc_channel_next_get_info, diff --git a/src/core/lib/promise/loop.h b/src/core/lib/promise/loop.h index 847b7d654fc..3de266727c3 100644 --- a/src/core/lib/promise/loop.h +++ b/src/core/lib/promise/loop.h @@ -20,6 +20,7 @@ #include #include +#include "absl/status/statusor.h" #include "absl/types/variant.h" #include "src/core/lib/promise/detail/promise_factory.h" @@ -44,20 +45,44 @@ struct LoopTraits; template struct LoopTraits> { using Result = T; + static LoopCtl ToLoopCtl(LoopCtl value) { return value; } +}; + +template +struct LoopTraits>> { + using Result = absl::StatusOr; + static LoopCtl ToLoopCtl(absl::StatusOr> value) { + if (!value.ok()) return value.status(); + const auto& inner = *value; + if (absl::holds_alternative(inner)) return Continue{}; + return absl::get(inner); + } +}; + +template <> +struct LoopTraits>> { + using Result = absl::Status; + static LoopCtl ToLoopCtl( + absl::StatusOr> value) { + if (!value.ok()) return value.status(); + const auto& inner = *value; + if (absl::holds_alternative(inner)) return Continue{}; + return absl::get(inner); + } }; template class Loop { private: using Factory = promise_detail::PromiseFactory; - using Promise = decltype(std::declval().Repeated()); - using PromiseResult = typename Promise::Result; + using PromiseType = decltype(std::declval().Repeated()); + using PromiseResult = typename PromiseType::Result; public: using Result = typename LoopTraits::Result; explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Repeated()) {} - ~Loop() { promise_.~Promise(); } + ~Loop() { promise_.~PromiseType(); } Loop(Loop&& loop) noexcept : factory_(std::move(loop.factory_)), @@ -74,13 +99,14 @@ class Loop { if (auto* p = absl::get_if(&promise_result)) { // - then if it's Continue, destroy the promise and recreate a new one // from our factory. - if (absl::holds_alternative(*p)) { - promise_.~Promise(); - new (&promise_) Promise(factory_.Repeated()); + auto lc = LoopTraits::ToLoopCtl(*p); + if (absl::holds_alternative(lc)) { + promise_.~PromiseType(); + new (&promise_) PromiseType(factory_.Repeated()); continue; } // - otherwise there's our result... return it out. - return absl::get(*p); + return absl::get(lc); } else { // Otherwise the inner promise was pending, so we are pending. return Pending(); @@ -90,7 +116,7 @@ class Loop { private: GPR_NO_UNIQUE_ADDRESS Factory factory_; - GPR_NO_UNIQUE_ADDRESS union { GPR_NO_UNIQUE_ADDRESS Promise promise_; }; + GPR_NO_UNIQUE_ADDRESS union { GPR_NO_UNIQUE_ADDRESS PromiseType promise_; }; }; } // namespace promise_detail diff --git a/src/core/lib/security/authorization/grpc_server_authz_filter.cc b/src/core/lib/security/authorization/grpc_server_authz_filter.cc index b483678b05b..12a500b9309 100644 --- a/src/core/lib/security/authorization/grpc_server_authz_filter.cc +++ b/src/core/lib/security/authorization/grpc_server_authz_filter.cc @@ -32,7 +32,7 @@ GrpcServerAuthzFilter::GrpcServerAuthzFilter( provider_(std::move(provider)) {} absl::StatusOr GrpcServerAuthzFilter::Create( - const grpc_channel_args* args) { + const grpc_channel_args* args, ChannelFilter::Args) { grpc_auth_context* auth_context = grpc_find_auth_context_in_args(args); grpc_authorization_policy_provider* provider = grpc_channel_args_find_pointer( diff --git a/src/core/lib/security/authorization/grpc_server_authz_filter.h b/src/core/lib/security/authorization/grpc_server_authz_filter.h index e4c64461fba..866c1ad5420 100644 --- a/src/core/lib/security/authorization/grpc_server_authz_filter.h +++ b/src/core/lib/security/authorization/grpc_server_authz_filter.h @@ -18,20 +18,21 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/security/authorization/authorization_policy_provider.h" namespace grpc_core { -class GrpcServerAuthzFilter { +class GrpcServerAuthzFilter final : public ChannelFilter { public: static const grpc_channel_filter kFilterVtable; static absl::StatusOr Create( - const grpc_channel_args* args); + const grpc_channel_args* args, ChannelFilter::Args); ArenaPromise MakeCallPromise( ClientInitialMetadata initial_metadata, - NextPromiseFactory next_promise_factory); + NextPromiseFactory next_promise_factory) override; private: GrpcServerAuthzFilter( diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 5d74ec254f9..cb1c47890d8 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -224,7 +224,9 @@ static void destroy_made_transport_stream_op(void* arg, made_transport_stream_op* op = static_cast(arg); grpc_closure* c = op->inner_on_complete; delete op; - grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error)); + if (c != nullptr) { + grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error)); + } } grpc_transport_stream_op_batch* grpc_make_transport_stream_op( diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2f5dcfac5f8..d0701ba32db 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -575,6 +575,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/profiling/basic_timers.cc', 'src/core/lib/profiling/stap_timers.cc', 'src/core/lib/promise/activity.cc', + 'src/core/lib/promise/sleep.cc', 'src/core/lib/resolver/resolver.cc', 'src/core/lib/resolver/resolver_registry.cc', 'src/core/lib/resolver/server_address.cc', diff --git a/test/core/filters/client_authority_filter_test.cc b/test/core/filters/client_authority_filter_test.cc index 1d2c970da8b..136f5b6f2b4 100644 --- a/test/core/filters/client_authority_filter_test.cc +++ b/test/core/filters/client_authority_filter_test.cc @@ -40,26 +40,29 @@ class TestChannelArgs { }; TEST(ClientAuthorityFilterTest, DefaultFails) { - EXPECT_FALSE(ClientAuthorityFilter::Create(nullptr).ok()); + EXPECT_FALSE( + ClientAuthorityFilter::Create(nullptr, ChannelFilter::Args()).ok()); } TEST(ClientAuthorityFilterTest, WithArgSucceeds) { - EXPECT_EQ(ClientAuthorityFilter::Create( - TestChannelArgs("foo.test.google.au").args()) - .status(), - absl::OkStatus()); + EXPECT_EQ( + ClientAuthorityFilter::Create( + TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args()) + .status(), + absl::OkStatus()); } TEST(ClientAuthorityFilterTest, NonStringArgFails) { grpc_arg arg = grpc_channel_arg_integer_create( const_cast(GRPC_ARG_DEFAULT_AUTHORITY), 123); grpc_channel_args args = {1, &arg}; - EXPECT_FALSE(ClientAuthorityFilter::Create(&args).ok()); + EXPECT_FALSE( + ClientAuthorityFilter::Create(&args, ChannelFilter::Args()).ok()); } TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) { auto filter = *ClientAuthorityFilter::Create( - TestChannelArgs("foo.test.google.au").args()); + TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args()); auto arena = MakeScopedArena(1024, g_memory_allocator); grpc_metadata_batch initial_metadata_batch(arena.get()); grpc_metadata_batch trailing_metadata_batch(arena.get()); @@ -85,7 +88,7 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) { TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndDoesNotClobberAlreadySetsAuthority) { auto filter = *ClientAuthorityFilter::Create( - TestChannelArgs("foo.test.google.au").args()); + TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args()); auto arena = MakeScopedArena(1024, g_memory_allocator); grpc_metadata_batch initial_metadata_batch(arena.get()); grpc_metadata_batch trailing_metadata_batch(arena.get()); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6224de3cf16..dd1f9ae8c53 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1959,6 +1959,7 @@ src/core/lib/gpr/useful.h \ src/core/lib/gpr/wrap_memcpy.cc \ src/core/lib/gprpp/atomic_utils.h \ src/core/lib/gprpp/bitset.h \ +src/core/lib/gprpp/capture.h \ src/core/lib/gprpp/chunked_vector.h \ src/core/lib/gprpp/construct_destruct.h \ src/core/lib/gprpp/cpp_impl_of.h \ @@ -2188,6 +2189,9 @@ src/core/lib/promise/poll.h \ src/core/lib/promise/promise.h \ src/core/lib/promise/race.h \ src/core/lib/promise/seq.h \ +src/core/lib/promise/sleep.cc \ +src/core/lib/promise/sleep.h \ +src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \ src/core/lib/resolver/resolver_factory.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index abc17d07edf..b59ba1a76b9 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1753,6 +1753,7 @@ src/core/lib/gpr/wrap_memcpy.cc \ src/core/lib/gprpp/README.md \ src/core/lib/gprpp/atomic_utils.h \ src/core/lib/gprpp/bitset.h \ +src/core/lib/gprpp/capture.h \ src/core/lib/gprpp/chunked_vector.h \ src/core/lib/gprpp/construct_destruct.h \ src/core/lib/gprpp/cpp_impl_of.h \ @@ -1983,6 +1984,9 @@ src/core/lib/promise/poll.h \ src/core/lib/promise/promise.h \ src/core/lib/promise/race.h \ src/core/lib/promise/seq.h \ +src/core/lib/promise/sleep.cc \ +src/core/lib/promise/sleep.h \ +src/core/lib/promise/try_seq.h \ src/core/lib/resolver/resolver.cc \ src/core/lib/resolver/resolver.h \ src/core/lib/resolver/resolver_factory.h \