From a5cbcd8c7ef2df9664d8908f523795ccf127e830 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 May 2024 12:11:32 -0700 Subject: [PATCH] x --- src/core/client_channel/client_channel.cc | 68 +- src/core/client_channel/client_channel.h | 25 +- src/core/client_channel/subchannel.cc | 1541 ++++++++--------- .../channel_idle/legacy_channel_idle_filter.h | 2 +- src/core/lib/channel/channel_args.h | 19 +- src/core/lib/channel/context.h | 3 +- src/core/lib/surface/channel.h | 2 +- src/core/load_balancing/lb_policy.h | 5 +- 8 files changed, 824 insertions(+), 841 deletions(-) diff --git a/src/core/client_channel/client_channel.cc b/src/core/client_channel/client_channel.cc index c8bf4add3de..4e31418b386 100644 --- a/src/core/client_channel/client_channel.cc +++ b/src/core/client_channel/client_channel.cc @@ -46,9 +46,9 @@ #include #include #include +#include #include #include -#include #include "src/core/client_channel/backup_poller.h" #include "src/core/client_channel/client_channel_internal.h" @@ -60,6 +60,7 @@ #include "src/core/client_channel/retry_filter.h" #include "src/core/client_channel/subchannel.h" #include "src/core/client_channel/subchannel_interface_internal.h" +#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/metrics.h" @@ -110,7 +111,6 @@ #include "src/core/resolver/resolver_registry.h" #include "src/core/service_config/service_config_call_data.h" #include "src/core/service_config/service_config_impl.h" -#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h" namespace grpc_core { @@ -328,9 +328,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { DEBUG_LOCATION); } - grpc_pollset_set* interested_parties() override { - return watcher_->interested_parties(); - } + grpc_pollset_set* interested_parties() override { return nullptr; } private: void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state, @@ -778,7 +776,8 @@ const NoInterceptor LbCallTracingFilter::Call::OnServerToClientMessage; } // namespace -class ClientChannel::LoadBalancedCallDestination : public UnstartedCallDestination { +class ClientChannel::LoadBalancedCallDestination + : public UnstartedCallDestination { public: explicit LoadBalancedCallDestination( RefCountedPtr client_channel) @@ -931,6 +930,20 @@ absl::StatusOr> ClientChannel::Create( std::move(*default_service_config), client_channel_factory); } +namespace { +std::string GetDefaultAuthorityFromChannelArgs(const ChannelArgs& channel_args, + absl::string_view target) { + absl::optional default_authority = + channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY); + if (!default_authority.has_value()) { + return CoreConfiguration::Get().resolver_registry().GetDefaultAuthority( + target); + } else { + return std::move(*default_authority); + } +} +} // namespace + ClientChannel::ClientChannel( std::string target, ChannelArgs channel_args, std::string uri_to_resolve, RefCountedPtr default_service_config, @@ -943,12 +956,9 @@ ClientChannel::ClientChannel( internal::ClientChannelServiceConfigParser::ParserIndex()), default_service_config_(std::move(default_service_config)), client_channel_factory_(client_channel_factory), + default_authority_( + GetDefaultAuthorityFromChannelArgs(channel_args_, this->target())), channelz_node_(channel_args_.GetObject()), - interested_parties_(grpc_pollset_set_create()), - lb_call_size_estimator_(1024), - lb_call_allocator_(channel_args_.GetObject() - ->memory_quota() - ->CreateMemoryOwner()), idle_timeout_(GetClientIdleTimeout(channel_args_)), resolver_data_for_calls_(ResolverDataForCalls{}), picker_(nullptr), @@ -965,18 +975,9 @@ ClientChannel::ClientChannel( } else { keepalive_time_ = -1; // unset } - // Set default authority. - absl::optional default_authority = - channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY); - if (!default_authority.has_value()) { - default_authority_ = - CoreConfiguration::Get().resolver_registry().GetDefaultAuthority( - target); - } else { - default_authority_ = std::move(*default_authority); - } // Get stats plugins for channel. -experimental:: StatsPluginChannelScope scope(this->target(), default_authority_); + experimental::StatsPluginChannelScope scope(this->target(), + default_authority_); stats_plugin_group_ = GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope); } @@ -985,7 +986,6 @@ ClientChannel::~ClientChannel() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "client_channel=%p: destroying", this); } - grpc_pollset_set_destroy(interested_parties_); } void ClientChannel::Orphan() { @@ -1196,14 +1196,14 @@ CallInitiator ClientChannel::CreateCall( CheckConnectivityState(/*try_to_connect=*/true); // Create an initiator/unstarted-handler pair. auto call = MakeCallPair(std::move(client_initial_metadata), - GetContext(), arena, nullptr, GetContext()); + GetContext(), arena, nullptr, + GetContext()); // Spawn a promise to wait for the resolver result. // This will eventually start the call. call.initiator.SpawnGuarded( - "wait-for-name-resolution", - [self = RefAsSubclass(), - unstarted_handler = std::move(call.handler), - was_queued = false]() mutable { + "wait-for-name-resolution", [self = RefAsSubclass(), + unstarted_handler = std::move(call.handler), + was_queued = false]() mutable { const bool wait_for_ready = unstarted_handler.UnprocessedClientInitialMetadata() .GetOrCreatePointer(WaitForReady()) @@ -1260,9 +1260,7 @@ void ClientChannel::CreateResolverLocked() { this, uri_to_resolve_.c_str()); } resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver( - uri_to_resolve_, channel_args_, - interested_parties_, // FIXME: remove somehow - work_serializer_, + uri_to_resolve_, channel_args_, nullptr, work_serializer_, std::make_unique(RefAsSubclass())); // Since the validity of the args was checked when the channel was created, // CreateResolver() must return a non-null result. @@ -1629,7 +1627,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { if (idle_state_.DecreaseCallCount()) StartIdleTimer(); }); } - CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(GRPC_CLIENT_CHANNEL, builder); + CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder( + GRPC_CLIENT_CHANNEL, builder); // FIXME: add filters returned by config selector #if 0 std::vector filters = @@ -1648,9 +1647,8 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { if (enable_retries) { Crash("call v3 stack does not yet support retries"); } else { - call_destination = - MakeRefCounted( - RefAsSubclass()); + call_destination = MakeRefCounted( + RefAsSubclass()); } auto filter_stack = builder.Build(call_destination); // Send result to data plane. diff --git a/src/core/client_channel/client_channel.h b/src/core/client_channel/client_channel.h index 900f0fa807a..e8d525248b0 100644 --- a/src/core/client_channel/client_channel.h +++ b/src/core/client_channel/client_channel.h @@ -167,25 +167,16 @@ class ClientChannel : public Channel { LoadBalancingPolicy::SubchannelPicker& picker, UnstartedCallHandler& unstarted_handler); - // - // Fields set at construction and never modified. - // - ChannelArgs channel_args_; - std::shared_ptr event_engine_; - std::string uri_to_resolve_; + const ChannelArgs channel_args_; + const std::shared_ptr + event_engine_; + const std::string uri_to_resolve_; const size_t service_config_parser_index_; - RefCountedPtr default_service_config_; - ClientChannelFactory* client_channel_factory_; - std::string default_authority_; - channelz::ChannelNode* channelz_node_; + const RefCountedPtr default_service_config_; + ClientChannelFactory* const client_channel_factory_; + const std::string default_authority_; + channelz::ChannelNode* const channelz_node_; GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_; - grpc_pollset_set* interested_parties_; - - // - // State for LB calls. - // - CallSizeEstimator lb_call_size_estimator_; - MemoryAllocator lb_call_allocator_; // // Idleness state. diff --git a/src/core/client_channel/subchannel.cc b/src/core/client_channel/subchannel.cc index 0d1e1dde483..a17c8c7c32d 100644 --- a/src/core/client_channel/subchannel.cc +++ b/src/core/client_channel/subchannel.cc @@ -86,848 +86,839 @@ (SubchannelCall*)(((char*)(call_stack)) - \ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) - namespace grpc_core { +namespace grpc_core { - using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::EventEngine; - TraceFlag grpc_trace_subchannel(false, "subchannel"); - DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, - "subchannel_refcount"); +TraceFlag grpc_trace_subchannel(false, "subchannel"); +DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); - // - // ConnectedSubchannel - // - - ConnectedSubchannel::ConnectedSubchannel( - const ChannelArgs& args, - RefCountedPtr channelz_subchannel) - : RefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) - ? "ConnectedSubchannel" - : nullptr), - args_(args), - channelz_subchannel_(std::move(channelz_subchannel)) {} - - // - // LegacyConnectedSubchannel - // - - class LegacyConnectedSubchannel : public ConnectedSubchannel { - public: - LegacyConnectedSubchannel( - RefCountedPtr channel_stack, - const ChannelArgs& args, - RefCountedPtr channelz_subchannel) - : ConnectedSubchannel(args, std::move(channelz_subchannel)), - channel_stack_(std::move(channel_stack)) {} - - ~LegacyConnectedSubchannel() override { - channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel"); - } - - void StartWatch( - grpc_pollset_set* interested_parties, - OrphanablePtr watcher) override { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->start_connectivity_watch = std::move(watcher); - op->start_connectivity_watch_state = GRPC_CHANNEL_READY; - op->bind_pollset_set = interested_parties; - grpc_channel_element* elem = - grpc_channel_stack_element(channel_stack_.get(), 0); - elem->filter->start_transport_op(elem, op); - } - - void Ping(absl::AnyInvocable on_ack) override { - Crash("call v3 ping method called in legacy impl"); - } - - void StartCall(UnstartedCallHandler) override { - Crash("call v3 StartCall() method called in legacy impl"); - } - - grpc_channel_stack* channel_stack() const override { - return channel_stack_.get(); - } - - size_t GetInitialCallSizeEstimate() const override { - return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + - channel_stack_->call_stack_size; - } - - ArenaPromise MakeCallPromise( - CallArgs call_args) override { - // If not using channelz, we just need to call the channel stack. - if (channelz_subchannel() == nullptr) { - return channel_stack_->MakeClientCallPromise(std::move(call_args)); - } - // Otherwise, we need to wrap the channel stack promise with code that - // handles the channelz updates. - return OnCancel( - Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)), - [self = Ref()](ServerMetadataHandle metadata) { - channelz::SubchannelNode* channelz_subchannel = - self->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (metadata->get(GrpcStatusMetadata()) - .value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) { - channelz_subchannel->RecordCallFailed(); - } else { - channelz_subchannel->RecordCallSucceeded(); - } - return metadata; - }), - [self = Ref()]() { - channelz::SubchannelNode* channelz_subchannel = - self->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - channelz_subchannel->RecordCallFailed(); - }); - } - - void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - grpc_channel_element* elem = - grpc_channel_stack_element(channel_stack_.get(), 0); - elem->filter->start_transport_op(elem, op); - } - - private: - RefCountedPtr channel_stack_; - }; - - // - // NewConnectedSubchannel - // - - class NewConnectedSubchannel : public ConnectedSubchannel { - public: - NewConnectedSubchannel( - RefCountedPtr filter_stack, - OrphanablePtr transport, const ChannelArgs& args, - RefCountedPtr channelz_subchannel) - : ConnectedSubchannel(args, std::move(channelz_subchannel)), - filter_stack_(std::move(filter_stack)), - transport_(std::move(transport)) {} - - void StartWatch( - grpc_pollset_set* interested_parties, - OrphanablePtr watcher) override { - // FIXME: add new transport API for this in v3 stack - } - - void Ping(absl::AnyInvocable on_ack) override { - // FIXME: add new transport API for this in v3 stack - } - - void StartCall(UnstartedCallHandler unstarted_handler) override { - auto handler = unstarted_handler.StartCall(filter_stack_); - transport_->client_transport()->StartCall(std::move(handler)); - } - - grpc_channel_stack* channel_stack() const override { return nullptr; } - - size_t GetInitialCallSizeEstimate() const override { return 0; } +// +// ConnectedSubchannel +// - ArenaPromise MakeCallPromise( - CallArgs call_args) override { - Crash("legacy MakeCallPromise() method called in call v3 impl"); - } +ConnectedSubchannel::ConnectedSubchannel( + const ChannelArgs& args, + RefCountedPtr channelz_subchannel) + : RefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) + ? "ConnectedSubchannel" + : nullptr), + args_(args), + channelz_subchannel_(std::move(channelz_subchannel)) {} - void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override { - Crash("legacy ping method called in call v3 impl"); - } +// +// LegacyConnectedSubchannel +// - private: - RefCountedPtr filter_stack_; - OrphanablePtr transport_; - }; +class LegacyConnectedSubchannel : public ConnectedSubchannel { + public: + LegacyConnectedSubchannel( + RefCountedPtr channel_stack, const ChannelArgs& args, + RefCountedPtr channelz_subchannel) + : ConnectedSubchannel(args, std::move(channelz_subchannel)), + channel_stack_(std::move(channel_stack)) {} - // - // SubchannelCall - // - - RefCountedPtr - SubchannelCall::Create(Args args, grpc_error_handle * error) { - const size_t allocation_size = - args.connected_subchannel->GetInitialCallSizeEstimate(); - Arena* arena = args.arena; - return RefCountedPtr(new ( - arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); - } - - SubchannelCall::SubchannelCall(Args args, grpc_error_handle * error) - : connected_subchannel_(std::move(args.connected_subchannel)), - deadline_(args.deadline) { - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); - const grpc_call_element_args call_args = { - callstk, // call_stack - nullptr, // server_transport_data - args.context, // context - args.path.c_slice(), // path - args.start_time, // start_time - args.deadline, // deadline - args.arena, // arena - args.call_combiner // call_combiner - }; - *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, - SubchannelCall::Destroy, this, &call_args); - if (GPR_UNLIKELY(!error->ok())) { - gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str()); - return; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); - auto* channelz_node = connected_subchannel_->channelz_subchannel(); - if (channelz_node != nullptr) { - channelz_node->RecordCallStarted(); - } + ~LegacyConnectedSubchannel() override { + channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel"); } - void SubchannelCall::StartTransportStreamOpBatch( - grpc_transport_stream_op_batch * batch) { - MaybeInterceptRecvTrailingMetadata(batch); - grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); - grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); - GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); - top_elem->filter->start_transport_stream_op_batch(top_elem, batch); + void StartWatch( + grpc_pollset_set* interested_parties, + OrphanablePtr watcher) override { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->start_connectivity_watch = std::move(watcher); + op->start_connectivity_watch_state = GRPC_CHANNEL_READY; + op->bind_pollset_set = interested_parties; + grpc_channel_element* elem = + grpc_channel_stack_element(channel_stack_.get(), 0); + elem->filter->start_transport_op(elem, op); } - grpc_call_stack* SubchannelCall::GetCallStack() { - return SUBCHANNEL_CALL_TO_CALL_STACK(this); + void Ping(absl::AnyInvocable on_ack) override { + Crash("call v3 ping method called in legacy impl"); } - void SubchannelCall::SetAfterCallStackDestroy(grpc_closure * closure) { - CHECK_EQ(after_call_stack_destroy_, nullptr); - CHECK_NE(closure, nullptr); - after_call_stack_destroy_ = closure; + void StartCall(UnstartedCallHandler) override { + Crash("call v3 StartCall() method called in legacy impl"); } - RefCountedPtr SubchannelCall::Ref() { - IncrementRefCount(); - return RefCountedPtr(this); + grpc_channel_stack* channel_stack() const override { + return channel_stack_.get(); } - RefCountedPtr SubchannelCall::Ref( - const DebugLocation& location, const char* reason) { - IncrementRefCount(location, reason); - return RefCountedPtr(this); + size_t GetInitialCallSizeEstimate() const override { + return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + + channel_stack_->call_stack_size; } - void SubchannelCall::Unref() { - GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); + ArenaPromise MakeCallPromise( + CallArgs call_args) override { + // If not using channelz, we just need to call the channel stack. + if (channelz_subchannel() == nullptr) { + return channel_stack_->MakeClientCallPromise(std::move(call_args)); + } + // Otherwise, we need to wrap the channel stack promise with code that + // handles the channelz updates. + return OnCancel( + Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)), + [self = Ref()](ServerMetadataHandle metadata) { + channelz::SubchannelNode* channelz_subchannel = + self->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (metadata->get(GrpcStatusMetadata()) + .value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) { + channelz_subchannel->RecordCallFailed(); + } else { + channelz_subchannel->RecordCallSucceeded(); + } + return metadata; + }), + [self = Ref()]() { + channelz::SubchannelNode* channelz_subchannel = + self->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + channelz_subchannel->RecordCallFailed(); + }); } - void SubchannelCall::Unref(const DebugLocation& /*location*/, - const char* reason) { - GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); + void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + grpc_channel_element* elem = + grpc_channel_stack_element(channel_stack_.get(), 0); + elem->filter->start_transport_op(elem, op); } - void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) { - SubchannelCall* self = static_cast(arg); - // Keep some members before destroying the subchannel call. - grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_; - RefCountedPtr connected_subchannel = - std::move(self->connected_subchannel_); - // Destroy the subchannel call. - self->~SubchannelCall(); - // Destroy the call stack. This should be after destroying the subchannel - // call, because call->after_call_stack_destroy(), if not null, will free - // the call arena. - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr, - after_call_stack_destroy); - // Automatically reset connected_subchannel. This should be after destroying - // the call stack, because destroying call stack needs access to the channel - // stack. - } + private: + RefCountedPtr channel_stack_; +}; - void SubchannelCall::MaybeInterceptRecvTrailingMetadata( - grpc_transport_stream_op_batch * batch) { - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { - return; - } - // only add interceptor is channelz is enabled. - if (connected_subchannel_->channelz_subchannel() == nullptr) { - return; - } - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, - this, grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - CHECK_EQ(recv_trailing_metadata_, nullptr); - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - original_recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; - } - - namespace { - - // Sets *status based on the rest of the parameters. - void GetCallStatus(grpc_status_code* status, Timestamp deadline, - grpc_metadata_batch* md_batch, grpc_error_handle error) { - if (!error.ok()) { - grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); - } else { - *status = - md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); - } - } +// +// NewConnectedSubchannel +// - } // namespace - - void SubchannelCall::RecvTrailingMetadataReady(void* arg, - grpc_error_handle error) { - SubchannelCall* call = static_cast(arg); - CHECK_NE(call->recv_trailing_metadata_, nullptr); - grpc_status_code status = GRPC_STATUS_OK; - GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, - error); - channelz::SubchannelNode* channelz_subchannel = - call->connected_subchannel_->channelz_subchannel(); - CHECK_NE(channelz_subchannel, nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); - } - Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error); - } +class NewConnectedSubchannel : public ConnectedSubchannel { + public: + NewConnectedSubchannel( + RefCountedPtr filter_stack, + OrphanablePtr transport, const ChannelArgs& args, + RefCountedPtr channelz_subchannel) + : ConnectedSubchannel(args, std::move(channelz_subchannel)), + filter_stack_(std::move(filter_stack)), + transport_(std::move(transport)) {} - void SubchannelCall::IncrementRefCount() { - GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); + void StartWatch( + grpc_pollset_set* interested_parties, + OrphanablePtr watcher) override { + // FIXME: add new transport API for this in v3 stack } - void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/, - const char* reason) { - GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); + void Ping(absl::AnyInvocable on_ack) override { + // FIXME: add new transport API for this in v3 stack } - // - // Subchannel::ConnectedSubchannelStateWatcher - // - - class Subchannel::ConnectedSubchannelStateWatcher final - : public AsyncConnectivityStateWatcherInterface { - public: - // Must be instantiated while holding c->mu. - explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr c) - : subchannel_(std::move(c)) {} - - ~ConnectedSubchannelStateWatcher() override { - subchannel_.reset(DEBUG_LOCATION, "state_watcher"); - } - - private: - void OnConnectivityStateChange(grpc_connectivity_state new_state, - const absl::Status& status) override { - Subchannel* c = subchannel_.get(); - { - MutexLock lock(&c->mu_); - // If we're either shutting down or have already seen this connection - // failure (i.e., c->connected_subchannel_ is null), do nothing. - // - // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN - // upon connection close. So if the server gracefully shuts down, - // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we - // will see only SHUTDOWN. Either way, we react to the first one we - // see, ignoring anything that happens after that. - if (c->connected_subchannel_ == nullptr) return; - if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || - new_state == GRPC_CHANNEL_SHUTDOWN) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { - gpr_log( - GPR_INFO, - "subchannel %p %s: Connected subchannel %p reports %s: %s", c, - c->key_.ToString().c_str(), c->connected_subchannel_.get(), - ConnectivityStateName(new_state), status.ToString().c_str()); - } - c->connected_subchannel_.reset(); - if (c->channelz_node() != nullptr) { - c->channelz_node()->SetChildSocket(nullptr); - } - // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here, - // pass along the status from the transport, since it may have - // keepalive info attached to it that the channel needs. - // TODO(roth): Consider whether there's a cleaner way to do this. - c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status); - c->backoff_.Reset(); - } - } - // Drain any connectivity state notifications after releasing the mutex. - c->work_serializer_.DrainQueue(); - } + void StartCall(UnstartedCallHandler unstarted_handler) override { + auto handler = unstarted_handler.StartCall(filter_stack_); + transport_->client_transport()->StartCall(std::move(handler)); + } - WeakRefCountedPtr subchannel_; - }; + grpc_channel_stack* channel_stack() const override { return nullptr; } - // - // Subchannel::ConnectivityStateWatcherList - // + size_t GetInitialCallSizeEstimate() const override { return 0; } - void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( - RefCountedPtr watcher) { - watchers_.insert(std::make_pair(watcher.get(), std::move(watcher))); + ArenaPromise MakeCallPromise( + CallArgs call_args) override { + Crash("legacy MakeCallPromise() method called in call v3 impl"); } - void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( - ConnectivityStateWatcherInterface * watcher) { - watchers_.erase(watcher); + void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override { + Crash("legacy ping method called in call v3 impl"); } - void Subchannel::ConnectivityStateWatcherList::NotifyLocked( - grpc_connectivity_state state, const absl::Status& status) { - for (const auto& p : watchers_) { - subchannel_->work_serializer_.Schedule( - [watcher = p.second->Ref(), state, status]() mutable { - auto* watcher_ptr = watcher.get(); - watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, - status); - }, - DEBUG_LOCATION); - } - } + private: + RefCountedPtr filter_stack_; + OrphanablePtr transport_; +}; - // - // Subchannel - // - - namespace { - - BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args, - Duration* min_connect_timeout) { - const absl::optional fixed_reconnect_backoff = - args.GetDurationFromIntMillis( - "grpc.testing.fixed_reconnect_backoff_ms"); - if (fixed_reconnect_backoff.has_value()) { - const Duration backoff = - std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff); - *min_connect_timeout = backoff; - return BackOff::Options() - .set_initial_backoff(backoff) - .set_multiplier(1.0) - .set_jitter(0.0) - .set_max_backoff(backoff); - } - const Duration initial_backoff = std::max( - Duration::Milliseconds(100), - args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS) - .value_or(Duration::Seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS))); - *min_connect_timeout = std::max( - Duration::Milliseconds(100), - args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS) - .value_or(Duration::Seconds( - GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS))); - const Duration max_backoff = std::max( - Duration::Milliseconds(100), - args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS) - .value_or(Duration::Seconds( - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS))); - return BackOff::Options() - .set_initial_backoff(initial_backoff) - .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER) - .set_max_backoff(max_backoff); - } - - } // namespace - - Subchannel::Subchannel(SubchannelKey key, - OrphanablePtr connector, - const ChannelArgs& args) - : DualRefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) - ? "Subchannel" - : nullptr), - key_(std::move(key)), - args_(args), - pollset_set_(grpc_pollset_set_create()), - connector_(std::move(connector)), - watcher_list_(this), - work_serializer_(args_.GetObjectRef()), - backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), - event_engine_(args_.GetObjectRef()) { - // A grpc_init is added here to ensure that grpc_shutdown does not happen - // until the subchannel is destroyed. Subchannels can persist longer than - // channels because they maybe reused/shared among multiple channels. As a - // result the subchannel destruction happens asynchronously to channel - // destruction. If the last channel destruction triggers a grpc_shutdown - // before the last subchannel destruction, then there maybe race conditions - // triggering segmentation faults. To prevent this issue, we call a - // grpc_init here and a grpc_shutdown in the subchannel destructor. - InitInternally(); - global_stats().IncrementClientSubchannelsCreated(); - GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, - grpc_schedule_on_exec_ctx); - // Check proxy mapper to determine address to connect to and channel - // args to use. - address_for_connect_ = CoreConfiguration::Get() - .proxy_mapper_registry() - .MapAddress(key_.address(), &args_) - .value_or(key_.address()); - // Initialize channelz. - const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ) - .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT); - if (channelz_enabled) { - const size_t channel_tracer_max_memory = Clamp( - args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) - .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT), - 0, INT_MAX); - channelz_node_ = MakeRefCounted( - grpc_sockaddr_to_uri(&key_.address()) - .value_or(""), - channel_tracer_max_memory); - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("subchannel created")); - } - } +// +// SubchannelCall +// - Subchannel::~Subchannel() { - if (channelz_node_ != nullptr) { - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Subchannel destroyed")); - channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN); - } - connector_.reset(); - grpc_pollset_set_destroy(pollset_set_); - // grpc_shutdown is called here because grpc_init is called in the ctor. - ShutdownInternally(); - } - - RefCountedPtr Subchannel::Create( - OrphanablePtr connector, - const grpc_resolved_address& address, const ChannelArgs& args) { - SubchannelKey key(address, args); - auto* subchannel_pool = args.GetObject(); - CHECK_NE(subchannel_pool, nullptr); - RefCountedPtr c = subchannel_pool->FindSubchannel(key); - if (c != nullptr) { - return c; - } - c = MakeRefCounted(std::move(key), std::move(connector), args); - // Try to register the subchannel before setting the subchannel pool. - // Otherwise, in case of a registration race, unreffing c in - // RegisterSubchannel() will cause c to be tried to be unregistered, while - // its key maps to a different subchannel. - RefCountedPtr registered = - subchannel_pool->RegisterSubchannel(c->key_, c); - if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); - return registered; - } - - void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { - MutexLock lock(&mu_); - // Only update the value if the new keepalive time is larger. - if (new_keepalive_time > keepalive_time_) { - keepalive_time_ = new_keepalive_time; - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { - gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d", - this, key_.ToString().c_str(), new_keepalive_time); - } - args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time); - } - } +RefCountedPtr SubchannelCall::Create(Args args, + grpc_error_handle* error) { + const size_t allocation_size = + args.connected_subchannel->GetInitialCallSizeEstimate(); + Arena* arena = args.arena; + return RefCountedPtr(new ( + arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); +} + +SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error) + : connected_subchannel_(std::move(args.connected_subchannel)), + deadline_(args.deadline) { + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); + const grpc_call_element_args call_args = { + callstk, // call_stack + nullptr, // server_transport_data + args.context, // context + args.path.c_slice(), // path + args.start_time, // start_time + args.deadline, // deadline + args.arena, // arena + args.call_combiner // call_combiner + }; + *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, + SubchannelCall::Destroy, this, &call_args); + if (GPR_UNLIKELY(!error->ok())) { + gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str()); + return; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + auto* channelz_node = connected_subchannel_->channelz_subchannel(); + if (channelz_node != nullptr) { + channelz_node->RecordCallStarted(); + } +} + +void SubchannelCall::StartTransportStreamOpBatch( + grpc_transport_stream_op_batch* batch) { + MaybeInterceptRecvTrailingMetadata(batch); + grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); + grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); + GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); + top_elem->filter->start_transport_stream_op_batch(top_elem, batch); +} + +grpc_call_stack* SubchannelCall::GetCallStack() { + return SUBCHANNEL_CALL_TO_CALL_STACK(this); +} + +void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { + CHECK_EQ(after_call_stack_destroy_, nullptr); + CHECK_NE(closure, nullptr); + after_call_stack_destroy_ = closure; +} + +RefCountedPtr SubchannelCall::Ref() { + IncrementRefCount(); + return RefCountedPtr(this); +} + +RefCountedPtr SubchannelCall::Ref(const DebugLocation& location, + const char* reason) { + IncrementRefCount(location, reason); + return RefCountedPtr(this); +} + +void SubchannelCall::Unref() { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); +} + +void SubchannelCall::Unref(const DebugLocation& /*location*/, + const char* reason) { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); +} + +void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) { + SubchannelCall* self = static_cast(arg); + // Keep some members before destroying the subchannel call. + grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_; + RefCountedPtr connected_subchannel = + std::move(self->connected_subchannel_); + // Destroy the subchannel call. + self->~SubchannelCall(); + // Destroy the call stack. This should be after destroying the subchannel + // call, because call->after_call_stack_destroy(), if not null, will free + // the call arena. + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr, + after_call_stack_destroy); + // Automatically reset connected_subchannel. This should be after destroying + // the call stack, because destroying call stack needs access to the channel + // stack. +} + +void SubchannelCall::MaybeInterceptRecvTrailingMetadata( + grpc_transport_stream_op_batch* batch) { + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (connected_subchannel_->channelz_subchannel() == nullptr) { + return; + } + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, + this, grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + CHECK_EQ(recv_trailing_metadata_, nullptr); + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &recv_trailing_metadata_ready_; +} + +namespace { + +// Sets *status based on the rest of the parameters. +void GetCallStatus(grpc_status_code* status, Timestamp deadline, + grpc_metadata_batch* md_batch, grpc_error_handle error) { + if (!error.ok()) { + grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); + } else { + *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); + } +} + +} // namespace + +void SubchannelCall::RecvTrailingMetadataReady(void* arg, + grpc_error_handle error) { + SubchannelCall* call = static_cast(arg); + CHECK_NE(call->recv_trailing_metadata_, nullptr); + grpc_status_code status = GRPC_STATUS_OK; + GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error); + channelz::SubchannelNode* channelz_subchannel = + call->connected_subchannel_->channelz_subchannel(); + CHECK_NE(channelz_subchannel, nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error); +} + +void SubchannelCall::IncrementRefCount() { + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); +} + +void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/, + const char* reason) { + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); +} - channelz::SubchannelNode* Subchannel::channelz_node() { - return channelz_node_.get(); - } +// +// Subchannel::ConnectedSubchannelStateWatcher +// - void Subchannel::WatchConnectivityState( - RefCountedPtr watcher) { - { - MutexLock lock(&mu_); - grpc_pollset_set* interested_parties = watcher->interested_parties(); - if (interested_parties != nullptr) { - grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); - } - work_serializer_.Schedule( - [watcher = watcher->Ref(), state = state_, - status = status_]() mutable { - auto* watcher_ptr = watcher.get(); - watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, - status); - }, - DEBUG_LOCATION); - watcher_list_.AddWatcherLocked(std::move(watcher)); - } - // Drain any connectivity state notifications after releasing the mutex. - work_serializer_.DrainQueue(); - } +class Subchannel::ConnectedSubchannelStateWatcher final + : public AsyncConnectivityStateWatcherInterface { + public: + // Must be instantiated while holding c->mu. + explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr c) + : subchannel_(std::move(c)) {} - void Subchannel::CancelConnectivityStateWatch( - ConnectivityStateWatcherInterface * watcher) { - { - MutexLock lock(&mu_); - grpc_pollset_set* interested_parties = watcher->interested_parties(); - if (interested_parties != nullptr) { - grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); - } - watcher_list_.RemoveWatcherLocked(watcher); - } - // Drain any connectivity state notifications after releasing the mutex. - // (Shouldn't actually be necessary in this case, but better safe than - // sorry.) - work_serializer_.DrainQueue(); + ~ConnectedSubchannelStateWatcher() override { + subchannel_.reset(DEBUG_LOCATION, "state_watcher"); } - void Subchannel::RequestConnection() { + private: + void OnConnectivityStateChange(grpc_connectivity_state new_state, + const absl::Status& status) override { + Subchannel* c = subchannel_.get(); { - MutexLock lock(&mu_); - if (state_ == GRPC_CHANNEL_IDLE) { - StartConnectingLocked(); + MutexLock lock(&c->mu_); + // If we're either shutting down or have already seen this connection + // failure (i.e., c->connected_subchannel_ is null), do nothing. + // + // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN + // upon connection close. So if the server gracefully shuts down, + // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we + // will see only SHUTDOWN. Either way, we react to the first one we + // see, ignoring anything that happens after that. + if (c->connected_subchannel_ == nullptr) return; + if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + new_state == GRPC_CHANNEL_SHUTDOWN) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { + gpr_log(GPR_INFO, + "subchannel %p %s: Connected subchannel %p reports %s: %s", c, + c->key_.ToString().c_str(), c->connected_subchannel_.get(), + ConnectivityStateName(new_state), status.ToString().c_str()); + } + c->connected_subchannel_.reset(); + if (c->channelz_node() != nullptr) { + c->channelz_node()->SetChildSocket(nullptr); + } + // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here, + // pass along the status from the transport, since it may have + // keepalive info attached to it that the channel needs. + // TODO(roth): Consider whether there's a cleaner way to do this. + c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status); + c->backoff_.Reset(); } } // Drain any connectivity state notifications after releasing the mutex. - work_serializer_.DrainQueue(); + c->work_serializer_.DrainQueue(); } - void Subchannel::ResetBackoff() { - // Hold a ref to ensure cancellation and subsequent deletion of the closure - // does not eliminate the last ref and destroy the Subchannel before the - // method returns. - auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff"); - { - MutexLock lock(&mu_); - backoff_.Reset(); - if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && - event_engine_->Cancel(retry_timer_handle_)) { - OnRetryTimerLocked(); - } else if (state_ == GRPC_CHANNEL_CONNECTING) { - next_attempt_time_ = Timestamp::Now(); - } - } - // Drain any connectivity state notifications after releasing the mutex. - work_serializer_.DrainQueue(); - } + WeakRefCountedPtr subchannel_; +}; - void Subchannel::Orphaned() { - // The subchannel_pool is only used once here in this subchannel, so the - // access can be outside of the lock. - if (subchannel_pool_ != nullptr) { - subchannel_pool_->UnregisterSubchannel(key_, this); - subchannel_pool_.reset(); - } - { - MutexLock lock(&mu_); - CHECK(!shutdown_); - shutdown_ = true; - connector_.reset(); - connected_subchannel_.reset(); +// +// Subchannel::ConnectivityStateWatcherList +// + +void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( + RefCountedPtr watcher) { + watchers_.insert(std::make_pair(watcher.get(), std::move(watcher))); +} + +void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( + ConnectivityStateWatcherInterface* watcher) { + watchers_.erase(watcher); +} + +void Subchannel::ConnectivityStateWatcherList::NotifyLocked( + grpc_connectivity_state state, const absl::Status& status) { + for (const auto& p : watchers_) { + subchannel_->work_serializer_.Schedule( + [watcher = p.second->Ref(), state, status]() mutable { + auto* watcher_ptr = watcher.get(); + watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, + status); + }, + DEBUG_LOCATION); + } +} + +// +// Subchannel +// + +namespace { + +BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args, + Duration* min_connect_timeout) { + const absl::optional fixed_reconnect_backoff = + args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms"); + if (fixed_reconnect_backoff.has_value()) { + const Duration backoff = + std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff); + *min_connect_timeout = backoff; + return BackOff::Options() + .set_initial_backoff(backoff) + .set_multiplier(1.0) + .set_jitter(0.0) + .set_max_backoff(backoff); + } + const Duration initial_backoff = std::max( + Duration::Milliseconds(100), + args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS) + .value_or(Duration::Seconds( + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS))); + *min_connect_timeout = + std::max(Duration::Milliseconds(100), + args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS) + .value_or(Duration::Seconds( + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS))); + const Duration max_backoff = + std::max(Duration::Milliseconds(100), + args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS) + .value_or(Duration::Seconds( + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS))); + return BackOff::Options() + .set_initial_backoff(initial_backoff) + .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_max_backoff(max_backoff); +} + +} // namespace + +Subchannel::Subchannel(SubchannelKey key, + OrphanablePtr connector, + const ChannelArgs& args) + : DualRefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel" + : nullptr), + key_(std::move(key)), + args_(args), + pollset_set_(grpc_pollset_set_create()), + connector_(std::move(connector)), + watcher_list_(this), + work_serializer_(args_.GetObjectRef()), + backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), + event_engine_(args_.GetObjectRef()) { + // A grpc_init is added here to ensure that grpc_shutdown does not happen + // until the subchannel is destroyed. Subchannels can persist longer than + // channels because they maybe reused/shared among multiple channels. As a + // result the subchannel destruction happens asynchronously to channel + // destruction. If the last channel destruction triggers a grpc_shutdown + // before the last subchannel destruction, then there maybe race conditions + // triggering segmentation faults. To prevent this issue, we call a + // grpc_init here and a grpc_shutdown in the subchannel destructor. + InitInternally(); + global_stats().IncrementClientSubchannelsCreated(); + GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, + grpc_schedule_on_exec_ctx); + // Check proxy mapper to determine address to connect to and channel + // args to use. + address_for_connect_ = CoreConfiguration::Get() + .proxy_mapper_registry() + .MapAddress(key_.address(), &args_) + .value_or(key_.address()); + // Initialize channelz. + const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ) + .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT); + if (channelz_enabled) { + const size_t channel_tracer_max_memory = Clamp( + args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) + .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT), + 0, INT_MAX); + channelz_node_ = MakeRefCounted( + grpc_sockaddr_to_uri(&key_.address()) + .value_or(""), + channel_tracer_max_memory); + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("subchannel created")); + } +} + +Subchannel::~Subchannel() { + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN); + } + connector_.reset(); + grpc_pollset_set_destroy(pollset_set_); + // grpc_shutdown is called here because grpc_init is called in the ctor. + ShutdownInternally(); +} + +RefCountedPtr Subchannel::Create( + OrphanablePtr connector, + const grpc_resolved_address& address, const ChannelArgs& args) { + SubchannelKey key(address, args); + auto* subchannel_pool = args.GetObject(); + CHECK_NE(subchannel_pool, nullptr); + RefCountedPtr c = subchannel_pool->FindSubchannel(key); + if (c != nullptr) { + return c; + } + c = MakeRefCounted(std::move(key), std::move(connector), args); + // Try to register the subchannel before setting the subchannel pool. + // Otherwise, in case of a registration race, unreffing c in + // RegisterSubchannel() will cause c to be tried to be unregistered, while + // its key maps to a different subchannel. + RefCountedPtr registered = + subchannel_pool->RegisterSubchannel(c->key_, c); + if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); + return registered; +} + +void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { + MutexLock lock(&mu_); + // Only update the value if the new keepalive time is larger. + if (new_keepalive_time > keepalive_time_) { + keepalive_time_ = new_keepalive_time; + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { + gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d", + this, key_.ToString().c_str(), new_keepalive_time); } - // Drain any connectivity state notifications after releasing the mutex. - work_serializer_.DrainQueue(); + args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time); } +} - void Subchannel::GetOrAddDataProducer( - UniqueTypeName type, - std::function get_or_add) { - MutexLock lock(&mu_); - auto it = data_producer_map_.emplace(type, nullptr).first; - get_or_add(&it->second); - } +channelz::SubchannelNode* Subchannel::channelz_node() { + return channelz_node_.get(); +} - void Subchannel::RemoveDataProducer(DataProducerInterface * data_producer) { +void Subchannel::WatchConnectivityState( + RefCountedPtr watcher) { + { MutexLock lock(&mu_); - auto it = data_producer_map_.find(data_producer->type()); - if (it != data_producer_map_.end() && it->second == data_producer) { - data_producer_map_.erase(it); + grpc_pollset_set* interested_parties = watcher->interested_parties(); + if (interested_parties != nullptr) { + grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); + } + work_serializer_.Schedule( + [watcher = watcher->Ref(), state = state_, status = status_]() mutable { + auto* watcher_ptr = watcher.get(); + watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, + status); + }, + DEBUG_LOCATION); + watcher_list_.AddWatcherLocked(std::move(watcher)); + } + // Drain any connectivity state notifications after releasing the mutex. + work_serializer_.DrainQueue(); +} + +void Subchannel::CancelConnectivityStateWatch( + ConnectivityStateWatcherInterface* watcher) { + { + MutexLock lock(&mu_); + grpc_pollset_set* interested_parties = watcher->interested_parties(); + if (interested_parties != nullptr) { + grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); } + watcher_list_.RemoveWatcherLocked(watcher); } + // Drain any connectivity state notifications after releasing the mutex. + // (Shouldn't actually be necessary in this case, but better safe than + // sorry.) + work_serializer_.DrainQueue(); +} - // Note: Must be called with a state that is different from the current state. - void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, - const absl::Status& status) { - state_ = state; - if (status.ok()) { - status_ = status; - } else { - // Augment status message to include IP address. - status_ = absl::Status( - status.code(), absl::StrCat(grpc_sockaddr_to_uri(&key_.address()) - .value_or(""), - ": ", status.message())); - status.ForEachPayload( - [this](absl::string_view key, const absl::Cord& value) - // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here, - // but that won't work, because we can't pass the lock - // annotation through absl::Status::ForEachPayload(). - ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); }); - } - if (channelz_node_ != nullptr) { - channelz_node_->UpdateConnectivityState(state); - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_cpp_string(absl::StrCat( - "Subchannel connectivity state changed to ", - ConnectivityStateName(state), - status.ok() ? "" : absl::StrCat(": ", status_.ToString())))); +void Subchannel::RequestConnection() { + { + MutexLock lock(&mu_); + if (state_ == GRPC_CHANNEL_IDLE) { + StartConnectingLocked(); } - // Notify watchers. - watcher_list_.NotifyLocked(state, status_); } + // Drain any connectivity state notifications after releasing the mutex. + work_serializer_.DrainQueue(); +} - void Subchannel::OnRetryTimer() { - { - MutexLock lock(&mu_); +void Subchannel::ResetBackoff() { + // Hold a ref to ensure cancellation and subsequent deletion of the closure + // does not eliminate the last ref and destroy the Subchannel before the + // method returns. + auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff"); + { + MutexLock lock(&mu_); + backoff_.Reset(); + if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && + event_engine_->Cancel(retry_timer_handle_)) { OnRetryTimerLocked(); + } else if (state_ == GRPC_CHANNEL_CONNECTING) { + next_attempt_time_ = Timestamp::Now(); } - // Drain any connectivity state notifications after releasing the mutex. - work_serializer_.DrainQueue(); - } - - void Subchannel::OnRetryTimerLocked() { - if (shutdown_) return; - gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE", - this, key_.ToString().c_str()); - SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus()); - } - - void Subchannel::StartConnectingLocked() { - // Set next attempt time. - const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now(); - next_attempt_time_ = backoff_.NextAttemptTime(); - // Report CONNECTING. - SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus()); - // Start connection attempt. - SubchannelConnector::Args args; - args.address = &address_for_connect_; - args.interested_parties = pollset_set_; - args.deadline = std::max(next_attempt_time_, min_deadline); - args.channel_args = args_; - WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback. - connector_->Connect(args, &connecting_result_, &on_connecting_finished_); - } - - void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { - WeakRefCountedPtr c(static_cast(arg)); - { - MutexLock lock(&c->mu_); - c->OnConnectingFinishedLocked(error); - } - // Drain any connectivity state notifications after releasing the mutex. - c->work_serializer_.DrainQueue(); - c.reset(DEBUG_LOCATION, "Connect"); } + // Drain any connectivity state notifications after releasing the mutex. + work_serializer_.DrainQueue(); +} - void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { - if (shutdown_) { - connecting_result_.Reset(); - return; - } - // If we didn't get a transport or we fail to publish it, report - // TRANSIENT_FAILURE and start the retry timer. - // Note that if the connection attempt took longer than the backoff - // time, then the timer will fire immediately, and we will quickly - // transition back to IDLE. - if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { - const Duration time_until_next_attempt = - next_attempt_time_ - Timestamp::Now(); - gpr_log(GPR_INFO, - "subchannel %p %s: connect failed (%s), backing off for %" PRId64 - " ms", - this, key_.ToString().c_str(), StatusToString(error).c_str(), - time_until_next_attempt.millis()); - SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_to_absl_status(error)); - retry_timer_handle_ = event_engine_->RunAfter( - time_until_next_attempt, - [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { - { - ApplicationCallbackExecCtx callback_exec_ctx; - ExecCtx exec_ctx; - self->OnRetryTimer(); - // Subchannel deletion might require an active ExecCtx. So if - // self.reset() is not called here, the WeakRefCountedPtr - // destructor may run after the ExecCtx declared in the callback - // is destroyed. Since subchannel may get destroyed when the - // WeakRefCountedPtr destructor runs, it may not have an active - // ExecCtx - thus leading to crashes. - self.reset(); - } - }); - } +void Subchannel::Orphaned() { + // The subchannel_pool is only used once here in this subchannel, so the + // access can be outside of the lock. + if (subchannel_pool_ != nullptr) { + subchannel_pool_->UnregisterSubchannel(key_, this); + subchannel_pool_.reset(); } - - bool Subchannel::PublishTransportLocked() { - auto socket_node = std::move(connecting_result_.socket_node); - if (!IsCallV3Enabled()) { - // Construct channel stack. - // Builder takes ownership of transport. - ChannelStackBuilderImpl builder( - "subchannel", GRPC_CLIENT_SUBCHANNEL, - connecting_result_.channel_args.SetObject( - std::exchange(connecting_result_.transport, nullptr))); - if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { - return false; - } - absl::StatusOr> stack = builder.Build(); - if (!stack.ok()) { - connecting_result_.Reset(); - gpr_log(GPR_ERROR, - "subchannel %p %s: error initializing subchannel stack: %s", - this, key_.ToString().c_str(), - stack.status().ToString().c_str()); - return false; - } - connected_subchannel_ = MakeRefCounted( - std::move(*stack), args_, channelz_node_); - } else { - // Call v3 stack. - CallFilters::StackBuilder builder; - // FIXME: add filters registered for CLIENT_SUBCHANNEL - auto filter_stack = builder.Build(); - connected_subchannel_ = MakeRefCounted( - std::move(filter_stack), - OrphanablePtr( - std::exchange(connecting_result_.transport, nullptr)), - args_, channelz_node_); - } + { + MutexLock lock(&mu_); + CHECK(!shutdown_); + shutdown_ = true; + connector_.reset(); + connected_subchannel_.reset(); + } + // Drain any connectivity state notifications after releasing the mutex. + work_serializer_.DrainQueue(); +} + +void Subchannel::GetOrAddDataProducer( + UniqueTypeName type, + std::function get_or_add) { + MutexLock lock(&mu_); + auto it = data_producer_map_.emplace(type, nullptr).first; + get_or_add(&it->second); +} + +void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) { + MutexLock lock(&mu_); + auto it = data_producer_map_.find(data_producer->type()); + if (it != data_producer_map_.end() && it->second == data_producer) { + data_producer_map_.erase(it); + } +} + +// Note: Must be called with a state that is different from the current state. +void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, + const absl::Status& status) { + state_ = state; + if (status.ok()) { + status_ = status; + } else { + // Augment status message to include IP address. + status_ = absl::Status(status.code(), + absl::StrCat(grpc_sockaddr_to_uri(&key_.address()) + .value_or(""), + ": ", status.message())); + status.ForEachPayload( + [this](absl::string_view key, const absl::Cord& value) + // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here, + // but that won't work, because we can't pass the lock + // annotation through absl::Status::ForEachPayload(). + ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); }); + } + if (channelz_node_ != nullptr) { + channelz_node_->UpdateConnectivityState(state); + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_cpp_string(absl::StrCat( + "Subchannel connectivity state changed to ", + ConnectivityStateName(state), + status.ok() ? "" : absl::StrCat(": ", status_.ToString())))); + } + // Notify watchers. + watcher_list_.NotifyLocked(state, status_); +} + +void Subchannel::OnRetryTimer() { + { + MutexLock lock(&mu_); + OnRetryTimerLocked(); + } + // Drain any connectivity state notifications after releasing the mutex. + work_serializer_.DrainQueue(); +} + +void Subchannel::OnRetryTimerLocked() { + if (shutdown_) return; + gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE", + this, key_.ToString().c_str()); + SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus()); +} + +void Subchannel::StartConnectingLocked() { + // Set next attempt time. + const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now(); + next_attempt_time_ = backoff_.NextAttemptTime(); + // Report CONNECTING. + SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus()); + // Start connection attempt. + SubchannelConnector::Args args; + args.address = &address_for_connect_; + args.interested_parties = pollset_set_; + args.deadline = std::max(next_attempt_time_, min_deadline); + args.channel_args = args_; + WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback. + connector_->Connect(args, &connecting_result_, &on_connecting_finished_); +} + +void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { + WeakRefCountedPtr c(static_cast(arg)); + { + MutexLock lock(&c->mu_); + c->OnConnectingFinishedLocked(error); + } + // Drain any connectivity state notifications after releasing the mutex. + c->work_serializer_.DrainQueue(); + c.reset(DEBUG_LOCATION, "Connect"); +} + +void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { + if (shutdown_) { connecting_result_.Reset(); - // Publish. - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { - gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", - this, key_.ToString().c_str(), connected_subchannel_.get()); - } - if (channelz_node_ != nullptr) { - channelz_node_->SetChildSocket(std::move(socket_node)); - } - // Start watching connected subchannel. - connected_subchannel_->StartWatch( - pollset_set_, MakeOrphanable( - WeakRef(DEBUG_LOCATION, "state_watcher"))); - // Report initial state. - SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status()); - return true; - } - - ChannelArgs Subchannel::MakeSubchannelArgs( - const ChannelArgs& channel_args, const ChannelArgs& address_args, - const RefCountedPtr& subchannel_pool, - const std::string& channel_default_authority) { - // Note that we start with the channel-level args and then apply the - // per-address args, so that if a value is present in both, the one - // in the channel-level args is used. This is particularly important - // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow - // resolvers to set on a per-address basis only if the application - // did not explicitly set it at the channel level. - return channel_args.UnionWith(address_args) - .SetObject(subchannel_pool) - // If we haven't already set the default authority arg (i.e., it - // was not explicitly set by the application nor overridden by - // the resolver), add it from the channel's default. - .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority) - // Remove channel args that should not affect subchannel - // uniqueness. - .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME) - .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING) - .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE) - // Remove all keys with the no-subchannel prefix. - .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX); - } + return; + } + // If we didn't get a transport or we fail to publish it, report + // TRANSIENT_FAILURE and start the retry timer. + // Note that if the connection attempt took longer than the backoff + // time, then the timer will fire immediately, and we will quickly + // transition back to IDLE. + if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { + const Duration time_until_next_attempt = + next_attempt_time_ - Timestamp::Now(); + gpr_log(GPR_INFO, + "subchannel %p %s: connect failed (%s), backing off for %" PRId64 + " ms", + this, key_.ToString().c_str(), StatusToString(error).c_str(), + time_until_next_attempt.millis()); + SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_to_absl_status(error)); + retry_timer_handle_ = event_engine_->RunAfter( + time_until_next_attempt, + [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { + { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnRetryTimer(); + // Subchannel deletion might require an active ExecCtx. So if + // self.reset() is not called here, the WeakRefCountedPtr + // destructor may run after the ExecCtx declared in the callback + // is destroyed. Since subchannel may get destroyed when the + // WeakRefCountedPtr destructor runs, it may not have an active + // ExecCtx - thus leading to crashes. + self.reset(); + } + }); + } +} + +bool Subchannel::PublishTransportLocked() { + auto socket_node = std::move(connecting_result_.socket_node); + if (!IsCallV3Enabled()) { + // Construct channel stack. + // Builder takes ownership of transport. + ChannelStackBuilderImpl builder( + "subchannel", GRPC_CLIENT_SUBCHANNEL, + connecting_result_.channel_args.SetObject( + std::exchange(connecting_result_.transport, nullptr))); + if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { + return false; + } + absl::StatusOr> stack = builder.Build(); + if (!stack.ok()) { + connecting_result_.Reset(); + gpr_log(GPR_ERROR, + "subchannel %p %s: error initializing subchannel stack: %s", this, + key_.ToString().c_str(), stack.status().ToString().c_str()); + return false; + } + connected_subchannel_ = MakeRefCounted( + std::move(*stack), args_, channelz_node_); + } else { + // Call v3 stack. + CallFilters::StackBuilder builder; + // FIXME: add filters registered for CLIENT_SUBCHANNEL + auto filter_stack = builder.Build(); + connected_subchannel_ = MakeRefCounted( + std::move(filter_stack), + OrphanablePtr( + std::exchange(connecting_result_.transport, nullptr)), + args_, channelz_node_); + } + connecting_result_.Reset(); + // Publish. + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { + gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this, + key_.ToString().c_str(), connected_subchannel_.get()); + } + if (channelz_node_ != nullptr) { + channelz_node_->SetChildSocket(std::move(socket_node)); + } + // Start watching connected subchannel. + connected_subchannel_->StartWatch( + pollset_set_, MakeOrphanable( + WeakRef(DEBUG_LOCATION, "state_watcher"))); + // Report initial state. + SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status()); + return true; +} + +ChannelArgs Subchannel::MakeSubchannelArgs( + const ChannelArgs& channel_args, const ChannelArgs& address_args, + const RefCountedPtr& subchannel_pool, + const std::string& channel_default_authority) { + // Note that we start with the channel-level args and then apply the + // per-address args, so that if a value is present in both, the one + // in the channel-level args is used. This is particularly important + // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow + // resolvers to set on a per-address basis only if the application + // did not explicitly set it at the channel level. + return channel_args.UnionWith(address_args) + .SetObject(subchannel_pool) + // If we haven't already set the default authority arg (i.e., it + // was not explicitly set by the application nor overridden by + // the resolver), add it from the channel's default. + .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority) + // Remove channel args that should not affect subchannel + // uniqueness. + .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME) + .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING) + .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE) + // Remove all keys with the no-subchannel prefix. + .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX); +} } // namespace grpc_core diff --git a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h index 6254e5f2f75..001282276f1 100644 --- a/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h +++ b/src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h @@ -40,7 +40,7 @@ namespace grpc_core { - Duration GetClientIdleTimeout(const ChannelArgs& args) ; +Duration GetClientIdleTimeout(const ChannelArgs& args); class LegacyChannelIdleFilter : public ChannelFilter { public: diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 501e3362950..e8d38de6976 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -89,7 +89,8 @@ struct IsRawPointerTagged { static constexpr bool kValue = false; }; template -struct IsRawPointerTagged> { +struct IsRawPointerTagged> { static constexpr bool kValue = true; }; } // namespace channel_args_detail @@ -100,14 +101,14 @@ struct IsRawPointerTagged> template struct ChannelArgTypeTraits< T, absl::enable_if_t< - !channel_args_detail::IsRawPointerTagged::kValue &&( - std::is_base_of>, - channel_args_detail::RefType>::value || - std::is_base_of, - NonPolymorphicRefCount>, - channel_args_detail::RefType>::value || - std::is_base_of>, - channel_args_detail::RefType>::value), + !channel_args_detail::IsRawPointerTagged::kValue && + (std::is_base_of>, + channel_args_detail::RefType>::value || + std::is_base_of, + NonPolymorphicRefCount>, + channel_args_detail::RefType>::value || + std::is_base_of>, + channel_args_detail::RefType>::value), void>> { static const grpc_arg_pointer_vtable* VTable() { static const grpc_arg_pointer_vtable tbl = { diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 1d37b464e45..c8be3af5bdf 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -108,7 +108,8 @@ class Context::kIndex)>> { .value); } static void set(T* value) { - auto& elem = GetContext()[OldStyleContext::kIndex]; + auto& elem = + GetContext()[OldStyleContext::kIndex]; if (elem.destroy != nullptr) { elem.destroy(elem.value); elem.destroy = nullptr; diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 4c003259201..41e82821f34 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -43,8 +43,8 @@ #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/surface/channel_stack_type.h" -#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/call_arena_allocator.h" +#include "src/core/lib/transport/connectivity_state.h" // Forward declaration to avoid dependency loop. struct grpc_channel_stack; diff --git a/src/core/load_balancing/lb_policy.h b/src/core/load_balancing/lb_policy.h index a9c6ae95ce9..906a3f00d3b 100644 --- a/src/core/load_balancing/lb_policy.h +++ b/src/core/load_balancing/lb_policy.h @@ -488,10 +488,11 @@ namespace promise_detail { template <> struct OldStyleContext { - static constexpr grpc_context_index kIndex = GRPC_SUBCHANNEL_CALL_TRACKER_INTERFACE; + static constexpr grpc_context_index kIndex = + GRPC_SUBCHANNEL_CALL_TRACKER_INTERFACE; }; -} +} // namespace promise_detail } // namespace grpc_core