mirror of https://github.com/grpc/grpc.git
* Revert "Revert "Convert max_age filter to promises (#28904)" (#28957)"
This reverts commit f18e6ede04
.
* start watch
* fix
* Fix cancellation on async server filters
* fix
* fix
pull/29095/head
parent
0c7b37a4a3
commit
d589f4e6ca
23 changed files with 480 additions and 900 deletions
@ -0,0 +1,402 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
|
||||
// mock transport operations.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <limits.h> |
||||
#include <stdlib.h> |
||||
|
||||
#include <atomic> |
||||
#include <limits> |
||||
|
||||
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack_builder.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/gprpp/capture.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/sleep.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/transport/http2_errors.h" |
||||
|
||||
// TODO(juanlishen): The idle filter was disabled in client channel by default
|
||||
// due to b/143502997. Now the bug is fixed enable the filter by default.
|
||||
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
||||
// The user input idle timeout smaller than this would be capped to it.
|
||||
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
||||
|
||||
// If these settings change, make sure that we are not sending a GOAWAY for
|
||||
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
|
||||
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
||||
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
||||
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
||||
#define MAX_CONNECTION_AGE_JITTER 0.1 |
||||
|
||||
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
||||
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
||||
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
||||
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
||||
|
||||
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
||||
do { \
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
||||
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
||||
} \
|
||||
} while (0) |
||||
|
||||
namespace { |
||||
|
||||
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
||||
int ms = std::max( |
||||
grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
||||
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
||||
MIN_IDLE_TIMEOUT_MS); |
||||
return ms == INT_MAX ? Duration::Infinity() : Duration::Milliseconds(ms); |
||||
} |
||||
|
||||
struct MaxAgeConfig { |
||||
Duration max_connection_age; |
||||
Duration max_connection_idle; |
||||
Duration max_connection_age_grace; |
||||
|
||||
bool enable() const { |
||||
return max_connection_age != Duration::Infinity() || |
||||
max_connection_idle != Duration::Infinity(); |
||||
} |
||||
}; |
||||
|
||||
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
||||
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
||||
would not create connection storms by itself, but if there happened to be a |
||||
connection storm it could cause it to repeat at a fixed period. */ |
||||
MaxAgeConfig GetMaxAgeConfig(const grpc_channel_args* args) { |
||||
const int args_max_age = grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
||||
MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
||||
const int args_max_idle = grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
||||
MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
||||
const int args_max_age_grace = grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find(args, GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS), |
||||
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
||||
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
||||
1 + MAX_CONNECTION_AGE_JITTER */ |
||||
const double multiplier = |
||||
rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + 1.0 - |
||||
MAX_CONNECTION_AGE_JITTER; |
||||
/* GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
|
||||
will not be cast to int implicitly before the comparison. */ |
||||
return MaxAgeConfig{ |
||||
args_max_age == INT_MAX |
||||
? Duration::Infinity() |
||||
: Duration::FromSecondsAsDouble(multiplier * args_max_age / 1000.0), |
||||
args_max_idle == INT_MAX ? Duration::Infinity() |
||||
: Duration::Milliseconds(args_max_idle), |
||||
Duration::Milliseconds(args_max_age_grace)}; |
||||
} |
||||
|
||||
class ChannelIdleFilter : public ChannelFilter { |
||||
public: |
||||
~ChannelIdleFilter() override = default; |
||||
|
||||
ChannelIdleFilter(const ChannelIdleFilter&) = delete; |
||||
ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete; |
||||
ChannelIdleFilter(ChannelIdleFilter&&) = default; |
||||
ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default; |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||
|
||||
bool StartTransportOp(grpc_transport_op* op) override; |
||||
|
||||
protected: |
||||
ChannelIdleFilter(grpc_channel_stack* channel_stack, |
||||
Duration client_idle_timeout) |
||||
: channel_stack_(channel_stack), |
||||
client_idle_timeout_(client_idle_timeout) {} |
||||
|
||||
grpc_channel_stack* channel_stack() { return channel_stack_; }; |
||||
|
||||
virtual void Shutdown(); |
||||
void CloseChannel(); |
||||
|
||||
void IncreaseCallCount(); |
||||
void DecreaseCallCount(); |
||||
|
||||
private: |
||||
void StartIdleTimer(); |
||||
|
||||
struct CallCountDecreaser { |
||||
void operator()(ChannelIdleFilter* filter) const { |
||||
filter->DecreaseCallCount(); |
||||
} |
||||
}; |
||||
|
||||
// The channel stack to which we take refs for pending callbacks.
|
||||
grpc_channel_stack* channel_stack_; |
||||
Duration client_idle_timeout_; |
||||
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
||||
std::make_shared<IdleFilterState>(false)}; |
||||
|
||||
ActivityPtr activity_; |
||||
}; |
||||
|
||||
class ClientIdleFilter final : public ChannelIdleFilter { |
||||
public: |
||||
static absl::StatusOr<ClientIdleFilter> Create( |
||||
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
||||
|
||||
private: |
||||
using ChannelIdleFilter::ChannelIdleFilter; |
||||
}; |
||||
|
||||
class MaxAgeFilter final : public ChannelIdleFilter { |
||||
public: |
||||
static absl::StatusOr<MaxAgeFilter> Create(const grpc_channel_args* args, |
||||
ChannelFilter::Args filter_args); |
||||
|
||||
void Start(); |
||||
|
||||
private: |
||||
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
||||
public: |
||||
explicit ConnectivityWatcher(MaxAgeFilter* filter) |
||||
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} |
||||
~ConnectivityWatcher() override = default; |
||||
|
||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||
const absl::Status&) override { |
||||
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); |
||||
} |
||||
|
||||
private: |
||||
RefCountedPtr<grpc_channel_stack> channel_stack_; |
||||
MaxAgeFilter* filter_; |
||||
}; |
||||
|
||||
MaxAgeFilter(grpc_channel_stack* channel_stack, |
||||
const MaxAgeConfig& max_age_config) |
||||
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), |
||||
max_connection_age_(max_age_config.max_connection_age), |
||||
max_connection_age_grace_(max_age_config.max_connection_age_grace) {} |
||||
|
||||
void Shutdown() override; |
||||
|
||||
ActivityPtr max_age_activity_; |
||||
Duration max_connection_age_; |
||||
Duration max_connection_age_grace_; |
||||
}; |
||||
|
||||
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||
ClientIdleFilter filter(filter_args.channel_stack(), |
||||
GetClientIdleTimeout(args)); |
||||
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
||||
} |
||||
|
||||
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create( |
||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||
const auto config = GetMaxAgeConfig(args); |
||||
MaxAgeFilter filter(filter_args.channel_stack(), config); |
||||
return absl::StatusOr<MaxAgeFilter>(std::move(filter)); |
||||
} |
||||
|
||||
void MaxAgeFilter::Shutdown() { |
||||
max_age_activity_.reset(); |
||||
ChannelIdleFilter::Shutdown(); |
||||
} |
||||
|
||||
void MaxAgeFilter::Start() { |
||||
// Trigger idle timer immediately
|
||||
IncreaseCallCount(); |
||||
DecreaseCallCount(); |
||||
|
||||
struct StartupClosure { |
||||
RefCountedPtr<grpc_channel_stack> channel_stack; |
||||
MaxAgeFilter* filter; |
||||
grpc_closure closure; |
||||
}; |
||||
auto run_startup = [](void* p, grpc_error_handle) { |
||||
auto* startup = static_cast<StartupClosure*>(p); |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->start_connectivity_watch.reset( |
||||
new ConnectivityWatcher(startup->filter)); |
||||
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
||||
grpc_channel_next_op( |
||||
grpc_channel_stack_element(startup->channel_stack.get(), 0), op); |
||||
delete startup; |
||||
}; |
||||
auto* startup = |
||||
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; |
||||
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); |
||||
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, GRPC_ERROR_NONE); |
||||
|
||||
auto channel_stack = this->channel_stack()->Ref(); |
||||
|
||||
// Start the max age timer
|
||||
if (max_connection_age_ != Duration::Infinity()) { |
||||
max_age_activity_ = MakeActivity( |
||||
TrySeq( |
||||
// First sleep until the max connection age
|
||||
Sleep(ExecCtx::Get()->Now() + max_connection_age_), |
||||
// Then send a goaway.
|
||||
[this] { |
||||
GRPC_CHANNEL_STACK_REF(this->channel_stack(), |
||||
"max_age send_goaway"); |
||||
// Jump out of the activity to send the goaway.
|
||||
auto fn = [](void* arg, grpc_error_handle) { |
||||
auto* channel_stack = static_cast<grpc_channel_stack*>(arg); |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->goaway_error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_element(channel_stack, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); |
||||
}; |
||||
ExecCtx::Run( |
||||
DEBUG_LOCATION, |
||||
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), |
||||
GRPC_ERROR_NONE); |
||||
return Immediate(absl::OkStatus()); |
||||
}, |
||||
// Sleep for the grace period
|
||||
[this] { |
||||
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_); |
||||
}), |
||||
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) { |
||||
// OnDone -- close the connection if the promise completed
|
||||
// successfully.
|
||||
// (if it did not, it was cancelled)
|
||||
if (status.ok()) CloseChannel(); |
||||
}); |
||||
} |
||||
} |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> ChannelIdleFilter::MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||
using Decrementer = std::unique_ptr<ChannelIdleFilter, CallCountDecreaser>; |
||||
IncreaseCallCount(); |
||||
return ArenaPromise<ServerMetadataHandle>( |
||||
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
||||
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
||||
Decrementer(this), next_promise_factory(std::move(call_args)))); |
||||
} |
||||
|
||||
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { |
||||
// Catch the disconnect_with_error transport op.
|
||||
if (op->disconnect_with_error != GRPC_ERROR_NONE) Shutdown(); |
||||
// Pass the op to the next filter.
|
||||
return false; |
||||
} |
||||
|
||||
void ChannelIdleFilter::Shutdown() { |
||||
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
||||
// being reset by other threads.
|
||||
IncreaseCallCount(); |
||||
activity_.reset(); |
||||
} |
||||
|
||||
void ChannelIdleFilter::IncreaseCallCount() { |
||||
idle_filter_state_->IncreaseCallCount(); |
||||
} |
||||
|
||||
void ChannelIdleFilter::DecreaseCallCount() { |
||||
if (idle_filter_state_->DecreaseCallCount()) { |
||||
// If there are no more calls in progress, start the idle timer.
|
||||
StartIdleTimer(); |
||||
} |
||||
} |
||||
|
||||
void ChannelIdleFilter::StartIdleTimer() { |
||||
GRPC_IDLE_FILTER_LOG("timer has started"); |
||||
auto idle_filter_state = idle_filter_state_; |
||||
// Hold a ref to the channel stack for the timer callback.
|
||||
auto channel_stack = channel_stack_->Ref(); |
||||
auto timeout = client_idle_timeout_; |
||||
auto promise = Loop([timeout, idle_filter_state]() { |
||||
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
||||
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
||||
if (idle_filter_state->CheckTimer()) { |
||||
return Continue{}; |
||||
} else { |
||||
return absl::OkStatus(); |
||||
} |
||||
}); |
||||
}); |
||||
activity_ = MakeActivity(std::move(promise), ExecCtxWakeupScheduler{}, |
||||
[channel_stack, this](absl::Status status) { |
||||
if (status.ok()) CloseChannel(); |
||||
}); |
||||
} |
||||
|
||||
void ChannelIdleFilter::CloseChannel() { |
||||
auto* op = grpc_make_transport_op(nullptr); |
||||
op->disconnect_with_error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
||||
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
||||
// Pass the transport op down to the channel stack.
|
||||
auto* elem = grpc_channel_stack_element(channel_stack_, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
} |
||||
|
||||
const grpc_channel_filter grpc_client_idle_filter = |
||||
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
||||
"client_idle"); |
||||
const grpc_channel_filter grpc_max_age_filter = |
||||
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age"); |
||||
|
||||
} // namespace
|
||||
|
||||
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { |
||||
builder->channel_init()->RegisterStage( |
||||
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||
[](ChannelStackBuilder* builder) { |
||||
const grpc_channel_args* channel_args = builder->channel_args(); |
||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
||||
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
||||
} |
||||
return true; |
||||
}); |
||||
builder->channel_init()->RegisterStage( |
||||
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||
[](ChannelStackBuilder* builder) { |
||||
const grpc_channel_args* channel_args = builder->channel_args(); |
||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||
GetMaxAgeConfig(channel_args).enable()) { |
||||
builder->PrependFilter( |
||||
&grpc_max_age_filter, |
||||
[](grpc_channel_stack*, grpc_channel_element* elem) { |
||||
static_cast<MaxAgeFilter*>(elem->channel_data)->Start(); |
||||
}); |
||||
} |
||||
return true; |
||||
}); |
||||
} |
||||
} // namespace grpc_core
|
@ -1,199 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <limits.h> |
||||
|
||||
#include <atomic> |
||||
|
||||
#include "src/core/ext/filters/client_idle/idle_filter_state.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack_builder.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/gprpp/capture.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/sleep.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/transport/http2_errors.h" |
||||
|
||||
// TODO(juanlishen): The idle filter is disabled in client channel by default
|
||||
// due to b/143502997. Try to fix the bug and enable the filter by default.
|
||||
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX |
||||
// The user input idle timeout smaller than this would be capped to it.
|
||||
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000) |
||||
|
||||
namespace grpc_core { |
||||
|
||||
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
||||
|
||||
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
||||
do { \
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
||||
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
||||
} \
|
||||
} while (0) |
||||
|
||||
namespace { |
||||
|
||||
Duration GetClientIdleTimeout(const grpc_channel_args* args) { |
||||
auto millis = std::max( |
||||
grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS), |
||||
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}), |
||||
MIN_IDLE_TIMEOUT_MS); |
||||
if (millis == INT_MAX) return Duration::Infinity(); |
||||
return Duration::Milliseconds(millis); |
||||
} |
||||
|
||||
class ClientIdleFilter : public ChannelFilter { |
||||
public: |
||||
static absl::StatusOr<ClientIdleFilter> Create( |
||||
const grpc_channel_args* args, ChannelFilter::Args filter_args); |
||||
~ClientIdleFilter() override = default; |
||||
|
||||
ClientIdleFilter(const ClientIdleFilter&) = delete; |
||||
ClientIdleFilter& operator=(const ClientIdleFilter&) = delete; |
||||
ClientIdleFilter(ClientIdleFilter&&) = default; |
||||
ClientIdleFilter& operator=(ClientIdleFilter&&) = default; |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||
|
||||
bool StartTransportOp(grpc_transport_op* op) override; |
||||
|
||||
private: |
||||
ClientIdleFilter(grpc_channel_stack* channel_stack, |
||||
Duration client_idle_timeout) |
||||
: channel_stack_(channel_stack), |
||||
client_idle_timeout_(client_idle_timeout) {} |
||||
|
||||
void StartIdleTimer(); |
||||
|
||||
void IncreaseCallCount(); |
||||
void DecreaseCallCount(); |
||||
|
||||
struct CallCountDecreaser { |
||||
void operator()(ClientIdleFilter* filter) const { |
||||
filter->DecreaseCallCount(); |
||||
} |
||||
}; |
||||
|
||||
// The channel stack to which we take refs for pending callbacks.
|
||||
grpc_channel_stack* channel_stack_; |
||||
Duration client_idle_timeout_; |
||||
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
||||
std::make_shared<IdleFilterState>(false)}; |
||||
|
||||
ActivityPtr activity_; |
||||
}; |
||||
|
||||
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create( |
||||
const grpc_channel_args* args, ChannelFilter::Args filter_args) { |
||||
ClientIdleFilter filter(filter_args.channel_stack(), |
||||
GetClientIdleTimeout(args)); |
||||
return absl::StatusOr<ClientIdleFilter>(std::move(filter)); |
||||
} |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> ClientIdleFilter::MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||
using Decrementer = std::unique_ptr<ClientIdleFilter, CallCountDecreaser>; |
||||
IncreaseCallCount(); |
||||
return ArenaPromise<ServerMetadataHandle>( |
||||
Capture([](Decrementer*, ArenaPromise<ServerMetadataHandle>* next) |
||||
-> Poll<ServerMetadataHandle> { return (*next)(); }, |
||||
Decrementer(this), next_promise_factory(std::move(call_args)))); |
||||
} |
||||
|
||||
bool ClientIdleFilter::StartTransportOp(grpc_transport_op* op) { |
||||
// Catch the disconnect_with_error transport op.
|
||||
if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
||||
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
||||
// being reset by other threads.
|
||||
IncreaseCallCount(); |
||||
activity_.reset(); |
||||
} |
||||
// Pass the op to the next filter.
|
||||
return false; |
||||
} |
||||
|
||||
void ClientIdleFilter::IncreaseCallCount() { |
||||
idle_filter_state_->IncreaseCallCount(); |
||||
} |
||||
|
||||
void ClientIdleFilter::DecreaseCallCount() { |
||||
if (idle_filter_state_->DecreaseCallCount()) { |
||||
// If there are no more calls in progress, start the idle timer.
|
||||
StartIdleTimer(); |
||||
} |
||||
} |
||||
|
||||
void ClientIdleFilter::StartIdleTimer() { |
||||
GRPC_IDLE_FILTER_LOG("timer has started"); |
||||
auto idle_filter_state = idle_filter_state_; |
||||
// Hold a ref to the channel stack for the timer callback.
|
||||
auto channel_stack = channel_stack_->Ref(); |
||||
auto timeout = client_idle_timeout_; |
||||
auto promise = Loop([timeout, idle_filter_state]() { |
||||
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout), |
||||
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
||||
if (idle_filter_state->CheckTimer()) { |
||||
return Continue{}; |
||||
} else { |
||||
return absl::OkStatus(); |
||||
} |
||||
}); |
||||
}); |
||||
activity_ = MakeActivity( |
||||
std::move(promise), ExecCtxWakeupScheduler{}, |
||||
[channel_stack](absl::Status status) { |
||||
if (!status.ok()) return; |
||||
auto* op = grpc_make_transport_op(nullptr); |
||||
op->disconnect_with_error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), |
||||
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); |
||||
// Pass the transport op down to the channel stack.
|
||||
auto* elem = grpc_channel_stack_element(channel_stack.get(), 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
}); |
||||
} |
||||
|
||||
const grpc_channel_filter grpc_client_idle_filter = |
||||
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>( |
||||
"client_idle"); |
||||
|
||||
} // namespace
|
||||
|
||||
void RegisterClientIdleFilter(CoreConfiguration::Builder* builder) { |
||||
builder->channel_init()->RegisterStage( |
||||
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||
[](ChannelStackBuilder* builder) { |
||||
const grpc_channel_args* channel_args = builder->channel_args(); |
||||
if (!grpc_channel_args_want_minimal_stack(channel_args) && |
||||
GetClientIdleTimeout(channel_args) != Duration::Infinity()) { |
||||
builder->PrependFilter(&grpc_client_idle_filter, nullptr); |
||||
} |
||||
return true; |
||||
}); |
||||
} |
||||
} // namespace grpc_core
|
@ -1,566 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/max_age/max_age_filter.h" |
||||
|
||||
#include <limits.h> |
||||
#include <string.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack_builder.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/transport/http2_errors.h" |
||||
|
||||
/* If these settings change, make sure that we are not sending a GOAWAY for
|
||||
* inproc transport, since a GOAWAY to inproc ends up destroying the transport. |
||||
*/ |
||||
#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX |
||||
#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX |
||||
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX |
||||
#define MAX_CONNECTION_AGE_JITTER 0.1 |
||||
|
||||
#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \ |
||||
{ DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX } |
||||
#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \ |
||||
{ DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX } |
||||
|
||||
/* States for idle_state in channel_data */ |
||||
#define MAX_IDLE_STATE_INIT ((gpr_atm)0) |
||||
#define MAX_IDLE_STATE_SEEN_EXIT_IDLE ((gpr_atm)1) |
||||
#define MAX_IDLE_STATE_SEEN_ENTER_IDLE ((gpr_atm)2) |
||||
#define MAX_IDLE_STATE_TIMER_SET ((gpr_atm)3) |
||||
|
||||
namespace { |
||||
struct channel_data { |
||||
/* The channel stack to which we take refs for pending callbacks. */ |
||||
grpc_channel_stack* channel_stack; |
||||
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
|
||||
and max_age_grace_timer_pending */ |
||||
grpc_core::Mutex max_age_timer_mu; |
||||
/* True if the max_age timer callback is currently pending */ |
||||
bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
||||
/* True if the max_age_grace timer callback is currently pending */ |
||||
bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; |
||||
/* The timer for checking if the channel has reached its max age */ |
||||
grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu); |
||||
/* The timer for checking if the max-aged channel has uesed up the grace
|
||||
period */ |
||||
grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu); |
||||
/* The timer for checking if the channel's idle duration reaches
|
||||
max_connection_idle */ |
||||
grpc_timer max_idle_timer; |
||||
/* Allowed max time a channel may have no outstanding rpcs */ |
||||
grpc_core::Duration max_connection_idle; |
||||
/* Allowed max time a channel may exist */ |
||||
grpc_core::Duration max_connection_age; |
||||
/* Allowed grace period after the channel reaches its max age */ |
||||
grpc_core::Duration max_connection_age_grace; |
||||
/* Closure to run when the channel's idle duration reaches max_connection_idle
|
||||
and should be closed gracefully */ |
||||
grpc_closure max_idle_timer_cb; |
||||
/* Closure to run when the channel reaches its max age and should be closed
|
||||
gracefully */ |
||||
grpc_closure close_max_age_channel; |
||||
/* Closure to run the channel uses up its max age grace time and should be
|
||||
closed forcibly */ |
||||
grpc_closure force_close_max_age_channel; |
||||
/* Closure to run when the init fo channel stack is done and the max_idle
|
||||
timer should be started */ |
||||
grpc_closure start_max_idle_timer_after_init; |
||||
/* Closure to run when the init fo channel stack is done and the max_age timer
|
||||
should be started */ |
||||
grpc_closure start_max_age_timer_after_init; |
||||
/* Closure to run when the goaway op is finished and the max_age_timer */ |
||||
grpc_closure start_max_age_grace_timer_after_goaway_op; |
||||
/* Number of active calls */ |
||||
gpr_atm call_count; |
||||
/* TODO(zyc): C++lize this state machine */ |
||||
/* 'idle_state' holds the states of max_idle_timer and channel idleness.
|
||||
It can contain one of the following values: |
||||
+--------------------------------+----------------+---------+ |
||||
| idle_state | max_idle_timer | channel | |
||||
+--------------------------------+----------------+---------+ |
||||
| MAX_IDLE_STATE_INIT | unset | busy | |
||||
| MAX_IDLE_STATE_TIMER_SET | set, valid | idle | |
||||
| MAX_IDLE_STATE_SEEN_EXIT_IDLE | set, invalid | busy | |
||||
| MAX_IDLE_STATE_SEEN_ENTER_IDLE | set, invalid | idle | |
||||
+--------------------------------+----------------+---------+ |
||||
|
||||
MAX_IDLE_STATE_INIT: The initial and final state of 'idle_state'. The |
||||
channel has 1 or 1+ active calls, and the timer is not set. Note that |
||||
we may put a virtual call to hold this state at channel initialization or |
||||
shutdown, so that the channel won't enter other states. |
||||
|
||||
MAX_IDLE_STATE_TIMER_SET: The state after the timer is set and no calls |
||||
have arrived after the timer is set. The channel must have 0 active call in |
||||
this state. If the timer is fired in this state, we will close the channel |
||||
due to idleness. |
||||
|
||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE: The state after the timer is set and at |
||||
least one call has arrived after the timer is set. The channel must have 1 |
||||
or 1+ active calls in this state. If the timer is fired in this state, we |
||||
won't reschudle it. |
||||
|
||||
MAX_IDLE_STATE_SEEN_ENTER_IDLE: The state after the timer is set and the at |
||||
least one call has arrived after the timer is set, BUT the channel |
||||
currently has 0 active calls. If the timer is fired in this state, we will |
||||
reschudle it. |
||||
|
||||
max_idle_timer will not be cancelled (unless the channel is shutting down). |
||||
If the timer callback is called when the max_idle_timer is valid (i.e. |
||||
idle_state is MAX_IDLE_STATE_TIMER_SET), the channel will be closed due to |
||||
idleness, otherwise the channel won't be changed. |
||||
|
||||
State transitions: |
||||
MAX_IDLE_STATE_INIT <-------3------ MAX_IDLE_STATE_SEEN_EXIT_IDLE |
||||
^ | ^ ^ | |
||||
| | | | | |
||||
1 2 +-----------4------------+ 6 7 |
||||
| | | | | |
||||
| v | | v |
||||
MAX_IDLE_STATE_TIMER_SET <----5------ MAX_IDLE_STATE_SEEN_ENTER_IDLE |
||||
|
||||
For 1, 3, 5 : See max_idle_timer_cb() function |
||||
For 2, 7 : See decrease_call_count() function |
||||
For 4, 6 : See increase_call_count() function */ |
||||
gpr_atm idle_state; |
||||
/* Time when the channel finished its last outstanding call, in
|
||||
* grpc_core::Timestamp */ |
||||
gpr_atm last_enter_idle_time_millis; |
||||
}; |
||||
} // namespace
|
||||
|
||||
/* Increase the nubmer of active calls. Before the increasement, if there are no
|
||||
calls, the max_idle_timer should be cancelled. */ |
||||
static void increase_call_count(channel_data* chand) { |
||||
/* Exit idle */ |
||||
if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) { |
||||
while (true) { |
||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||
switch (idle_state) { |
||||
case MAX_IDLE_STATE_TIMER_SET: |
||||
/* max_idle_timer_cb may have already set idle_state to
|
||||
MAX_IDLE_STATE_INIT, in this case, we don't need to set it to |
||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE */ |
||||
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET, |
||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
||||
return; |
||||
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE); |
||||
return; |
||||
default: |
||||
/* try again */ |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/* Decrease the nubmer of active calls. After the decrement, if there are no
|
||||
calls, the max_idle_timer should be started. */ |
||||
static void decrease_call_count(channel_data* chand) { |
||||
/* Enter idle */ |
||||
if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { |
||||
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, |
||||
(gpr_atm)grpc_core::ExecCtx::Get() |
||||
->Now() |
||||
.milliseconds_after_process_epoch()); |
||||
while (true) { |
||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||
switch (idle_state) { |
||||
case MAX_IDLE_STATE_INIT: |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||
"max_age max_idle_timer"); |
||||
grpc_timer_init( |
||||
&chand->max_idle_timer, |
||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_idle, |
||||
&chand->max_idle_timer_cb); |
||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_TIMER_SET); |
||||
return; |
||||
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
||||
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
||||
MAX_IDLE_STATE_SEEN_ENTER_IDLE)) { |
||||
return; |
||||
} |
||||
break; |
||||
default: |
||||
/* try again */ |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
static void start_max_idle_timer_after_init(void* arg, |
||||
grpc_error_handle /*error*/) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
/* Decrease call_count. If there are no active calls at this time,
|
||||
max_idle_timer will start here. If the number of active calls is not 0, |
||||
max_idle_timer will start after all the active calls end. */ |
||||
decrease_call_count(chand); |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||
"max_age start_max_idle_timer_after_init"); |
||||
} |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
||||
public: |
||||
explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { |
||||
GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch"); |
||||
} |
||||
|
||||
~ConnectivityWatcher() override { |
||||
GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch"); |
||||
} |
||||
|
||||
private: |
||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||
const absl::Status& /* status */) override { |
||||
if (new_state != GRPC_CHANNEL_SHUTDOWN) return; |
||||
{ |
||||
MutexLock lock(&chand_->max_age_timer_mu); |
||||
if (chand_->max_age_timer_pending) { |
||||
grpc_timer_cancel(&chand_->max_age_timer); |
||||
chand_->max_age_timer_pending = false; |
||||
} |
||||
if (chand_->max_age_grace_timer_pending) { |
||||
grpc_timer_cancel(&chand_->max_age_grace_timer); |
||||
chand_->max_age_grace_timer_pending = false; |
||||
} |
||||
} |
||||
/* If there are no active calls, this increasement will cancel
|
||||
max_idle_timer, and prevent max_idle_timer from being started in the |
||||
future. */ |
||||
increase_call_count(chand_); |
||||
if (gpr_atm_acq_load(&chand_->idle_state) == |
||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE) { |
||||
grpc_timer_cancel(&chand_->max_idle_timer); |
||||
} |
||||
} |
||||
|
||||
channel_data* chand_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
static void start_max_age_timer_after_init(void* arg, |
||||
grpc_error_handle /*error*/) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
{ |
||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||
chand->max_age_timer_pending = true; |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); |
||||
grpc_timer_init( |
||||
&chand->max_age_timer, |
||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, |
||||
&chand->close_max_age_channel); |
||||
} |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand)); |
||||
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
||||
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||
"max_age start_max_age_timer_after_init"); |
||||
} |
||||
|
||||
static void start_max_age_grace_timer_after_goaway_op( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
{ |
||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||
chand->max_age_grace_timer_pending = true; |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); |
||||
grpc_timer_init( |
||||
&chand->max_age_grace_timer, |
||||
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace, |
||||
&chand->force_close_max_age_channel); |
||||
} |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, |
||||
"max_age start_max_age_grace_timer_after_goaway_op"); |
||||
} |
||||
|
||||
static void close_max_idle_channel(channel_data* chand) { |
||||
/* Prevent the max idle timer from being set again */ |
||||
gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->goaway_error = |
||||
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"), |
||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_element(chand->channel_stack, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
} |
||||
|
||||
static void max_idle_timer_cb(void* arg, grpc_error_handle error) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
bool try_again = true; |
||||
while (try_again) { |
||||
gpr_atm idle_state = gpr_atm_acq_load(&chand->idle_state); |
||||
switch (idle_state) { |
||||
case MAX_IDLE_STATE_TIMER_SET: |
||||
close_max_idle_channel(chand); |
||||
/* This MAX_IDLE_STATE_INIT is a final state, we don't have to check
|
||||
* if idle_state has been changed */ |
||||
gpr_atm_rel_store(&chand->idle_state, MAX_IDLE_STATE_INIT); |
||||
try_again = false; |
||||
break; |
||||
case MAX_IDLE_STATE_SEEN_EXIT_IDLE: |
||||
if (gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_EXIT_IDLE, |
||||
MAX_IDLE_STATE_INIT)) { |
||||
try_again = false; |
||||
} |
||||
break; |
||||
case MAX_IDLE_STATE_SEEN_ENTER_IDLE: |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||
"max_age max_idle_timer"); |
||||
grpc_timer_init( |
||||
&chand->max_idle_timer, |
||||
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch( |
||||
gpr_atm_no_barrier_load( |
||||
&chand->last_enter_idle_time_millis)) + |
||||
chand->max_connection_idle, |
||||
&chand->max_idle_timer_cb); |
||||
/* idle_state may have already been set to
|
||||
MAX_IDLE_STATE_SEEN_EXIT_IDLE by increase_call_count(), in this |
||||
case, we don't need to set it to MAX_IDLE_STATE_TIMER_SET */ |
||||
gpr_atm_rel_cas(&chand->idle_state, MAX_IDLE_STATE_SEEN_ENTER_IDLE, |
||||
MAX_IDLE_STATE_TIMER_SET); |
||||
try_again = false; |
||||
break; |
||||
default: |
||||
/* try again */ |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_idle_timer"); |
||||
} |
||||
|
||||
static void close_max_age_channel(void* arg, grpc_error_handle error) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
{ |
||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||
chand->max_age_timer_pending = false; |
||||
} |
||||
if (error == GRPC_ERROR_NONE) { |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||
"max_age start_max_age_grace_timer_after_goaway_op"); |
||||
grpc_transport_op* op = grpc_make_transport_op( |
||||
&chand->start_max_age_grace_timer_after_goaway_op); |
||||
op->goaway_error = |
||||
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"), |
||||
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR); |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_element(chand->channel_stack, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
} else if (error != GRPC_ERROR_CANCELLED) { |
||||
GRPC_LOG_IF_ERROR("close_max_age_channel", error); |
||||
} |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_timer"); |
||||
} |
||||
|
||||
static void force_close_max_age_channel(void* arg, grpc_error_handle error) { |
||||
channel_data* chand = static_cast<channel_data*>(arg); |
||||
{ |
||||
grpc_core::MutexLock lock(&chand->max_age_timer_mu); |
||||
chand->max_age_grace_timer_pending = false; |
||||
} |
||||
if (error == GRPC_ERROR_NONE) { |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->disconnect_with_error = |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age"); |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_element(chand->channel_stack, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
} else if (error != GRPC_ERROR_CANCELLED) { |
||||
GRPC_LOG_IF_ERROR("force_close_max_age_channel", error); |
||||
} |
||||
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer"); |
||||
} |
||||
|
||||
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
|
||||
connection storms. Note that the MAX_CONNECTION_AGE option without jitter |
||||
would not create connection storms by itself, but if there happened to be a |
||||
connection storm it could cause it to repeat at a fixed period. */ |
||||
static grpc_core::Duration |
||||
add_random_max_connection_age_jitter_and_convert_to_duration(int value) { |
||||
/* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and
|
||||
1 + MAX_CONNECTION_AGE_JITTER */ |
||||
double multiplier = rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + |
||||
1.0 - MAX_CONNECTION_AGE_JITTER; |
||||
double result = multiplier * value; |
||||
/* INT_MAX - 0.5 converts the value to float, so that result will not be
|
||||
cast to int implicitly before the comparison. */ |
||||
return result > (static_cast<double>( |
||||
grpc_core::Duration::Infinity().millis())) - |
||||
0.5 |
||||
? grpc_core::Duration::Infinity() |
||||
: grpc_core::Duration::Milliseconds(result); |
||||
} |
||||
|
||||
/* Constructor for call_data. */ |
||||
static grpc_error_handle max_age_init_call_elem( |
||||
grpc_call_element* elem, const grpc_call_element_args* /*args*/) { |
||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||
increase_call_count(chand); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
/* Destructor for call_data. */ |
||||
static void max_age_destroy_call_elem( |
||||
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, |
||||
grpc_closure* /*ignored*/) { |
||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||
decrease_call_count(chand); |
||||
} |
||||
|
||||
/* Constructor for channel_data. */ |
||||
static grpc_error_handle max_age_init_channel_elem( |
||||
grpc_channel_element* elem, grpc_channel_element_args* args) { |
||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||
new (chand) channel_data(); |
||||
chand->channel_stack = args->channel_stack; |
||||
chand->max_connection_age = |
||||
add_random_max_connection_age_jitter_and_convert_to_duration( |
||||
DEFAULT_MAX_CONNECTION_AGE_MS); |
||||
chand->max_connection_age_grace = |
||||
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX |
||||
? grpc_core::Duration::Infinity() |
||||
: grpc_core::Duration::Milliseconds( |
||||
DEFAULT_MAX_CONNECTION_AGE_GRACE_MS); |
||||
chand->max_connection_idle = |
||||
DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX |
||||
? grpc_core::Duration::Infinity() |
||||
: grpc_core::Duration::Milliseconds(DEFAULT_MAX_CONNECTION_IDLE_MS); |
||||
chand->idle_state = MAX_IDLE_STATE_INIT; |
||||
gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); |
||||
for (size_t i = 0; i < args->channel_args->num_args; ++i) { |
||||
if (0 == strcmp(args->channel_args->args[i].key, |
||||
GRPC_ARG_MAX_CONNECTION_AGE_MS)) { |
||||
const int value = grpc_channel_arg_get_integer( |
||||
&args->channel_args->args[i], MAX_CONNECTION_AGE_INTEGER_OPTIONS); |
||||
chand->max_connection_age = |
||||
add_random_max_connection_age_jitter_and_convert_to_duration(value); |
||||
} else if (0 == strcmp(args->channel_args->args[i].key, |
||||
GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { |
||||
const int value = grpc_channel_arg_get_integer( |
||||
&args->channel_args->args[i], |
||||
{DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); |
||||
chand->max_connection_age_grace = |
||||
value == INT_MAX ? grpc_core::Duration::Infinity() |
||||
: grpc_core::Duration::Milliseconds(value); |
||||
} else if (0 == strcmp(args->channel_args->args[i].key, |
||||
GRPC_ARG_MAX_CONNECTION_IDLE_MS)) { |
||||
const int value = grpc_channel_arg_get_integer( |
||||
&args->channel_args->args[i], MAX_CONNECTION_IDLE_INTEGER_OPTIONS); |
||||
chand->max_connection_idle = |
||||
value == INT_MAX ? grpc_core::Duration::Infinity() |
||||
: grpc_core::Duration::Milliseconds(value); |
||||
} |
||||
} |
||||
GRPC_CLOSURE_INIT(&chand->max_idle_timer_cb, max_idle_timer_cb, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&chand->close_max_age_channel, close_max_age_channel, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&chand->force_close_max_age_channel, |
||||
force_close_max_age_channel, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&chand->start_max_idle_timer_after_init, |
||||
start_max_idle_timer_after_init, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&chand->start_max_age_timer_after_init, |
||||
start_max_age_timer_after_init, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, |
||||
start_max_age_grace_timer_after_goaway_op, chand, |
||||
grpc_schedule_on_exec_ctx); |
||||
|
||||
if (chand->max_connection_age != grpc_core::Duration::Infinity()) { |
||||
/* When the channel reaches its max age, we send down an op with
|
||||
goaway_error set. However, we can't send down any ops until after the |
||||
channel stack is fully initialized. If we start the timer here, we have |
||||
no guarantee that the timer won't pop before channel stack initialization |
||||
is finished. To avoid that problem, we create a closure to start the |
||||
timer, and we schedule that closure to be run after call stack |
||||
initialization is done. */ |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||
"max_age start_max_age_timer_after_init"); |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
||||
&chand->start_max_age_timer_after_init, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
|
||||
/* Initialize the number of calls as 1, so that the max_idle_timer will not
|
||||
start until start_max_idle_timer_after_init is invoked. */ |
||||
gpr_atm_rel_store(&chand->call_count, 1); |
||||
if (chand->max_connection_idle != grpc_core::Duration::Infinity()) { |
||||
GRPC_CHANNEL_STACK_REF(chand->channel_stack, |
||||
"max_age start_max_idle_timer_after_init"); |
||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, |
||||
&chand->start_max_idle_timer_after_init, |
||||
GRPC_ERROR_NONE); |
||||
} |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
/* Destructor for channel_data. */ |
||||
static void max_age_destroy_channel_elem(grpc_channel_element* elem) { |
||||
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
||||
chand->~channel_data(); |
||||
} |
||||
|
||||
const grpc_channel_filter grpc_max_age_filter = { |
||||
grpc_call_next_op, |
||||
nullptr, |
||||
grpc_channel_next_op, |
||||
0, /* sizeof_call_data */ |
||||
max_age_init_call_elem, |
||||
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
||||
max_age_destroy_call_elem, |
||||
sizeof(channel_data), |
||||
max_age_init_channel_elem, |
||||
max_age_destroy_channel_elem, |
||||
grpc_channel_next_get_info, |
||||
"max_age"}; |
||||
|
||||
namespace grpc_core { |
||||
void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) { |
||||
builder->channel_init()->RegisterStage( |
||||
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, |
||||
[](ChannelStackBuilder* builder) { |
||||
const grpc_channel_args* channel_args = builder->channel_args(); |
||||
bool enable = grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find( |
||||
channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), |
||||
MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || |
||||
grpc_channel_arg_get_integer( |
||||
grpc_channel_args_find( |
||||
channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), |
||||
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; |
||||
if (enable) { |
||||
builder->PrependFilter(&grpc_max_age_filter, nullptr); |
||||
} |
||||
return true; |
||||
}); |
||||
} |
||||
} // namespace grpc_core
|
@ -1,26 +0,0 @@ |
||||
//
|
||||
// Copyright 2017 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
||||
#define GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
|
||||
extern const grpc_channel_filter grpc_max_age_filter; |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_MAX_AGE_MAX_AGE_FILTER_H */ |
Loading…
Reference in new issue