mirror of https://github.com/grpc/grpc.git
This reverts commit d589f4e6ca
.
pull/29106/head
parent
f82245555e
commit
ff14d1d7f3
23 changed files with 900 additions and 480 deletions
@ -1,402 +0,0 @@ |
|||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
|
|
||||||
// mock transport operations.
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <limits.h> |
|
||||||
#include <stdlib.h> |
|
||||||
|
|
||||||
#include <atomic> |
|
||||||
#include <limits> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_stack_builder.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/config/core_configuration.h" |
|
||||||
#include "src/core/lib/gprpp/capture.h" |
|
||||||
#include "src/core/lib/iomgr/timer.h" |
|
||||||
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
|
||||||
#include "src/core/lib/promise/loop.h" |
|
||||||
#include "src/core/lib/promise/sleep.h" |
|
||||||
#include "src/core/lib/promise/try_seq.h" |
|
||||||
#include "src/core/lib/transport/http2_errors.h" |
|
||||||
|
|
||||||
// TODO(juanlishen): The idle filter was disabled in client channel by default
|
|
||||||
// due to b/143502997. Now the bug is fixed enable the filter by default.
|
|
||||||
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
|
||||||
// The user input idle timeout smaller than this would be capped to it.
|
|
||||||
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
|
||||||
|
|
||||||
// If these settings change, make sure that we are not sending a GOAWAY for
|
|
||||||
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
|
|
||||||
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
|
||||||
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
|
||||||
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
|
||||||
#define MAX_CONNECTION_AGE_JITTER 0.1 |
|
||||||
|
|
||||||
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
|
||||||
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
|
||||||
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
|
||||||
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
|
||||||
|
|
||||||
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
|
||||||
do { \
|
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
|
||||||
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
} while (0) |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
|
||||||
int ms = std::max( |
|
||||||
grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
|
||||||
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
|
||||||
MIN_IDLE_TIMEOUT_MS); |
|
||||||
return ms == INT_MAX ? Duration::Infinity() : Duration::Milliseconds(ms); |
|
||||||
} |
|
||||||
|
|
||||||
struct MaxAgeConfig { |
|
||||||
Duration max_connection_age; |
|
||||||
Duration max_connection_idle; |
|
||||||
Duration max_connection_age_grace; |
|
||||||
|
|
||||||
bool enable() const { |
|
||||||
return max_connection_age != Duration::Infinity() || |
|
||||||
max_connection_idle != Duration::Infinity(); |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
|
||||||
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
|
||||||
would not create connection storms by itself, but if there happened to be a |
|
||||||
connection storm it could cause it to repeat at a fixed period. */ |
|
||||||
MaxAgeConfig GetMaxAgeConfig(const grpc_channel_args* args) { |
|
||||||
const int args_max_age = grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
|
||||||
MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
|
||||||
const int args_max_idle = grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
|
||||||
MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
|
||||||
const int args_max_age_grace = grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS), |
|
||||||
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
|
||||||
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
|
||||||
1 + MAX_CONNECTION_AGE_JITTER */ |
|
||||||
const double multiplier = |
|
||||||
rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + 1.0 - |
|
||||||
MAX_CONNECTION_AGE_JITTER; |
|
||||||
/* GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
|
|
||||||
will not be cast to int implicitly before the comparison. */ |
|
||||||
return MaxAgeConfig{ |
|
||||||
args_max_age == INT_MAX |
|
||||||
? Duration::Infinity() |
|
||||||
: Duration::FromSecondsAsDouble(multiplier * args_max_age / 1000.0), |
|
||||||
args_max_idle == INT_MAX ? Duration::Infinity() |
|
||||||
: Duration::Milliseconds(args_max_idle), |
|
||||||
Duration::Milliseconds(args_max_age_grace)}; |
|
||||||
} |
|
||||||
|
|
||||||
class ChannelIdleFilter : public ChannelFilter { |
|
||||||
public: |
|
||||||
~ChannelIdleFilter() override = default; |
|
||||||
|
|
||||||
ChannelIdleFilter(const ChannelIdleFilter&) = delete; |
|
||||||
ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete; |
|
||||||
ChannelIdleFilter(ChannelIdleFilter&&) = default; |
|
||||||
ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default; |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
|
||||||
|
|
||||||
bool StartTransportOp(grpc_transport_op* op) override; |
|
||||||
|
|
||||||
protected: |
|
||||||
ChannelIdleFilter(grpc_channel_stack* channel_stack, |
|
||||||
Duration client_idle_timeout) |
|
||||||
: channel_stack_(channel_stack), |
|
||||||
client_idle_timeout_(client_idle_timeout) {} |
|
||||||
|
|
||||||
grpc_channel_stack* channel_stack() { return channel_stack_; }; |
|
||||||
|
|
||||||
virtual void Shutdown(); |
|
||||||
void CloseChannel(); |
|
||||||
|
|
||||||
void IncreaseCallCount(); |
|
||||||
void DecreaseCallCount(); |
|
||||||
|
|
||||||
private: |
|
||||||
void StartIdleTimer(); |
|
||||||
|
|
||||||
struct CallCountDecreaser { |
|
||||||
void operator()(ChannelIdleFilter* filter) const { |
|
||||||
filter->DecreaseCallCount(); |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
// The channel stack to which we take refs for pending callbacks.
|
|
||||||
grpc_channel_stack* channel_stack_; |
|
||||||
Duration client_idle_timeout_; |
|
||||||
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
|
||||||
std::make_shared<IdleFilterState>(false)}; |
|
||||||
|
|
||||||
ActivityPtr activity_; |
|
||||||
}; |
|
||||||
|
|
||||||
class ClientIdleFilter final : public ChannelIdleFilter { |
|
||||||
public: |
|
||||||
static absl::StatusOr<ClientIdleFilter> Create( |
|
||||||
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
|
||||||
|
|
||||||
private: |
|
||||||
using ChannelIdleFilter::ChannelIdleFilter; |
|
||||||
}; |
|
||||||
|
|
||||||
class MaxAgeFilter final : public ChannelIdleFilter { |
|
||||||
public: |
|
||||||
static absl::StatusOr<MaxAgeFilter> Create(const grpc_channel_args* args, |
|
||||||
ChannelFilter::Args filter_args); |
|
||||||
|
|
||||||
void Start(); |
|
||||||
|
|
||||||
private: |
|
||||||
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
|
||||||
public: |
|
||||||
explicit ConnectivityWatcher(MaxAgeFilter* filter) |
|
||||||
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} |
|
||||||
~ConnectivityWatcher() override = default; |
|
||||||
|
|
||||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
|
||||||
const absl::Status&) override { |
|
||||||
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
RefCountedPtr<grpc_channel_stack> channel_stack_; |
|
||||||
MaxAgeFilter* filter_; |
|
||||||
}; |
|
||||||
|
|
||||||
MaxAgeFilter(grpc_channel_stack* channel_stack, |
|
||||||
const MaxAgeConfig& max_age_config) |
|
||||||
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), |
|
||||||
max_connection_age_(max_age_config.max_connection_age), |
|
||||||
max_connection_age_grace_(max_age_config.max_connection_age_grace) {} |
|
||||||
|
|
||||||
void Shutdown() override; |
|
||||||
|
|
||||||
ActivityPtr max_age_activity_; |
|
||||||
Duration max_connection_age_; |
|
||||||
Duration max_connection_age_grace_; |
|
||||||
}; |
|
||||||
|
|
||||||
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
|
||||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
|
||||||
ClientIdleFilter filter(filter_args.channel_stack(), |
|
||||||
GetClientIdleTimeout(args)); |
|
||||||
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
|
||||||
} |
|
||||||
|
|
||||||
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create( |
|
||||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
|
||||||
const auto config = GetMaxAgeConfig(args); |
|
||||||
MaxAgeFilter filter(filter_args.channel_stack(), config); |
|
||||||
return absl::StatusOr<MaxAgeFilter>(std::move(filter)); |
|
||||||
} |
|
||||||
|
|
||||||
void MaxAgeFilter::Shutdown() { |
|
||||||
max_age_activity_.reset(); |
|
||||||
ChannelIdleFilter::Shutdown(); |
|
||||||
} |
|
||||||
|
|
||||||
void MaxAgeFilter::Start() { |
|
||||||
// Trigger idle timer immediately
|
|
||||||
IncreaseCallCount(); |
|
||||||
DecreaseCallCount(); |
|
||||||
|
|
||||||
struct StartupClosure { |
|
||||||
RefCountedPtr<grpc_channel_stack> channel_stack; |
|
||||||
MaxAgeFilter* filter; |
|
||||||
grpc_closure closure; |
|
||||||
}; |
|
||||||
auto run_startup = [](void* p, grpc_error_handle) { |
|
||||||
auto* startup = static_cast<StartupClosure*>(p); |
|
||||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
||||||
op->start_connectivity_watch.reset( |
|
||||||
new ConnectivityWatcher(startup->filter)); |
|
||||||
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
|
||||||
grpc_channel_next_op( |
|
||||||
grpc_channel_stack_element(startup->channel_stack.get(), 0), op); |
|
||||||
delete startup; |
|
||||||
}; |
|
||||||
auto* startup = |
|
||||||
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; |
|
||||||
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); |
|
||||||
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, GRPC_ERROR_NONE); |
|
||||||
|
|
||||||
auto channel_stack = this->channel_stack()->Ref(); |
|
||||||
|
|
||||||
// Start the max age timer
|
|
||||||
if (max_connection_age_ != Duration::Infinity()) { |
|
||||||
max_age_activity_ = MakeActivity( |
|
||||||
TrySeq( |
|
||||||
// First sleep until the max connection age
|
|
||||||
Sleep(ExecCtx::Get()->Now() + max_connection_age_), |
|
||||||
// Then send a goaway.
|
|
||||||
[this] { |
|
||||||
GRPC_CHANNEL_STACK_REF(this->channel_stack(), |
|
||||||
"max_age send_goaway"); |
|
||||||
// Jump out of the activity to send the goaway.
|
|
||||||
auto fn = [](void* arg, grpc_error_handle) { |
|
||||||
auto* channel_stack = static_cast<grpc_channel_stack*>(arg); |
|
||||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
||||||
op->goaway_error = grpc_error_set_int( |
|
||||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
|
||||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
|
||||||
grpc_channel_element* elem = |
|
||||||
grpc_channel_stack_element(channel_stack, 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); |
|
||||||
}; |
|
||||||
ExecCtx::Run( |
|
||||||
DEBUG_LOCATION, |
|
||||||
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), |
|
||||||
GRPC_ERROR_NONE); |
|
||||||
return Immediate(absl::OkStatus()); |
|
||||||
}, |
|
||||||
// Sleep for the grace period
|
|
||||||
[this] { |
|
||||||
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); |
|
||||||
}), |
|
||||||
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { |
|
||||||
// OnDone -- close the connection if the promise completed
|
|
||||||
// successfully.
|
|
||||||
// (if it did not, it was cancelled)
|
|
||||||
if (status.ok()) CloseChannel(); |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> ChannelIdleFilter::MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
|
||||||
using Decrementer = std::unique_ptr<ChannelIdleFilter, CallCountDecreaser>; |
|
||||||
IncreaseCallCount(); |
|
||||||
return ArenaPromise<ServerMetadataHandle>( |
|
||||||
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
|
||||||
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
|
||||||
Decrementer(this), next_promise_factory(std::move(call_args)))); |
|
||||||
} |
|
||||||
|
|
||||||
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { |
|
||||||
// Catch the disconnect_with_error transport op.
|
|
||||||
if (op->disconnect_with_error != GRPC_ERROR_NONE) Shutdown(); |
|
||||||
// Pass the op to the next filter.
|
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
void ChannelIdleFilter::Shutdown() { |
|
||||||
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
|
||||||
// being reset by other threads.
|
|
||||||
IncreaseCallCount(); |
|
||||||
activity_.reset(); |
|
||||||
} |
|
||||||
|
|
||||||
void ChannelIdleFilter::IncreaseCallCount() { |
|
||||||
idle_filter_state_->IncreaseCallCount(); |
|
||||||
} |
|
||||||
|
|
||||||
void ChannelIdleFilter::DecreaseCallCount() { |
|
||||||
if (idle_filter_state_->DecreaseCallCount()) { |
|
||||||
// If there are no more calls in progress, start the idle timer.
|
|
||||||
StartIdleTimer(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void ChannelIdleFilter::StartIdleTimer() { |
|
||||||
GRPC_IDLE_FILTER_LOG("timer has started"); |
|
||||||
auto idle_filter_state = idle_filter_state_; |
|
||||||
// Hold a ref to the channel stack for the timer callback.
|
|
||||||
auto channel_stack = channel_stack_->Ref(); |
|
||||||
auto timeout = client_idle_timeout_; |
|
||||||
auto promise = Loop([timeout, idle_filter_state]() { |
|
||||||
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
|
||||||
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
|
||||||
if (idle_filter_state->CheckTimer()) { |
|
||||||
return Continue{}; |
|
||||||
} else { |
|
||||||
return absl::OkStatus(); |
|
||||||
} |
|
||||||
}); |
|
||||||
}); |
|
||||||
activity_ = MakeActivity(std::move(promise), ExecCtxWakeupScheduler{}, |
|
||||||
[channel_stack, this](absl::Status status) { |
|
||||||
if (status.ok()) CloseChannel(); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
void ChannelIdleFilter::CloseChannel() { |
|
||||||
auto* op = grpc_make_transport_op(nullptr); |
|
||||||
op->disconnect_with_error = grpc_error_set_int( |
|
||||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
|
||||||
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
|
||||||
// Pass the transport op down to the channel stack.
|
|
||||||
auto* elem = grpc_channel_stack_element(channel_stack_, 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
} |
|
||||||
|
|
||||||
const grpc_channel_filter grpc_client_idle_filter = |
|
||||||
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
|
||||||
"client_idle"); |
|
||||||
const grpc_channel_filter grpc_max_age_filter = |
|
||||||
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age"); |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { |
|
||||||
builder->channel_init()->RegisterStage( |
|
||||||
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
|
||||||
[](ChannelStackBuilder* builder) { |
|
||||||
const grpc_channel_args* channel_args = builder->channel_args(); |
|
||||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
|
||||||
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
|
||||||
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
builder->channel_init()->RegisterStage( |
|
||||||
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
|
||||||
[](ChannelStackBuilder* builder) { |
|
||||||
const grpc_channel_args* channel_args = builder->channel_args(); |
|
||||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
|
||||||
GetMaxAgeConfig(channel_args).enable()) { |
|
||||||
builder->PrependFilter( |
|
||||||
&grpc_max_age_filter, |
|
||||||
[](grpc_channel_stack*, grpc_channel_element* elem) { |
|
||||||
static_cast<MaxAgeFilter*>(elem->channel_data)->Start(); |
|
||||||
}); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
} |
|
||||||
} // namespace grpc_core
|
|
@ -0,0 +1,199 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2019 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <limits.h> |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_idle/idle_filter_state.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/channel/channel_stack_builder.h" |
||||||
|
#include "src/core/lib/channel/promise_based_filter.h" |
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
#include "src/core/lib/gprpp/capture.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
||||||
|
#include "src/core/lib/promise/loop.h" |
||||||
|
#include "src/core/lib/promise/sleep.h" |
||||||
|
#include "src/core/lib/promise/try_seq.h" |
||||||
|
#include "src/core/lib/transport/http2_errors.h" |
||||||
|
|
||||||
|
// TODO(juanlishen): The idle filter is disabled in client channel by default
|
||||||
|
// due to b/143502997. Try to fix the bug and enable the filter by default.
|
||||||
|
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
||||||
|
// The user input idle timeout smaller than this would be capped to it.
|
||||||
|
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
||||||
|
|
||||||
|
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
||||||
|
do { \
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
||||||
|
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
} while (0) |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
||||||
|
auto millis = std::max( |
||||||
|
grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
||||||
|
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
||||||
|
MIN_IDLE_TIMEOUT_MS); |
||||||
|
if (millis == INT_MAX) return Duration::Infinity(); |
||||||
|
return Duration::Milliseconds(millis); |
||||||
|
} |
||||||
|
|
||||||
|
class ClientIdleFilter : public ChannelFilter { |
||||||
|
public: |
||||||
|
static absl::StatusOr<ClientIdleFilter> Create( |
||||||
|
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
||||||
|
~ClientIdleFilter() override = default; |
||||||
|
|
||||||
|
ClientIdleFilter(const ClientIdleFilter&) = delete; |
||||||
|
ClientIdleFilter& operator=(const ClientIdleFilter&) = delete; |
||||||
|
ClientIdleFilter(ClientIdleFilter&&) = default; |
||||||
|
ClientIdleFilter& operator=(ClientIdleFilter&&) = default; |
||||||
|
|
||||||
|
// Construct a promise for one call.
|
||||||
|
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||||
|
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||||
|
|
||||||
|
bool StartTransportOp(grpc_transport_op* op) override; |
||||||
|
|
||||||
|
private: |
||||||
|
ClientIdleFilter(grpc_channel_stack* channel_stack, |
||||||
|
Duration client_idle_timeout) |
||||||
|
: channel_stack_(channel_stack), |
||||||
|
client_idle_timeout_(client_idle_timeout) {} |
||||||
|
|
||||||
|
void StartIdleTimer(); |
||||||
|
|
||||||
|
void IncreaseCallCount(); |
||||||
|
void DecreaseCallCount(); |
||||||
|
|
||||||
|
struct CallCountDecreaser { |
||||||
|
void operator()(ClientIdleFilter* filter) const { |
||||||
|
filter->DecreaseCallCount(); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
// The channel stack to which we take refs for pending callbacks.
|
||||||
|
grpc_channel_stack* channel_stack_; |
||||||
|
Duration client_idle_timeout_; |
||||||
|
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
||||||
|
std::make_shared<IdleFilterState>(false)}; |
||||||
|
|
||||||
|
ActivityPtr activity_; |
||||||
|
}; |
||||||
|
|
||||||
|
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
||||||
|
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||||
|
ClientIdleFilter filter(filter_args.channel_stack(), |
||||||
|
GetClientIdleTimeout(args)); |
||||||
|
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
||||||
|
} |
||||||
|
|
||||||
|
// Construct a promise for one call.
|
||||||
|
ArenaPromise<ServerMetadataHandle> ClientIdleFilter::MakeCallPromise( |
||||||
|
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||||
|
using Decrementer = std::unique_ptr<ClientIdleFilter, CallCountDecreaser>; |
||||||
|
IncreaseCallCount(); |
||||||
|
return ArenaPromise<ServerMetadataHandle>( |
||||||
|
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
||||||
|
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
||||||
|
Decrementer(this), next_promise_factory(std::move(call_args)))); |
||||||
|
} |
||||||
|
|
||||||
|
bool ClientIdleFilter::StartTransportOp(grpc_transport_op* op) { |
||||||
|
// Catch the disconnect_with_error transport op.
|
||||||
|
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
||||||
|
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
||||||
|
// being reset by other threads.
|
||||||
|
IncreaseCallCount(); |
||||||
|
activity_.reset(); |
||||||
|
} |
||||||
|
// Pass the op to the next filter.
|
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
void ClientIdleFilter::IncreaseCallCount() { |
||||||
|
idle_filter_state_->IncreaseCallCount(); |
||||||
|
} |
||||||
|
|
||||||
|
void ClientIdleFilter::DecreaseCallCount() { |
||||||
|
if (idle_filter_state_->DecreaseCallCount()) { |
||||||
|
// If there are no more calls in progress, start the idle timer.
|
||||||
|
StartIdleTimer(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void ClientIdleFilter::StartIdleTimer() { |
||||||
|
GRPC_IDLE_FILTER_LOG("timer has started"); |
||||||
|
auto idle_filter_state = idle_filter_state_; |
||||||
|
// Hold a ref to the channel stack for the timer callback.
|
||||||
|
auto channel_stack = channel_stack_->Ref(); |
||||||
|
auto timeout = client_idle_timeout_; |
||||||
|
auto promise = Loop([timeout, idle_filter_state]() { |
||||||
|
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
||||||
|
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
||||||
|
if (idle_filter_state->CheckTimer()) { |
||||||
|
return Continue{}; |
||||||
|
} else { |
||||||
|
return absl::OkStatus(); |
||||||
|
} |
||||||
|
}); |
||||||
|
}); |
||||||
|
activity_ = MakeActivity( |
||||||
|
std::move(promise), ExecCtxWakeupScheduler{}, |
||||||
|
[channel_stack](absl::Status status) { |
||||||
|
if (!status.ok()) return; |
||||||
|
auto* op = grpc_make_transport_op(nullptr); |
||||||
|
op->disconnect_with_error = grpc_error_set_int( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
||||||
|
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
||||||
|
// Pass the transport op down to the channel stack.
|
||||||
|
auto* elem = grpc_channel_stack_element(channel_stack.get(), 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
const grpc_channel_filter grpc_client_idle_filter = |
||||||
|
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
||||||
|
"client_idle"); |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void RegisterClientIdleFilter(CoreConfiguration::Builder* builder) { |
||||||
|
builder->channel_init()->RegisterStage( |
||||||
|
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||||
|
[](ChannelStackBuilder* builder) { |
||||||
|
const grpc_channel_args* channel_args = builder->channel_args(); |
||||||
|
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||||
|
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
||||||
|
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}); |
||||||
|
} |
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,566 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2017 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/max_age/max_age_filter.h" |
||||||
|
|
||||||
|
#include <limits.h> |
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/channel/channel_stack_builder.h" |
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
#include "src/core/lib/transport/http2_errors.h" |
||||||
|
|
||||||
|
/* If these settings change, make sure that we are not sending a GOAWAY for
|
||||||
|
* inproc transport, since a GOAWAY to inproc ends up destroying the transport. |
||||||
|
*/ |
||||||
|
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
||||||
|
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
||||||
|
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
||||||
|
#define MAX_CONNECTION_AGE_JITTER 0.1 |
||||||
|
|
||||||
|
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
||||||
|
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
||||||
|
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
||||||
|
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
||||||
|
|
||||||
|
/* States for idle_state in channel_data */ |
||||||
|
#define MAX_IDLE_STATE_INIT ((gpr_atm)0) |
||||||
|
#define MAX_IDLE_STATE_SEEN_EXIT_IDLE ((gpr_atm)1) |
||||||
|
#define MAX_IDLE_STATE_SEEN_ENTER_IDLE ((gpr_atm)2) |
||||||
|
#define MAX_IDLE_STATE_TIMER_SET ((gpr_atm)3) |
||||||
|
|
||||||
|
namespace { |
||||||
|
struct channel_data { |
||||||
|
/* The channel stack to which we take refs for pending callbacks. */ |
||||||
|
grpc_channel_stack* channel_stack; |
||||||
|
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
|
||||||
|
and max_age_grace_timer_pending */ |
||||||
|
grpc_core::Mutex max_age_timer_mu; |
||||||
|
/* True if the max_age timer callback is currently pending */ |
||||||
|
bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
||||||
|
/* True if the max_age_grace timer callback is currently pending */ |
||||||
|
bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
||||||
|
/* The timer for checking if the channel has reached its max age */ |
||||||
|
grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu); |
||||||
|
/* The timer for checking if the max-aged channel has uesed up the grace
|
||||||
|
period */ |
||||||
|
grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu); |
||||||
|
/* The timer for checking if the channel's idle duration reaches
|
||||||
|
max_connection_idle */ |
||||||
|
grpc_timer max_idle_timer; |
||||||
|
/* Allowed max time a channel may have no outstanding rpcs */ |
||||||
|
grpc_core::Duration max_connection_idle; |
||||||
|
/* Allowed max time a channel may exist */ |
||||||
|
grpc_core::Duration max_connection_age; |
||||||
|
/* Allowed grace period after the channel reaches its max age */ |
||||||
|
grpc_core::Duration max_connection_age_grace; |
||||||
|
/* Closure to run when the channel's idle duration reaches max_connection_idle
|
||||||
|
and should be closed gracefully */ |
||||||
|
grpc_closure max_idle_timer_cb; |
||||||
|
/* Closure to run when the channel reaches its max age and should be closed
|
||||||
|
gracefully */ |
||||||
|
grpc_closure close_max_age_channel; |
||||||
|
/* Closure to run the channel uses up its max age grace time and should be
|
||||||
|
closed forcibly */ |
||||||
|
grpc_closure force_close_max_age_channel; |
||||||
|
/* Closure to run when the init fo channel stack is done and the max_idle
|
||||||
|
timer should be started */ |
||||||
|
grpc_closure start_max_idle_timer_after_init; |
||||||
|
/* Closure to run when the init fo channel stack is done and the max_age timer
|
||||||
|
should be started */ |
||||||
|
grpc_closure start_max_age_timer_after_init; |
||||||
|
/* Closure to run when the goaway op is finished and the max_age_timer */ |
||||||
|
grpc_closure start_max_age_grace_timer_after_goaway_op; |
||||||
|
/* Number of active calls */ |
||||||
|
gpr_atm call_count; |
||||||
|
/* TODO(zyc): C++lize this state machine */ |
||||||
|
/* 'idle_state' holds the states of max_idle_timer and channel idleness.
|
||||||
|
It can contain one of the following values: |
||||||
|
+--------------------------------+----------------+---------+ |
||||||
|
| idle_state | max_idle_timer | channel | |
||||||
|
+--------------------------------+----------------+---------+ |
||||||
|
| MAX_IDLE_STATE_INIT | unset | busy | |
||||||
|
| MAX_IDLE_STATE_TIMER_SET | set, valid | idle | |
||||||
|
| MAX_IDLE_STATE_SEEN_EXIT_IDLE | set, invalid | busy | |
||||||
|
| MAX_IDLE_STATE_SEEN_ENTER_IDLE | set, invalid | idle | |
||||||
|
+--------------------------------+----------------+---------+ |
||||||
|
|
||||||
|
MAX_IDLE_STATE_INIT: The initial and final state of 'idle_state'. The |
||||||
|
channel has 1 or 1+ active calls, and the timer is not set. Note that |
||||||
|
we may put a virtual call to hold this state at channel initialization or |
||||||
|
shutdown, so that the channel won't enter other states. |
||||||
|
|
||||||
|
MAX_IDLE_STATE_TIMER_SET: The state after the timer is set and no calls |
||||||
|
have arrived after the timer is set. The channel must have 0 active call in |
||||||
|
this state. If the timer is fired in this state, we will close the channel |
||||||
|
due to idleness. |
||||||
|
|
||||||
|
MAX_IDLE_STATE_SEEN_EXIT_IDLE: The state after the timer is set and at |
||||||
|
least one call has arrived after the timer is set. The channel must have 1 |
||||||
|
or 1+ active calls in this state. If the timer is fired in this state, we |
||||||
|
won't reschudle it. |
||||||
|
|
||||||
|
MAX_IDLE_STATE_SEEN_ENTER_IDLE: The state after the timer is set and the at |
||||||
|
least one call has arrived after the timer is set, BUT the channel |
||||||
|
currently has 0 active calls. If the timer is fired in this state, we will |
||||||
|
reschudle it. |
||||||
|
|
||||||
|
max_idle_timer will not be cancelled (unless the channel is shutting down). |
||||||
|
If the timer callback is called when the max_idle_timer is valid (i.e. |
||||||
|
idle_state is MAX_IDLE_STATE_TIMER_SET), the channel will be closed due to |
||||||
|
idleness, otherwise the channel won't be changed. |
||||||
|
|
||||||
|
State transitions: |
||||||
|
MAX_IDLE_STATE_INIT <-------3------ MAX_IDLE_STATE_SEEN_EXIT_IDLE |
||||||
|
^ | ^ ^ | |
||||||
|
| | | | | |
||||||
|
1 2 +-----------4------------+ 6 7 |
||||||
|
| | | | | |
||||||
|
| v | | v |
||||||
|
MAX_IDLE_STATE_TIMER_SET <----5------ MAX_IDLE_STATE_SEEN_ENTER_IDLE |
||||||
|
|
||||||
|
For 1, 3, 5 : See max_idle_timer_cb() function |
||||||
|
For 2, 7 : See decrease_call_count() function |
||||||
|
For 4, 6 : See increase_call_count() function */ |
||||||
|
gpr_atm idle_state; |
||||||
|
/* Time when the channel finished its last outstanding call, in
|
||||||
|
* grpc_core::Timestamp */ |
||||||
|
gpr_atm last_enter_idle_time_millis; |
||||||
|
}; |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
/* Increase the nubmer of active calls. Before the increasement, if there are no
|
||||||
|
calls, the max_idle_timer should be cancelled. */ |
||||||
|
static void increase_call_count(channel_data* chand) { |
||||||
|
/* Exit idle */ |
||||||
|
if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) { |
||||||
|
while (true) { |
||||||
|
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||||
|
switch (idle_state) { |
||||||
|
case MAX_IDLE_STATE_TIMER_SET: |
||||||
|
/* max_idle_timer_cb may have already set idle_state to
|
||||||
|
MAX_IDLE_STATE_INIT, in this case, we don't need to set it to |
||||||
|
MAX_IDLE_STATE_SEEN_EXIT_IDLE */ |
||||||
|
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET, |
||||||
|
MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
||||||
|
return; |
||||||
|
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
||||||
|
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
||||||
|
return; |
||||||
|
default: |
||||||
|
/* try again */ |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/* Decrease the nubmer of active calls. After the decrement, if there are no
|
||||||
|
calls, the max_idle_timer should be started. */ |
||||||
|
static void decrease_call_count(channel_data* chand) { |
||||||
|
/* Enter idle */ |
||||||
|
if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { |
||||||
|
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, |
||||||
|
(gpr_atm)grpc_core::ExecCtx::Get() |
||||||
|
->Now() |
||||||
|
.milliseconds_after_process_epoch()); |
||||||
|
while (true) { |
||||||
|
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||||
|
switch (idle_state) { |
||||||
|
case MAX_IDLE_STATE_INIT: |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||||
|
"max_age max_idle_timer"); |
||||||
|
grpc_timer_init( |
||||||
|
&chand->max_idle_timer, |
||||||
|
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_idle, |
||||||
|
&chand->max_idle_timer_cb); |
||||||
|
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET); |
||||||
|
return; |
||||||
|
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
||||||
|
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
||||||
|
MAX_IDLE_STATE_SEEN_ENTER_IDLE)) { |
||||||
|
return; |
||||||
|
} |
||||||
|
break; |
||||||
|
default: |
||||||
|
/* try again */ |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void start_max_idle_timer_after_init(void* arg, |
||||||
|
grpc_error_handle /*error*/) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
/* Decrease call_count. If there are no active calls at this time,
|
||||||
|
max_idle_timer will start here. If the number of active calls is not 0, |
||||||
|
max_idle_timer will start after all the active calls end. */ |
||||||
|
decrease_call_count(chand); |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||||
|
"max_age start_max_idle_timer_after_init"); |
||||||
|
} |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
||||||
|
public: |
||||||
|
explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { |
||||||
|
GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch"); |
||||||
|
} |
||||||
|
|
||||||
|
~ConnectivityWatcher() override { |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch"); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||||
|
const absl::Status& /* status */) override { |
||||||
|
if (new_state != GRPC_CHANNEL_SHUTDOWN) return; |
||||||
|
{ |
||||||
|
MutexLock lock(&chand_->max_age_timer_mu); |
||||||
|
if (chand_->max_age_timer_pending) { |
||||||
|
grpc_timer_cancel(&chand_->max_age_timer); |
||||||
|
chand_->max_age_timer_pending = false; |
||||||
|
} |
||||||
|
if (chand_->max_age_grace_timer_pending) { |
||||||
|
grpc_timer_cancel(&chand_->max_age_grace_timer); |
||||||
|
chand_->max_age_grace_timer_pending = false; |
||||||
|
} |
||||||
|
} |
||||||
|
/* If there are no active calls, this increasement will cancel
|
||||||
|
max_idle_timer, and prevent max_idle_timer from being started in the |
||||||
|
future. */ |
||||||
|
increase_call_count(chand_); |
||||||
|
if (gpr_atm_acq_load(&chand_->idle_state) == |
||||||
|
MAX_IDLE_STATE_SEEN_EXIT_IDLE) { |
||||||
|
grpc_timer_cancel(&chand_->max_idle_timer); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
channel_data* chand_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
static void start_max_age_timer_after_init(void* arg, |
||||||
|
grpc_error_handle /*error*/) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
{ |
||||||
|
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||||
|
chand->max_age_timer_pending = true; |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); |
||||||
|
grpc_timer_init( |
||||||
|
&chand->max_age_timer, |
||||||
|
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, |
||||||
|
&chand->close_max_age_channel); |
||||||
|
} |
||||||
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||||
|
op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand)); |
||||||
|
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
||||||
|
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||||
|
"max_age start_max_age_timer_after_init"); |
||||||
|
} |
||||||
|
|
||||||
|
static void start_max_age_grace_timer_after_goaway_op( |
||||||
|
void* arg, grpc_error_handle /*error*/) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
{ |
||||||
|
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||||
|
chand->max_age_grace_timer_pending = true; |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); |
||||||
|
grpc_timer_init( |
||||||
|
&chand->max_age_grace_timer, |
||||||
|
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace, |
||||||
|
&chand->force_close_max_age_channel); |
||||||
|
} |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||||
|
"max_age start_max_age_grace_timer_after_goaway_op"); |
||||||
|
} |
||||||
|
|
||||||
|
static void close_max_idle_channel(channel_data* chand) { |
||||||
|
/* Prevent the max idle timer from being set again */ |
||||||
|
gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); |
||||||
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||||
|
op->goaway_error = |
||||||
|
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"), |
||||||
|
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||||
|
grpc_channel_element* elem = |
||||||
|
grpc_channel_stack_element(chand->channel_stack, 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
} |
||||||
|
|
||||||
|
static void max_idle_timer_cb(void* arg, grpc_error_handle error) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
bool try_again = true; |
||||||
|
while (try_again) { |
||||||
|
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||||
|
switch (idle_state) { |
||||||
|
case MAX_IDLE_STATE_TIMER_SET: |
||||||
|
close_max_idle_channel(chand); |
||||||
|
/* This MAX_IDLE_STATE_INIT is a final state, we don't have to check
|
||||||
|
* if idle_state has been changed */ |
||||||
|
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_INIT); |
||||||
|
try_again = false; |
||||||
|
break; |
||||||
|
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
||||||
|
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
||||||
|
MAX_IDLE_STATE_INIT)) { |
||||||
|
try_again = false; |
||||||
|
} |
||||||
|
break; |
||||||
|
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||||
|
"max_age max_idle_timer"); |
||||||
|
grpc_timer_init( |
||||||
|
&chand->max_idle_timer, |
||||||
|
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||||
|
gpr_atm_no_barrier_load( |
||||||
|
&chand->last_enter_idle_time_millis)) + |
||||||
|
chand->max_connection_idle, |
||||||
|
&chand->max_idle_timer_cb); |
||||||
|
/* idle_state may have already been set to
|
||||||
|
MAX_IDLE_STATE_SEEN_EXIT_IDLE by increase_call_count(), in this |
||||||
|
case, we don't need to set it to MAX_IDLE_STATE_TIMER_SET */ |
||||||
|
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_ENTER_IDLE, |
||||||
|
MAX_IDLE_STATE_TIMER_SET); |
||||||
|
try_again = false; |
||||||
|
break; |
||||||
|
default: |
||||||
|
/* try again */ |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_idle_timer"); |
||||||
|
} |
||||||
|
|
||||||
|
static void close_max_age_channel(void* arg, grpc_error_handle error) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
{ |
||||||
|
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||||
|
chand->max_age_timer_pending = false; |
||||||
|
} |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||||
|
"max_age start_max_age_grace_timer_after_goaway_op"); |
||||||
|
grpc_transport_op* op = grpc_make_transport_op( |
||||||
|
&chand->start_max_age_grace_timer_after_goaway_op); |
||||||
|
op->goaway_error = |
||||||
|
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
||||||
|
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||||
|
grpc_channel_element* elem = |
||||||
|
grpc_channel_stack_element(chand->channel_stack, 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
} else if (error != GRPC_ERROR_CANCELLED) { |
||||||
|
GRPC_LOG_IF_ERROR("close_max_age_channel", error); |
||||||
|
} |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_timer"); |
||||||
|
} |
||||||
|
|
||||||
|
static void force_close_max_age_channel(void* arg, grpc_error_handle error) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(arg); |
||||||
|
{ |
||||||
|
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||||
|
chand->max_age_grace_timer_pending = false; |
||||||
|
} |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||||
|
op->disconnect_with_error = |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age"); |
||||||
|
grpc_channel_element* elem = |
||||||
|
grpc_channel_stack_element(chand->channel_stack, 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
} else if (error != GRPC_ERROR_CANCELLED) { |
||||||
|
GRPC_LOG_IF_ERROR("force_close_max_age_channel", error); |
||||||
|
} |
||||||
|
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer"); |
||||||
|
} |
||||||
|
|
||||||
|
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
||||||
|
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
||||||
|
would not create connection storms by itself, but if there happened to be a |
||||||
|
connection storm it could cause it to repeat at a fixed period. */ |
||||||
|
static grpc_core::Duration |
||||||
|
add_random_max_connection_age_jitter_and_convert_to_duration(int value) { |
||||||
|
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
||||||
|
1 + MAX_CONNECTION_AGE_JITTER */ |
||||||
|
double multiplier = rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + |
||||||
|
1.0 - MAX_CONNECTION_AGE_JITTER; |
||||||
|
double result = multiplier * value; |
||||||
|
/* INT_MAX - 0.5 converts the value to float, so that result will not be
|
||||||
|
cast to int implicitly before the comparison. */ |
||||||
|
return result > (static_cast<double>( |
||||||
|
grpc_core::Duration::Infinity().millis())) - |
||||||
|
0.5 |
||||||
|
? grpc_core::Duration::Infinity() |
||||||
|
: grpc_core::Duration::Milliseconds(result); |
||||||
|
} |
||||||
|
|
||||||
|
/* Constructor for call_data. */ |
||||||
|
static grpc_error_handle max_age_init_call_elem( |
||||||
|
grpc_call_element* elem, const grpc_call_element_args* /*args*/) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||||
|
increase_call_count(chand); |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
/* Destructor for call_data. */ |
||||||
|
static void max_age_destroy_call_elem( |
||||||
|
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, |
||||||
|
grpc_closure* /*ignored*/) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||||
|
decrease_call_count(chand); |
||||||
|
} |
||||||
|
|
||||||
|
/* Constructor for channel_data. */ |
||||||
|
static grpc_error_handle max_age_init_channel_elem( |
||||||
|
grpc_channel_element* elem, grpc_channel_element_args* args) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||||
|
new (chand) channel_data(); |
||||||
|
chand->channel_stack = args->channel_stack; |
||||||
|
chand->max_connection_age = |
||||||
|
add_random_max_connection_age_jitter_and_convert_to_duration( |
||||||
|
DEFAULT_MAX_CONNECTION_AGE_MS); |
||||||
|
chand->max_connection_age_grace = |
||||||
|
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX |
||||||
|
? grpc_core::Duration::Infinity() |
||||||
|
: grpc_core::Duration::Milliseconds( |
||||||
|
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS); |
||||||
|
chand->max_connection_idle = |
||||||
|
DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX |
||||||
|
? grpc_core::Duration::Infinity() |
||||||
|
: grpc_core::Duration::Milliseconds(DEFAULT_MAX_CONNECTION_IDLE_MS); |
||||||
|
chand->idle_state = MAX_IDLE_STATE_INIT; |
||||||
|
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); |
||||||
|
for (size_t i = 0; i < args->channel_args->num_args; ++i) { |
||||||
|
if (0 == strcmp(args->channel_args->args[i].key, |
||||||
|
GRPC_ARG_MAX_CONNECTION_AGE_MS)) { |
||||||
|
const int value = grpc_channel_arg_get_integer( |
||||||
|
&args->channel_args->args[i], MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
||||||
|
chand->max_connection_age = |
||||||
|
add_random_max_connection_age_jitter_and_convert_to_duration(value); |
||||||
|
} else if (0 == strcmp(args->channel_args->args[i].key, |
||||||
|
GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { |
||||||
|
const int value = grpc_channel_arg_get_integer( |
||||||
|
&args->channel_args->args[i], |
||||||
|
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
||||||
|
chand->max_connection_age_grace = |
||||||
|
value == INT_MAX ? grpc_core::Duration::Infinity() |
||||||
|
: grpc_core::Duration::Milliseconds(value); |
||||||
|
} else if (0 == strcmp(args->channel_args->args[i].key, |
||||||
|
GRPC_ARG_MAX_CONNECTION_IDLE_MS)) { |
||||||
|
const int value = grpc_channel_arg_get_integer( |
||||||
|
&args->channel_args->args[i], MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
||||||
|
chand->max_connection_idle = |
||||||
|
value == INT_MAX ? grpc_core::Duration::Infinity() |
||||||
|
: grpc_core::Duration::Milliseconds(value); |
||||||
|
} |
||||||
|
} |
||||||
|
GRPC_CLOSURE_INIT(&chand->max_idle_timer_cb, max_idle_timer_cb, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&chand->close_max_age_channel, close_max_age_channel, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&chand->force_close_max_age_channel, |
||||||
|
force_close_max_age_channel, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&chand->start_max_idle_timer_after_init, |
||||||
|
start_max_idle_timer_after_init, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&chand->start_max_age_timer_after_init, |
||||||
|
start_max_age_timer_after_init, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, |
||||||
|
start_max_age_grace_timer_after_goaway_op, chand, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
|
||||||
|
if (chand->max_connection_age != grpc_core::Duration::Infinity()) { |
||||||
|
/* When the channel reaches its max age, we send down an op with
|
||||||
|
goaway_error set. However, we can't send down any ops until after the |
||||||
|
channel stack is fully initialized. If we start the timer here, we have |
||||||
|
no guarantee that the timer won't pop before channel stack initialization |
||||||
|
is finished. To avoid that problem, we create a closure to start the |
||||||
|
timer, and we schedule that closure to be run after call stack |
||||||
|
initialization is done. */ |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||||
|
"max_age start_max_age_timer_after_init"); |
||||||
|
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
||||||
|
&chand->start_max_age_timer_after_init, |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
|
||||||
|
/* Initialize the number of calls as 1, so that the max_idle_timer will not
|
||||||
|
start until start_max_idle_timer_after_init is invoked. */ |
||||||
|
gpr_atm_rel_store(&chand->call_count, 1); |
||||||
|
if (chand->max_connection_idle != grpc_core::Duration::Infinity()) { |
||||||
|
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||||
|
"max_age start_max_idle_timer_after_init"); |
||||||
|
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
||||||
|
&chand->start_max_idle_timer_after_init, |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
/* Destructor for channel_data. */ |
||||||
|
static void max_age_destroy_channel_elem(grpc_channel_element* elem) { |
||||||
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||||
|
chand->~channel_data(); |
||||||
|
} |
||||||
|
|
||||||
|
const grpc_channel_filter grpc_max_age_filter = { |
||||||
|
grpc_call_next_op, |
||||||
|
nullptr, |
||||||
|
grpc_channel_next_op, |
||||||
|
0, /* sizeof_call_data */ |
||||||
|
max_age_init_call_elem, |
||||||
|
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
||||||
|
max_age_destroy_call_elem, |
||||||
|
sizeof(channel_data), |
||||||
|
max_age_init_channel_elem, |
||||||
|
max_age_destroy_channel_elem, |
||||||
|
grpc_channel_next_get_info, |
||||||
|
"max_age"}; |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) { |
||||||
|
builder->channel_init()->RegisterStage( |
||||||
|
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||||
|
[](ChannelStackBuilder* builder) { |
||||||
|
const grpc_channel_args* channel_args = builder->channel_args(); |
||||||
|
bool enable = grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find( |
||||||
|
channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
||||||
|
MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || |
||||||
|
grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find( |
||||||
|
channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
||||||
|
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; |
||||||
|
if (enable) { |
||||||
|
builder->PrependFilter(&grpc_max_age_filter, nullptr); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}); |
||||||
|
} |
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,26 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2017 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_stack.h" |
||||||
|
|
||||||
|
extern const grpc_channel_filter grpc_max_age_filter; |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H */ |
Loading…
Reference in new issue