mirror of https://github.com/grpc/grpc.git
[call-v3] Idle/max-age filters (#35270)
Closes #35270
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35270 from ctiller:cg-idle 5312003ca6
PiperOrigin-RevId: 589913523
pull/35278/head
parent
a5c71e132e
commit
57dacad8c7
24 changed files with 558 additions and 21 deletions
@ -0,0 +1,326 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
|
||||
// mock transport operations.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" |
||||
|
||||
#include <functional> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/meta/type_traits.h" |
||||
#include "absl/random/random.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/impl/channel_arg_names.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/experiments/experiments.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/no_destruct.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/per_cpu.h" |
||||
#include "src/core/lib/gprpp/status_helper.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/promise.h" |
||||
#include "src/core/lib/promise/sleep.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/surface/channel_stack_type.h" |
||||
#include "src/core/lib/transport/http2_errors.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
// TODO(roth): This can go back to being a constant when the experiment
|
||||
// is removed.
|
||||
Duration DefaultIdleTimeout() { |
||||
if (IsClientIdlenessEnabled()) return Duration::Minutes(30); |
||||
return Duration::Infinity(); |
||||
} |
||||
|
||||
// If these settings change, make sure that we are not sending a GOAWAY for
|
||||
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
|
||||
const auto kDefaultMaxConnectionAge = Duration::Infinity(); |
||||
const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity(); |
||||
const auto kDefaultMaxConnectionIdle = Duration::Infinity(); |
||||
const auto kMaxConnectionAgeJitter = 0.1; |
||||
|
||||
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); |
||||
} // namespace
|
||||
|
||||
#define GRPC_IDLE_FILTER_LOG(format, ...) \ |
||||
do { \
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
|
||||
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
|
||||
} \
|
||||
} while (0) |
||||
|
||||
namespace { |
||||
|
||||
Duration GetClientIdleTimeout(const ChannelArgs& args) { |
||||
return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS) |
||||
.value_or(DefaultIdleTimeout()); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
struct LegacyMaxAgeFilter::Config { |
||||
Duration max_connection_age; |
||||
Duration max_connection_idle; |
||||
Duration max_connection_age_grace; |
||||
|
||||
bool enable() const { |
||||
return max_connection_age != Duration::Infinity() || |
||||
max_connection_idle != Duration::Infinity(); |
||||
} |
||||
|
||||
// A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and
|
||||
// MAX_CONNECTION_IDLE to spread out reconnection storms.
|
||||
static Config FromChannelArgs(const ChannelArgs& args) { |
||||
const Duration args_max_age = |
||||
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS) |
||||
.value_or(kDefaultMaxConnectionAge); |
||||
const Duration args_max_idle = |
||||
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS) |
||||
.value_or(kDefaultMaxConnectionIdle); |
||||
const Duration args_max_age_grace = |
||||
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS) |
||||
.value_or(kDefaultMaxConnectionAgeGrace); |
||||
// generate a random number between 1 - kMaxConnectionAgeJitter and
|
||||
// 1 + kMaxConnectionAgeJitter
|
||||
struct BitGen { |
||||
Mutex mu; |
||||
absl::BitGen bit_gen ABSL_GUARDED_BY(mu); |
||||
double MakeUniformDouble(double min, double max) { |
||||
MutexLock lock(&mu); |
||||
return absl::Uniform(bit_gen, min, max); |
||||
} |
||||
}; |
||||
static NoDestruct<PerCpu<BitGen>> bit_gen(PerCpuOptions().SetMaxShards(8)); |
||||
const double multiplier = bit_gen->this_cpu().MakeUniformDouble( |
||||
1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter); |
||||
// GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
|
||||
// will not be cast to int implicitly before the comparison.
|
||||
return Config{args_max_age * multiplier, args_max_idle * multiplier, |
||||
args_max_age_grace}; |
||||
} |
||||
}; |
||||
|
||||
absl::StatusOr<LegacyClientIdleFilter> LegacyClientIdleFilter::Create( |
||||
const ChannelArgs& args, ChannelFilter::Args filter_args) { |
||||
LegacyClientIdleFilter filter(filter_args.channel_stack(), |
||||
GetClientIdleTimeout(args)); |
||||
return absl::StatusOr<LegacyClientIdleFilter>(std::move(filter)); |
||||
} |
||||
|
||||
absl::StatusOr<LegacyMaxAgeFilter> LegacyMaxAgeFilter::Create( |
||||
const ChannelArgs& args, ChannelFilter::Args filter_args) { |
||||
LegacyMaxAgeFilter filter(filter_args.channel_stack(), |
||||
Config::FromChannelArgs(args)); |
||||
return absl::StatusOr<LegacyMaxAgeFilter>(std::move(filter)); |
||||
} |
||||
|
||||
void LegacyMaxAgeFilter::Shutdown() { |
||||
max_age_activity_.Reset(); |
||||
LegacyChannelIdleFilter::Shutdown(); |
||||
} |
||||
|
||||
void LegacyMaxAgeFilter::PostInit() { |
||||
struct StartupClosure { |
||||
RefCountedPtr<grpc_channel_stack> channel_stack; |
||||
LegacyMaxAgeFilter* filter; |
||||
grpc_closure closure; |
||||
}; |
||||
auto run_startup = [](void* p, grpc_error_handle) { |
||||
auto* startup = static_cast<StartupClosure*>(p); |
||||
// Trigger idle timer
|
||||
startup->filter->IncreaseCallCount(); |
||||
startup->filter->DecreaseCallCount(); |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->start_connectivity_watch.reset( |
||||
new ConnectivityWatcher(startup->filter)); |
||||
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; |
||||
grpc_channel_next_op( |
||||
grpc_channel_stack_element(startup->channel_stack.get(), 0), op); |
||||
delete startup; |
||||
}; |
||||
auto* startup = |
||||
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; |
||||
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); |
||||
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus()); |
||||
|
||||
auto channel_stack = this->channel_stack()->Ref(); |
||||
|
||||
// Start the max age timer
|
||||
if (max_connection_age_ != Duration::Infinity()) { |
||||
max_age_activity_.Set(MakeActivity( |
||||
TrySeq( |
||||
// First sleep until the max connection age
|
||||
Sleep(Timestamp::Now() + max_connection_age_), |
||||
// Then send a goaway.
|
||||
[this] { |
||||
GRPC_CHANNEL_STACK_REF(this->channel_stack(), |
||||
"max_age send_goaway"); |
||||
// Jump out of the activity to send the goaway.
|
||||
auto fn = [](void* arg, grpc_error_handle) { |
||||
auto* channel_stack = static_cast<grpc_channel_stack*>(arg); |
||||
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
||||
op->goaway_error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE("max_age"), |
||||
StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR); |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_element(channel_stack, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); |
||||
}; |
||||
ExecCtx::Run( |
||||
DEBUG_LOCATION, |
||||
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), |
||||
absl::OkStatus()); |
||||
return Immediate(absl::OkStatus()); |
||||
}, |
||||
// Sleep for the grace period
|
||||
[this] { |
||||
return Sleep(Timestamp::Now() + max_connection_age_grace_); |
||||
}), |
||||
ExecCtxWakeupScheduler(), |
||||
[channel_stack, this](absl::Status status) { |
||||
// OnDone -- close the connection if the promise completed
|
||||
// successfully.
|
||||
// (if it did not, it was cancelled)
|
||||
if (status.ok()) CloseChannel(); |
||||
}, |
||||
channel_stack->EventEngine())); |
||||
} |
||||
} |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> LegacyChannelIdleFilter::MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
||||
using Decrementer = |
||||
std::unique_ptr<LegacyChannelIdleFilter, CallCountDecreaser>; |
||||
IncreaseCallCount(); |
||||
return ArenaPromise<ServerMetadataHandle>( |
||||
[decrementer = Decrementer(this), |
||||
next = next_promise_factory(std::move(call_args))]() mutable |
||||
-> Poll<ServerMetadataHandle> { return next(); }); |
||||
} |
||||
|
||||
bool LegacyChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { |
||||
// Catch the disconnect_with_error transport op.
|
||||
if (!op->disconnect_with_error.ok()) Shutdown(); |
||||
// Pass the op to the next filter.
|
||||
return false; |
||||
} |
||||
|
||||
void LegacyChannelIdleFilter::Shutdown() { |
||||
// IncreaseCallCount() introduces a phony call and prevent the timer from
|
||||
// being reset by other threads.
|
||||
IncreaseCallCount(); |
||||
activity_.Reset(); |
||||
} |
||||
|
||||
void LegacyChannelIdleFilter::IncreaseCallCount() { |
||||
idle_filter_state_->IncreaseCallCount(); |
||||
} |
||||
|
||||
void LegacyChannelIdleFilter::DecreaseCallCount() { |
||||
if (idle_filter_state_->DecreaseCallCount()) { |
||||
// If there are no more calls in progress, start the idle timer.
|
||||
StartIdleTimer(); |
||||
} |
||||
} |
||||
|
||||
void LegacyChannelIdleFilter::StartIdleTimer() { |
||||
GRPC_IDLE_FILTER_LOG("timer has started"); |
||||
auto idle_filter_state = idle_filter_state_; |
||||
// Hold a ref to the channel stack for the timer callback.
|
||||
auto channel_stack = channel_stack_->Ref(); |
||||
auto timeout = client_idle_timeout_; |
||||
auto promise = Loop([timeout, idle_filter_state]() { |
||||
return TrySeq(Sleep(Timestamp::Now() + timeout), |
||||
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> { |
||||
if (idle_filter_state->CheckTimer()) { |
||||
return Continue{}; |
||||
} else { |
||||
return absl::OkStatus(); |
||||
} |
||||
}); |
||||
}); |
||||
activity_.Set(MakeActivity( |
||||
std::move(promise), ExecCtxWakeupScheduler{}, |
||||
[channel_stack, this](absl::Status status) { |
||||
if (status.ok()) CloseChannel(); |
||||
}, |
||||
channel_stack->EventEngine())); |
||||
} |
||||
|
||||
void LegacyChannelIdleFilter::CloseChannel() { |
||||
auto* op = grpc_make_transport_op(nullptr); |
||||
op->disconnect_with_error = grpc_error_set_int( |
||||
GRPC_ERROR_CREATE("enter idle"), |
||||
StatusIntProperty::ChannelConnectivityState, GRPC_CHANNEL_IDLE); |
||||
// Pass the transport op down to the channel stack.
|
||||
auto* elem = grpc_channel_stack_element(channel_stack_, 0); |
||||
elem->filter->start_transport_op(elem, op); |
||||
} |
||||
|
||||
const grpc_channel_filter LegacyClientIdleFilter::kFilter = |
||||
MakePromiseBasedFilter<LegacyClientIdleFilter, FilterEndpoint::kClient>( |
||||
"client_idle"); |
||||
const grpc_channel_filter LegacyMaxAgeFilter::kFilter = |
||||
MakePromiseBasedFilter<LegacyMaxAgeFilter, FilterEndpoint::kServer>( |
||||
"max_age"); |
||||
|
||||
void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) { |
||||
if (IsV3ChannelIdleFiltersEnabled()) return; |
||||
builder->channel_init() |
||||
->RegisterFilter(GRPC_CLIENT_CHANNEL, &LegacyClientIdleFilter::kFilter) |
||||
.ExcludeFromMinimalStack() |
||||
.If([](const ChannelArgs& channel_args) { |
||||
return GetClientIdleTimeout(channel_args) != Duration::Infinity(); |
||||
}); |
||||
builder->channel_init() |
||||
->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyMaxAgeFilter::kFilter) |
||||
.ExcludeFromMinimalStack() |
||||
.If([](const ChannelArgs& channel_args) { |
||||
return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args) |
||||
.enable(); |
||||
}); |
||||
} |
||||
|
||||
LegacyMaxAgeFilter::LegacyMaxAgeFilter(grpc_channel_stack* channel_stack, |
||||
const Config& max_age_config) |
||||
: LegacyChannelIdleFilter(channel_stack, |
||||
max_age_config.max_connection_idle), |
||||
max_connection_age_(max_age_config.max_connection_age), |
||||
max_connection_age_grace_(max_age_config.max_connection_age_grace) {} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,143 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H |
||||
#define GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/impl/connectivity_state.h> |
||||
|
||||
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/promise_based_filter.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/single_set_ptr.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/arena_promise.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class LegacyChannelIdleFilter : public ChannelFilter { |
||||
public: |
||||
~LegacyChannelIdleFilter() override = default; |
||||
|
||||
LegacyChannelIdleFilter(const LegacyChannelIdleFilter&) = delete; |
||||
LegacyChannelIdleFilter& operator=(const LegacyChannelIdleFilter&) = delete; |
||||
LegacyChannelIdleFilter(LegacyChannelIdleFilter&&) = default; |
||||
LegacyChannelIdleFilter& operator=(LegacyChannelIdleFilter&&) = default; |
||||
|
||||
// Construct a promise for one call.
|
||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
||||
|
||||
bool StartTransportOp(grpc_transport_op* op) override; |
||||
|
||||
protected: |
||||
using SingleSetActivityPtr = |
||||
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>; |
||||
|
||||
LegacyChannelIdleFilter(grpc_channel_stack* channel_stack, |
||||
Duration client_idle_timeout) |
||||
: channel_stack_(channel_stack), |
||||
client_idle_timeout_(client_idle_timeout) {} |
||||
|
||||
grpc_channel_stack* channel_stack() { return channel_stack_; }; |
||||
|
||||
virtual void Shutdown(); |
||||
void CloseChannel(); |
||||
|
||||
void IncreaseCallCount(); |
||||
void DecreaseCallCount(); |
||||
|
||||
private: |
||||
void StartIdleTimer(); |
||||
|
||||
struct CallCountDecreaser { |
||||
void operator()(LegacyChannelIdleFilter* filter) const { |
||||
filter->DecreaseCallCount(); |
||||
} |
||||
}; |
||||
|
||||
// The channel stack to which we take refs for pending callbacks.
|
||||
grpc_channel_stack* channel_stack_; |
||||
Duration client_idle_timeout_; |
||||
std::shared_ptr<IdleFilterState> idle_filter_state_{ |
||||
std::make_shared<IdleFilterState>(false)}; |
||||
|
||||
SingleSetActivityPtr activity_; |
||||
}; |
||||
|
||||
class LegacyClientIdleFilter final : public LegacyChannelIdleFilter { |
||||
public: |
||||
static const grpc_channel_filter kFilter; |
||||
|
||||
static absl::StatusOr<LegacyClientIdleFilter> Create( |
||||
const ChannelArgs& args, ChannelFilter::Args filter_args); |
||||
|
||||
private: |
||||
using LegacyChannelIdleFilter::LegacyChannelIdleFilter; |
||||
}; |
||||
|
||||
class LegacyMaxAgeFilter final : public LegacyChannelIdleFilter { |
||||
public: |
||||
static const grpc_channel_filter kFilter; |
||||
struct Config; |
||||
|
||||
static absl::StatusOr<LegacyMaxAgeFilter> Create( |
||||
const ChannelArgs& args, ChannelFilter::Args filter_args); |
||||
|
||||
void PostInit() override; |
||||
|
||||
private: |
||||
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { |
||||
public: |
||||
explicit ConnectivityWatcher(LegacyMaxAgeFilter* filter) |
||||
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} |
||||
~ConnectivityWatcher() override = default; |
||||
|
||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||
const absl::Status&) override { |
||||
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); |
||||
} |
||||
|
||||
private: |
||||
RefCountedPtr<grpc_channel_stack> channel_stack_; |
||||
LegacyMaxAgeFilter* filter_; |
||||
}; |
||||
|
||||
LegacyMaxAgeFilter(grpc_channel_stack* channel_stack, |
||||
const Config& max_age_config); |
||||
|
||||
void Shutdown() override; |
||||
|
||||
SingleSetActivityPtr max_age_activity_; |
||||
Duration max_connection_age_; |
||||
Duration max_connection_age_grace_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_LEGACY_CHANNEL_IDLE_FILTER_H
|
Loading…
Reference in new issue