diff --git a/BUILD b/BUILD index 136396788ec..669c3b5ef3b 100644 --- a/BUILD +++ b/BUILD @@ -1779,10 +1779,10 @@ grpc_cc_library( "gpr", "grpc_public_hdrs", "grpc_trace", - "orphanable", "ref_counted_ptr", "stats", "//src/core:arena", + "//src/core:call_arena_allocator", "//src/core:channel_args", "//src/core:channel_stack_type", "//src/core:compression", diff --git a/src/core/client_channel/client_channel.cc b/src/core/client_channel/client_channel.cc index 5e6ed05c008..bab61b78891 100644 --- a/src/core/client_channel/client_channel.cc +++ b/src/core/client_channel/client_channel.cc @@ -48,9 +48,9 @@ #include #include #include +#include #include "src/core/client_channel/backup_poller.h" -#include "src/core/client_channel/client_channel_channelz.h" #include "src/core/client_channel/client_channel_internal.h" #include "src/core/client_channel/client_channel_service_config.h" #include "src/core/client_channel/config_selector.h" @@ -60,10 +60,8 @@ #include "src/core/client_channel/retry_filter.h" #include "src/core/client_channel/subchannel.h" #include "src/core/client_channel/subchannel_interface_internal.h" -#include "src/core/ext/filters/channel_idle/channel_idle_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/metrics.h" #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/channel/status_util.h" @@ -78,7 +76,6 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/handshaker/proxy_mapper_registry.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/json/json.h" @@ -113,6 +110,7 @@ #include "src/core/resolver/resolver_registry.h" #include "src/core/service_config/service_config_call_data.h" #include "src/core/service_config/service_config_impl.h" +#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" namespace grpc_core { @@ -207,7 +205,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { } } - void Orphan() override { + void Orphaned() override { // Make sure we clean up the channel's subchannel maps inside the // WorkSerializer. WeakRefAsSubclass(DEBUG_LOCATION, @@ -647,7 +645,7 @@ ClientCallTracer::CallAttemptTracer* GetCallAttemptTracerFromContext() { // call tracker inside the LB call. // FIXME: move this to its own file, register only when call v3 // experiment is enabled -class LbCallTracingFilter : public ImplementChannelFilter { +class LbCallTracingFilter { public: static absl::StatusOr Create(const ChannelArgs&, ChannelFilter::Args) { @@ -674,13 +672,12 @@ class LbCallTracingFilter : public ImplementChannelFilter { static const NoInterceptor OnClientToServerMessage; static const NoInterceptor OnServerToClientMessage; - // FIXME(ctiller): Add this hook to the L1 filter API - void OnClientToServerMessagesClosed() { + void OnClientToServerHalfClose() { auto* tracer = GetCallAttemptTracerFromContext(); if (tracer == nullptr) return; // TODO(roth): Change CallTracer API to not pass metadata // batch to this method, since the batch is always empty. - grpc_metadata_batch metadata(GetContext()); + grpc_metadata_batch metadata; tracer->RecordSendTrailingMetadata(&metadata); } @@ -781,18 +778,18 @@ const NoInterceptor LbCallTracingFilter::Call::OnServerToClientMessage; } // namespace -class ClientChannel::LoadBalancedCallDestination : public CallDestination { +class ClientChannel::LoadBalancedCallDestination : public UnstartedCallDestination { public: explicit LoadBalancedCallDestination( RefCountedPtr client_channel) : client_channel_(std::move(client_channel)) {} - void Orphan() override {} + void Orphaned() override {} void StartCall(UnstartedCallHandler unstarted_handler) override { // If there is a call tracer, create a call attempt tracer. bool* is_transparent_retry_metadata = - unstarted_handler.UnprocessedClientInitialMetadata()->get_pointer( + unstarted_handler.UnprocessedClientInitialMetadata().get_pointer( IsTransparentRetry()); bool is_transparent_retry = is_transparent_retry_metadata != nullptr ? *is_transparent_retry_metadata @@ -874,76 +871,6 @@ ClientChannelServiceConfigCallData* GetServiceConfigCallDataFromContext() { legacy_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); } -// A call destination that does not support retries. -// To be used as an L2 filter. -class NoRetryCallDestination : public DelegatingCallDestination { - public: - NoRetryCallDestination(RefCountedPtr next, - RefCountedPtr filter_stack, - const ChannelArgs& channel_args) - : DelegatingCallDestination(std::move(next)), - filter_stack_(std::move(filter_stack)), - call_size_estimator_(1024), - allocator_(channel_args.GetObject() - ->memory_quota() - ->CreateMemoryOwner()) {} - - void Orphan() override {} - - void StartCall(UnstartedCallHandler unstarted_handler) override { - // Start the parent call. We take ownership of the handler. - CallHandler handler = unstarted_handler.StartCall(filter_stack_); - // Start a promise to drain the client initial metadata from the - // parent call, create a new child call, and forward between them. - handler.SpawnGuarded( - "drain_send_initial_metadata", - [self = RefAsSubclass(), handler]() mutable { - return Map( - handler.PullClientInitialMetadata(), - [self = std::move(self), - handler](ValueOrFailure - client_initial_metadata) mutable -> StatusFlag { - if (!client_initial_metadata.ok()) return Failure{}; - // Indicate that this is not a transparent retry. - *(*client_initial_metadata) - ->GetOrCreatePointer(IsTransparentRetry()) = false; - // Set on_commit callback in context. - handler.SetContext( - []() { GetServiceConfigCallDataFromContext()->Commit(); }); - // Create an arena for the child call. - const size_t initial_size = - self->call_size_estimator_.CallSizeEstimate(); - // FIXME: do we want to do this for LB calls, or do we want a - // separate stat for this? - // global_stats().IncrementCallInitialSize(initial_size); - Arena* arena = Arena::Create(initial_size, &self->allocator_); - // Create an initiator/unstarted-handler pair using the arena. - // FIXME: pass in a callback that the CallSpine will use to - // destroy the arena: - // [](Arena* arena) { - // call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes()); - // arena->Destroy(); - // } - auto child_call = MakeCall(std::move(*client_initial_metadata), - GetContext(), arena); - // Pass the child call's unstarted handler to the next - // destination. - self->wrapped_destination()->StartCall( - std::move(child_call.unstarted_handler)); - // Forward everything from the parent call to the child call. - ForwardCall(std::move(handler), - std::move(child_call.initiator)); - return Success{}; - }); - }); - } - - private: - RefCountedPtr filter_stack_; - CallSizeEstimator call_size_estimator_; - MemoryAllocator allocator_; -}; - } // namespace // @@ -1049,7 +976,7 @@ ClientChannel::ClientChannel( default_authority_ = std::move(*default_authority); } // Get stats plugins for channel. - StatsPlugin::ChannelScope scope(this->target(), default_authority_); +experimental:: StatsPluginChannelScope scope(this->target(), default_authority_); stats_plugin_group_ = GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope); } @@ -1268,18 +1195,18 @@ CallInitiator ClientChannel::CreateCall( // Exit IDLE if needed. CheckConnectivityState(/*try_to_connect=*/true); // Create an initiator/unstarted-handler pair. - auto call = MakeCall(std::move(client_initial_metadata), - GetContext(), arena); + auto call = MakeCallPair(std::move(client_initial_metadata), + GetContext(), arena, nullptr, GetContext()); // Spawn a promise to wait for the resolver result. // This will eventually start the call. call.initiator.SpawnGuarded( "wait-for-name-resolution", [self = RefAsSubclass(), - unstarted_handler = std::move(call.unstarted_handler), + unstarted_handler = std::move(call.handler), was_queued = false]() mutable { const bool wait_for_ready = unstarted_handler.UnprocessedClientInitialMetadata() - ->GetOrCreatePointer(WaitForReady()) + .GetOrCreatePointer(WaitForReady()) ->value; return Map( // Wait for the resolver result. @@ -1695,7 +1622,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { MakeRefCounted(saved_service_config_); } // Construct filter stack. - CallFilters::StackBuilder builder; + // TODO(roth): Add service_config to channel_args_. + InterceptionChainBuilder builder(channel_args_.SetObject(this)); if (idle_timeout_ != Duration::Zero()) { builder.AddOnServerTrailingMetadata([this](ServerMetadata&) { if (idle_state_.DecreaseCallCount()) StartIdleTimer(); @@ -1712,20 +1640,19 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { DynamicFilters::Create(new_args, std::move(filters)); GPR_ASSERT(dynamic_filters != nullptr); #endif - auto filter_stack = builder.Build(); // Create call destination. - RefCountedPtr call_destination; + RefCountedPtr call_destination; const bool enable_retries = !channel_args_.WantMinimalStack() && channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true); if (enable_retries) { Crash("call v3 stack does not yet support retries"); } else { - call_destination = MakeRefCounted( + call_destination = MakeRefCounted( - RefAsSubclass()), - std::move(filter_stack), channel_args_); + RefAsSubclass()); } + auto filter_stack = builder.Build(call_destination); // Send result to data plane. resolver_data_for_calls_.Set(ResolverDataForCalls{ std::move(config_selector), std::move(call_destination)}); @@ -1794,7 +1721,7 @@ void ClientChannel::StartIdleTimer() { absl::Status ClientChannel::ApplyServiceConfigToCall( ConfigSelector& config_selector, - ClientMetadataHandle& client_initial_metadata) const { + ClientMetadata& client_initial_metadata) const { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "client_channel=%p: %sapplying service config to call", this, GetContext()->DebugTag().c_str()); @@ -1809,7 +1736,7 @@ absl::Status ClientChannel::ApplyServiceConfigToCall( GetContext(), GetContext()); // Use the ConfigSelector to determine the config for the call. absl::Status call_config_status = config_selector.GetCallConfig( - {client_initial_metadata.get(), GetContext(), + {&client_initial_metadata, GetContext(), service_config_call_data}); if (!call_config_status.ok()) { return MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"); @@ -1822,16 +1749,16 @@ absl::Status ClientChannel::ApplyServiceConfigToCall( // If the service config specifies a deadline, update the call's // deadline timer. if (method_params->timeout() != Duration::Zero()) { - CallContext* call_context = GetContext(); + Call* call = GetContext(); const Timestamp per_method_deadline = - Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) + + Timestamp::FromCycleCounterRoundUp(call->start_time()) + method_params->timeout(); - call_context->UpdateDeadline(per_method_deadline); + call->UpdateDeadline(per_method_deadline); } // If the service config set wait_for_ready and the application // did not explicitly set it, use the value from the service config. auto* wait_for_ready = - client_initial_metadata->GetOrCreatePointer(WaitForReady()); + client_initial_metadata.GetOrCreatePointer(WaitForReady()); if (method_params->wait_for_ready().has_value() && !wait_for_ready->explicitly_set) { wait_for_ready->value = method_params->wait_for_ready().value(); @@ -1870,12 +1797,12 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, auto& client_initial_metadata = unstarted_handler.UnprocessedClientInitialMetadata(); LoadBalancingPolicy::PickArgs pick_args; - Slice* path = client_initial_metadata->get_pointer(HttpPathMetadata()); + Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata()); GPR_ASSERT(path != nullptr); pick_args.path = path->as_string_view(); LbCallState lb_call_state; pick_args.call_state = &lb_call_state; - LbMetadata initial_metadata(client_initial_metadata.get()); + LbMetadata initial_metadata(&client_initial_metadata); pick_args.initial_metadata = &initial_metadata; auto result = picker.Pick(pick_args); // Handle result. @@ -1940,7 +1867,7 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, // If wait_for_ready is false, then the error indicates the RPC // attempt's final status. if (!unstarted_handler.UnprocessedClientInitialMetadata() - ->GetOrCreatePointer(WaitForReady()) + .GetOrCreatePointer(WaitForReady()) ->value) { return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status), "LB pick"); diff --git a/src/core/client_channel/client_channel.h b/src/core/client_channel/client_channel.h index 01f37c1e38b..900f0fa807a 100644 --- a/src/core/client_channel/client_channel.h +++ b/src/core/client_channel/client_channel.h @@ -154,7 +154,7 @@ class ClientChannel : public Channel { // May modify call context and client_initial_metadata. absl::Status ApplyServiceConfigToCall( ConfigSelector& config_selector, - ClientMetadataHandle& client_initial_metadata) const; + ClientMetadata& client_initial_metadata) const; // Does an LB pick for a call. Returns one of the following things: // - Continue{}, meaning to queue the pick @@ -199,7 +199,7 @@ class ClientChannel : public Channel { // struct ResolverDataForCalls { RefCountedPtr config_selector; - RefCountedPtr call_destination; + RefCountedPtr call_destination; }; Observable> resolver_data_for_calls_; diff --git a/src/core/client_channel/subchannel.h b/src/core/client_channel/subchannel.h index 35247d3af2f..832179c6005 100644 --- a/src/core/client_channel/subchannel.h +++ b/src/core/client_channel/subchannel.h @@ -64,7 +64,7 @@ namespace grpc_core { class SubchannelCall; -class ConnectedSubchannel final : public RefCounted { +class ConnectedSubchannel : public RefCounted { public: const ChannelArgs& args() const { return args_; } channelz::SubchannelNode* channelz_subchannel() const { diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.cc b/src/core/ext/filters/channel_idle/channel_idle_filter.cc deleted file mode 100644 index 6f90704a04f..00000000000 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.cc +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// TODO(ctiller): Add a unit test suite for these filters once it's practical to -// mock transport operations. - -#include - -#include "src/core/ext/filters/channel_idle/channel_idle_filter.h" - -#include -#include - -#include "absl/base/thread_annotations.h" -#include "absl/meta/type_traits.h" -#include "absl/random/random.h" -#include "absl/types/optional.h" - -#include -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/promise_based_filter.h" -#include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/experiments/experiments.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/no_destruct.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/per_cpu.h" -#include "src/core/lib/gprpp/status_helper.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h" -#include "src/core/lib/promise/loop.h" -#include "src/core/lib/promise/poll.h" -#include "src/core/lib/promise/promise.h" -#include "src/core/lib/promise/sleep.h" -#include "src/core/lib/promise/try_seq.h" -#include "src/core/lib/surface/channel_stack_type.h" -#include "src/core/lib/transport/http2_errors.h" -#include "src/core/lib/transport/metadata_batch.h" - -namespace grpc_core { - -const NoInterceptor ChannelIdleFilter::Call::OnClientInitialMetadata; -const NoInterceptor ChannelIdleFilter::Call::OnServerInitialMetadata; -const NoInterceptor ChannelIdleFilter::Call::OnServerTrailingMetadata; -const NoInterceptor ChannelIdleFilter::Call::OnClientToServerMessage; -const NoInterceptor ChannelIdleFilter::Call::OnServerToClientMessage; - -namespace { - -// TODO(roth): This can go back to being a constant when the experiment -// is removed. -Duration DefaultIdleTimeout() { - if (IsClientIdlenessEnabled()) return Duration::Minutes(30); - return Duration::Infinity(); -} - -// If these settings change, make sure that we are not sending a GOAWAY for -// inproc transport, since a GOAWAY to inproc ends up destroying the transport. -const auto kDefaultMaxConnectionAge = Duration::Infinity(); -const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity(); -const auto kDefaultMaxConnectionIdle = Duration::Infinity(); -const auto kMaxConnectionAgeJitter = 0.1; - -TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); -} // namespace - -#define GRPC_IDLE_FILTER_LOG(format, ...) \ - do { \ - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \ - gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \ - } \ - } while (0) - -Duration GetClientIdleTimeout(const ChannelArgs& args) { - return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS) - .value_or(DefaultIdleTimeout()); -} - -struct MaxAgeFilter::Config { - Duration max_connection_age; - Duration max_connection_idle; - Duration max_connection_age_grace; - - bool enable() const { - return max_connection_age != Duration::Infinity() || - max_connection_idle != Duration::Infinity(); - } - - // A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and - // MAX_CONNECTION_IDLE to spread out reconnection storms. - static Config FromChannelArgs(const ChannelArgs& args) { - const Duration args_max_age = - args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS) - .value_or(kDefaultMaxConnectionAge); - const Duration args_max_idle = - args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS) - .value_or(kDefaultMaxConnectionIdle); - const Duration args_max_age_grace = - args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS) - .value_or(kDefaultMaxConnectionAgeGrace); - // generate a random number between 1 - kMaxConnectionAgeJitter and - // 1 + kMaxConnectionAgeJitter - struct BitGen { - Mutex mu; - absl::BitGen bit_gen ABSL_GUARDED_BY(mu); - double MakeUniformDouble(double min, double max) { - MutexLock lock(&mu); - return absl::Uniform(bit_gen, min, max); - } - }; - static NoDestruct> bit_gen(PerCpuOptions().SetMaxShards(8)); - const double multiplier = bit_gen->this_cpu().MakeUniformDouble( - 1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter); - // GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result - // will not be cast to int implicitly before the comparison. - return Config{args_max_age * multiplier, args_max_idle * multiplier, - args_max_age_grace}; - } -}; - -absl::StatusOr ClientIdleFilter::Create( - const ChannelArgs& args, ChannelFilter::Args filter_args) { - ClientIdleFilter filter(filter_args.channel_stack(), - GetClientIdleTimeout(args)); - return absl::StatusOr(std::move(filter)); -} - -absl::StatusOr MaxAgeFilter::Create( - const ChannelArgs& args, ChannelFilter::Args filter_args) { - MaxAgeFilter filter(filter_args.channel_stack(), - Config::FromChannelArgs(args)); - return absl::StatusOr(std::move(filter)); -} - -void MaxAgeFilter::Shutdown() { - max_age_activity_.Reset(); - ChannelIdleFilter::Shutdown(); -} - -void MaxAgeFilter::PostInit() { - struct StartupClosure { - RefCountedPtr channel_stack; - MaxAgeFilter* filter; - grpc_closure closure; - }; - auto run_startup = [](void* p, grpc_error_handle) { - auto* startup = static_cast(p); - // Trigger idle timer - startup->filter->IncreaseCallCount(); - startup->filter->DecreaseCallCount(); - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->start_connectivity_watch.reset( - new ConnectivityWatcher(startup->filter)); - op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; - grpc_channel_next_op( - grpc_channel_stack_element(startup->channel_stack.get(), 0), op); - delete startup; - }; - auto* startup = - new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}}; - GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr); - ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus()); - - auto channel_stack = this->channel_stack()->Ref(); - - // Start the max age timer - if (max_connection_age_ != Duration::Infinity()) { - max_age_activity_.Set(MakeActivity( - TrySeq( - // First sleep until the max connection age - Sleep(Timestamp::Now() + max_connection_age_), - // Then send a goaway. - [this] { - GRPC_CHANNEL_STACK_REF(this->channel_stack(), - "max_age send_goaway"); - // Jump out of the activity to send the goaway. - auto fn = [](void* arg, grpc_error_handle) { - auto* channel_stack = static_cast(arg); - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->goaway_error = grpc_error_set_int( - GRPC_ERROR_CREATE("max_age"), - StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR); - grpc_channel_element* elem = - grpc_channel_stack_element(channel_stack, 0); - elem->filter->start_transport_op(elem, op); - GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway"); - }; - ExecCtx::Run( - DEBUG_LOCATION, - GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr), - absl::OkStatus()); - return Immediate(absl::OkStatus()); - }, - // Sleep for the grace period - [this] { - return Sleep(Timestamp::Now() + max_connection_age_grace_); - }), - ExecCtxWakeupScheduler(), - [channel_stack, this](absl::Status status) { - // OnDone -- close the connection if the promise completed - // successfully. - // (if it did not, it was cancelled) - if (status.ok()) CloseChannel(); - }, - channel_stack->EventEngine())); - } -} - -bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) { - // Catch the disconnect_with_error transport op. - if (!op->disconnect_with_error.ok()) Shutdown(); - // Pass the op to the next filter. - return false; -} - -void ChannelIdleFilter::Shutdown() { - // IncreaseCallCount() introduces a phony call and prevent the timer from - // being reset by other threads. - IncreaseCallCount(); - activity_.Reset(); -} - -void ChannelIdleFilter::IncreaseCallCount() { - idle_filter_state_->IncreaseCallCount(); -} - -void ChannelIdleFilter::DecreaseCallCount() { - if (idle_filter_state_->DecreaseCallCount()) { - // If there are no more calls in progress, start the idle timer. - StartIdleTimer(); - } -} - -void ChannelIdleFilter::StartIdleTimer() { - GRPC_IDLE_FILTER_LOG("timer has started"); - auto idle_filter_state = idle_filter_state_; - // Hold a ref to the channel stack for the timer callback. - auto channel_stack = channel_stack_->Ref(); - auto timeout = client_idle_timeout_; - auto promise = Loop([timeout, idle_filter_state]() { - return TrySeq(Sleep(Timestamp::Now() + timeout), - [idle_filter_state]() -> Poll> { - if (idle_filter_state->CheckTimer()) { - return Continue{}; - } else { - return absl::OkStatus(); - } - }); - }); - activity_.Set(MakeActivity( - std::move(promise), ExecCtxWakeupScheduler{}, - [channel_stack, this](absl::Status status) { - if (status.ok()) CloseChannel(); - }, - channel_stack->EventEngine())); -} - -void ChannelIdleFilter::CloseChannel() { - auto* op = grpc_make_transport_op(nullptr); - op->disconnect_with_error = grpc_error_set_int( - GRPC_ERROR_CREATE("enter idle"), - StatusIntProperty::ChannelConnectivityState, GRPC_CHANNEL_IDLE); - // Pass the transport op down to the channel stack. - auto* elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} - -const grpc_channel_filter ClientIdleFilter::kFilter = - MakePromiseBasedFilter( - "client_idle"); -const grpc_channel_filter MaxAgeFilter::kFilter = - MakePromiseBasedFilter("max_age"); - -void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) { - GPR_ASSERT(MaxAgeFilter::kFilter.init_call != nullptr); - if (!IsV3ChannelIdleFiltersEnabled()) return; - if (!IsCallV3Enabled()) { - builder->channel_init() - ->RegisterFilter(GRPC_CLIENT_CHANNEL) - .ExcludeFromMinimalStack() - .If([](const ChannelArgs& channel_args) { - return GetClientIdleTimeout(channel_args) != Duration::Infinity(); - }); - } - builder->channel_init() - ->RegisterFilter(GRPC_SERVER_CHANNEL) - .ExcludeFromMinimalStack() - .If([](const ChannelArgs& channel_args) { - return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable(); - }); -} - -MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack, - const Config& max_age_config) - : ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle), - max_connection_age_(max_age_config.max_connection_age), - max_connection_age_grace_(max_age_config.max_connection_age_grace) {} - -} // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/channel_idle_filter.h b/src/core/ext/filters/channel_idle/channel_idle_filter.h deleted file mode 100644 index f4f02f71107..00000000000 --- a/src/core/ext/filters/channel_idle/channel_idle_filter.h +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H -#define GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H - -#include - -#include - -#include "absl/status/status.h" -#include "absl/status/statusor.h" - -#include - -#include "src/core/ext/filters/channel_idle/idle_filter_state.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/promise_based_filter.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/single_set_ptr.h" -#include "src/core/lib/gprpp/time.h" -#include "src/core/lib/promise/activity.h" -#include "src/core/lib/promise/arena_promise.h" -#include "src/core/lib/transport/connectivity_state.h" -#include "src/core/lib/transport/transport.h" - -namespace grpc_core { - -Duration GetClientIdleTimeout(const ChannelArgs& args); - -class ChannelIdleFilter : public ImplementChannelFilter { - public: - ~ChannelIdleFilter() override = default; - - ChannelIdleFilter(const ChannelIdleFilter&) = delete; - ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete; - ChannelIdleFilter(ChannelIdleFilter&&) = default; - ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default; - - class Call { - public: - explicit Call(ChannelIdleFilter* filter) : filter_(filter) { - filter_->IncreaseCallCount(); - } - ~Call() { MaybeDecrement(); } - - static const NoInterceptor OnClientInitialMetadata; - static const NoInterceptor OnServerInitialMetadata; - static const NoInterceptor OnServerTrailingMetadata; - static const NoInterceptor OnClientToServerMessage; - static const NoInterceptor OnServerToClientMessage; - void OnFinalize(const grpc_call_final_info*) { MaybeDecrement(); } - - private: - void MaybeDecrement() { - auto* filter = std::exchange(filter_, nullptr); - if (filter != nullptr) filter->DecreaseCallCount(); - } - ChannelIdleFilter* filter_; - }; - - bool StartTransportOp(grpc_transport_op* op) override; - - protected: - using SingleSetActivityPtr = - SingleSetPtr; - - ChannelIdleFilter(grpc_channel_stack* channel_stack, - Duration client_idle_timeout) - : channel_stack_(channel_stack), - client_idle_timeout_(client_idle_timeout) {} - - grpc_channel_stack* channel_stack() { return channel_stack_; }; - - virtual void Shutdown(); - void CloseChannel(); - - void IncreaseCallCount(); - void DecreaseCallCount(); - - private: - void StartIdleTimer(); - - // The channel stack to which we take refs for pending callbacks. - grpc_channel_stack* channel_stack_; - Duration client_idle_timeout_; - std::shared_ptr idle_filter_state_{ - std::make_shared(false)}; - - SingleSetActivityPtr activity_; -}; - -class ClientIdleFilter final : public ChannelIdleFilter { - public: - static const grpc_channel_filter kFilter; - - static absl::StatusOr Create( - const ChannelArgs& args, ChannelFilter::Args filter_args); - - private: - using ChannelIdleFilter::ChannelIdleFilter; -}; - -class MaxAgeFilter final : public ChannelIdleFilter { - public: - static const grpc_channel_filter kFilter; - struct Config; - - static absl::StatusOr Create(const ChannelArgs& args, - ChannelFilter::Args filter_args); - - void PostInit() override; - - private: - class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { - public: - explicit ConnectivityWatcher(MaxAgeFilter* filter) - : channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {} - ~ConnectivityWatcher() override = default; - - void OnConnectivityStateChange(grpc_connectivity_state new_state, - const absl::Status&) override { - if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown(); - } - - private: - RefCountedPtr channel_stack_; - MaxAgeFilter* filter_; - }; - - MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config); - - void Shutdown() override; - - SingleSetActivityPtr max_age_activity_; - Duration max_connection_age_; - Duration max_connection_age_grace_; -}; - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H diff --git a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc index fd48d034d38..63a886bc991 100644 --- a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc +++ b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc @@ -78,15 +78,11 @@ TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter"); } \ } while (0) -namespace { - Duration GetClientIdleTimeout(const ChannelArgs& args) { return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS) .value_or(kDefaultIdleTimeout); } -} // namespace - struct LegacyMaxAgeFilter::Config { Duration max_connection_age; Duration max_connection_idle; diff --git a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h index 8e6215cebba..6254e5f2f75 100644 --- a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h @@ -40,6 +40,8 @@ namespace grpc_core { + Duration GetClientIdleTimeout(const ChannelArgs& args) ; + class LegacyChannelIdleFilter : public ChannelFilter { public: LegacyChannelIdleFilter(grpc_channel_stack* channel_stack, diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 97197f43715..501e3362950 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -83,6 +83,15 @@ inline int PointerCompare(void* a_ptr, const grpc_arg_pointer_vtable* a_vtable, // before the crt refcount base class. template using RefType = absl::remove_cvref_t().Ref())>; + +template +struct IsRawPointerTagged { + static constexpr bool kValue = false; +}; +template +struct IsRawPointerTagged> { + static constexpr bool kValue = true; +}; } // namespace channel_args_detail // Specialization for ref-counted pointers. @@ -91,13 +100,14 @@ using RefType = absl::remove_cvref_t().Ref())>; template struct ChannelArgTypeTraits< T, absl::enable_if_t< + !channel_args_detail::IsRawPointerTagged::kValue &&( std::is_base_of>, channel_args_detail::RefType>::value || std::is_base_of, NonPolymorphicRefCount>, channel_args_detail::RefType>::value || std::is_base_of>, - channel_args_detail::RefType>::value, + channel_args_detail::RefType>::value), void>> { static const grpc_arg_pointer_vtable* VTable() { static const grpc_arg_pointer_vtable tbl = { diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 9388c142f67..84bd13b37d6 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -130,6 +130,8 @@ class Call : public CppImplOf, // Implementation of EventEngine::Closure, called when deadline expires void Run() final; + gpr_cycle_counter start_time() const { return start_time_; } + protected: // The maximum number of concurrent batches possible. // Based upon the maximum number of individually queueable ops in the batch @@ -209,8 +211,6 @@ class Call : public CppImplOf, void HandleCompressionAlgorithmNotAccepted( grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; - gpr_cycle_counter start_time() const { return start_time_; } - virtual grpc_compression_options compression_options() = 0; private: diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 0a5e7b1c199..4c003259201 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -43,8 +43,8 @@ #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/surface/channel_stack_type.h" -#include "src/core/lib/transport/call_size_estimator.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/call_arena_allocator.h" // Forward declaration to avoid dependency loop. struct grpc_channel_stack; diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index 555068d041f..6b53bf7e2f9 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -616,109 +616,6 @@ class UnstartedCallHandler { RefCountedPtr spine_; }; -class UnstartedCallHandler; - -// CallDestination is responsible for starting an UnstartedCallHandler -// and then processing operations on the resulting CallHandler. -// -// Examples of CallDestinations include: -// - a client transport -// - the server API -// - a load-balanced call in the client channel -// - a hijacking filter (see DelegatingCallDestination below) -// -// FIXME: do we want this to be ref-counted? that might not be -// desirable for the hijacking filter case, where we want the filter stack -// to own the filter rather than having every call take its own ref to every -// hijacking filter. -class CallDestination : public DualRefCounted { - public: - virtual void StartCall(UnstartedCallHandler unstarted_call_handler) = 0; -}; - -// A delegating CallDestination for use as a hijacking filter. -// Implementations may look at the unprocessed initial metadata -// and decide to do one of two things: -// -// 1. It can be a no-op. In this case, it will simply pass the -// unstarted_call_handler to the wrapped CallDestination. -// -// 2. It can hijack the call by doing the following: -// - Start unstarted_call_handler and take ownership of the -// resulting handler. -// - Create a new CallInitiator/UnstartedCallHandler pair, and pass -// that new UnstartedCallHandler down to the wrapped CallDestination. -// - The implementation is then responsible for forwarding between -// the started handler and the new initiator. Note that in -// simple cases, this can be done via ForwardCall(). -class DelegatingCallDestination : public CallDestination { - protected: - explicit DelegatingCallDestination( - RefCountedPtr wrapped_destination) - : wrapped_destination_(std::move(wrapped_destination)) {} - - CallDestination* wrapped_destination() const { - return wrapped_destination_.get(); - } - - private: - RefCountedPtr wrapped_destination_; -}; - -class UnstartedCallHandler { - public: - UnstartedCallHandler(RefCountedPtr spine, - ClientMetadataHandle client_initial_metadata) - : spine_(std::move(spine)) { - spine_->SpawnGuarded( - "send_initial_metadata", - [client_initial_metadata = std::move(client_initial_metadata), - spine = spine_]() mutable { - GPR_DEBUG_ASSERT(GetContext() == &spine->party()); - return Map(spine->client_initial_metadata().sender.Push( - std::move(client_initial_metadata)), - [](bool ok) { return StatusFlag(ok); }); - }); - } - - // Returns the client initial metadata, which has not yet been - // processed by the stack that will ultimately be used for this call. - ClientMetadataHandle& UnprocessedClientInitialMetadata(); - - // Starts the call using the specified stack. - // This must be called only once, and the UnstartedCallHandler object - // may not be used after this is called. - CallHandler StartCall(RefCountedPtr stack); - - template - void SetContext(ContextType context) { - // FIXME: implement - } - - template - auto CancelIfFails(Promise promise) { - return spine_->CancelIfFails(std::move(promise)); - } - - template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnGuarded(name, std::move(promise_factory)); - } - - template - void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnInfallible(name, std::move(promise_factory)); - } - - template - auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { - return spine_->party().SpawnWaitable(name, std::move(promise_factory)); - } - - private: - RefCountedPtr spine_; -}; - struct CallInitiatorAndHandler { CallInitiator initiator; UnstartedCallHandler handler; diff --git a/tools/distrib/fix_build_deps.py b/tools/distrib/fix_build_deps.py index 763ab361efb..d34a0c03f03 100755 --- a/tools/distrib/fix_build_deps.py +++ b/tools/distrib/fix_build_deps.py @@ -64,6 +64,7 @@ EXTERNAL_DEPS = { "absl/functional/bind_front.h": "absl/functional:bind_front", "absl/functional/function_ref.h": "absl/functional:function_ref", "absl/hash/hash.h": "absl/hash", + "absl/log/check.h": "absl/log:check", "absl/log/log.h": "absl/log", "absl/memory/memory.h": "absl/memory", "absl/meta/type_traits.h": "absl/meta:type_traits",