From 1391c93a95cc8efeca80ecefe69697418dcb3354 Mon Sep 17 00:00:00 2001 From: Qiancheng Zhao Date: Fri, 16 Aug 2019 14:15:21 -0700 Subject: [PATCH] atomic client idle filter --- include/grpc/impl/codegen/grpc_types.h | 6 +- .../filters/client_idle/client_idle_filter.cc | 234 +++++++++++++++--- .../channel/minimal_stack_is_minimal_test.cc | 4 +- test/cpp/end2end/client_lb_end2end_test.cc | 4 +- 4 files changed, 210 insertions(+), 38 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 68ae3606e80..8108b853fca 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -169,11 +169,7 @@ typedef struct { #define GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS "grpc.max_connection_age_grace_ms" /** Timeout after the last RPC finishes on the client channel at which the * channel goes back into IDLE state. Int valued, milliseconds. INT_MAX means - * unlimited. */ -/** TODO(qianchengz): Currently the default value is INT_MAX, which means the - * client idle filter is disabled by default. After the client idle filter - * proves no perfomance issue, we will change the default value to a reasonable - * value. */ + * unlimited. The default value is 30 minutes and the min value is 1 second. */ #define GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS "grpc.client_idle_timeout_ms" /** Enable/disable support for per-message compression. Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */ diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc index d2b23a9d20f..13c35ae3730 100644 --- a/src/core/ext/filters/client_idle/client_idle_filter.cc +++ b/src/core/ext/filters/client_idle/client_idle_filter.cc @@ -27,12 +27,12 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/http2_errors.h" -// The idle filter is disabled in client channel by default. -// To enable the idle filte, set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [0, INT_MAX) -// in channel args. -// TODO(qianchengz): Find a reasonable default value. Maybe check what deault -// value Java uses. -#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX +// The idle filter is enabled in client channel by default. +// Set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [1000, INT_MAX) in channel args to +// configure the idle timeout. +#define DEFAULT_IDLE_TIMEOUT_MS (30 /*minutes*/ * 60 * 1000) +// The user input idle timeout smaller than this would be capped to it. +#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) namespace grpc_core { @@ -47,10 +47,82 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); namespace { +/* + client_idle_filter maintains a state tracking if there are active calls in the + channel and its internal idle_timer_. The states are specified as following: + + +--------------------------------------------+-------------+---------+ + | ChannelState | idle_timer_ | channel | + +--------------------------------------------+-------------+---------+ + | IDLE | unset | idle | + | CALLS_ACTIVE | unset | busy | + | TIMER_PENDING | set-valid | idle | + | TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy | + | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle | + +--------------------------------------------+-------------+---------+ + + IDLE: The initial state of the client_idle_filter, indicating the channel is + in IDLE. + + CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set. + + TIMER_PENDING: The state after the timer is set and no calls have arrived + after the timer is set. The channel must have 0 active call in this state. If + the timer is fired in this state, the channel will go into IDLE state. + + TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one + call has arrived after the timer is set. The channel must have 1 or 1+ active + calls in this state. If the timer is fired in this state, we won't reschedule + it. + + TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set + and at least one call has arrived after the timer is set, BUT the channel + currently has 0 active call. If the timer is fired in this state, we will + reschedule it according to the finish time of the latest call. + + PROCESSING: The state set to block other threads when the setting thread is + doing some work to keep state consistency. + + idle_timer_ will not be cancelled (unless the channel is shutting down). + If the timer callback is called when the idle_timer_ is valid (i.e. idle_state + is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be + changed. + + State transitions: + IDLE + | ^ + --------------------------------- * + | * + v * + CALLS_ACTIVE =================> TIMER_PENDING + ^ | ^ + * ------------------------------ * + * | * + * v * +TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START + ^ | + | | + --------------------------------- + + ---> Triggered by IncreaseCallCount() + ===> Triggered by DecreaseCallCount() + ***> Triggered by IdleTimerCallback() +*/ +enum ChannelState { + IDLE, + CALLS_ACTIVE, + TIMER_PENDING, + TIMER_PENDING_CALLS_ACTIVE, + TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, + PROCESSING +}; + grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) { - return grpc_channel_arg_get_integer( - grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), - {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}); + return GPR_MAX( + grpc_channel_arg_get_integer( + grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), + {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), + MIN_IDLE_TIMEOUT_MS); } class ChannelData { @@ -86,8 +158,9 @@ class ChannelData { const grpc_millis client_idle_timeout_; // Member data used to track the state of channel. - Mutex call_count_mu_; - size_t call_count_; + grpc_millis last_idle_time_; + Atomic call_count_{0}; + Atomic state_{IDLE}; // Idle timer and its callback closure. grpc_timer idle_timer_; @@ -115,37 +188,105 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem, ChannelData* chand = static_cast(elem->channel_data); // Catch the disconnect_with_error transport op. if (op->disconnect_with_error != nullptr) { - // Disconnect. Cancel the timer if we set it before. - // IncreaseCallCount() introduces a dummy call. It will cancel the timer and - // prevent the timer from being reset by other threads. + // IncreaseCallCount() introduces a dummy call and prevent the timer from + // being reset by other threads. chand->IncreaseCallCount(); + // If the timer has been set, cancel the timer. + // No synchronization issues here. grpc_timer_cancel() is valid as long as + // the timer has been init()ed before. + grpc_timer_cancel(&chand->idle_timer_); } // Pass the op to the next filter. grpc_channel_next_op(elem, op); } void ChannelData::IncreaseCallCount() { - MutexLock lock(&call_count_mu_); - if (call_count_++ == 0) { - grpc_timer_cancel(&idle_timer_); + const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED); + GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, + previous_value + 1); + if (previous_value == 0) { + // This call is the one that makes the channel busy. + // Loop here to make sure the previous decrease operation has finished. + ChannelState state = state_.Load(MemoryOrder::RELAXED); + while (true) { + switch (state) { + // Timer has not been set. Switch to CALLS_ACTIVE. + case IDLE: + // In this case, no other threads will modify the state, so we can + // just store the value. + state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED); + return; + // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE. + case TIMER_PENDING: + case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: + // At this point, the state may have been switched to IDLE by the + // idle timer callback. Therefore, use CAS operation to change the + // state atomically. + // Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has + // been properly set in DecreaseCallCount(). + if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE, + MemoryOrder::ACQUIRE, + MemoryOrder::RELAXED)) { + return; + } + break; + default: + // The state has not been switched to desired value yet, try again. + state = state_.Load(MemoryOrder::RELAXED); + break; + } + } } - GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, call_count_); } void ChannelData::DecreaseCallCount() { - MutexLock lock(&call_count_mu_); - if (call_count_-- == 1) { - StartIdleTimer(); + const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED); + GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, + previous_value - 1); + if (previous_value == 1) { + // This call is the one that makes the channel idle. + // last_idle_time_ does not need to be Atomic<> because busy-loops in + // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will + // prevent multiple threads from simultaneously accessing this variable. + last_idle_time_ = ExecCtx::Get()->Now(); + ChannelState state = state_.Load(MemoryOrder::RELAXED); + while (true) { + switch (state) { + // Timer has not been set. Set the timer and switch to TIMER_PENDING + case CALLS_ACTIVE: + // Release store here to make other threads see the updated value of + // last_idle_time_. + StartIdleTimer(); + state_.Store(TIMER_PENDING, MemoryOrder::RELEASE); + return; + // Timer has been set. Switch to + // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START + case TIMER_PENDING_CALLS_ACTIVE: + // At this point, the state may have been switched to CALLS_ACTIVE by + // the idle timer callback. Therefore, use CAS operation to change the + // state atomically. + // Release store here to make the idle timer callback see the updated + // value of last_idle_time_ to properly reset the idle timer. + if (state_.CompareExchangeWeak( + &state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, + MemoryOrder::RELEASE, MemoryOrder::RELAXED)) { + return; + } + break; + default: + // The state has not been switched to desired value yet, try again. + state = state_.Load(MemoryOrder::RELAXED); + break; + } + } } - GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, call_count_); } ChannelData::ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args, grpc_error** error) : elem_(elem), channel_stack_(args->channel_stack), - client_idle_timeout_(GetClientIdleTimeout(args->channel_args)), - call_count_(0) { + client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) { // If the idle filter is explicitly disabled in channel args, this ctor should // not get called. GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE); @@ -165,10 +306,45 @@ ChannelData::ChannelData(grpc_channel_element* elem, void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) { GRPC_IDLE_FILTER_LOG("timer alarms"); ChannelData* chand = static_cast(arg); - { - MutexLock lock(&chand->call_count_mu_); - if (error == GRPC_ERROR_NONE && chand->call_count_ == 0) { - chand->EnterIdle(); + if (error != GRPC_ERROR_NONE) { + GRPC_IDLE_FILTER_LOG("timer canceled"); + GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); + return; + } + bool finished = false; + ChannelState state = chand->state_.Load(MemoryOrder::RELAXED); + while (!finished) { + switch (state) { + case TIMER_PENDING: + // Change the state to PROCESSING to block IncreaseCallCout() until the + // EnterIdle() operation finishes, preventing mistakenly entering IDLE + // when active RPC exists. + finished = chand->state_.CompareExchangeWeak( + &state, PROCESSING, MemoryOrder::RELAXED, MemoryOrder::RELAXED); + if (finished) { + chand->EnterIdle(); + chand->state_.Store(IDLE, MemoryOrder::RELAXED); + } + break; + case TIMER_PENDING_CALLS_ACTIVE: + finished = chand->state_.CompareExchangeWeak( + &state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED); + break; + case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: + // Change the state to PROCESSING to block IncreaseCallCount() until the + // StartIdleTimer() operation finishes, preventing mistakenly restarting + // the timer after grpc_timer_cancel() when shutdown. + finished = chand->state_.CompareExchangeWeak( + &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED); + if (finished) { + chand->StartIdleTimer(); + chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED); + } + break; + default: + // The state has not been switched to desired value yet, try again. + state = chand->state_.Load(MemoryOrder::RELAXED); + break; } } GRPC_IDLE_FILTER_LOG("timer finishes"); @@ -185,7 +361,7 @@ void ChannelData::StartIdleTimer() { GRPC_IDLE_FILTER_LOG("timer has started"); // Hold a ref to the channel stack for the timer callback. GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback"); - grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_, + grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_, &idle_timer_callback_); } diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index bee0bfb41f2..3ca1992195b 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -100,8 +100,8 @@ int main(int argc, char** argv) { errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server", "message_size", "deadline", "http-server", "message_compress", "connected", NULL); - errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel", - NULL); + errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, + "client_idle, client-channel", NULL); GPR_ASSERT(errors == 0); grpc_shutdown(); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index dac7860141c..ceab7506729 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -1471,7 +1471,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) { StartServers(kNumServers); // Set max idle time and build the channel. ChannelArguments args; - args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 100); + args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000); auto response_generator = BuildResolverResponseGenerator(); auto channel = BuildChannel("", response_generator, args); auto stub = BuildStub(channel); @@ -1483,7 +1483,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) { EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // After a period time not using the channel, the channel state should switch // to IDLE. - gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(120)); + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200)); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); // Sending a new RPC should awake the IDLE channel. response_generator.SetNextResolution(GetServersPorts());