diff --git a/BUILD b/BUILD index a09c2d9c081..84ea536b555 100644 --- a/BUILD +++ b/BUILD @@ -1968,16 +1968,30 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "idle_filter_state", + srcs = [ + "src/core/ext/filters/client_idle/idle_filter_state.cc", + ], + hdrs = [ + "src/core/ext/filters/client_idle/idle_filter_state.h", + ], + language = "c++", + deps = [ + "gpr_platform", + ], +) + grpc_cc_library( name = "grpc_client_idle_filter", srcs = [ "src/core/ext/filters/client_idle/client_idle_filter.cc", ], - language = "c++", deps = [ "config", "gpr_base", "grpc_base", + "idle_filter_state", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index f53f94c8ecb..5fd8d555793 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -837,6 +837,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx hpack_encoder_index_test) add_dependencies(buildtests_cxx http2_client) add_dependencies(buildtests_cxx hybrid_end2end_test) + add_dependencies(buildtests_cxx idle_filter_state_test) add_dependencies(buildtests_cxx if_test) add_dependencies(buildtests_cxx init_test) add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test) @@ -1550,6 +1551,7 @@ add_library(grpc src/core/ext/filters/client_channel/subchannel.cc src/core/ext/filters/client_channel/subchannel_pool_interface.cc src/core/ext/filters/client_idle/client_idle_filter.cc + src/core/ext/filters/client_idle/idle_filter_state.cc src/core/ext/filters/deadline/deadline_filter.cc src/core/ext/filters/fault_injection/fault_injection_filter.cc src/core/ext/filters/fault_injection/service_config_parser.cc @@ -2383,6 +2385,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/subchannel.cc src/core/ext/filters/client_channel/subchannel_pool_interface.cc src/core/ext/filters/client_idle/client_idle_filter.cc + src/core/ext/filters/client_idle/idle_filter_state.cc src/core/ext/filters/deadline/deadline_filter.cc src/core/ext/filters/fault_injection/fault_injection_filter.cc src/core/ext/filters/fault_injection/service_config_parser.cc @@ -11845,6 +11848,41 @@ target_link_libraries(hybrid_end2end_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(idle_filter_state_test + src/core/ext/filters/client_idle/idle_filter_state.cc + test/core/client_idle/idle_filter_state_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(idle_filter_state_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(idle_filter_state_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index fec9f2592da..722a52f8938 100644 --- a/Makefile +++ b/Makefile @@ -1107,6 +1107,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_idle/client_idle_filter.cc \ + src/core/ext/filters/client_idle/idle_filter_state.cc \ src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/service_config_parser.cc \ @@ -1787,6 +1788,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_idle/client_idle_filter.cc \ + src/core/ext/filters/client_idle/idle_filter_state.cc \ src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/service_config_parser.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5a10eed28ba..a5f88e689a1 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -484,6 +484,7 @@ libs: - src/core/ext/filters/client_channel/subchannel.h - src/core/ext/filters/client_channel/subchannel_interface.h - src/core/ext/filters/client_channel/subchannel_pool_interface.h + - src/core/ext/filters/client_idle/idle_filter_state.h - src/core/ext/filters/deadline/deadline_filter.h - src/core/ext/filters/fault_injection/fault_injection_filter.h - src/core/ext/filters/fault_injection/service_config_parser.h @@ -1021,6 +1022,7 @@ libs: - src/core/ext/filters/client_channel/subchannel.cc - src/core/ext/filters/client_channel/subchannel_pool_interface.cc - src/core/ext/filters/client_idle/client_idle_filter.cc + - src/core/ext/filters/client_idle/idle_filter_state.cc - src/core/ext/filters/deadline/deadline_filter.cc - src/core/ext/filters/fault_injection/fault_injection_filter.cc - src/core/ext/filters/fault_injection/service_config_parser.cc @@ -1727,6 +1729,7 @@ libs: - src/core/ext/filters/client_channel/subchannel.h - src/core/ext/filters/client_channel/subchannel_interface.h - src/core/ext/filters/client_channel/subchannel_pool_interface.h + - src/core/ext/filters/client_idle/idle_filter_state.h - src/core/ext/filters/deadline/deadline_filter.h - src/core/ext/filters/fault_injection/fault_injection_filter.h - src/core/ext/filters/fault_injection/service_config_parser.h @@ -1991,6 +1994,7 @@ libs: - src/core/ext/filters/client_channel/subchannel.cc - src/core/ext/filters/client_channel/subchannel_pool_interface.cc - src/core/ext/filters/client_idle/client_idle_filter.cc + - src/core/ext/filters/client_idle/idle_filter_state.cc - src/core/ext/filters/deadline/deadline_filter.cc - src/core/ext/filters/fault_injection/fault_injection_filter.cc - src/core/ext/filters/fault_injection/service_config_parser.cc @@ -6122,6 +6126,17 @@ targets: - test/cpp/end2end/test_service_impl.cc deps: - grpc++_test_util +- name: idle_filter_state_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/filters/client_idle/idle_filter_state.h + src: + - src/core/ext/filters/client_idle/idle_filter_state.cc + - test/core/client_idle/idle_filter_state_test.cc + deps: [] + uses_polling: false - name: if_test gtest: true build: test diff --git a/config.m4 b/config.m4 index 184043c495b..9259d4ec4b0 100644 --- a/config.m4 +++ b/config.m4 @@ -102,6 +102,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_idle/client_idle_filter.cc \ + src/core/ext/filters/client_idle/idle_filter_state.cc \ src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ src/core/ext/filters/fault_injection/service_config_parser.cc \ diff --git a/config.w32 b/config.w32 index 34b4e135602..501751772e0 100644 --- a/config.w32 +++ b/config.w32 @@ -68,6 +68,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\subchannel.cc " + "src\\core\\ext\\filters\\client_channel\\subchannel_pool_interface.cc " + "src\\core\\ext\\filters\\client_idle\\client_idle_filter.cc " + + "src\\core\\ext\\filters\\client_idle\\idle_filter_state.cc " + "src\\core\\ext\\filters\\deadline\\deadline_filter.cc " + "src\\core\\ext\\filters\\fault_injection\\fault_injection_filter.cc " + "src\\core\\ext\\filters\\fault_injection\\service_config_parser.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index cb0b5e0c453..eb069b6495d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -254,6 +254,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', + 'src/core/ext/filters/client_idle/idle_filter_state.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/service_config_parser.h', @@ -924,6 +925,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', + 'src/core/ext/filters/client_idle/idle_filter_state.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/service_config_parser.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index e4d9f4a9d88..232b3ea7da4 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -309,6 +309,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', 'src/core/ext/filters/client_idle/client_idle_filter.cc', + 'src/core/ext/filters/client_idle/idle_filter_state.cc', + 'src/core/ext/filters/client_idle/idle_filter_state.h', 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', @@ -1511,6 +1513,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/subchannel.h', 'src/core/ext/filters/client_channel/subchannel_interface.h', 'src/core/ext/filters/client_channel/subchannel_pool_interface.h', + 'src/core/ext/filters/client_idle/idle_filter_state.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/filters/fault_injection/fault_injection_filter.h', 'src/core/ext/filters/fault_injection/service_config_parser.h', diff --git a/grpc.gemspec b/grpc.gemspec index 0d3ef9b9414..0fcf656eaf2 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -220,6 +220,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.cc ) s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h ) s.files += %w( src/core/ext/filters/client_idle/client_idle_filter.cc ) + s.files += %w( src/core/ext/filters/client_idle/idle_filter_state.cc ) + s.files += %w( src/core/ext/filters/client_idle/idle_filter_state.h ) s.files += %w( src/core/ext/filters/deadline/deadline_filter.cc ) s.files += %w( src/core/ext/filters/deadline/deadline_filter.h ) s.files += %w( src/core/ext/filters/fault_injection/fault_injection_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index e133e6d580d..d05820b7ada 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -549,6 +549,7 @@ 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', 'src/core/ext/filters/client_idle/client_idle_filter.cc', + 'src/core/ext/filters/client_idle/idle_filter_state.cc', 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', 'src/core/ext/filters/fault_injection/service_config_parser.cc', @@ -1204,6 +1205,7 @@ 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', 'src/core/ext/filters/client_idle/client_idle_filter.cc', + 'src/core/ext/filters/client_idle/idle_filter_state.cc', 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', 'src/core/ext/filters/fault_injection/service_config_parser.cc', diff --git a/package.xml b/package.xml index 410aca28d79..7331a384fe9 100644 --- a/package.xml +++ b/package.xml @@ -200,6 +200,8 @@ + + diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc index ee2e0ddbefb..6cd3d12ec70 100644 --- a/src/core/ext/filters/client_idle/client_idle_filter.cc +++ b/src/core/ext/filters/client_idle/client_idle_filter.cc @@ -22,6 +22,7 @@ #include +#include "src/core/ext/filters/client_idle/idle_filter_state.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/config/core_configuration.h" @@ -47,76 +48,6 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); namespace { -/* - client_idle_filter maintains a state tracking if there are active calls in the - channel and its internal idle_timer_. The states are specified as following: - - +--------------------------------------------+-------------+---------+ - | ChannelState | idle_timer_ | channel | - +--------------------------------------------+-------------+---------+ - | IDLE | unset | idle | - | CALLS_ACTIVE | unset | busy | - | TIMER_PENDING | set-valid | idle | - | TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy | - | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle | - +--------------------------------------------+-------------+---------+ - - IDLE: The initial state of the client_idle_filter, indicating the channel is - in IDLE. - - CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set. - - TIMER_PENDING: The state after the timer is set and no calls have arrived - after the timer is set. The channel must have 0 active call in this state. If - the timer is fired in this state, the channel will go into IDLE state. - - TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one - call has arrived after the timer is set. The channel must have 1 or 1+ active - calls in this state. If the timer is fired in this state, we won't reschedule - it. - - TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set - and at least one call has arrived after the timer is set, BUT the channel - currently has 0 active call. If the timer is fired in this state, we will - reschedule it according to the finish time of the latest call. - - PROCESSING: The state set to block other threads when the setting thread is - doing some work to keep state consistency. - - idle_timer_ will not be cancelled (unless the channel is shutting down). - If the timer callback is called when the idle_timer_ is valid (i.e. idle_state - is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be - changed. - - State transitions: - IDLE - | ^ - --------------------------------- * - | * - v * - CALLS_ACTIVE =================> TIMER_PENDING - ^ | ^ - * ------------------------------ * - * | * - * v * -TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START - ^ | - | | - --------------------------------- - - ---> Triggered by IncreaseCallCount() - ===> Triggered by DecreaseCallCount() - ***> Triggered by IdleTimerCallback() -*/ -enum ChannelState { - IDLE, - CALLS_ACTIVE, - TIMER_PENDING, - TIMER_PENDING_CALLS_ACTIVE, - TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, - PROCESSING -}; - grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) { return std::max( grpc_channel_arg_get_integer( @@ -159,9 +90,7 @@ class ChannelData { const grpc_millis client_idle_timeout_; // Member data used to track the state of channel. - grpc_millis last_idle_time_; - std::atomic call_count_{0}; - std::atomic state_{IDLE}; + IdleFilterState idle_filter_state_{false}; // Idle timer and its callback closure. grpc_timer idle_timer_; @@ -202,86 +131,13 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem, } void ChannelData::IncreaseCallCount() { - const intptr_t previous_value = - call_count_.fetch_add(1, std::memory_order_relaxed); - GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, - previous_value + 1); - if (previous_value == 0) { - // This call is the one that makes the channel busy. - // Loop here to make sure the previous decrease operation has finished. - ChannelState state = state_.load(std::memory_order_relaxed); - while (true) { - switch (state) { - // Timer has not been set. Switch to CALLS_ACTIVE. - case IDLE: - // In this case, no other threads will modify the state, so we can - // just store the value. - state_.store(CALLS_ACTIVE, std::memory_order_relaxed); - return; - // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE. - case TIMER_PENDING: - case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: - // At this point, the state may have been switched to IDLE by the - // idle timer callback. Therefore, use CAS operation to change the - // state atomically. - // Use std::memory_order_acquire on success to ensure last_idle_time_ - // has been properly set in DecreaseCallCount(). - if (state_.compare_exchange_weak(state, TIMER_PENDING_CALLS_ACTIVE, - std::memory_order_acquire, - std::memory_order_relaxed)) { - return; - } - break; - default: - // The state has not been switched to desired value yet, try again. - state = state_.load(std::memory_order_relaxed); - break; - } - } - } + idle_filter_state_.IncreaseCallCount(); } void ChannelData::DecreaseCallCount() { - const intptr_t previous_value = - call_count_.fetch_sub(1, std::memory_order_relaxed); - GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, - previous_value - 1); - if (previous_value == 1) { - // This call is the one that makes the channel idle. - // last_idle_time_ does not need to be std::atomic<> because busy-loops in - // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will - // prevent multiple threads from simultaneously accessing this variable. - last_idle_time_ = ExecCtx::Get()->Now(); - ChannelState state = state_.load(std::memory_order_relaxed); - while (true) { - switch (state) { - // Timer has not been set. Set the timer and switch to TIMER_PENDING - case CALLS_ACTIVE: - // Release store here to make other threads see the updated value of - // last_idle_time_. - StartIdleTimer(); - state_.store(TIMER_PENDING, std::memory_order_release); - return; - // Timer has been set. Switch to - // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START - case TIMER_PENDING_CALLS_ACTIVE: - // At this point, the state may have been switched to CALLS_ACTIVE by - // the idle timer callback. Therefore, use CAS operation to change the - // state atomically. - // Release store here to make the idle timer callback see the updated - // value of last_idle_time_ to properly reset the idle timer. - if (state_.compare_exchange_weak( - state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, - std::memory_order_release, std::memory_order_relaxed)) { - return; - } - break; - default: - // The state has not been switched to desired value yet, try again. - state = state_.load(std::memory_order_relaxed); - break; - } - } + if (idle_filter_state_.DecreaseCallCount()) { + // If there are no more calls in progress, start the idle timer. + StartIdleTimer(); } } @@ -315,44 +171,10 @@ void ChannelData::IdleTimerCallback(void* arg, grpc_error_handle error) { GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); return; } - bool finished = false; - ChannelState state = chand->state_.load(std::memory_order_relaxed); - while (!finished) { - switch (state) { - case TIMER_PENDING: - // Change the state to PROCESSING to block IncreaseCallCout() until the - // EnterIdle() operation finishes, preventing mistakenly entering IDLE - // when active RPC exists. - finished = chand->state_.compare_exchange_weak( - state, PROCESSING, std::memory_order_acquire, - std::memory_order_relaxed); - if (finished) { - chand->EnterIdle(); - chand->state_.store(IDLE, std::memory_order_relaxed); - } - break; - case TIMER_PENDING_CALLS_ACTIVE: - finished = chand->state_.compare_exchange_weak( - state, CALLS_ACTIVE, std::memory_order_relaxed, - std::memory_order_relaxed); - break; - case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: - // Change the state to PROCESSING to block IncreaseCallCount() until the - // StartIdleTimer() operation finishes, preventing mistakenly restarting - // the timer after grpc_timer_cancel() when shutdown. - finished = chand->state_.compare_exchange_weak( - state, PROCESSING, std::memory_order_acquire, - std::memory_order_relaxed); - if (finished) { - chand->StartIdleTimer(); - chand->state_.store(TIMER_PENDING, std::memory_order_relaxed); - } - break; - default: - // The state has not been switched to desired value yet, try again. - state = chand->state_.load(std::memory_order_relaxed); - break; - } + if (chand->idle_filter_state_.CheckTimer()) { + chand->StartIdleTimer(); + } else { + chand->EnterIdle(); } GRPC_IDLE_FILTER_LOG("timer finishes"); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); @@ -368,7 +190,7 @@ void ChannelData::StartIdleTimer() { GRPC_IDLE_FILTER_LOG("timer has started"); // Hold a ref to the channel stack for the timer callback. GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback"); - grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_, + grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_, &idle_timer_callback_); } diff --git a/src/core/ext/filters/client_idle/idle_filter_state.cc b/src/core/ext/filters/client_idle/idle_filter_state.cc new file mode 100644 index 00000000000..cd0689721e1 --- /dev/null +++ b/src/core/ext/filters/client_idle/idle_filter_state.cc @@ -0,0 +1,96 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/filters/client_idle/idle_filter_state.h" + +#include + +namespace grpc_core { + +IdleFilterState::IdleFilterState(bool start_timer) + : state_(start_timer ? kTimerStarted : 0) {} + +void IdleFilterState::IncreaseCallCount() { + uintptr_t state = state_.load(std::memory_order_relaxed); + uintptr_t new_state; + do { + // Increment the counter, and flag that there's been activity. + new_state = state; + new_state |= kCallsStartedSinceLastTimerCheck; + new_state += kCallIncrement; + } while (!state_.compare_exchange_weak( + state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); +} + +bool IdleFilterState::DecreaseCallCount() { + uintptr_t state = state_.load(std::memory_order_relaxed); + uintptr_t new_state; + bool start_timer; + do { + start_timer = false; + new_state = state; + // Decrement call count (and assert there's at least one call outstanding!) + assert(new_state >= kCallIncrement); + new_state -= kCallIncrement; + // If that decrement reaches a call count of zero and we have not started a + // timer + if ((new_state >> kCallsInProgressShift) == 0 && + (new_state & kTimerStarted) == 0) { + // Flag that we will start a timer, and mark it started so nobody else + // does. + start_timer = true; + new_state |= kTimerStarted; + new_state &= ~kCallsInProgressShift; + } + } while (!state_.compare_exchange_weak( + state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); + return start_timer; +} + +bool IdleFilterState::CheckTimer() { + uintptr_t state = state_.load(std::memory_order_relaxed); + uintptr_t new_state; + bool start_timer; + do { + if ((state >> kCallsInProgressShift) != 0) { + // Still calls in progress: nothing needs updating, just return + // and keep the timer going! + return true; + } + new_state = state; + bool is_active = false; + if (new_state & kCallsStartedSinceLastTimerCheck) { + // If any calls started since the last time we checked, then consider the + // channel still active and try again. + is_active = true; + new_state &= ~kCallsStartedSinceLastTimerCheck; + } + if (is_active) { + // If we are still active, we should signal that the timer should start + // again. + start_timer = true; + } else { + // Otherwise, we should not start the timer again, and we should signal + // that in the updated state. + start_timer = false; + new_state &= ~kTimerStarted; + } + } while (!state_.compare_exchange_weak( + state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); + return start_timer; +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_idle/idle_filter_state.h b/src/core/ext/filters/client_idle/idle_filter_state.h new file mode 100644 index 00000000000..ef236b14b4d --- /dev/null +++ b/src/core/ext/filters/client_idle/idle_filter_state.h @@ -0,0 +1,66 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H + +#include + +#include + +namespace grpc_core { + +// State machine for the idle filter. +// Keeps track of how many calls are in progress, whether there is a timer +// started, and whether we've seen calls since the previous timer fired. +class IdleFilterState { + public: + explicit IdleFilterState(bool start_timer); + ~IdleFilterState() = default; + + IdleFilterState(const IdleFilterState&); + IdleFilterState& operator=(const IdleFilterState&); + + // Increment the number of calls in progress. + void IncreaseCallCount(); + + // Decrement the number of calls in progress. + // Return true if we reached idle with no timer started. + GRPC_MUST_USE_RESULT bool DecreaseCallCount(); + + // Check if there's been any activity since the last timer check. + // If there was, reset the activity flag and return true to indicated that + // a new timer should be started. + // If there was not, reset the timer flag and return false - in this case + // we know that the channel is idle and has been for one full cycle. + GRPC_MUST_USE_RESULT bool CheckTimer(); + + private: + // Bit in state_ indicating that the timer has been started. + static constexpr uintptr_t kTimerStarted = 1; + // Bit in state_ indicating that we've seen a call start or stop since the + // last timer. + static constexpr uintptr_t kCallsStartedSinceLastTimerCheck = 2; + // How much should we shift to get the number of calls in progress. + static constexpr uintptr_t kCallsInProgressShift = 2; + // How much to increment/decrement the state_ when a call is started/stopped. + // Ensures we don't clobber the preceding bits. + static constexpr uintptr_t kCallIncrement = uintptr_t(1) + << kCallsInProgressShift; + std::atomic state_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 86503d5b9eb..cd2bbc0a153 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -77,6 +77,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', 'src/core/ext/filters/client_idle/client_idle_filter.cc', + 'src/core/ext/filters/client_idle/idle_filter_state.cc', 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/ext/filters/fault_injection/fault_injection_filter.cc', 'src/core/ext/filters/fault_injection/service_config_parser.cc', diff --git a/test/core/client_idle/BUILD b/test/core/client_idle/BUILD new file mode 100644 index 00000000000..d9897f7838b --- /dev/null +++ b/test/core/client_idle/BUILD @@ -0,0 +1,33 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") + +licenses(["notice"]) + +grpc_package(name = "test/core/client_idle") + +grpc_cc_test( + name = "idle_filter_state_test", + srcs = ["idle_filter_state_test.cc"], + external_deps = [ + "gtest", + ], + language = "c++", + uses_polling = False, + deps = [ + "//:idle_filter_state", + "//test/core/util:grpc_suppressions", + ], +) diff --git a/test/core/client_idle/idle_filter_state_test.cc b/test/core/client_idle/idle_filter_state_test.cc new file mode 100644 index 00000000000..2911b683d74 --- /dev/null +++ b/test/core/client_idle/idle_filter_state_test.cc @@ -0,0 +1,109 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/filters/client_idle/idle_filter_state.h" + +#include + +#include +#include +#include + +#include + +namespace grpc_core { +namespace testing { + +TEST(IdleFilterStateTest, IdlenessStartsTimer) { + IdleFilterState s(false); + s.IncreaseCallCount(); + // First idle should start the timer + EXPECT_TRUE(s.DecreaseCallCount()); + for (int i = 0; i < 10; i++) { + // Next idle should not! + s.IncreaseCallCount(); + EXPECT_FALSE(s.DecreaseCallCount()); + } +} + +TEST(IdleFilterStateTest, TimerStopsAfterIdle) { + IdleFilterState s(true); + EXPECT_FALSE(s.CheckTimer()); +} + +TEST(IdleFilterStateTest, TimerKeepsGoingWithActivity) { + IdleFilterState s(true); + for (int i = 0; i < 10; i++) { + s.IncreaseCallCount(); + (void)s.DecreaseCallCount(); + EXPECT_TRUE(s.CheckTimer()); + } + EXPECT_FALSE(s.CheckTimer()); +} + +TEST(IdleFilterStateTest, StressTest) { + IdleFilterState s(false); + std::atomic done{false}; + int idle_polls = 0; + int thread_jumps = 0; + std::vector threads; + for (int idx = 0; idx < 100; idx++) { + std::thread t([&] { + int ctr = 0; + auto increase = [&] { + s.IncreaseCallCount(); + ctr++; + }; + auto decrease = [&] { + ctr--; + if (s.DecreaseCallCount()) { + thread_jumps++; + if (thread_jumps == 10) done.store(true, std::memory_order_relaxed); + EXPECT_EQ(ctr, 0); + do { + idle_polls++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while (s.CheckTimer()); + } + }; + std::mt19937 g{std::random_device()()}; + while (!done.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for (int i = 0; i < 100; i++) { + if (g() & 1) { + increase(); + } else if (ctr > 0) { + decrease(); + } + } + while (ctr > 0) { + decrease(); + } + } + while (ctr > 0) { + decrease(); + } + }); + threads.emplace_back(std::move(t)); + } + for (auto& thread : threads) thread.join(); +} + +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 1d134fd897d..7ad2af98111 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1157,6 +1157,8 @@ src/core/ext/filters/client_channel/subchannel_interface.h \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.h \ src/core/ext/filters/client_idle/client_idle_filter.cc \ +src/core/ext/filters/client_idle/idle_filter_state.cc \ +src/core/ext/filters/client_idle/idle_filter_state.h \ src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/deadline/deadline_filter.h \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index bcc0da7d51c..3ed9fe567fc 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -984,6 +984,8 @@ src/core/ext/filters/client_channel/subchannel_interface.h \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.h \ src/core/ext/filters/client_idle/client_idle_filter.cc \ +src/core/ext/filters/client_idle/idle_filter_state.cc \ +src/core/ext/filters/client_idle/idle_filter_state.h \ src/core/ext/filters/deadline/deadline_filter.cc \ src/core/ext/filters/deadline/deadline_filter.h \ src/core/ext/filters/fault_injection/fault_injection_filter.cc \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 93d4e770b1f..bb1c6a0f072 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4979,6 +4979,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "idle_filter_state_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,