pull/36732/head
Craig Tiller 11 months ago
parent 393745938e
commit 8cfce95667
  1. 2
      BUILD
  2. 131
      src/core/client_channel/client_channel.cc
  3. 4
      src/core/client_channel/client_channel.h
  4. 2
      src/core/client_channel/subchannel.h
  5. 316
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  6. 156
      src/core/ext/filters/channel_idle/channel_idle_filter.h
  7. 4
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
  8. 2
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h
  9. 12
      src/core/lib/channel/channel_args.h
  10. 4
      src/core/lib/surface/call.h
  11. 2
      src/core/lib/surface/channel.h
  12. 103
      src/core/lib/transport/call_spine.h
  13. 1
      tools/distrib/fix_build_deps.py

@ -1779,10 +1779,10 @@ grpc_cc_library(
"gpr",
"grpc_public_hdrs",
"grpc_trace",
"orphanable",
"ref_counted_ptr",
"stats",
"//src/core:arena",
"//src/core:call_arena_allocator",
"//src/core:channel_args",
"//src/core:channel_stack_type",
"//src/core:compression",

@ -48,9 +48,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/metrics.h>
#include "src/core/client_channel/backup_poller.h"
#include "src/core/client_channel/client_channel_channelz.h"
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/client_channel/client_channel_service_config.h"
#include "src/core/client_channel/config_selector.h"
@ -60,10 +60,8 @@
#include "src/core/client_channel/retry_filter.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/channel_idle/channel_idle_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/channel/status_util.h"
@ -78,7 +76,6 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
@ -113,6 +110,7 @@
#include "src/core/resolver/resolver_registry.h"
#include "src/core/service_config/service_config_call_data.h"
#include "src/core/service_config/service_config_impl.h"
#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
namespace grpc_core {
@ -207,7 +205,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
}
}
void Orphan() override {
void Orphaned() override {
// Make sure we clean up the channel's subchannel maps inside the
// WorkSerializer.
WeakRefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION,
@ -647,7 +645,7 @@ ClientCallTracer::CallAttemptTracer* GetCallAttemptTracerFromContext() {
// call tracker inside the LB call.
// FIXME: move this to its own file, register only when call v3
// experiment is enabled
class LbCallTracingFilter : public ImplementChannelFilter<LbCallTracingFilter> {
class LbCallTracingFilter {
public:
static absl::StatusOr<LbCallTracingFilter> Create(const ChannelArgs&,
ChannelFilter::Args) {
@ -674,13 +672,12 @@ class LbCallTracingFilter : public ImplementChannelFilter<LbCallTracingFilter> {
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
// FIXME(ctiller): Add this hook to the L1 filter API
void OnClientToServerMessagesClosed() {
void OnClientToServerHalfClose() {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
// TODO(roth): Change CallTracer API to not pass metadata
// batch to this method, since the batch is always empty.
grpc_metadata_batch metadata(GetContext<Arena>());
grpc_metadata_batch metadata;
tracer->RecordSendTrailingMetadata(&metadata);
}
@ -781,18 +778,18 @@ const NoInterceptor LbCallTracingFilter::Call::OnServerToClientMessage;
} // namespace
class ClientChannel::LoadBalancedCallDestination : public CallDestination {
class ClientChannel::LoadBalancedCallDestination : public UnstartedCallDestination {
public:
explicit LoadBalancedCallDestination(
RefCountedPtr<ClientChannel> client_channel)
: client_channel_(std::move(client_channel)) {}
void Orphan() override {}
void Orphaned() override {}
void StartCall(UnstartedCallHandler unstarted_handler) override {
// If there is a call tracer, create a call attempt tracer.
bool* is_transparent_retry_metadata =
unstarted_handler.UnprocessedClientInitialMetadata()->get_pointer(
unstarted_handler.UnprocessedClientInitialMetadata().get_pointer(
IsTransparentRetry());
bool is_transparent_retry = is_transparent_retry_metadata != nullptr
? *is_transparent_retry_metadata
@ -874,76 +871,6 @@ ClientChannelServiceConfigCallData* GetServiceConfigCallDataFromContext() {
legacy_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
}
// A call destination that does not support retries.
// To be used as an L2 filter.
class NoRetryCallDestination : public DelegatingCallDestination {
public:
NoRetryCallDestination(RefCountedPtr<CallDestination> next,
RefCountedPtr<CallFilters::Stack> filter_stack,
const ChannelArgs& channel_args)
: DelegatingCallDestination(std::move(next)),
filter_stack_(std::move(filter_stack)),
call_size_estimator_(1024),
allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()) {}
void Orphan() override {}
void StartCall(UnstartedCallHandler unstarted_handler) override {
// Start the parent call. We take ownership of the handler.
CallHandler handler = unstarted_handler.StartCall(filter_stack_);
// Start a promise to drain the client initial metadata from the
// parent call, create a new child call, and forward between them.
handler.SpawnGuarded(
"drain_send_initial_metadata",
[self = RefAsSubclass<NoRetryCallDestination>(), handler]() mutable {
return Map(
handler.PullClientInitialMetadata(),
[self = std::move(self),
handler](ValueOrFailure<ClientMetadataHandle>
client_initial_metadata) mutable -> StatusFlag {
if (!client_initial_metadata.ok()) return Failure{};
// Indicate that this is not a transparent retry.
*(*client_initial_metadata)
->GetOrCreatePointer(IsTransparentRetry()) = false;
// Set on_commit callback in context.
handler.SetContext<LbOnCommit>(
[]() { GetServiceConfigCallDataFromContext()->Commit(); });
// Create an arena for the child call.
const size_t initial_size =
self->call_size_estimator_.CallSizeEstimate();
// FIXME: do we want to do this for LB calls, or do we want a
// separate stat for this?
// global_stats().IncrementCallInitialSize(initial_size);
Arena* arena = Arena::Create(initial_size, &self->allocator_);
// Create an initiator/unstarted-handler pair using the arena.
// FIXME: pass in a callback that the CallSpine will use to
// destroy the arena:
// [](Arena* arena) {
// call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
// arena->Destroy();
// }
auto child_call = MakeCall(std::move(*client_initial_metadata),
GetContext<EventEngine>(), arena);
// Pass the child call's unstarted handler to the next
// destination.
self->wrapped_destination()->StartCall(
std::move(child_call.unstarted_handler));
// Forward everything from the parent call to the child call.
ForwardCall(std::move(handler),
std::move(child_call.initiator));
return Success{};
});
});
}
private:
RefCountedPtr<CallFilters::Stack> filter_stack_;
CallSizeEstimator call_size_estimator_;
MemoryAllocator allocator_;
};
} // namespace
//
@ -1049,7 +976,7 @@ ClientChannel::ClientChannel(
default_authority_ = std::move(*default_authority);
}
// Get stats plugins for channel.
StatsPlugin::ChannelScope scope(this->target(), default_authority_);
experimental:: StatsPluginChannelScope scope(this->target(), default_authority_);
stats_plugin_group_ =
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
}
@ -1268,18 +1195,18 @@ CallInitiator ClientChannel::CreateCall(
// Exit IDLE if needed.
CheckConnectivityState(/*try_to_connect=*/true);
// Create an initiator/unstarted-handler pair.
auto call = MakeCall(std::move(client_initial_metadata),
GetContext<EventEngine>(), arena);
auto call = MakeCallPair(std::move(client_initial_metadata),
GetContext<EventEngine>(), arena, nullptr, GetContext<grpc_call_context_element>());
// Spawn a promise to wait for the resolver result.
// This will eventually start the call.
call.initiator.SpawnGuarded(
"wait-for-name-resolution",
[self = RefAsSubclass<ClientChannel>(),
unstarted_handler = std::move(call.unstarted_handler),
unstarted_handler = std::move(call.handler),
was_queued = false]() mutable {
const bool wait_for_ready =
unstarted_handler.UnprocessedClientInitialMetadata()
->GetOrCreatePointer(WaitForReady())
.GetOrCreatePointer(WaitForReady())
->value;
return Map(
// Wait for the resolver result.
@ -1695,7 +1622,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
}
// Construct filter stack.
CallFilters::StackBuilder builder;
// TODO(roth): Add service_config to channel_args_.
InterceptionChainBuilder builder(channel_args_.SetObject(this));
if (idle_timeout_ != Duration::Zero()) {
builder.AddOnServerTrailingMetadata([this](ServerMetadata&) {
if (idle_state_.DecreaseCallCount()) StartIdleTimer();
@ -1712,20 +1640,19 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
DynamicFilters::Create(new_args, std::move(filters));
GPR_ASSERT(dynamic_filters != nullptr);
#endif
auto filter_stack = builder.Build();
// Create call destination.
RefCountedPtr<CallDestination> call_destination;
RefCountedPtr<UnstartedCallDestination> call_destination;
const bool enable_retries =
!channel_args_.WantMinimalStack() &&
channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
if (enable_retries) {
Crash("call v3 stack does not yet support retries");
} else {
call_destination = MakeRefCounted<NoRetryCallDestination>(
call_destination =
MakeRefCounted<LoadBalancedCallDestination>(
RefAsSubclass<ClientChannel>()),
std::move(filter_stack), channel_args_);
RefAsSubclass<ClientChannel>());
}
auto filter_stack = builder.Build(call_destination);
// Send result to data plane.
resolver_data_for_calls_.Set(ResolverDataForCalls{
std::move(config_selector), std::move(call_destination)});
@ -1794,7 +1721,7 @@ void ClientChannel::StartIdleTimer() {
absl::Status ClientChannel::ApplyServiceConfigToCall(
ConfigSelector& config_selector,
ClientMetadataHandle& client_initial_metadata) const {
ClientMetadata& client_initial_metadata) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "client_channel=%p: %sapplying service config to call",
this, GetContext<Activity>()->DebugTag().c_str());
@ -1809,7 +1736,7 @@ absl::Status ClientChannel::ApplyServiceConfigToCall(
GetContext<Arena>(), GetContext<grpc_call_context_element>());
// Use the ConfigSelector to determine the config for the call.
absl::Status call_config_status = config_selector.GetCallConfig(
{client_initial_metadata.get(), GetContext<Arena>(),
{&client_initial_metadata, GetContext<Arena>(),
service_config_call_data});
if (!call_config_status.ok()) {
return MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector");
@ -1822,16 +1749,16 @@ absl::Status ClientChannel::ApplyServiceConfigToCall(
// If the service config specifies a deadline, update the call's
// deadline timer.
if (method_params->timeout() != Duration::Zero()) {
CallContext* call_context = GetContext<CallContext>();
Call* call = GetContext<Call>();
const Timestamp per_method_deadline =
Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) +
Timestamp::FromCycleCounterRoundUp(call->start_time()) +
method_params->timeout();
call_context->UpdateDeadline(per_method_deadline);
call->UpdateDeadline(per_method_deadline);
}
// If the service config set wait_for_ready and the application
// did not explicitly set it, use the value from the service config.
auto* wait_for_ready =
client_initial_metadata->GetOrCreatePointer(WaitForReady());
client_initial_metadata.GetOrCreatePointer(WaitForReady());
if (method_params->wait_for_ready().has_value() &&
!wait_for_ready->explicitly_set) {
wait_for_ready->value = method_params->wait_for_ready().value();
@ -1870,12 +1797,12 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker,
auto& client_initial_metadata =
unstarted_handler.UnprocessedClientInitialMetadata();
LoadBalancingPolicy::PickArgs pick_args;
Slice* path = client_initial_metadata->get_pointer(HttpPathMetadata());
Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
pick_args.path = path->as_string_view();
LbCallState lb_call_state;
pick_args.call_state = &lb_call_state;
LbMetadata initial_metadata(client_initial_metadata.get());
LbMetadata initial_metadata(&client_initial_metadata);
pick_args.initial_metadata = &initial_metadata;
auto result = picker.Pick(pick_args);
// Handle result.
@ -1940,7 +1867,7 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker,
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if (!unstarted_handler.UnprocessedClientInitialMetadata()
->GetOrCreatePointer(WaitForReady())
.GetOrCreatePointer(WaitForReady())
->value) {
return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status),
"LB pick");

@ -154,7 +154,7 @@ class ClientChannel : public Channel {
// May modify call context and client_initial_metadata.
absl::Status ApplyServiceConfigToCall(
ConfigSelector& config_selector,
ClientMetadataHandle& client_initial_metadata) const;
ClientMetadata& client_initial_metadata) const;
// Does an LB pick for a call. Returns one of the following things:
// - Continue{}, meaning to queue the pick
@ -199,7 +199,7 @@ class ClientChannel : public Channel {
//
struct ResolverDataForCalls {
RefCountedPtr<ConfigSelector> config_selector;
RefCountedPtr<CallDestination> call_destination;
RefCountedPtr<UnstartedCallDestination> call_destination;
};
Observable<absl::StatusOr<ResolverDataForCalls>> resolver_data_for_calls_;

@ -64,7 +64,7 @@ namespace grpc_core {
class SubchannelCall;
class ConnectedSubchannel final : public RefCounted<ConnectedSubchannel> {
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {

@ -1,316 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
// mock transport operations.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/channel_idle/channel_idle_filter.h"
#include <functional>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
const NoInterceptor ChannelIdleFilter::Call::OnClientInitialMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnServerInitialMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnClientToServerMessage;
const NoInterceptor ChannelIdleFilter::Call::OnServerToClientMessage;
namespace {
// TODO(roth): This can go back to being a constant when the experiment
// is removed.
Duration DefaultIdleTimeout() {
if (IsClientIdlenessEnabled()) return Duration::Minutes(30);
return Duration::Infinity();
}
// If these settings change, make sure that we are not sending a GOAWAY for
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
const auto kDefaultMaxConnectionAge = Duration::Infinity();
const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity();
const auto kDefaultMaxConnectionIdle = Duration::Infinity();
const auto kMaxConnectionAgeJitter = 0.1;
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
} // namespace
#define GRPC_IDLE_FILTER_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
} \
} while (0)
Duration GetClientIdleTimeout(const ChannelArgs& args) {
return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
.value_or(DefaultIdleTimeout());
}
struct MaxAgeFilter::Config {
Duration max_connection_age;
Duration max_connection_idle;
Duration max_connection_age_grace;
bool enable() const {
return max_connection_age != Duration::Infinity() ||
max_connection_idle != Duration::Infinity();
}
// A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and
// MAX_CONNECTION_IDLE to spread out reconnection storms.
static Config FromChannelArgs(const ChannelArgs& args) {
const Duration args_max_age =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS)
.value_or(kDefaultMaxConnectionAge);
const Duration args_max_idle =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
.value_or(kDefaultMaxConnectionIdle);
const Duration args_max_age_grace =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)
.value_or(kDefaultMaxConnectionAgeGrace);
// generate a random number between 1 - kMaxConnectionAgeJitter and
// 1 + kMaxConnectionAgeJitter
struct BitGen {
Mutex mu;
absl::BitGen bit_gen ABSL_GUARDED_BY(mu);
double MakeUniformDouble(double min, double max) {
MutexLock lock(&mu);
return absl::Uniform(bit_gen, min, max);
}
};
static NoDestruct<PerCpu<BitGen>> bit_gen(PerCpuOptions().SetMaxShards(8));
const double multiplier = bit_gen->this_cpu().MakeUniformDouble(
1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter);
// GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
// will not be cast to int implicitly before the comparison.
return Config{args_max_age * multiplier, args_max_idle * multiplier,
args_max_age_grace};
}
};
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args));
return absl::StatusOr<MaxAgeFilter>(std::move(filter));
}
void MaxAgeFilter::Shutdown() {
max_age_activity_.Reset();
ChannelIdleFilter::Shutdown();
}
void MaxAgeFilter::PostInit() {
struct StartupClosure {
RefCountedPtr<grpc_channel_stack> channel_stack;
MaxAgeFilter* filter;
grpc_closure closure;
};
auto run_startup = [](void* p, grpc_error_handle) {
auto* startup = static_cast<StartupClosure*>(p);
// Trigger idle timer
startup->filter->IncreaseCallCount();
startup->filter->DecreaseCallCount();
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset(
new ConnectivityWatcher(startup->filter));
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_channel_next_op(
grpc_channel_stack_element(startup->channel_stack.get(), 0), op);
delete startup;
};
auto* startup =
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}};
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus());
auto channel_stack = this->channel_stack()->Ref();
// Start the max age timer
if (max_connection_age_ != Duration::Infinity()) {
max_age_activity_.Set(MakeActivity(
TrySeq(
// First sleep until the max connection age
Sleep(Timestamp::Now() + max_connection_age_),
// Then send a goaway.
[this] {
GRPC_CHANNEL_STACK_REF(this->channel_stack(),
"max_age send_goaway");
// Jump out of the activity to send the goaway.
auto fn = [](void* arg, grpc_error_handle) {
auto* channel_stack = static_cast<grpc_channel_stack*>(arg);
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error = grpc_error_set_int(
GRPC_ERROR_CREATE("max_age"),
StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR);
grpc_channel_element* elem =
grpc_channel_stack_element(channel_stack, 0);
elem->filter->start_transport_op(elem, op);
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway");
};
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr),
absl::OkStatus());
return Immediate(absl::OkStatus());
},
// Sleep for the grace period
[this] {
return Sleep(Timestamp::Now() + max_connection_age_grace_);
}),
ExecCtxWakeupScheduler(),
[channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed
// successfully.
// (if it did not, it was cancelled)
if (status.ok()) CloseChannel();
},
channel_stack->EventEngine()));
}
}
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) {
// Catch the disconnect_with_error transport op.
if (!op->disconnect_with_error.ok()) Shutdown();
// Pass the op to the next filter.
return false;
}
void ChannelIdleFilter::Shutdown() {
// IncreaseCallCount() introduces a phony call and prevent the timer from
// being reset by other threads.
IncreaseCallCount();
activity_.Reset();
}
void ChannelIdleFilter::IncreaseCallCount() {
idle_filter_state_->IncreaseCallCount();
}
void ChannelIdleFilter::DecreaseCallCount() {
if (idle_filter_state_->DecreaseCallCount()) {
// If there are no more calls in progress, start the idle timer.
StartIdleTimer();
}
}
void ChannelIdleFilter::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
auto idle_filter_state = idle_filter_state_;
// Hold a ref to the channel stack for the timer callback.
auto channel_stack = channel_stack_->Ref();
auto timeout = client_idle_timeout_;
auto promise = Loop([timeout, idle_filter_state]() {
return TrySeq(Sleep(Timestamp::Now() + timeout),
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
if (idle_filter_state->CheckTimer()) {
return Continue{};
} else {
return absl::OkStatus();
}
});
});
activity_.Set(MakeActivity(
std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
},
channel_stack->EventEngine()));
}
void ChannelIdleFilter::CloseChannel() {
auto* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE("enter idle"),
StatusIntProperty::ChannelConnectivityState, GRPC_CHANNEL_IDLE);
// Pass the transport op down to the channel stack.
auto* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
const grpc_channel_filter ClientIdleFilter::kFilter =
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>(
"client_idle");
const grpc_channel_filter MaxAgeFilter::kFilter =
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age");
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
GPR_ASSERT(MaxAgeFilter::kFilter.init_call != nullptr);
if (!IsV3ChannelIdleFiltersEnabled()) return;
if (!IsCallV3Enabled()) {
builder->channel_init()
->RegisterFilter<ClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
}
builder->channel_init()
->RegisterFilter<MaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable();
});
}
MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
const Config& max_age_config)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
} // namespace grpc_core

@ -1,156 +0,0 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H
#define GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/impl/connectivity_state.h>
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/single_set_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
Duration GetClientIdleTimeout(const ChannelArgs& args);
class ChannelIdleFilter : public ImplementChannelFilter<ChannelIdleFilter> {
public:
~ChannelIdleFilter() override = default;
ChannelIdleFilter(const ChannelIdleFilter&) = delete;
ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete;
ChannelIdleFilter(ChannelIdleFilter&&) = default;
ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default;
class Call {
public:
explicit Call(ChannelIdleFilter* filter) : filter_(filter) {
filter_->IncreaseCallCount();
}
~Call() { MaybeDecrement(); }
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
void OnFinalize(const grpc_call_final_info*) { MaybeDecrement(); }
private:
void MaybeDecrement() {
auto* filter = std::exchange(filter_, nullptr);
if (filter != nullptr) filter->DecreaseCallCount();
}
ChannelIdleFilter* filter_;
};
bool StartTransportOp(grpc_transport_op* op) override;
protected:
using SingleSetActivityPtr =
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>;
ChannelIdleFilter(grpc_channel_stack* channel_stack,
Duration client_idle_timeout)
: channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout) {}
grpc_channel_stack* channel_stack() { return channel_stack_; };
virtual void Shutdown();
void CloseChannel();
void IncreaseCallCount();
void DecreaseCallCount();
private:
void StartIdleTimer();
// The channel stack to which we take refs for pending callbacks.
grpc_channel_stack* channel_stack_;
Duration client_idle_timeout_;
std::shared_ptr<IdleFilterState> idle_filter_state_{
std::make_shared<IdleFilterState>(false)};
SingleSetActivityPtr activity_;
};
class ClientIdleFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientIdleFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
private:
using ChannelIdleFilter::ChannelIdleFilter;
};
class MaxAgeFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
struct Config;
static absl::StatusOr<MaxAgeFilter> Create(const ChannelArgs& args,
ChannelFilter::Args filter_args);
void PostInit() override;
private:
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(MaxAgeFilter* filter)
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {}
~ConnectivityWatcher() override = default;
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status&) override {
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown();
}
private:
RefCountedPtr<grpc_channel_stack> channel_stack_;
MaxAgeFilter* filter_;
};
MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config);
void Shutdown() override;
SingleSetActivityPtr max_age_activity_;
Duration max_connection_age_;
Duration max_connection_age_grace_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H

@ -78,15 +78,11 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
} \
} while (0)
namespace {
Duration GetClientIdleTimeout(const ChannelArgs& args) {
return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
.value_or(kDefaultIdleTimeout);
}
} // namespace
struct LegacyMaxAgeFilter::Config {
Duration max_connection_age;
Duration max_connection_idle;

@ -40,6 +40,8 @@
namespace grpc_core {
Duration GetClientIdleTimeout(const ChannelArgs& args) ;
class LegacyChannelIdleFilter : public ChannelFilter {
public:
LegacyChannelIdleFilter(grpc_channel_stack* channel_stack,

@ -83,6 +83,15 @@ inline int PointerCompare(void* a_ptr, const grpc_arg_pointer_vtable* a_vtable,
// before the crt refcount base class.
template <typename T>
using RefType = absl::remove_cvref_t<decltype(*std::declval<T>().Ref())>;
template <typename T, typename Ignored = void /* for SFINAE */>
struct IsRawPointerTagged {
static constexpr bool kValue = false;
};
template <typename T>
struct IsRawPointerTagged<T, absl::void_t<typename T::RawPointerChannelArgTag>> {
static constexpr bool kValue = true;
};
} // namespace channel_args_detail
// Specialization for ref-counted pointers.
@ -91,13 +100,14 @@ using RefType = absl::remove_cvref_t<decltype(*std::declval<T>().Ref())>;
template <typename T>
struct ChannelArgTypeTraits<
T, absl::enable_if_t<
!channel_args_detail::IsRawPointerTagged<T>::kValue &&(
std::is_base_of<RefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<RefCounted<channel_args_detail::RefType<T>,
NonPolymorphicRefCount>,
channel_args_detail::RefType<T>>::value ||
std::is_base_of<DualRefCounted<channel_args_detail::RefType<T>>,
channel_args_detail::RefType<T>>::value,
channel_args_detail::RefType<T>>::value),
void>> {
static const grpc_arg_pointer_vtable* VTable() {
static const grpc_arg_pointer_vtable tbl = {

@ -130,6 +130,8 @@ class Call : public CppImplOf<Call, grpc_call>,
// Implementation of EventEngine::Closure, called when deadline expires
void Run() final;
gpr_cycle_counter start_time() const { return start_time_; }
protected:
// The maximum number of concurrent batches possible.
// Based upon the maximum number of individually queueable ops in the batch
@ -209,8 +211,6 @@ class Call : public CppImplOf<Call, grpc_call>,
void HandleCompressionAlgorithmNotAccepted(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
gpr_cycle_counter start_time() const { return start_time_; }
virtual grpc_compression_options compression_options() = 0;
private:

@ -43,8 +43,8 @@
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/call_arena_allocator.h"
// Forward declaration to avoid dependency loop.
struct grpc_channel_stack;

@ -616,109 +616,6 @@ class UnstartedCallHandler {
RefCountedPtr<CallSpineInterface> spine_;
};
class UnstartedCallHandler;
// CallDestination is responsible for starting an UnstartedCallHandler
// and then processing operations on the resulting CallHandler.
//
// Examples of CallDestinations include:
// - a client transport
// - the server API
// - a load-balanced call in the client channel
// - a hijacking filter (see DelegatingCallDestination below)
//
// FIXME: do we want this to be ref-counted? that might not be
// desirable for the hijacking filter case, where we want the filter stack
// to own the filter rather than having every call take its own ref to every
// hijacking filter.
class CallDestination : public DualRefCounted<CallDestination> {
public:
virtual void StartCall(UnstartedCallHandler unstarted_call_handler) = 0;
};
// A delegating CallDestination for use as a hijacking filter.
// Implementations may look at the unprocessed initial metadata
// and decide to do one of two things:
//
// 1. It can be a no-op. In this case, it will simply pass the
// unstarted_call_handler to the wrapped CallDestination.
//
// 2. It can hijack the call by doing the following:
// - Start unstarted_call_handler and take ownership of the
// resulting handler.
// - Create a new CallInitiator/UnstartedCallHandler pair, and pass
// that new UnstartedCallHandler down to the wrapped CallDestination.
// - The implementation is then responsible for forwarding between
// the started handler and the new initiator. Note that in
// simple cases, this can be done via ForwardCall().
class DelegatingCallDestination : public CallDestination {
protected:
explicit DelegatingCallDestination(
RefCountedPtr<CallDestination> wrapped_destination)
: wrapped_destination_(std::move(wrapped_destination)) {}
CallDestination* wrapped_destination() const {
return wrapped_destination_.get();
}
private:
RefCountedPtr<CallDestination> wrapped_destination_;
};
class UnstartedCallHandler {
public:
UnstartedCallHandler(RefCountedPtr<CallSpineInterface> spine,
ClientMetadataHandle client_initial_metadata)
: spine_(std::move(spine)) {
spine_->SpawnGuarded(
"send_initial_metadata",
[client_initial_metadata = std::move(client_initial_metadata),
spine = spine_]() mutable {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine->party());
return Map(spine->client_initial_metadata().sender.Push(
std::move(client_initial_metadata)),
[](bool ok) { return StatusFlag(ok); });
});
}
// Returns the client initial metadata, which has not yet been
// processed by the stack that will ultimately be used for this call.
ClientMetadataHandle& UnprocessedClientInitialMetadata();
// Starts the call using the specified stack.
// This must be called only once, and the UnstartedCallHandler object
// may not be used after this is called.
CallHandler StartCall(RefCountedPtr<CallFilters::Stack> stack);
template <typename ContextType>
void SetContext(ContextType context) {
// FIXME: implement
}
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
}
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}
template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
private:
RefCountedPtr<CallSpineInterface> spine_;
};
struct CallInitiatorAndHandler {
CallInitiator initiator;
UnstartedCallHandler handler;

@ -64,6 +64,7 @@ EXTERNAL_DEPS = {
"absl/functional/bind_front.h": "absl/functional:bind_front",
"absl/functional/function_ref.h": "absl/functional:function_ref",
"absl/hash/hash.h": "absl/hash",
"absl/log/check.h": "absl/log:check",
"absl/log/log.h": "absl/log",
"absl/memory/memory.h": "absl/memory",
"absl/meta/type_traits.h": "absl/meta:type_traits",

Loading…
Cancel
Save