|
|
|
@ -27,12 +27,12 @@ |
|
|
|
|
#include "src/core/lib/surface/channel_init.h" |
|
|
|
|
#include "src/core/lib/transport/http2_errors.h" |
|
|
|
|
|
|
|
|
|
// The idle filter is disabled in client channel by default.
|
|
|
|
|
// To enable the idle filte, set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [0, INT_MAX)
|
|
|
|
|
// in channel args.
|
|
|
|
|
// TODO(qianchengz): Find a reasonable default value. Maybe check what deault
|
|
|
|
|
// value Java uses.
|
|
|
|
|
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
|
|
|
|
// The idle filter is enabled in client channel by default.
|
|
|
|
|
// Set GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS to [1000, INT_MAX) in channel args to
|
|
|
|
|
// configure the idle timeout.
|
|
|
|
|
#define DEFAULT_IDLE_TIMEOUT_MS (30 /*minutes*/ * 60 * 1000) |
|
|
|
|
// The user input idle timeout smaller than this would be capped to it.
|
|
|
|
|
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
|
|
|
|
@ -47,10 +47,82 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
client_idle_filter maintains a state tracking if there are active calls in the |
|
|
|
|
channel and its internal idle_timer_. The states are specified as following: |
|
|
|
|
|
|
|
|
|
+--------------------------------------------+-------------+---------+ |
|
|
|
|
| ChannelState | idle_timer_ | channel | |
|
|
|
|
+--------------------------------------------+-------------+---------+ |
|
|
|
|
| IDLE | unset | idle | |
|
|
|
|
| CALLS_ACTIVE | unset | busy | |
|
|
|
|
| TIMER_PENDING | set-valid | idle | |
|
|
|
|
| TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy | |
|
|
|
|
| TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle | |
|
|
|
|
+--------------------------------------------+-------------+---------+ |
|
|
|
|
|
|
|
|
|
IDLE: The initial state of the client_idle_filter, indicating the channel is |
|
|
|
|
in IDLE. |
|
|
|
|
|
|
|
|
|
CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set. |
|
|
|
|
|
|
|
|
|
TIMER_PENDING: The state after the timer is set and no calls have arrived |
|
|
|
|
after the timer is set. The channel must have 0 active call in this state. If |
|
|
|
|
the timer is fired in this state, the channel will go into IDLE state. |
|
|
|
|
|
|
|
|
|
TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one |
|
|
|
|
call has arrived after the timer is set. The channel must have 1 or 1+ active |
|
|
|
|
calls in this state. If the timer is fired in this state, we won't reschedule |
|
|
|
|
it. |
|
|
|
|
|
|
|
|
|
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set |
|
|
|
|
and at least one call has arrived after the timer is set, BUT the channel |
|
|
|
|
currently has 0 active call. If the timer is fired in this state, we will |
|
|
|
|
reschedule it according to the finish time of the latest call. |
|
|
|
|
|
|
|
|
|
PROCESSING: The state set to block other threads when the setting thread is |
|
|
|
|
doing some work to keep state consistency. |
|
|
|
|
|
|
|
|
|
idle_timer_ will not be cancelled (unless the channel is shutting down). |
|
|
|
|
If the timer callback is called when the idle_timer_ is valid (i.e. idle_state |
|
|
|
|
is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be |
|
|
|
|
changed. |
|
|
|
|
|
|
|
|
|
State transitions: |
|
|
|
|
IDLE |
|
|
|
|
| ^ |
|
|
|
|
--------------------------------- * |
|
|
|
|
| * |
|
|
|
|
v * |
|
|
|
|
CALLS_ACTIVE =================> TIMER_PENDING |
|
|
|
|
^ | ^ |
|
|
|
|
* ------------------------------ * |
|
|
|
|
* | * |
|
|
|
|
* v * |
|
|
|
|
TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START |
|
|
|
|
^ | |
|
|
|
|
| | |
|
|
|
|
--------------------------------- |
|
|
|
|
|
|
|
|
|
---> Triggered by IncreaseCallCount() |
|
|
|
|
===> Triggered by DecreaseCallCount() |
|
|
|
|
***> Triggered by IdleTimerCallback() |
|
|
|
|
*/ |
|
|
|
|
enum ChannelState { |
|
|
|
|
IDLE, |
|
|
|
|
CALLS_ACTIVE, |
|
|
|
|
TIMER_PENDING, |
|
|
|
|
TIMER_PENDING_CALLS_ACTIVE, |
|
|
|
|
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, |
|
|
|
|
PROCESSING |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) { |
|
|
|
|
return grpc_channel_arg_get_integer( |
|
|
|
|
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
|
|
|
|
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}); |
|
|
|
|
return GPR_MAX( |
|
|
|
|
grpc_channel_arg_get_integer( |
|
|
|
|
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
|
|
|
|
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
|
|
|
|
MIN_IDLE_TIMEOUT_MS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class ChannelData { |
|
|
|
@ -86,8 +158,9 @@ class ChannelData { |
|
|
|
|
const grpc_millis client_idle_timeout_; |
|
|
|
|
|
|
|
|
|
// Member data used to track the state of channel.
|
|
|
|
|
Mutex call_count_mu_; |
|
|
|
|
size_t call_count_; |
|
|
|
|
grpc_millis last_idle_time_; |
|
|
|
|
Atomic<intptr_t> call_count_{0}; |
|
|
|
|
Atomic<ChannelState> state_{IDLE}; |
|
|
|
|
|
|
|
|
|
// Idle timer and its callback closure.
|
|
|
|
|
grpc_timer idle_timer_; |
|
|
|
@ -115,37 +188,105 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem, |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); |
|
|
|
|
// Catch the disconnect_with_error transport op.
|
|
|
|
|
if (op->disconnect_with_error != nullptr) { |
|
|
|
|
// Disconnect. Cancel the timer if we set it before.
|
|
|
|
|
// IncreaseCallCount() introduces a dummy call. It will cancel the timer and
|
|
|
|
|
// prevent the timer from being reset by other threads.
|
|
|
|
|
// IncreaseCallCount() introduces a dummy call and prevent the timer from
|
|
|
|
|
// being reset by other threads.
|
|
|
|
|
chand->IncreaseCallCount(); |
|
|
|
|
// If the timer has been set, cancel the timer.
|
|
|
|
|
// No synchronization issues here. grpc_timer_cancel() is valid as long as
|
|
|
|
|
// the timer has been init()ed before.
|
|
|
|
|
grpc_timer_cancel(&chand->idle_timer_); |
|
|
|
|
} |
|
|
|
|
// Pass the op to the next filter.
|
|
|
|
|
grpc_channel_next_op(elem, op); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::IncreaseCallCount() { |
|
|
|
|
MutexLock lock(&call_count_mu_); |
|
|
|
|
if (call_count_++ == 0) { |
|
|
|
|
grpc_timer_cancel(&idle_timer_); |
|
|
|
|
const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED); |
|
|
|
|
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, |
|
|
|
|
previous_value + 1); |
|
|
|
|
if (previous_value == 0) { |
|
|
|
|
// This call is the one that makes the channel busy.
|
|
|
|
|
// Loop here to make sure the previous decrease operation has finished.
|
|
|
|
|
ChannelState state = state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
while (true) { |
|
|
|
|
switch (state) { |
|
|
|
|
// Timer has not been set. Switch to CALLS_ACTIVE.
|
|
|
|
|
case IDLE: |
|
|
|
|
// In this case, no other threads will modify the state, so we can
|
|
|
|
|
// just store the value.
|
|
|
|
|
state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED); |
|
|
|
|
return; |
|
|
|
|
// Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
|
|
|
|
|
case TIMER_PENDING: |
|
|
|
|
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: |
|
|
|
|
// At this point, the state may have been switched to IDLE by the
|
|
|
|
|
// idle timer callback. Therefore, use CAS operation to change the
|
|
|
|
|
// state atomically.
|
|
|
|
|
// Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
|
|
|
|
|
// been properly set in DecreaseCallCount().
|
|
|
|
|
if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE, |
|
|
|
|
MemoryOrder::ACQUIRE, |
|
|
|
|
MemoryOrder::RELAXED)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
// The state has not been switched to desired value yet, try again.
|
|
|
|
|
state = state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR, call_count_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ChannelData::DecreaseCallCount() { |
|
|
|
|
MutexLock lock(&call_count_mu_); |
|
|
|
|
if (call_count_-- == 1) { |
|
|
|
|
StartIdleTimer(); |
|
|
|
|
const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED); |
|
|
|
|
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, |
|
|
|
|
previous_value - 1); |
|
|
|
|
if (previous_value == 1) { |
|
|
|
|
// This call is the one that makes the channel idle.
|
|
|
|
|
// last_idle_time_ does not need to be Atomic<> because busy-loops in
|
|
|
|
|
// IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
|
|
|
|
|
// prevent multiple threads from simultaneously accessing this variable.
|
|
|
|
|
last_idle_time_ = ExecCtx::Get()->Now(); |
|
|
|
|
ChannelState state = state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
while (true) { |
|
|
|
|
switch (state) { |
|
|
|
|
// Timer has not been set. Set the timer and switch to TIMER_PENDING
|
|
|
|
|
case CALLS_ACTIVE: |
|
|
|
|
// Release store here to make other threads see the updated value of
|
|
|
|
|
// last_idle_time_.
|
|
|
|
|
StartIdleTimer(); |
|
|
|
|
state_.Store(TIMER_PENDING, MemoryOrder::RELEASE); |
|
|
|
|
return; |
|
|
|
|
// Timer has been set. Switch to
|
|
|
|
|
// TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
|
|
|
|
|
case TIMER_PENDING_CALLS_ACTIVE: |
|
|
|
|
// At this point, the state may have been switched to CALLS_ACTIVE by
|
|
|
|
|
// the idle timer callback. Therefore, use CAS operation to change the
|
|
|
|
|
// state atomically.
|
|
|
|
|
// Release store here to make the idle timer callback see the updated
|
|
|
|
|
// value of last_idle_time_ to properly reset the idle timer.
|
|
|
|
|
if (state_.CompareExchangeWeak( |
|
|
|
|
&state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START, |
|
|
|
|
MemoryOrder::RELEASE, MemoryOrder::RELAXED)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
// The state has not been switched to desired value yet, try again.
|
|
|
|
|
state = state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR, call_count_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ChannelData::ChannelData(grpc_channel_element* elem, |
|
|
|
|
grpc_channel_element_args* args, grpc_error** error) |
|
|
|
|
: elem_(elem), |
|
|
|
|
channel_stack_(args->channel_stack), |
|
|
|
|
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)), |
|
|
|
|
call_count_(0) { |
|
|
|
|
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) { |
|
|
|
|
// If the idle filter is explicitly disabled in channel args, this ctor should
|
|
|
|
|
// not get called.
|
|
|
|
|
GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE); |
|
|
|
@ -165,10 +306,45 @@ ChannelData::ChannelData(grpc_channel_element* elem, |
|
|
|
|
void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) { |
|
|
|
|
GRPC_IDLE_FILTER_LOG("timer alarms"); |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>(arg); |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&chand->call_count_mu_); |
|
|
|
|
if (error == GRPC_ERROR_NONE && chand->call_count_ == 0) { |
|
|
|
|
chand->EnterIdle(); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
GRPC_IDLE_FILTER_LOG("timer canceled"); |
|
|
|
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
bool finished = false; |
|
|
|
|
ChannelState state = chand->state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
while (!finished) { |
|
|
|
|
switch (state) { |
|
|
|
|
case TIMER_PENDING: |
|
|
|
|
// Change the state to PROCESSING to block IncreaseCallCout() until the
|
|
|
|
|
// EnterIdle() operation finishes, preventing mistakenly entering IDLE
|
|
|
|
|
// when active RPC exists.
|
|
|
|
|
finished = chand->state_.CompareExchangeWeak( |
|
|
|
|
&state, PROCESSING, MemoryOrder::RELAXED, MemoryOrder::RELAXED); |
|
|
|
|
if (finished) { |
|
|
|
|
chand->EnterIdle(); |
|
|
|
|
chand->state_.Store(IDLE, MemoryOrder::RELAXED); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case TIMER_PENDING_CALLS_ACTIVE: |
|
|
|
|
finished = chand->state_.CompareExchangeWeak( |
|
|
|
|
&state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED); |
|
|
|
|
break; |
|
|
|
|
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: |
|
|
|
|
// Change the state to PROCESSING to block IncreaseCallCount() until the
|
|
|
|
|
// StartIdleTimer() operation finishes, preventing mistakenly restarting
|
|
|
|
|
// the timer after grpc_timer_cancel() when shutdown.
|
|
|
|
|
finished = chand->state_.CompareExchangeWeak( |
|
|
|
|
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED); |
|
|
|
|
if (finished) { |
|
|
|
|
chand->StartIdleTimer(); |
|
|
|
|
chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
// The state has not been switched to desired value yet, try again.
|
|
|
|
|
state = chand->state_.Load(MemoryOrder::RELAXED); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_IDLE_FILTER_LOG("timer finishes"); |
|
|
|
@ -185,7 +361,7 @@ void ChannelData::StartIdleTimer() { |
|
|
|
|
GRPC_IDLE_FILTER_LOG("timer has started"); |
|
|
|
|
// Hold a ref to the channel stack for the timer callback.
|
|
|
|
|
GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback"); |
|
|
|
|
grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_, |
|
|
|
|
grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_, |
|
|
|
|
&idle_timer_callback_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|