mirror of https://github.com/grpc/grpc.git
Reland max_age filter -> promises (#29127)
* TrySeq fix
* Revert "Revert "Revert "Revert "Convert max_age filter to promises (#28904)" (#28957)" (#28958)" (#29105)"
This reverts commit ff14d1d7f3
.
* Single set pointer
* Update single_set_ptr.h
* fix
* fix
* build
* fix race
* fix race
* Automated change: Fix sanity tests
Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29147/head^2
parent
a87173e87e
commit
ba41407ea6
23 changed files with 492 additions and 909 deletions
@ -0,0 +1,406 @@ |
|||||||
|
// Copyright 2022 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
|
||||||
|
// mock transport operations.
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <limits.h> |
||||||
|
#include <stdlib.h> |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include <limits> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/channel/channel_stack_builder.h" |
||||||
|
#include "src/core/lib/channel/promise_based_filter.h" |
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
#include "src/core/lib/gprpp/capture.h" |
||||||
|
#include "src/core/lib/gprpp/single_set_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
||||||
|
#include "src/core/lib/promise/loop.h" |
||||||
|
#include "src/core/lib/promise/sleep.h" |
||||||
|
#include "src/core/lib/promise/try_seq.h" |
||||||
|
#include "src/core/lib/transport/http2_errors.h" |
||||||
|
|
||||||
|
// TODO(juanlishen): The idle filter was disabled in client channel by default
|
||||||
|
// due to b/143502997. Now the bug is fixed enable the filter by default.
|
||||||
|
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
||||||
|
// The user input idle timeout smaller than this would be capped to it.
|
||||||
|
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
||||||
|
|
||||||
|
// If these settings change, make sure that we are not sending a GOAWAY for
|
||||||
|
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
|
||||||
|
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
||||||
|
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
||||||
|
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
||||||
|
#define MAX_CONNECTION_AGE_JITTER 0.1 |
||||||
|
|
||||||
|
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
||||||
|
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
||||||
|
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
||||||
|
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
||||||
|
|
||||||
|
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
||||||
|
do { \
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
||||||
|
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
||||||
|
} \
|
||||||
|
} while (0) |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
using SingleSetActivityPtr = |
||||||
|
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>; |
||||||
|
|
||||||
|
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
||||||
|
int ms = std::max( |
||||||
|
grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
||||||
|
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
||||||
|
MIN_IDLE_TIMEOUT_MS); |
||||||
|
return ms == INT_MAX ? Duration::Infinity() : Duration::Milliseconds(ms); |
||||||
|
} |
||||||
|
|
||||||
|
struct MaxAgeConfig { |
||||||
|
Duration max_connection_age; |
||||||
|
Duration max_connection_idle; |
||||||
|
Duration max_connection_age_grace; |
||||||
|
|
||||||
|
bool enable() const { |
||||||
|
return max_connection_age != Duration::Infinity() || |
||||||
|
max_connection_idle != Duration::Infinity(); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
||||||
|
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
||||||
|
would not create connection storms by itself, but if there happened to be a |
||||||
|
connection storm it could cause it to repeat at a fixed period. */ |
||||||
|
MaxAgeConfig GetMaxAgeConfig(const grpc_channel_args* args) { |
||||||
|
const int args_max_age = grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
||||||
|
MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
||||||
|
const int args_max_idle = grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
||||||
|
MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
||||||
|
const int args_max_age_grace = grpc_channel_arg_get_integer( |
||||||
|
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS), |
||||||
|
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
||||||
|
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
||||||
|
1 + MAX_CONNECTION_AGE_JITTER */ |
||||||
|
const double multiplier = |
||||||
|
rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + 1.0 - |
||||||
|
MAX_CONNECTION_AGE_JITTER; |
||||||
|
/* GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
|
||||||
|
will not be cast to int implicitly before the comparison. */ |
||||||
|
return MaxAgeConfig{ |
||||||
|
args_max_age == INT_MAX |
||||||
|
? Duration::Infinity() |
||||||
|
: Duration::FromSecondsAsDouble(multiplier * args_max_age / 1000.0), |
||||||
|
args_max_idle == INT_MAX ? Duration::Infinity() |
||||||
|
: Duration::Milliseconds(args_max_idle), |
||||||
|
Duration::Milliseconds(args_max_age_grace)}; |
||||||
|
} |
||||||
|
|
||||||
|
class ChannelIdleFilter : public ChannelFilter { |
||||||
|
public: |
||||||
|
~ChannelIdleFilter() override = default; |
||||||
|
|
||||||
|
ChannelIdleFilter(const ChannelIdleFilter&) = delete; |
||||||
|
ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete; |
||||||
|
ChannelIdleFilter(ChannelIdleFilter&&) = default; |
||||||
|
ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default; |
||||||
|
|
||||||
|
// Construct a promise for one call.
|
||||||
|
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||||
|
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||||
|
|
||||||
|
bool StartTransportOp(grpc_transport_op* op) override; |
||||||
|
|
||||||
|
protected: |
||||||
|
ChannelIdleFilter(grpc_channel_stack* channel_stack, |
||||||
|
Duration client_idle_timeout) |
||||||
|
: channel_stack_(channel_stack), |
||||||
|
client_idle_timeout_(client_idle_timeout) {} |
||||||
|
|
||||||
|
grpc_channel_stack* channel_stack() { return channel_stack_; }; |
||||||
|
|
||||||
|
virtual void Shutdown(); |
||||||
|
void CloseChannel(); |
||||||
|
|
||||||
|
void IncreaseCallCount(); |
||||||
|
void DecreaseCallCount(); |
||||||
|
|
||||||
|
private: |
||||||
|
void StartIdleTimer(); |
||||||
|
|
||||||
|
struct CallCountDecreaser { |
||||||
|
void operator()(ChannelIdleFilter* filter) const { |
||||||
|
filter->DecreaseCallCount(); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
// The channel stack to which we take refs for pending callbacks.
|
||||||
|
grpc_channel_stack* channel_stack_; |
||||||
|
Duration client_idle_timeout_; |
||||||
|
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
||||||
|
std::make_shared<IdleFilterState>(false)}; |
||||||
|
|
||||||
|
SingleSetActivityPtr activity_; |
||||||
|
}; |
||||||
|
|
||||||
|
class ClientIdleFilter final : public ChannelIdleFilter { |
||||||
|
public: |
||||||
|
static absl::StatusOr<ClientIdleFilter> Create( |
||||||
|
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
||||||
|
|
||||||
|
private: |
||||||
|
using ChannelIdleFilter::ChannelIdleFilter; |
||||||
|
}; |
||||||
|
|
||||||
|
class MaxAgeFilter final : public ChannelIdleFilter { |
||||||
|
public: |
||||||
|
static absl::StatusOr<MaxAgeFilter> Create(const grpc_channel_args* args, |
||||||
|
ChannelFilter::Args filter_args); |
||||||
|
|
||||||
|
void Start(); |
||||||
|
|
||||||
|
private: |
||||||
|
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
||||||
|
public: |
||||||
|
explicit ConnectivityWatcher(MaxAgeFilter* filter) |
||||||
|
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} |
||||||
|
~ConnectivityWatcher() override = default; |
||||||
|
|
||||||
|
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||||
|
const absl::Status&) override { |
||||||
|
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<grpc_channel_stack> channel_stack_; |
||||||
|
MaxAgeFilter* filter_; |
||||||
|
}; |
||||||
|
|
||||||
|
MaxAgeFilter(grpc_channel_stack* channel_stack, |
||||||
|
const MaxAgeConfig& max_age_config) |
||||||
|
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), |
||||||
|
max_connection_age_(max_age_config.max_connection_age), |
||||||
|
max_connection_age_grace_(max_age_config.max_connection_age_grace) {} |
||||||
|
|
||||||
|
void Shutdown() override; |
||||||
|
|
||||||
|
SingleSetActivityPtr max_age_activity_; |
||||||
|
Duration max_connection_age_; |
||||||
|
Duration max_connection_age_grace_; |
||||||
|
}; |
||||||
|
|
||||||
|
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
||||||
|
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||||
|
ClientIdleFilter filter(filter_args.channel_stack(), |
||||||
|
GetClientIdleTimeout(args)); |
||||||
|
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
||||||
|
} |
||||||
|
|
||||||
|
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create( |
||||||
|
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||||
|
const auto config = GetMaxAgeConfig(args); |
||||||
|
MaxAgeFilter filter(filter_args.channel_stack(), config); |
||||||
|
return absl::StatusOr<MaxAgeFilter>(std::move(filter)); |
||||||
|
} |
||||||
|
|
||||||
|
void MaxAgeFilter::Shutdown() { |
||||||
|
max_age_activity_.Reset(); |
||||||
|
ChannelIdleFilter::Shutdown(); |
||||||
|
} |
||||||
|
|
||||||
|
void MaxAgeFilter::Start() { |
||||||
|
// Trigger idle timer immediately
|
||||||
|
IncreaseCallCount(); |
||||||
|
DecreaseCallCount(); |
||||||
|
|
||||||
|
struct StartupClosure { |
||||||
|
RefCountedPtr<grpc_channel_stack> channel_stack; |
||||||
|
MaxAgeFilter* filter; |
||||||
|
grpc_closure closure; |
||||||
|
}; |
||||||
|
auto run_startup = [](void* p, grpc_error_handle) { |
||||||
|
auto* startup = static_cast<StartupClosure*>(p); |
||||||
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||||
|
op->start_connectivity_watch.reset( |
||||||
|
new ConnectivityWatcher(startup->filter)); |
||||||
|
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
||||||
|
grpc_channel_next_op( |
||||||
|
grpc_channel_stack_element(startup->channel_stack.get(), 0), op); |
||||||
|
delete startup; |
||||||
|
}; |
||||||
|
auto* startup = |
||||||
|
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; |
||||||
|
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); |
||||||
|
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, GRPC_ERROR_NONE); |
||||||
|
|
||||||
|
auto channel_stack = this->channel_stack()->Ref(); |
||||||
|
|
||||||
|
// Start the max age timer
|
||||||
|
if (max_connection_age_ != Duration::Infinity()) { |
||||||
|
max_age_activity_.Set(MakeActivity( |
||||||
|
TrySeq( |
||||||
|
// First sleep until the max connection age
|
||||||
|
Sleep(ExecCtx::Get()->Now() + max_connection_age_), |
||||||
|
// Then send a goaway.
|
||||||
|
[this] { |
||||||
|
GRPC_CHANNEL_STACK_REF(this->channel_stack(), |
||||||
|
"max_age send_goaway"); |
||||||
|
// Jump out of the activity to send the goaway.
|
||||||
|
auto fn = [](void* arg, grpc_error_handle) { |
||||||
|
auto* channel_stack = static_cast<grpc_channel_stack*>(arg); |
||||||
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||||
|
op->goaway_error = grpc_error_set_int( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
||||||
|
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||||
|
grpc_channel_element* elem = |
||||||
|
grpc_channel_stack_element(channel_stack, 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); |
||||||
|
}; |
||||||
|
ExecCtx::Run( |
||||||
|
DEBUG_LOCATION, |
||||||
|
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
return Immediate(absl::OkStatus()); |
||||||
|
}, |
||||||
|
// Sleep for the grace period
|
||||||
|
[this] { |
||||||
|
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); |
||||||
|
}), |
||||||
|
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { |
||||||
|
// OnDone -- close the connection if the promise completed
|
||||||
|
// successfully.
|
||||||
|
// (if it did not, it was cancelled)
|
||||||
|
if (status.ok()) CloseChannel(); |
||||||
|
})); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Construct a promise for one call.
|
||||||
|
ArenaPromise<ServerMetadataHandle> ChannelIdleFilter::MakeCallPromise( |
||||||
|
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||||
|
using Decrementer = std::unique_ptr<ChannelIdleFilter, CallCountDecreaser>; |
||||||
|
IncreaseCallCount(); |
||||||
|
return ArenaPromise<ServerMetadataHandle>( |
||||||
|
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
||||||
|
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
||||||
|
Decrementer(this), next_promise_factory(std::move(call_args)))); |
||||||
|
} |
||||||
|
|
||||||
|
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { |
||||||
|
// Catch the disconnect_with_error transport op.
|
||||||
|
if (op->disconnect_with_error != GRPC_ERROR_NONE) Shutdown(); |
||||||
|
// Pass the op to the next filter.
|
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
void ChannelIdleFilter::Shutdown() { |
||||||
|
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
||||||
|
// being reset by other threads.
|
||||||
|
IncreaseCallCount(); |
||||||
|
activity_.Reset(); |
||||||
|
} |
||||||
|
|
||||||
|
void ChannelIdleFilter::IncreaseCallCount() { |
||||||
|
idle_filter_state_->IncreaseCallCount(); |
||||||
|
} |
||||||
|
|
||||||
|
void ChannelIdleFilter::DecreaseCallCount() { |
||||||
|
if (idle_filter_state_->DecreaseCallCount()) { |
||||||
|
// If there are no more calls in progress, start the idle timer.
|
||||||
|
StartIdleTimer(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void ChannelIdleFilter::StartIdleTimer() { |
||||||
|
GRPC_IDLE_FILTER_LOG("timer has started"); |
||||||
|
auto idle_filter_state = idle_filter_state_; |
||||||
|
// Hold a ref to the channel stack for the timer callback.
|
||||||
|
auto channel_stack = channel_stack_->Ref(); |
||||||
|
auto timeout = client_idle_timeout_; |
||||||
|
auto promise = Loop([timeout, idle_filter_state]() { |
||||||
|
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
||||||
|
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
||||||
|
if (idle_filter_state->CheckTimer()) { |
||||||
|
return Continue{}; |
||||||
|
} else { |
||||||
|
return absl::OkStatus(); |
||||||
|
} |
||||||
|
}); |
||||||
|
}); |
||||||
|
activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{}, |
||||||
|
[channel_stack, this](absl::Status status) { |
||||||
|
if (status.ok()) CloseChannel(); |
||||||
|
})); |
||||||
|
} |
||||||
|
|
||||||
|
void ChannelIdleFilter::CloseChannel() { |
||||||
|
auto* op = grpc_make_transport_op(nullptr); |
||||||
|
op->disconnect_with_error = grpc_error_set_int( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
||||||
|
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
||||||
|
// Pass the transport op down to the channel stack.
|
||||||
|
auto* elem = grpc_channel_stack_element(channel_stack_, 0); |
||||||
|
elem->filter->start_transport_op(elem, op); |
||||||
|
} |
||||||
|
|
||||||
|
const grpc_channel_filter grpc_client_idle_filter = |
||||||
|
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
||||||
|
"client_idle"); |
||||||
|
const grpc_channel_filter grpc_max_age_filter = |
||||||
|
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age"); |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { |
||||||
|
builder->channel_init()->RegisterStage( |
||||||
|
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||||
|
[](ChannelStackBuilder* builder) { |
||||||
|
const grpc_channel_args* channel_args = builder->channel_args(); |
||||||
|
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||||
|
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
||||||
|
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}); |
||||||
|
builder->channel_init()->RegisterStage( |
||||||
|
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||||
|
[](ChannelStackBuilder* builder) { |
||||||
|
const grpc_channel_args* channel_args = builder->channel_args(); |
||||||
|
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||||
|
GetMaxAgeConfig(channel_args).enable()) { |
||||||
|
builder->PrependFilter( |
||||||
|
&grpc_max_age_filter, |
||||||
|
[](grpc_channel_stack*, grpc_channel_element* elem) { |
||||||
|
static_cast<MaxAgeFilter*>(elem->channel_data)->Start(); |
||||||
|
}); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}); |
||||||
|
} |
||||||
|
} // namespace grpc_core
|
@ -1,199 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2019 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <limits.h> |
|
||||||
|
|
||||||
#include <atomic> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_idle/idle_filter_state.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_stack_builder.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/config/core_configuration.h" |
|
||||||
#include "src/core/lib/gprpp/capture.h" |
|
||||||
#include "src/core/lib/iomgr/timer.h" |
|
||||||
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
|
||||||
#include "src/core/lib/promise/loop.h" |
|
||||||
#include "src/core/lib/promise/sleep.h" |
|
||||||
#include "src/core/lib/promise/try_seq.h" |
|
||||||
#include "src/core/lib/transport/http2_errors.h" |
|
||||||
|
|
||||||
// TODO(juanlishen): The idle filter is disabled in client channel by default
|
|
||||||
// due to b/143502997. Try to fix the bug and enable the filter by default.
|
|
||||||
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
|
||||||
// The user input idle timeout smaller than this would be capped to it.
|
|
||||||
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
|
||||||
|
|
||||||
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
|
||||||
do { \
|
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
|
||||||
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
} while (0) |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
|
||||||
auto millis = std::max( |
|
||||||
grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
|
||||||
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
|
||||||
MIN_IDLE_TIMEOUT_MS); |
|
||||||
if (millis == INT_MAX) return Duration::Infinity(); |
|
||||||
return Duration::Milliseconds(millis); |
|
||||||
} |
|
||||||
|
|
||||||
class ClientIdleFilter : public ChannelFilter { |
|
||||||
public: |
|
||||||
static absl::StatusOr<ClientIdleFilter> Create( |
|
||||||
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
|
||||||
~ClientIdleFilter() override = default; |
|
||||||
|
|
||||||
ClientIdleFilter(const ClientIdleFilter&) = delete; |
|
||||||
ClientIdleFilter& operator=(const ClientIdleFilter&) = delete; |
|
||||||
ClientIdleFilter(ClientIdleFilter&&) = default; |
|
||||||
ClientIdleFilter& operator=(ClientIdleFilter&&) = default; |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
|
||||||
|
|
||||||
bool StartTransportOp(grpc_transport_op* op) override; |
|
||||||
|
|
||||||
private: |
|
||||||
ClientIdleFilter(grpc_channel_stack* channel_stack, |
|
||||||
Duration client_idle_timeout) |
|
||||||
: channel_stack_(channel_stack), |
|
||||||
client_idle_timeout_(client_idle_timeout) {} |
|
||||||
|
|
||||||
void StartIdleTimer(); |
|
||||||
|
|
||||||
void IncreaseCallCount(); |
|
||||||
void DecreaseCallCount(); |
|
||||||
|
|
||||||
struct CallCountDecreaser { |
|
||||||
void operator()(ClientIdleFilter* filter) const { |
|
||||||
filter->DecreaseCallCount(); |
|
||||||
} |
|
||||||
}; |
|
||||||
|
|
||||||
// The channel stack to which we take refs for pending callbacks.
|
|
||||||
grpc_channel_stack* channel_stack_; |
|
||||||
Duration client_idle_timeout_; |
|
||||||
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
|
||||||
std::make_shared<IdleFilterState>(false)}; |
|
||||||
|
|
||||||
ActivityPtr activity_; |
|
||||||
}; |
|
||||||
|
|
||||||
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
|
||||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
|
||||||
ClientIdleFilter filter(filter_args.channel_stack(), |
|
||||||
GetClientIdleTimeout(args)); |
|
||||||
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
|
||||||
} |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> ClientIdleFilter::MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
|
||||||
using Decrementer = std::unique_ptr<ClientIdleFilter, CallCountDecreaser>; |
|
||||||
IncreaseCallCount(); |
|
||||||
return ArenaPromise<ServerMetadataHandle>( |
|
||||||
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
|
||||||
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
|
||||||
Decrementer(this), next_promise_factory(std::move(call_args)))); |
|
||||||
} |
|
||||||
|
|
||||||
bool ClientIdleFilter::StartTransportOp(grpc_transport_op* op) { |
|
||||||
// Catch the disconnect_with_error transport op.
|
|
||||||
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
|
||||||
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
|
||||||
// being reset by other threads.
|
|
||||||
IncreaseCallCount(); |
|
||||||
activity_.reset(); |
|
||||||
} |
|
||||||
// Pass the op to the next filter.
|
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
void ClientIdleFilter::IncreaseCallCount() { |
|
||||||
idle_filter_state_->IncreaseCallCount(); |
|
||||||
} |
|
||||||
|
|
||||||
void ClientIdleFilter::DecreaseCallCount() { |
|
||||||
if (idle_filter_state_->DecreaseCallCount()) { |
|
||||||
// If there are no more calls in progress, start the idle timer.
|
|
||||||
StartIdleTimer(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void ClientIdleFilter::StartIdleTimer() { |
|
||||||
GRPC_IDLE_FILTER_LOG("timer has started"); |
|
||||||
auto idle_filter_state = idle_filter_state_; |
|
||||||
// Hold a ref to the channel stack for the timer callback.
|
|
||||||
auto channel_stack = channel_stack_->Ref(); |
|
||||||
auto timeout = client_idle_timeout_; |
|
||||||
auto promise = Loop([timeout, idle_filter_state]() { |
|
||||||
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
|
||||||
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
|
||||||
if (idle_filter_state->CheckTimer()) { |
|
||||||
return Continue{}; |
|
||||||
} else { |
|
||||||
return absl::OkStatus(); |
|
||||||
} |
|
||||||
}); |
|
||||||
}); |
|
||||||
activity_ = MakeActivity( |
|
||||||
std::move(promise), ExecCtxWakeupScheduler{}, |
|
||||||
[channel_stack](absl::Status status) { |
|
||||||
if (!status.ok()) return; |
|
||||||
auto* op = grpc_make_transport_op(nullptr); |
|
||||||
op->disconnect_with_error = grpc_error_set_int( |
|
||||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
|
||||||
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
|
||||||
// Pass the transport op down to the channel stack.
|
|
||||||
auto* elem = grpc_channel_stack_element(channel_stack.get(), 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
const grpc_channel_filter grpc_client_idle_filter = |
|
||||||
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
|
||||||
"client_idle"); |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
void RegisterClientIdleFilter(CoreConfiguration::Builder* builder) { |
|
||||||
builder->channel_init()->RegisterStage( |
|
||||||
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
|
||||||
[](ChannelStackBuilder* builder) { |
|
||||||
const grpc_channel_args* channel_args = builder->channel_args(); |
|
||||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
|
||||||
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
|
||||||
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
} |
|
||||||
} // namespace grpc_core
|
|
@ -1,566 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2017 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/max_age/max_age_filter.h" |
|
||||||
|
|
||||||
#include <limits.h> |
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_stack_builder.h" |
|
||||||
#include "src/core/lib/config/core_configuration.h" |
|
||||||
#include "src/core/lib/iomgr/timer.h" |
|
||||||
#include "src/core/lib/transport/http2_errors.h" |
|
||||||
|
|
||||||
/* If these settings change, make sure that we are not sending a GOAWAY for
|
|
||||||
* inproc transport, since a GOAWAY to inproc ends up destroying the transport. |
|
||||||
*/ |
|
||||||
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
|
||||||
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
|
||||||
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
|
||||||
#define MAX_CONNECTION_AGE_JITTER 0.1 |
|
||||||
|
|
||||||
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
|
||||||
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
|
||||||
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
|
||||||
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
|
||||||
|
|
||||||
/* States for idle_state in channel_data */ |
|
||||||
#define MAX_IDLE_STATE_INIT ((gpr_atm)0) |
|
||||||
#define MAX_IDLE_STATE_SEEN_EXIT_IDLE ((gpr_atm)1) |
|
||||||
#define MAX_IDLE_STATE_SEEN_ENTER_IDLE ((gpr_atm)2) |
|
||||||
#define MAX_IDLE_STATE_TIMER_SET ((gpr_atm)3) |
|
||||||
|
|
||||||
namespace { |
|
||||||
struct channel_data { |
|
||||||
/* The channel stack to which we take refs for pending callbacks. */ |
|
||||||
grpc_channel_stack* channel_stack; |
|
||||||
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
|
|
||||||
and max_age_grace_timer_pending */ |
|
||||||
grpc_core::Mutex max_age_timer_mu; |
|
||||||
/* True if the max_age timer callback is currently pending */ |
|
||||||
bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
|
||||||
/* True if the max_age_grace timer callback is currently pending */ |
|
||||||
bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
|
||||||
/* The timer for checking if the channel has reached its max age */ |
|
||||||
grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu); |
|
||||||
/* The timer for checking if the max-aged channel has uesed up the grace
|
|
||||||
period */ |
|
||||||
grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu); |
|
||||||
/* The timer for checking if the channel's idle duration reaches
|
|
||||||
max_connection_idle */ |
|
||||||
grpc_timer max_idle_timer; |
|
||||||
/* Allowed max time a channel may have no outstanding rpcs */ |
|
||||||
grpc_core::Duration max_connection_idle; |
|
||||||
/* Allowed max time a channel may exist */ |
|
||||||
grpc_core::Duration max_connection_age; |
|
||||||
/* Allowed grace period after the channel reaches its max age */ |
|
||||||
grpc_core::Duration max_connection_age_grace; |
|
||||||
/* Closure to run when the channel's idle duration reaches max_connection_idle
|
|
||||||
and should be closed gracefully */ |
|
||||||
grpc_closure max_idle_timer_cb; |
|
||||||
/* Closure to run when the channel reaches its max age and should be closed
|
|
||||||
gracefully */ |
|
||||||
grpc_closure close_max_age_channel; |
|
||||||
/* Closure to run the channel uses up its max age grace time and should be
|
|
||||||
closed forcibly */ |
|
||||||
grpc_closure force_close_max_age_channel; |
|
||||||
/* Closure to run when the init fo channel stack is done and the max_idle
|
|
||||||
timer should be started */ |
|
||||||
grpc_closure start_max_idle_timer_after_init; |
|
||||||
/* Closure to run when the init fo channel stack is done and the max_age timer
|
|
||||||
should be started */ |
|
||||||
grpc_closure start_max_age_timer_after_init; |
|
||||||
/* Closure to run when the goaway op is finished and the max_age_timer */ |
|
||||||
grpc_closure start_max_age_grace_timer_after_goaway_op; |
|
||||||
/* Number of active calls */ |
|
||||||
gpr_atm call_count; |
|
||||||
/* TODO(zyc): C++lize this state machine */ |
|
||||||
/* 'idle_state' holds the states of max_idle_timer and channel idleness.
|
|
||||||
It can contain one of the following values: |
|
||||||
+--------------------------------+----------------+---------+ |
|
||||||
| idle_state | max_idle_timer | channel | |
|
||||||
+--------------------------------+----------------+---------+ |
|
||||||
| MAX_IDLE_STATE_INIT | unset | busy | |
|
||||||
| MAX_IDLE_STATE_TIMER_SET | set, valid | idle | |
|
||||||
| MAX_IDLE_STATE_SEEN_EXIT_IDLE | set, invalid | busy | |
|
||||||
| MAX_IDLE_STATE_SEEN_ENTER_IDLE | set, invalid | idle | |
|
||||||
+--------------------------------+----------------+---------+ |
|
||||||
|
|
||||||
MAX_IDLE_STATE_INIT: The initial and final state of 'idle_state'. The |
|
||||||
channel has 1 or 1+ active calls, and the timer is not set. Note that |
|
||||||
we may put a virtual call to hold this state at channel initialization or |
|
||||||
shutdown, so that the channel won't enter other states. |
|
||||||
|
|
||||||
MAX_IDLE_STATE_TIMER_SET: The state after the timer is set and no calls |
|
||||||
have arrived after the timer is set. The channel must have 0 active call in |
|
||||||
this state. If the timer is fired in this state, we will close the channel |
|
||||||
due to idleness. |
|
||||||
|
|
||||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE: The state after the timer is set and at |
|
||||||
least one call has arrived after the timer is set. The channel must have 1 |
|
||||||
or 1+ active calls in this state. If the timer is fired in this state, we |
|
||||||
won't reschudle it. |
|
||||||
|
|
||||||
MAX_IDLE_STATE_SEEN_ENTER_IDLE: The state after the timer is set and the at |
|
||||||
least one call has arrived after the timer is set, BUT the channel |
|
||||||
currently has 0 active calls. If the timer is fired in this state, we will |
|
||||||
reschudle it. |
|
||||||
|
|
||||||
max_idle_timer will not be cancelled (unless the channel is shutting down). |
|
||||||
If the timer callback is called when the max_idle_timer is valid (i.e. |
|
||||||
idle_state is MAX_IDLE_STATE_TIMER_SET), the channel will be closed due to |
|
||||||
idleness, otherwise the channel won't be changed. |
|
||||||
|
|
||||||
State transitions: |
|
||||||
MAX_IDLE_STATE_INIT <-------3------ MAX_IDLE_STATE_SEEN_EXIT_IDLE |
|
||||||
^ | ^ ^ | |
|
||||||
| | | | | |
|
||||||
1 2 +-----------4------------+ 6 7 |
|
||||||
| | | | | |
|
||||||
| v | | v |
|
||||||
MAX_IDLE_STATE_TIMER_SET <----5------ MAX_IDLE_STATE_SEEN_ENTER_IDLE |
|
||||||
|
|
||||||
For 1, 3, 5 : See max_idle_timer_cb() function |
|
||||||
For 2, 7 : See decrease_call_count() function |
|
||||||
For 4, 6 : See increase_call_count() function */ |
|
||||||
gpr_atm idle_state; |
|
||||||
/* Time when the channel finished its last outstanding call, in
|
|
||||||
* grpc_core::Timestamp */ |
|
||||||
gpr_atm last_enter_idle_time_millis; |
|
||||||
}; |
|
||||||
} // namespace
|
|
||||||
|
|
||||||
/* Increase the nubmer of active calls. Before the increasement, if there are no
|
|
||||||
calls, the max_idle_timer should be cancelled. */ |
|
||||||
static void increase_call_count(channel_data* chand) { |
|
||||||
/* Exit idle */ |
|
||||||
if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) { |
|
||||||
while (true) { |
|
||||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
|
||||||
switch (idle_state) { |
|
||||||
case MAX_IDLE_STATE_TIMER_SET: |
|
||||||
/* max_idle_timer_cb may have already set idle_state to
|
|
||||||
MAX_IDLE_STATE_INIT, in this case, we don't need to set it to |
|
||||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE */ |
|
||||||
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET, |
|
||||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
|
||||||
return; |
|
||||||
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
|
||||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
|
||||||
return; |
|
||||||
default: |
|
||||||
/* try again */ |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
/* Decrease the nubmer of active calls. After the decrement, if there are no
|
|
||||||
calls, the max_idle_timer should be started. */ |
|
||||||
static void decrease_call_count(channel_data* chand) { |
|
||||||
/* Enter idle */ |
|
||||||
if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { |
|
||||||
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, |
|
||||||
(gpr_atm)grpc_core::ExecCtx::Get() |
|
||||||
->Now() |
|
||||||
.milliseconds_after_process_epoch()); |
|
||||||
while (true) { |
|
||||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
|
||||||
switch (idle_state) { |
|
||||||
case MAX_IDLE_STATE_INIT: |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
|
||||||
"max_age max_idle_timer"); |
|
||||||
grpc_timer_init( |
|
||||||
&chand->max_idle_timer, |
|
||||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_idle, |
|
||||||
&chand->max_idle_timer_cb); |
|
||||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET); |
|
||||||
return; |
|
||||||
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
|
||||||
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
|
||||||
MAX_IDLE_STATE_SEEN_ENTER_IDLE)) { |
|
||||||
return; |
|
||||||
} |
|
||||||
break; |
|
||||||
default: |
|
||||||
/* try again */ |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
static void start_max_idle_timer_after_init(void* arg, |
|
||||||
grpc_error_handle /*error*/) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
/* Decrease call_count. If there are no active calls at this time,
|
|
||||||
max_idle_timer will start here. If the number of active calls is not 0, |
|
||||||
max_idle_timer will start after all the active calls end. */ |
|
||||||
decrease_call_count(chand); |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
|
||||||
"max_age start_max_idle_timer_after_init"); |
|
||||||
} |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
|
||||||
public: |
|
||||||
explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { |
|
||||||
GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch"); |
|
||||||
} |
|
||||||
|
|
||||||
~ConnectivityWatcher() override { |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch"); |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
|
||||||
const absl::Status& /* status */) override { |
|
||||||
if (new_state != GRPC_CHANNEL_SHUTDOWN) return; |
|
||||||
{ |
|
||||||
MutexLock lock(&chand_->max_age_timer_mu); |
|
||||||
if (chand_->max_age_timer_pending) { |
|
||||||
grpc_timer_cancel(&chand_->max_age_timer); |
|
||||||
chand_->max_age_timer_pending = false; |
|
||||||
} |
|
||||||
if (chand_->max_age_grace_timer_pending) { |
|
||||||
grpc_timer_cancel(&chand_->max_age_grace_timer); |
|
||||||
chand_->max_age_grace_timer_pending = false; |
|
||||||
} |
|
||||||
} |
|
||||||
/* If there are no active calls, this increasement will cancel
|
|
||||||
max_idle_timer, and prevent max_idle_timer from being started in the |
|
||||||
future. */ |
|
||||||
increase_call_count(chand_); |
|
||||||
if (gpr_atm_acq_load(&chand_->idle_state) == |
|
||||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE) { |
|
||||||
grpc_timer_cancel(&chand_->max_idle_timer); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
channel_data* chand_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
static void start_max_age_timer_after_init(void* arg, |
|
||||||
grpc_error_handle /*error*/) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
|
||||||
chand->max_age_timer_pending = true; |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); |
|
||||||
grpc_timer_init( |
|
||||||
&chand->max_age_timer, |
|
||||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, |
|
||||||
&chand->close_max_age_channel); |
|
||||||
} |
|
||||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
||||||
op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand)); |
|
||||||
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
|
||||||
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
|
||||||
"max_age start_max_age_timer_after_init"); |
|
||||||
} |
|
||||||
|
|
||||||
static void start_max_age_grace_timer_after_goaway_op( |
|
||||||
void* arg, grpc_error_handle /*error*/) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
|
||||||
chand->max_age_grace_timer_pending = true; |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); |
|
||||||
grpc_timer_init( |
|
||||||
&chand->max_age_grace_timer, |
|
||||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace, |
|
||||||
&chand->force_close_max_age_channel); |
|
||||||
} |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
|
||||||
"max_age start_max_age_grace_timer_after_goaway_op"); |
|
||||||
} |
|
||||||
|
|
||||||
static void close_max_idle_channel(channel_data* chand) { |
|
||||||
/* Prevent the max idle timer from being set again */ |
|
||||||
gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); |
|
||||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
||||||
op->goaway_error = |
|
||||||
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"), |
|
||||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
|
||||||
grpc_channel_element* elem = |
|
||||||
grpc_channel_stack_element(chand->channel_stack, 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
} |
|
||||||
|
|
||||||
static void max_idle_timer_cb(void* arg, grpc_error_handle error) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
if (error == GRPC_ERROR_NONE) { |
|
||||||
bool try_again = true; |
|
||||||
while (try_again) { |
|
||||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
|
||||||
switch (idle_state) { |
|
||||||
case MAX_IDLE_STATE_TIMER_SET: |
|
||||||
close_max_idle_channel(chand); |
|
||||||
/* This MAX_IDLE_STATE_INIT is a final state, we don't have to check
|
|
||||||
* if idle_state has been changed */ |
|
||||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_INIT); |
|
||||||
try_again = false; |
|
||||||
break; |
|
||||||
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
|
||||||
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
|
||||||
MAX_IDLE_STATE_INIT)) { |
|
||||||
try_again = false; |
|
||||||
} |
|
||||||
break; |
|
||||||
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
|
||||||
"max_age max_idle_timer"); |
|
||||||
grpc_timer_init( |
|
||||||
&chand->max_idle_timer, |
|
||||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
|
||||||
gpr_atm_no_barrier_load( |
|
||||||
&chand->last_enter_idle_time_millis)) + |
|
||||||
chand->max_connection_idle, |
|
||||||
&chand->max_idle_timer_cb); |
|
||||||
/* idle_state may have already been set to
|
|
||||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE by increase_call_count(), in this |
|
||||||
case, we don't need to set it to MAX_IDLE_STATE_TIMER_SET */ |
|
||||||
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_ENTER_IDLE, |
|
||||||
MAX_IDLE_STATE_TIMER_SET); |
|
||||||
try_again = false; |
|
||||||
break; |
|
||||||
default: |
|
||||||
/* try again */ |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_idle_timer"); |
|
||||||
} |
|
||||||
|
|
||||||
static void close_max_age_channel(void* arg, grpc_error_handle error) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
|
||||||
chand->max_age_timer_pending = false; |
|
||||||
} |
|
||||||
if (error == GRPC_ERROR_NONE) { |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
|
||||||
"max_age start_max_age_grace_timer_after_goaway_op"); |
|
||||||
grpc_transport_op* op = grpc_make_transport_op( |
|
||||||
&chand->start_max_age_grace_timer_after_goaway_op); |
|
||||||
op->goaway_error = |
|
||||||
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
|
||||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
|
||||||
grpc_channel_element* elem = |
|
||||||
grpc_channel_stack_element(chand->channel_stack, 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
} else if (error != GRPC_ERROR_CANCELLED) { |
|
||||||
GRPC_LOG_IF_ERROR("close_max_age_channel", error); |
|
||||||
} |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_timer"); |
|
||||||
} |
|
||||||
|
|
||||||
static void force_close_max_age_channel(void* arg, grpc_error_handle error) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(arg); |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
|
||||||
chand->max_age_grace_timer_pending = false; |
|
||||||
} |
|
||||||
if (error == GRPC_ERROR_NONE) { |
|
||||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
||||||
op->disconnect_with_error = |
|
||||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age"); |
|
||||||
grpc_channel_element* elem = |
|
||||||
grpc_channel_stack_element(chand->channel_stack, 0); |
|
||||||
elem->filter->start_transport_op(elem, op); |
|
||||||
} else if (error != GRPC_ERROR_CANCELLED) { |
|
||||||
GRPC_LOG_IF_ERROR("force_close_max_age_channel", error); |
|
||||||
} |
|
||||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer"); |
|
||||||
} |
|
||||||
|
|
||||||
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
|
||||||
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
|
||||||
would not create connection storms by itself, but if there happened to be a |
|
||||||
connection storm it could cause it to repeat at a fixed period. */ |
|
||||||
static grpc_core::Duration |
|
||||||
add_random_max_connection_age_jitter_and_convert_to_duration(int value) { |
|
||||||
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
|
||||||
1 + MAX_CONNECTION_AGE_JITTER */ |
|
||||||
double multiplier = rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + |
|
||||||
1.0 - MAX_CONNECTION_AGE_JITTER; |
|
||||||
double result = multiplier * value; |
|
||||||
/* INT_MAX - 0.5 converts the value to float, so that result will not be
|
|
||||||
cast to int implicitly before the comparison. */ |
|
||||||
return result > (static_cast<double>( |
|
||||||
grpc_core::Duration::Infinity().millis())) - |
|
||||||
0.5 |
|
||||||
? grpc_core::Duration::Infinity() |
|
||||||
: grpc_core::Duration::Milliseconds(result); |
|
||||||
} |
|
||||||
|
|
||||||
/* Constructor for call_data. */ |
|
||||||
static grpc_error_handle max_age_init_call_elem( |
|
||||||
grpc_call_element* elem, const grpc_call_element_args* /*args*/) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
||||||
increase_call_count(chand); |
|
||||||
return GRPC_ERROR_NONE; |
|
||||||
} |
|
||||||
|
|
||||||
/* Destructor for call_data. */ |
|
||||||
static void max_age_destroy_call_elem( |
|
||||||
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, |
|
||||||
grpc_closure* /*ignored*/) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
||||||
decrease_call_count(chand); |
|
||||||
} |
|
||||||
|
|
||||||
/* Constructor for channel_data. */ |
|
||||||
static grpc_error_handle max_age_init_channel_elem( |
|
||||||
grpc_channel_element* elem, grpc_channel_element_args* args) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
||||||
new (chand) channel_data(); |
|
||||||
chand->channel_stack = args->channel_stack; |
|
||||||
chand->max_connection_age = |
|
||||||
add_random_max_connection_age_jitter_and_convert_to_duration( |
|
||||||
DEFAULT_MAX_CONNECTION_AGE_MS); |
|
||||||
chand->max_connection_age_grace = |
|
||||||
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX |
|
||||||
? grpc_core::Duration::Infinity() |
|
||||||
: grpc_core::Duration::Milliseconds( |
|
||||||
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS); |
|
||||||
chand->max_connection_idle = |
|
||||||
DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX |
|
||||||
? grpc_core::Duration::Infinity() |
|
||||||
: grpc_core::Duration::Milliseconds(DEFAULT_MAX_CONNECTION_IDLE_MS); |
|
||||||
chand->idle_state = MAX_IDLE_STATE_INIT; |
|
||||||
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); |
|
||||||
for (size_t i = 0; i < args->channel_args->num_args; ++i) { |
|
||||||
if (0 == strcmp(args->channel_args->args[i].key, |
|
||||||
GRPC_ARG_MAX_CONNECTION_AGE_MS)) { |
|
||||||
const int value = grpc_channel_arg_get_integer( |
|
||||||
&args->channel_args->args[i], MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
|
||||||
chand->max_connection_age = |
|
||||||
add_random_max_connection_age_jitter_and_convert_to_duration(value); |
|
||||||
} else if (0 == strcmp(args->channel_args->args[i].key, |
|
||||||
GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { |
|
||||||
const int value = grpc_channel_arg_get_integer( |
|
||||||
&args->channel_args->args[i], |
|
||||||
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
|
||||||
chand->max_connection_age_grace = |
|
||||||
value == INT_MAX ? grpc_core::Duration::Infinity() |
|
||||||
: grpc_core::Duration::Milliseconds(value); |
|
||||||
} else if (0 == strcmp(args->channel_args->args[i].key, |
|
||||||
GRPC_ARG_MAX_CONNECTION_IDLE_MS)) { |
|
||||||
const int value = grpc_channel_arg_get_integer( |
|
||||||
&args->channel_args->args[i], MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
|
||||||
chand->max_connection_idle = |
|
||||||
value == INT_MAX ? grpc_core::Duration::Infinity() |
|
||||||
: grpc_core::Duration::Milliseconds(value); |
|
||||||
} |
|
||||||
} |
|
||||||
GRPC_CLOSURE_INIT(&chand->max_idle_timer_cb, max_idle_timer_cb, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
GRPC_CLOSURE_INIT(&chand->close_max_age_channel, close_max_age_channel, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
GRPC_CLOSURE_INIT(&chand->force_close_max_age_channel, |
|
||||||
force_close_max_age_channel, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
GRPC_CLOSURE_INIT(&chand->start_max_idle_timer_after_init, |
|
||||||
start_max_idle_timer_after_init, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
GRPC_CLOSURE_INIT(&chand->start_max_age_timer_after_init, |
|
||||||
start_max_age_timer_after_init, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, |
|
||||||
start_max_age_grace_timer_after_goaway_op, chand, |
|
||||||
grpc_schedule_on_exec_ctx); |
|
||||||
|
|
||||||
if (chand->max_connection_age != grpc_core::Duration::Infinity()) { |
|
||||||
/* When the channel reaches its max age, we send down an op with
|
|
||||||
goaway_error set. However, we can't send down any ops until after the |
|
||||||
channel stack is fully initialized. If we start the timer here, we have |
|
||||||
no guarantee that the timer won't pop before channel stack initialization |
|
||||||
is finished. To avoid that problem, we create a closure to start the |
|
||||||
timer, and we schedule that closure to be run after call stack |
|
||||||
initialization is done. */ |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
|
||||||
"max_age start_max_age_timer_after_init"); |
|
||||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
|
||||||
&chand->start_max_age_timer_after_init, |
|
||||||
GRPC_ERROR_NONE); |
|
||||||
} |
|
||||||
|
|
||||||
/* Initialize the number of calls as 1, so that the max_idle_timer will not
|
|
||||||
start until start_max_idle_timer_after_init is invoked. */ |
|
||||||
gpr_atm_rel_store(&chand->call_count, 1); |
|
||||||
if (chand->max_connection_idle != grpc_core::Duration::Infinity()) { |
|
||||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
|
||||||
"max_age start_max_idle_timer_after_init"); |
|
||||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
|
||||||
&chand->start_max_idle_timer_after_init, |
|
||||||
GRPC_ERROR_NONE); |
|
||||||
} |
|
||||||
return GRPC_ERROR_NONE; |
|
||||||
} |
|
||||||
|
|
||||||
/* Destructor for channel_data. */ |
|
||||||
static void max_age_destroy_channel_elem(grpc_channel_element* elem) { |
|
||||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
||||||
chand->~channel_data(); |
|
||||||
} |
|
||||||
|
|
||||||
const grpc_channel_filter grpc_max_age_filter = { |
|
||||||
grpc_call_next_op, |
|
||||||
nullptr, |
|
||||||
grpc_channel_next_op, |
|
||||||
0, /* sizeof_call_data */ |
|
||||||
max_age_init_call_elem, |
|
||||||
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
|
||||||
max_age_destroy_call_elem, |
|
||||||
sizeof(channel_data), |
|
||||||
max_age_init_channel_elem, |
|
||||||
max_age_destroy_channel_elem, |
|
||||||
grpc_channel_next_get_info, |
|
||||||
"max_age"}; |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) { |
|
||||||
builder->channel_init()->RegisterStage( |
|
||||||
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
|
||||||
[](ChannelStackBuilder* builder) { |
|
||||||
const grpc_channel_args* channel_args = builder->channel_args(); |
|
||||||
bool enable = grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find( |
|
||||||
channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
|
||||||
MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || |
|
||||||
grpc_channel_arg_get_integer( |
|
||||||
grpc_channel_args_find( |
|
||||||
channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
|
||||||
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; |
|
||||||
if (enable) { |
|
||||||
builder->PrependFilter(&grpc_max_age_filter, nullptr); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
} |
|
||||||
} // namespace grpc_core
|
|
@ -1,26 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2017 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
|
||||||
#define GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_stack.h" |
|
||||||
|
|
||||||
extern const grpc_channel_filter grpc_max_age_filter; |
|
||||||
|
|
||||||
#endif /* GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H */ |
|
Loading…
Reference in new issue