From 1391c93a95cc8efeca80ecefe69697418dcb3354 Mon Sep 17 00:00:00 2001
From: Qiancheng Zhao <qianchengz@google.com>
Date: Fri, 16 Aug 2019 14:15:21 -0700
Subject: [PATCH] atomic client idle filter

---
 include/grpc/impl/codegen/grpc_types.h        |   6 +-
 .../filters/client_idle/client_idle_filter.cc | 234 +++++++++++++++---
 .../channel/minimal_stack_is_minimal_test.cc  |   4 +-
 test/cpp/end2end/client_lb_end2end_test.cc    |   4 +-
 4 files changed, 210 insertions(+), 38 deletions(-)

diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 68ae3606e80..8108b853fca 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -169,11 +169,7 @@ typedef struct {
 #define GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS "grpc.max_connection_age_grace_ms"
 /** Timeout after the last RPC finishes on the client channel at which the
  * channel goes back into IDLE state. Int valued, milliseconds. INT_MAX means
- * unlimited. */
-/** TODO(qianchengz): Currently the default value is INT_MAX, which means the
- * client idle filter is disabled by default. After the client idle filter
- * proves no perfomance issue, we will change the default value to a reasonable
- * value. */
+ * unlimited. The default value is 30 minutes and the min value is 1 second. */
 #define GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS "grpc.client_idle_timeout_ms"
 /** Enable/disable support for per-message compression. Defaults to 1, unless
     GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc
index d2b23a9d20f..13c35ae3730 100644
--- a/src/core/ext/filters/client_idle/client_idle_filter.cc
+++ b/src/core/ext/filters/client_idle/client_idle_filter.cc
@@ -27,12 +27,12 @@
 #include "src/core/lib/surface/channel_init.h"
 #include "src/core/lib/transport/http2_errors.h"
 
-// The idle filter is disabled in client channel by default.
-// To enable the idle filte, set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [0, INT_MAX)
-// in channel args.
-// TODO(qianchengz): Find a reasonable default value. Maybe check what deault
-// value Java uses.
-#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
+// The idle filter is enabled in client channel by default.
+// Set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [1000, INT_MAX) in channel args to
+// configure the idle timeout.
+#define DEFAULT_IDLE_TIMEOUT_MS (30 /*minutes*/ * 60 * 1000)
+// The user input idle timeout smaller than this would be capped to it.
+#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)
 
 namespace grpc_core {
 
@@ -47,10 +47,82 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
 
 namespace {
 
+/*
+  client_idle_filter maintains a state tracking if there are active calls in the
+  channel and its internal idle_timer_. The states are specified as following:
+
+  +--------------------------------------------+-------------+---------+
+  |               ChannelState                 | idle_timer_ | channel |
+  +--------------------------------------------+-------------+---------+
+  | IDLE                                       | unset       | idle    |
+  | CALLS_ACTIVE                               | unset       | busy    |
+  | TIMER_PENDING                              | set-valid   | idle    |
+  | TIMER_PENDING_CALLS_ACTIVE                 | set-invalid | busy    |
+  | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle    |
+  +--------------------------------------------+-------------+---------+
+
+  IDLE: The initial state of the client_idle_filter, indicating the channel is
+  in IDLE.
+
+  CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
+
+  TIMER_PENDING: The state after the timer is set and no calls have arrived
+  after the timer is set. The channel must have 0 active call in this state. If
+  the timer is fired in this state, the channel will go into IDLE state.
+
+  TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
+  call has arrived after the timer is set. The channel must have 1 or 1+ active
+  calls in this state. If the timer is fired in this state, we won't reschedule
+  it.
+
+  TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
+  and at least one call has arrived after the timer is set, BUT the channel
+  currently has 0 active call. If the timer is fired in this state, we will
+  reschedule it according to the finish time of the latest call.
+
+  PROCESSING: The state set to block other threads when the setting thread is
+  doing some work to keep state consistency.
+
+  idle_timer_ will not be cancelled (unless the channel is shutting down).
+  If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
+  is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
+  changed.
+
+  State transitions:
+                                            IDLE
+                                            |  ^
+            ---------------------------------  *
+            |                                  *
+            v                                  *
+      CALLS_ACTIVE =================> TIMER_PENDING
+            ^                               |  ^
+            *  ------------------------------  *
+            *  |                               *
+            *  v                               *
+TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
+            ^                               |
+            |                               |
+            ---------------------------------
+
+  ---> Triggered by IncreaseCallCount()
+  ===> Triggered by DecreaseCallCount()
+  ***> Triggered by IdleTimerCallback()
+*/
+enum ChannelState {
+  IDLE,
+  CALLS_ACTIVE,
+  TIMER_PENDING,
+  TIMER_PENDING_CALLS_ACTIVE,
+  TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
+  PROCESSING
+};
+
 grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
-  return grpc_channel_arg_get_integer(
-      grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
-      {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX});
+  return GPR_MAX(
+      grpc_channel_arg_get_integer(
+          grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
+          {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
+      MIN_IDLE_TIMEOUT_MS);
 }
 
 class ChannelData {
@@ -86,8 +158,9 @@ class ChannelData {
   const grpc_millis client_idle_timeout_;
 
   // Member data used to track the state of channel.
-  Mutex call_count_mu_;
-  size_t call_count_;
+  grpc_millis last_idle_time_;
+  Atomic<intptr_t> call_count_{0};
+  Atomic<ChannelState> state_{IDLE};
 
   // Idle timer and its callback closure.
   grpc_timer idle_timer_;
@@ -115,37 +188,105 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
   // Catch the disconnect_with_error transport op.
   if (op->disconnect_with_error != nullptr) {
-    // Disconnect. Cancel the timer if we set it before.
-    // IncreaseCallCount() introduces a dummy call. It will cancel the timer and
-    // prevent the timer from being reset by other threads.
+    // IncreaseCallCount() introduces a dummy call and prevent the timer from
+    // being reset by other threads.
     chand->IncreaseCallCount();
+    // If the timer has been set, cancel the timer.
+    // No synchronization issues here. grpc_timer_cancel() is valid as long as
+    // the timer has been init()ed before.
+    grpc_timer_cancel(&chand->idle_timer_);
   }
   // Pass the op to the next filter.
   grpc_channel_next_op(elem, op);
 }
 
 void ChannelData::IncreaseCallCount() {
-  MutexLock lock(&call_count_mu_);
-  if (call_count_++ == 0) {
-    grpc_timer_cancel(&idle_timer_);
+  const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
+  GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
+                       previous_value + 1);
+  if (previous_value == 0) {
+    // This call is the one that makes the channel busy.
+    // Loop here to make sure the previous decrease operation has finished.
+    ChannelState state = state_.Load(MemoryOrder::RELAXED);
+    while (true) {
+      switch (state) {
+        // Timer has not been set. Switch to CALLS_ACTIVE.
+        case IDLE:
+          // In this case, no other threads will modify the state, so we can
+          // just store the value.
+          state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
+          return;
+        // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
+        case TIMER_PENDING:
+        case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
+          // At this point, the state may have been switched to IDLE by the
+          // idle timer callback. Therefore, use CAS operation to change the
+          // state atomically.
+          // Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
+          // been properly set in DecreaseCallCount().
+          if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
+                                         MemoryOrder::ACQUIRE,
+                                         MemoryOrder::RELAXED)) {
+            return;
+          }
+          break;
+        default:
+          // The state has not been switched to desired value yet, try again.
+          state = state_.Load(MemoryOrder::RELAXED);
+          break;
+      }
+    }
   }
-  GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, call_count_);
 }
 
 void ChannelData::DecreaseCallCount() {
-  MutexLock lock(&call_count_mu_);
-  if (call_count_-- == 1) {
-    StartIdleTimer();
+  const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
+  GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
+                       previous_value - 1);
+  if (previous_value == 1) {
+    // This call is the one that makes the channel idle.
+    // last_idle_time_ does not need to be Atomic<> because busy-loops in
+    // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
+    // prevent multiple threads from simultaneously accessing this variable.
+    last_idle_time_ = ExecCtx::Get()->Now();
+    ChannelState state = state_.Load(MemoryOrder::RELAXED);
+    while (true) {
+      switch (state) {
+        // Timer has not been set. Set the timer and switch to TIMER_PENDING
+        case CALLS_ACTIVE:
+          // Release store here to make other threads see the updated value of
+          // last_idle_time_.
+          StartIdleTimer();
+          state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
+          return;
+        // Timer has been set. Switch to
+        // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
+        case TIMER_PENDING_CALLS_ACTIVE:
+          // At this point, the state may have been switched to CALLS_ACTIVE by
+          // the idle timer callback. Therefore, use CAS operation to change the
+          // state atomically.
+          // Release store here to make the idle timer callback see the updated
+          // value of last_idle_time_ to properly reset the idle timer.
+          if (state_.CompareExchangeWeak(
+                  &state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
+                  MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
+            return;
+          }
+          break;
+        default:
+          // The state has not been switched to desired value yet, try again.
+          state = state_.Load(MemoryOrder::RELAXED);
+          break;
+      }
+    }
   }
-  GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, call_count_);
 }
 
 ChannelData::ChannelData(grpc_channel_element* elem,
                          grpc_channel_element_args* args, grpc_error** error)
     : elem_(elem),
       channel_stack_(args->channel_stack),
-      client_idle_timeout_(GetClientIdleTimeout(args->channel_args)),
-      call_count_(0) {
+      client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
   // If the idle filter is explicitly disabled in channel args, this ctor should
   // not get called.
   GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
@@ -165,10 +306,45 @@ ChannelData::ChannelData(grpc_channel_element* elem,
 void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
   GRPC_IDLE_FILTER_LOG("timer alarms");
   ChannelData* chand = static_cast<ChannelData*>(arg);
-  {
-    MutexLock lock(&chand->call_count_mu_);
-    if (error == GRPC_ERROR_NONE && chand->call_count_ == 0) {
-      chand->EnterIdle();
+  if (error != GRPC_ERROR_NONE) {
+    GRPC_IDLE_FILTER_LOG("timer canceled");
+    GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
+    return;
+  }
+  bool finished = false;
+  ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
+  while (!finished) {
+    switch (state) {
+      case TIMER_PENDING:
+        // Change the state to PROCESSING to block IncreaseCallCout() until the
+        // EnterIdle() operation finishes, preventing mistakenly entering IDLE
+        // when active RPC exists.
+        finished = chand->state_.CompareExchangeWeak(
+            &state, PROCESSING, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
+        if (finished) {
+          chand->EnterIdle();
+          chand->state_.Store(IDLE, MemoryOrder::RELAXED);
+        }
+        break;
+      case TIMER_PENDING_CALLS_ACTIVE:
+        finished = chand->state_.CompareExchangeWeak(
+            &state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
+        break;
+      case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
+        // Change the state to PROCESSING to block IncreaseCallCount() until the
+        // StartIdleTimer() operation finishes, preventing mistakenly restarting
+        // the timer after grpc_timer_cancel() when shutdown.
+        finished = chand->state_.CompareExchangeWeak(
+            &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
+        if (finished) {
+          chand->StartIdleTimer();
+          chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
+        }
+        break;
+      default:
+        // The state has not been switched to desired value yet, try again.
+        state = chand->state_.Load(MemoryOrder::RELAXED);
+        break;
     }
   }
   GRPC_IDLE_FILTER_LOG("timer finishes");
@@ -185,7 +361,7 @@ void ChannelData::StartIdleTimer() {
   GRPC_IDLE_FILTER_LOG("timer has started");
   // Hold a ref to the channel stack for the timer callback.
   GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
-  grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_,
+  grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
                   &idle_timer_callback_);
 }
 
diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc
index bee0bfb41f2..3ca1992195b 100644
--- a/test/core/channel/minimal_stack_is_minimal_test.cc
+++ b/test/core/channel/minimal_stack_is_minimal_test.cc
@@ -100,8 +100,8 @@ int main(int argc, char** argv) {
   errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
                         "message_size", "deadline", "http-server",
                         "message_compress", "connected", NULL);
-  errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel",
-                        NULL);
+  errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL,
+                        "client_idle, client-channel", NULL);
 
   GPR_ASSERT(errors == 0);
   grpc_shutdown();
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index dac7860141c..ceab7506729 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -1471,7 +1471,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
   StartServers(kNumServers);
   // Set max idle time and build the channel.
   ChannelArguments args;
-  args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 100);
+  args.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, 1000);
   auto response_generator = BuildResolverResponseGenerator();
   auto channel = BuildChannel("", response_generator, args);
   auto stub = BuildStub(channel);
@@ -1483,7 +1483,7 @@ TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
   // After a period time not using the channel, the channel state should switch
   // to IDLE.
-  gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(120));
+  gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1200));
   EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE);
   // Sending a new RPC should awake the IDLE channel.
   response_generator.SetNextResolution(GetServersPorts());