From ea8b74a33a2cc3a5bad074fdb7381dd4b8296062 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 1 Feb 2024 16:35:26 +0000 Subject: [PATCH] [client channel] rename ClientChannel to ClientChannelFilter --- .../client_channel/channel_connectivity.cc | 20 +- .../filters/client_channel/client_channel.cc | 428 +++++++++--------- .../filters/client_channel/client_channel.h | 60 +-- .../client_channel/client_channel_internal.h | 2 +- .../client_channel/client_channel_plugin.cc | 4 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 8 +- .../client_channel/lb_policy/rls/rls.cc | 8 +- .../filters/client_channel/retry_filter.cc | 2 +- .../ext/filters/client_channel/retry_filter.h | 4 +- .../retry_filter_legacy_call_data.cc | 7 +- .../retry_filter_legacy_call_data.h | 7 +- src/core/ext/xds/xds_transport_grpc.cc | 8 +- .../client_channel/client_channel_test.cc | 12 +- 13 files changed, 294 insertions(+), 276 deletions(-) diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index cc5068ec656..d216c98b1fb 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -67,8 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( (c_channel, try_to_connect)); grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); // Forward through to the underlying client channel. - grpc_core::ClientChannel* client_channel = - grpc_core::ClientChannel::GetFromChannel(channel); + grpc_core::ClientChannelFilter* client_channel = + grpc_core::ClientChannelFilter::GetFromChannel(channel); if (GPR_UNLIKELY(client_channel == nullptr)) { if (grpc_core::IsLameChannel(channel)) { return GRPC_CHANNEL_TRANSIENT_FAILURE; @@ -83,8 +83,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) { grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); - grpc_core::ClientChannel* client_channel = - grpc_core::ClientChannel::GetFromChannel(channel); + grpc_core::ClientChannelFilter* client_channel = + grpc_core::ClientChannelFilter::GetFromChannel(channel); if (client_channel == nullptr) { if (!grpc_core::IsLameChannel(channel)) { gpr_log(GPR_ERROR, @@ -97,7 +97,7 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) { } int grpc_channel_support_connectivity_watcher(grpc_channel* channel) { - return grpc_core::ClientChannel::GetFromChannel( + return grpc_core::ClientChannelFilter::GetFromChannel( grpc_core::Channel::FromC(channel)) != nullptr; } @@ -115,8 +115,8 @@ class StateWatcher : public DualRefCounted { state_(last_observed_state) { GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr); - ClientChannel* client_channel = - ClientChannel::GetFromChannel(channel_.get()); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(channel_.get()); if (client_channel == nullptr) { // If the target URI used to create the channel was invalid, channel // stack initialization failed, and that caused us to create a lame @@ -145,7 +145,7 @@ class StateWatcher : public DualRefCounted { private: // A fire-and-forget object used to delay starting the timer until the - // ClientChannel actually starts the watch. + // ClientChannelFilter actually starts the watch. class WatcherTimerInitState { public: WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline) @@ -201,8 +201,8 @@ class StateWatcher : public DualRefCounted { void TimeoutComplete() { timer_fired_ = true; // If this is a client channel (not a lame channel), cancel the watch. - ClientChannel* client_channel = - ClientChannel::GetFromChannel(channel_.get()); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(channel_.get()); if (client_channel != nullptr) { client_channel->CancelExternalConnectivityWatcher(&on_complete_); } diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 29b5d7ec4d1..c5391aff84e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -120,20 +120,20 @@ TraceFlag grpc_client_channel_call_trace(false, "client_channel_call"); TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call"); // -// ClientChannel::CallData definition +// ClientChannelFilter::CallData definition // -class ClientChannel::CallData { +class ClientChannelFilter::CallData { public: // Removes the call from the channel's list of calls queued // for name resolution. void RemoveCallFromResolverQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_); // Called by the channel for each queued call when a new resolution // result becomes available. virtual void RetryCheckResolutionLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) = 0; + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) = 0; RefCountedPtr dynamic_filters() const { return dynamic_filters_; @@ -157,7 +157,7 @@ class ClientChannel::CallData { private: // Accessors for data stored in the subclass. - virtual ClientChannel* chand() const = 0; + virtual ClientChannelFilter* chand() const = 0; virtual Arena* arena() const = 0; virtual grpc_polling_entity* pollent() = 0; virtual grpc_metadata_batch* send_initial_metadata() = 0; @@ -168,15 +168,15 @@ class ClientChannel::CallData { // an invalid resolution result but the call is not wait_for_ready). bool CheckResolutionLocked( absl::StatusOr>* config_selector) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_); // Adds the call to the channel's list of calls queued for name resolution. void AddCallToResolverQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_); // Called when adding the call to the resolver queue. virtual void OnAddToQueueLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {} + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {} // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. @@ -192,7 +192,8 @@ class ClientChannel::CallData { RefCountedPtr dynamic_filters_; }; -class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { +class ClientChannelFilter::FilterBasedCallData + : public ClientChannelFilter::CallData { public: static grpc_error_handle Init(grpc_call_element* elem, const grpc_call_element_args* args); @@ -214,8 +215,8 @@ class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { grpc_call_stack* owning_call() const { return deadline_state_.call_stack; } CallCombiner* call_combiner() const { return deadline_state_.call_combiner; } - ClientChannel* chand() const override { - return static_cast(elem()->channel_data); + ClientChannelFilter* chand() const override { + return static_cast(elem()->channel_data); } Arena* arena() const override { return deadline_state_.arena; } grpc_polling_entity* pollent() override { return pollent_; } @@ -262,10 +263,10 @@ class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { void TryCheckResolution(bool was_queued); void OnAddToQueueLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_); void RetryCheckResolutionLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_); void ResetDeadline(Duration timeout) override { const Timestamp per_method_deadline = @@ -291,9 +292,9 @@ class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { grpc_polling_entity* pollent_ = nullptr; - // Accessed while holding ClientChannel::resolution_mu_. + // Accessed while holding ClientChannelFilter::resolution_mu_. ResolverQueuedCallCanceller* resolver_call_canceller_ - ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr; + ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_) = nullptr; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; grpc_closure recv_trailing_metadata_ready_; @@ -311,9 +312,10 @@ class ClientChannel::FilterBasedCallData : public ClientChannel::CallData { grpc_error_handle cancel_error_; }; -class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { +class ClientChannelFilter::PromiseBasedCallData + : public ClientChannelFilter::CallData { public: - explicit PromiseBasedCallData(ClientChannel* chand) : chand_(chand) {} + explicit PromiseBasedCallData(ClientChannelFilter* chand) : chand_(chand) {} ~PromiseBasedCallData() override { if (was_queued_ && client_initial_metadata_ != nullptr) { @@ -360,7 +362,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { } private: - ClientChannel* chand() const override { return chand_; } + ClientChannelFilter* chand() const override { return chand_; } Arena* arena() const override { return GetContext(); } grpc_polling_entity* pollent() override { return &pollent_; } grpc_metadata_batch* send_initial_metadata() override { @@ -371,13 +373,13 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { } void OnAddToQueueLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) { waker_ = GetContext()->MakeNonOwningWaker(); was_queued_ = true; } - void RetryCheckResolutionLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) override { + void RetryCheckResolutionLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED( + &ClientChannelFilter::resolution_mu_) override { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s", chand_, this, waker_.ActivityDebugTag().c_str()); @@ -393,48 +395,48 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { call_context->UpdateDeadline(per_method_deadline); } - ClientChannel* chand_; + ClientChannelFilter* chand_; grpc_polling_entity pollent_; ClientMetadataHandle client_initial_metadata_; bool was_queued_ = false; - Waker waker_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_); + Waker waker_ ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_); }; // // Filter vtable // -const grpc_channel_filter ClientChannel::kFilterVtableWithPromises = { - ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, - ClientChannel::MakeCallPromise, +const grpc_channel_filter ClientChannelFilter::kFilterVtableWithPromises = { + ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch, + ClientChannelFilter::MakeCallPromise, /* init_call: */ nullptr, - ClientChannel::StartTransportOp, - sizeof(ClientChannel::FilterBasedCallData), - ClientChannel::FilterBasedCallData::Init, - ClientChannel::FilterBasedCallData::SetPollent, - ClientChannel::FilterBasedCallData::Destroy, - sizeof(ClientChannel), - ClientChannel::Init, + ClientChannelFilter::StartTransportOp, + sizeof(ClientChannelFilter::FilterBasedCallData), + ClientChannelFilter::FilterBasedCallData::Init, + ClientChannelFilter::FilterBasedCallData::SetPollent, + ClientChannelFilter::FilterBasedCallData::Destroy, + sizeof(ClientChannelFilter), + ClientChannelFilter::Init, grpc_channel_stack_no_post_init, - ClientChannel::Destroy, - ClientChannel::GetChannelInfo, + ClientChannelFilter::Destroy, + ClientChannelFilter::GetChannelInfo, "client-channel", }; -const grpc_channel_filter ClientChannel::kFilterVtableWithoutPromises = { - ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch, +const grpc_channel_filter ClientChannelFilter::kFilterVtableWithoutPromises = { + ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch, nullptr, /* init_call: */ nullptr, - ClientChannel::StartTransportOp, - sizeof(ClientChannel::FilterBasedCallData), - ClientChannel::FilterBasedCallData::Init, - ClientChannel::FilterBasedCallData::SetPollent, - ClientChannel::FilterBasedCallData::Destroy, - sizeof(ClientChannel), - ClientChannel::Init, + ClientChannelFilter::StartTransportOp, + sizeof(ClientChannelFilter::FilterBasedCallData), + ClientChannelFilter::FilterBasedCallData::Init, + ClientChannelFilter::FilterBasedCallData::SetPollent, + ClientChannelFilter::FilterBasedCallData::Destroy, + sizeof(ClientChannelFilter), + ClientChannelFilter::Init, grpc_channel_stack_no_post_init, - ClientChannel::Destroy, - ClientChannel::GetChannelInfo, + ClientChannelFilter::Destroy, + ClientChannelFilter::GetChannelInfo, "client-channel", }; @@ -490,9 +492,9 @@ class DynamicTerminationFilter { private: explicit DynamicTerminationFilter(const ChannelArgs& args) - : chand_(args.GetObject()) {} + : chand_(args.GetObject()) {} - ClientChannel* chand_; + ClientChannelFilter* chand_; }; class DynamicTerminationFilter::CallData { @@ -530,7 +532,7 @@ class DynamicTerminationFilter::CallData { grpc_polling_entity* pollent) { auto* calld = static_cast(elem->call_data); auto* chand = static_cast(elem->channel_data); - ClientChannel* client_channel = chand->chand_; + ClientChannelFilter* client_channel = chand->chand_; grpc_call_element_args args = {calld->owning_call_, nullptr, calld->call_context_, calld->path_, /*start_time=*/0, calld->deadline_, @@ -566,7 +568,7 @@ class DynamicTerminationFilter::CallData { CallCombiner* call_combiner_; grpc_call_context_element* call_context_; - OrphanablePtr lb_call_; + OrphanablePtr lb_call_; }; const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = { @@ -589,12 +591,13 @@ const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = { } // namespace // -// ClientChannel::ResolverResultHandler +// ClientChannelFilter::ResolverResultHandler // -class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { +class ClientChannelFilter::ResolverResultHandler + : public Resolver::ResultHandler { public: - explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) { + explicit ResolverResultHandler(ClientChannelFilter* chand) : chand_(chand) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler"); } @@ -611,11 +614,11 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { } private: - ClientChannel* chand_; + ClientChannelFilter* chand_; }; // -// ClientChannel::SubchannelWrapper +// ClientChannelFilter::SubchannelWrapper // // This class is a wrapper for Subchannel that hides details of the @@ -626,9 +629,10 @@ class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler { // underlying subchannel is shared between channels, this wrapper will only // be used within one channel, so it will always be synchronized by the // control plane work_serializer. -class ClientChannel::SubchannelWrapper : public SubchannelInterface { +class ClientChannelFilter::SubchannelWrapper : public SubchannelInterface { public: - SubchannelWrapper(ClientChannel* chand, RefCountedPtr subchannel) + SubchannelWrapper(ClientChannelFilter* chand, + RefCountedPtr subchannel) : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace) ? "SubchannelWrapper" : nullptr), @@ -884,7 +888,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { } }; - ClientChannel* chand_; + ClientChannelFilter* chand_; RefCountedPtr subchannel_; // Maps from the address of the watcher passed to us by the LB policy // to the address of the WrapperWatcher that we passed to the underlying @@ -898,11 +902,11 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { }; // -// ClientChannel::ExternalConnectivityWatcher +// ClientChannelFilter::ExternalConnectivityWatcher // -ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher( - ClientChannel* chand, grpc_polling_entity pollent, +ClientChannelFilter::ExternalConnectivityWatcher::ExternalConnectivityWatcher( + ClientChannelFilter* chand, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) : chand_(chand), @@ -932,15 +936,16 @@ ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher( DEBUG_LOCATION); } -ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { +ClientChannelFilter::ExternalConnectivityWatcher:: + ~ExternalConnectivityWatcher() { grpc_polling_entity_del_from_pollset_set(&pollent_, chand_->interested_parties_); GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ExternalConnectivityWatcher"); } -void ClientChannel::ExternalConnectivityWatcher:: - RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, +void ClientChannelFilter::ExternalConnectivityWatcher:: + RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand, grpc_closure* on_complete, bool cancel) { RefCountedPtr watcher; @@ -957,7 +962,7 @@ void ClientChannel::ExternalConnectivityWatcher:: if (watcher != nullptr && cancel) watcher->Cancel(); } -void ClientChannel::ExternalConnectivityWatcher::Notify( +void ClientChannelFilter::ExternalConnectivityWatcher::Notify( grpc_connectivity_state state, const absl::Status& /* status */) { bool done = false; if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed, @@ -986,7 +991,7 @@ void ClientChannel::ExternalConnectivityWatcher::Notify( } } -void ClientChannel::ExternalConnectivityWatcher::Cancel() { +void ClientChannelFilter::ExternalConnectivityWatcher::Cancel() { bool done = false; if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed, std::memory_order_relaxed)) { @@ -1005,25 +1010,25 @@ void ClientChannel::ExternalConnectivityWatcher::Cancel() { DEBUG_LOCATION); } -void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() { +void ClientChannelFilter::ExternalConnectivityWatcher::AddWatcherLocked() { Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus()); // Add new watcher. Pass the ref of the object from creation to OrphanablePtr. chand_->state_tracker_.AddWatcher( initial_state_, OrphanablePtr(this)); } -void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() { +void ClientChannelFilter::ExternalConnectivityWatcher::RemoveWatcherLocked() { chand_->state_tracker_.RemoveWatcher(this); } // -// ClientChannel::ConnectivityWatcherAdder +// ClientChannelFilter::ConnectivityWatcherAdder // -class ClientChannel::ConnectivityWatcherAdder { +class ClientChannelFilter::ConnectivityWatcherAdder { public: ConnectivityWatcherAdder( - ClientChannel* chand, grpc_connectivity_state initial_state, + ClientChannelFilter* chand, grpc_connectivity_state initial_state, OrphanablePtr watcher) : chand_(chand), initial_state_(initial_state), @@ -1044,18 +1049,18 @@ class ClientChannel::ConnectivityWatcherAdder { delete this; } - ClientChannel* chand_; + ClientChannelFilter* chand_; grpc_connectivity_state initial_state_; OrphanablePtr watcher_; }; // -// ClientChannel::ConnectivityWatcherRemover +// ClientChannelFilter::ConnectivityWatcherRemover // -class ClientChannel::ConnectivityWatcherRemover { +class ClientChannelFilter::ConnectivityWatcherRemover { public: - ConnectivityWatcherRemover(ClientChannel* chand, + ConnectivityWatcherRemover(ClientChannelFilter* chand, AsyncConnectivityStateWatcherInterface* watcher) : chand_(chand), watcher_(watcher) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); @@ -1075,18 +1080,19 @@ class ClientChannel::ConnectivityWatcherRemover { delete this; } - ClientChannel* chand_; + ClientChannelFilter* chand_; AsyncConnectivityStateWatcherInterface* watcher_; }; // -// ClientChannel::ClientChannelControlHelper +// ClientChannelFilter::ClientChannelControlHelper // -class ClientChannel::ClientChannelControlHelper +class ClientChannelFilter::ClientChannelControlHelper : public LoadBalancingPolicy::ChannelControlHelper { public: - explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) { + explicit ClientChannelControlHelper(ClientChannelFilter* chand) + : chand_(chand) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper"); } @@ -1100,7 +1106,7 @@ class ClientChannel::ClientChannelControlHelper const ChannelArgs& args) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { if (chand_->resolver_ == nullptr) return nullptr; // Shutting down. - ChannelArgs subchannel_args = ClientChannel::MakeSubchannelArgs( + ChannelArgs subchannel_args = ClientChannelFilter::MakeSubchannelArgs( args, per_address_args, chand_->subchannel_pool_, chand_->default_authority_); // Create subchannel. @@ -1178,36 +1184,36 @@ class ClientChannel::ClientChannelControlHelper return channelz::ChannelTrace::Error; } - ClientChannel* chand_; + ClientChannelFilter* chand_; }; // -// ClientChannel implementation +// ClientChannelFilter implementation // -ClientChannel* ClientChannel::GetFromChannel(Channel* channel) { +ClientChannelFilter* ClientChannelFilter::GetFromChannel(Channel* channel) { grpc_channel_element* elem = grpc_channel_stack_last_element(channel->channel_stack()); if (elem->filter != &kFilterVtableWithPromises && elem->filter != &kFilterVtableWithoutPromises) { return nullptr; } - return static_cast(elem->channel_data); + return static_cast(elem->channel_data); } -grpc_error_handle ClientChannel::Init(grpc_channel_element* elem, - grpc_channel_element_args* args) { +grpc_error_handle ClientChannelFilter::Init(grpc_channel_element* elem, + grpc_channel_element_args* args) { GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &kFilterVtableWithPromises || elem->filter == &kFilterVtableWithoutPromises); grpc_error_handle error; - new (elem->channel_data) ClientChannel(args, &error); + new (elem->channel_data) ClientChannelFilter(args, &error); return error; } -void ClientChannel::Destroy(grpc_channel_element* elem) { - ClientChannel* chand = static_cast(elem->channel_data); - chand->~ClientChannel(); +void ClientChannelFilter::Destroy(grpc_channel_element* elem) { + auto* chand = static_cast(elem->channel_data); + chand->~ClientChannelFilter(); } namespace { @@ -1222,8 +1228,8 @@ RefCountedPtr GetSubchannelPool( } // namespace -ClientChannel::ClientChannel(grpc_channel_element_args* args, - grpc_error_handle* error) +ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args, + grpc_error_handle* error) : channel_args_(args->channel_args), deadline_checking_enabled_( channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS) @@ -1308,7 +1314,7 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args, *error = absl::OkStatus(); } -ClientChannel::~ClientChannel() { +ClientChannelFilter::~ClientChannelFilter() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: destroying channel", this); } @@ -1318,9 +1324,9 @@ ClientChannel::~ClientChannel() { grpc_pollset_set_destroy(interested_parties_); } -ArenaPromise ClientChannel::MakeCallPromise( +ArenaPromise ClientChannelFilter::MakeCallPromise( grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) { - auto* chand = static_cast(elem->channel_data); + auto* chand = static_cast(elem->channel_data); // TODO(roth): Is this the right lifetime story for calld? auto* calld = GetContext()->ManagedNew(chand); return TrySeq( @@ -1333,8 +1339,8 @@ ArenaPromise ClientChannel::MakeCallPromise( }); } -OrphanablePtr -ClientChannel::CreateLoadBalancedCall( +OrphanablePtr +ClientChannelFilter::CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, absl::AnyInvocable on_commit, bool is_transparent_retry) { @@ -1344,7 +1350,8 @@ ClientChannel::CreateLoadBalancedCall( std::move(on_commit), is_transparent_retry)); } -ArenaPromise ClientChannel::CreateLoadBalancedCallPromise( +ArenaPromise +ClientChannelFilter::CreateLoadBalancedCallPromise( CallArgs call_args, absl::AnyInvocable on_commit, bool is_transparent_retry) { OrphanablePtr lb_call( @@ -1354,7 +1361,7 @@ ArenaPromise ClientChannel::CreateLoadBalancedCallPromise( return call_ptr->MakeCallPromise(std::move(call_args), std::move(lb_call)); } -ChannelArgs ClientChannel::MakeSubchannelArgs( +ChannelArgs ClientChannelFilter::MakeSubchannelArgs( const ChannelArgs& channel_args, const ChannelArgs& address_args, const RefCountedPtr& subchannel_pool, const std::string& channel_default_authority) { @@ -1379,7 +1386,7 @@ ChannelArgs ClientChannel::MakeSubchannelArgs( .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX); } -void ClientChannel::ReprocessQueuedResolverCalls() { +void ClientChannelFilter::ReprocessQueuedResolverCalls() { for (CallData* calld : resolver_queued_calls_) { calld->RemoveCallFromResolverQueuedCallsLocked(); calld->RetryCheckResolutionLocked(); @@ -1448,7 +1455,8 @@ RefCountedPtr ChooseLbPolicy( } // namespace -void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { +void ClientChannelFilter::OnResolverResultChangedLocked( + Resolver::Result result) { // Handle race conditions. if (resolver_ == nullptr) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { @@ -1583,7 +1591,7 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) { } } -void ClientChannel::OnResolverErrorLocked(absl::Status status) { +void ClientChannelFilter::OnResolverErrorLocked(absl::Status status) { if (resolver_ == nullptr) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this, @@ -1606,7 +1614,7 @@ void ClientChannel::OnResolverErrorLocked(absl::Status status) { } } -absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked( +absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, const absl::optional& health_check_service_name, Resolver::Result result) { @@ -1642,7 +1650,7 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked( } // Creates a new LB policy. -OrphanablePtr ClientChannel::CreateLbPolicyLocked( +OrphanablePtr ClientChannelFilter::CreateLbPolicyLocked( const ChannelArgs& args) { // The LB policy will start in state CONNECTING but will not // necessarily send us an update synchronously, so set state to @@ -1669,7 +1677,7 @@ OrphanablePtr ClientChannel::CreateLbPolicyLocked( return lb_policy; } -void ClientChannel::UpdateServiceConfigInControlPlaneLocked( +void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, RefCountedPtr config_selector, std::string lb_policy_name) { std::string service_config_json(service_config->json_string()); @@ -1693,7 +1701,7 @@ void ClientChannel::UpdateServiceConfigInControlPlaneLocked( } } -void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { +void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked() { // Grab ref to service config. RefCountedPtr service_config = saved_service_config_; // Grab ref to config selector. Use default if resolver didn't supply one. @@ -1742,7 +1750,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { // of scope. } -void ClientChannel::CreateResolverLocked() { +void ClientChannelFilter::CreateResolverLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: starting name resolution for %s", this, uri_to_resolve_.c_str()); @@ -1761,7 +1769,7 @@ void ClientChannel::CreateResolverLocked() { } } -void ClientChannel::DestroyResolverAndLbPolicyLocked() { +void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() { if (resolver_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this, @@ -1797,9 +1805,9 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() { } } -void ClientChannel::UpdateStateLocked(grpc_connectivity_state state, - const absl::Status& status, - const char* reason) { +void ClientChannelFilter::UpdateStateLocked(grpc_connectivity_state state, + const absl::Status& status, + const char* reason) { if (state != GRPC_CHANNEL_SHUTDOWN && state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) { Crash("Illegal transition SHUTDOWN -> anything"); @@ -1815,7 +1823,7 @@ void ClientChannel::UpdateStateLocked(grpc_connectivity_state state, } } -void ClientChannel::UpdateStateAndPickerLocked( +void ClientChannelFilter::UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, RefCountedPtr picker) { @@ -1867,7 +1875,7 @@ T HandlePickResult( } // namespace -grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { +grpc_error_handle ClientChannelFilter::DoPingLocked(grpc_transport_op* op) { if (state_tracker_.state() != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE("channel not connected"); } @@ -1880,7 +1888,8 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { &result, // Complete pick. [op](LoadBalancingPolicy::PickResult::Complete* complete_pick) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*ClientChannel::work_serializer_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED( + *ClientChannelFilter::work_serializer_) { SubchannelWrapper* subchannel = static_cast( complete_pick->subchannel.get()); RefCountedPtr connected_subchannel = @@ -1906,7 +1915,7 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) { }); } -void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { +void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) { // Connectivity watch. if (op->start_connectivity_watch != nullptr) { state_tracker_.AddWatcher(op->start_connectivity_watch_state, @@ -1969,9 +1978,9 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) { ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus()); } -void ClientChannel::StartTransportOp(grpc_channel_element* elem, - grpc_transport_op* op) { - ClientChannel* chand = static_cast(elem->channel_data); +void ClientChannelFilter::StartTransportOp(grpc_channel_element* elem, + grpc_transport_op* op) { + auto* chand = static_cast(elem->channel_data); GPR_ASSERT(op->set_accept_stream == false); // Handle bind_pollset. if (op->bind_pollset != nullptr) { @@ -1986,9 +1995,9 @@ void ClientChannel::StartTransportOp(grpc_channel_element* elem, DEBUG_LOCATION); } -void ClientChannel::GetChannelInfo(grpc_channel_element* elem, - const grpc_channel_info* info) { - ClientChannel* chand = static_cast(elem->channel_data); +void ClientChannelFilter::GetChannelInfo(grpc_channel_element* elem, + const grpc_channel_info* info) { + auto* chand = static_cast(elem->channel_data); MutexLock lock(&chand->info_mu_); if (info->lb_policy_name != nullptr) { *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str()); @@ -1999,7 +2008,7 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem, } } -void ClientChannel::TryToConnectLocked() { +void ClientChannelFilter::TryToConnectLocked() { if (disconnect_error_.ok()) { if (lb_policy_ != nullptr) { lb_policy_->ExitIdleLocked(); @@ -2010,7 +2019,7 @@ void ClientChannel::TryToConnectLocked() { GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect"); } -grpc_connectivity_state ClientChannel::CheckConnectivityState( +grpc_connectivity_state ClientChannelFilter::CheckConnectivityState( bool try_to_connect) { // state_tracker_ is guarded by work_serializer_, which we're not // holding here. But the one method of state_tracker_ that *is* @@ -2026,13 +2035,13 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState( return out; } -void ClientChannel::AddConnectivityWatcher( +void ClientChannelFilter::AddConnectivityWatcher( grpc_connectivity_state initial_state, OrphanablePtr watcher) { new ConnectivityWatcherAdder(this, initial_state, std::move(watcher)); } -void ClientChannel::RemoveConnectivityWatcher( +void ClientChannelFilter::RemoveConnectivityWatcher( AsyncConnectivityStateWatcherInterface* watcher) { new ConnectivityWatcherRemover(this, watcher); } @@ -2041,7 +2050,7 @@ void ClientChannel::RemoveConnectivityWatcher( // CallData implementation // -void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() { +void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: removing from resolver queued picks list", @@ -2053,10 +2062,10 @@ void ClientChannel::CallData::RemoveCallFromResolverQueuedCallsLocked() { // Note: There's no need to actually remove the call from the queue // here, because that will be done in // ResolverQueuedCallCanceller::CancelLocked() or - // ClientChannel::ReprocessQueuedResolverCalls(). + // ClientChannelFilter::ReprocessQueuedResolverCalls(). } -void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() { +void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log( GPR_INFO, @@ -2072,7 +2081,7 @@ void ClientChannel::CallData::AddCallToResolverQueuedCallsLocked() { OnAddToQueueLocked(); } -grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( +grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked( const absl::StatusOr>& config_selector) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", @@ -2118,7 +2127,7 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( return absl::OkStatus(); } -absl::optional ClientChannel::CallData::CheckResolution( +absl::optional ClientChannelFilter::CallData::CheckResolution( bool was_queued) { // Check if we have a resolver result to use. absl::StatusOr> config_selector; @@ -2161,7 +2170,7 @@ absl::optional ClientChannel::CallData::CheckResolution( return absl::OkStatus(); } -bool ClientChannel::CallData::CheckResolutionLocked( +bool ClientChannelFilter::CallData::CheckResolutionLocked( absl::StatusOr>* config_selector) { // If we don't yet have a resolver result, we need to queue the call // until we get one. @@ -2197,23 +2206,24 @@ bool ClientChannel::CallData::CheckResolutionLocked( // FilterBasedCallData implementation // -ClientChannel::FilterBasedCallData::FilterBasedCallData( +ClientChannelFilter::FilterBasedCallData::FilterBasedCallData( grpc_call_element* elem, const grpc_call_element_args& args) : path_(CSliceRef(args.path)), call_context_(args.context), call_start_time_(args.start_time), deadline_(args.deadline), - deadline_state_(elem, args, - GPR_LIKELY(static_cast(elem->channel_data) - ->deadline_checking_enabled_) - ? args.deadline - : Timestamp::InfFuture()) { + deadline_state_( + elem, args, + GPR_LIKELY(static_cast(elem->channel_data) + ->deadline_checking_enabled_) + ? args.deadline + : Timestamp::InfFuture()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this); } } -ClientChannel::FilterBasedCallData::~FilterBasedCallData() { +ClientChannelFilter::FilterBasedCallData::~FilterBasedCallData() { CSliceUnref(path_); // Make sure there are no remaining pending batches. for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { @@ -2221,13 +2231,13 @@ ClientChannel::FilterBasedCallData::~FilterBasedCallData() { } } -grpc_error_handle ClientChannel::FilterBasedCallData::Init( +grpc_error_handle ClientChannelFilter::FilterBasedCallData::Init( grpc_call_element* elem, const grpc_call_element_args* args) { new (elem->call_data) FilterBasedCallData(elem, *args); return absl::OkStatus(); } -void ClientChannel::FilterBasedCallData::Destroy( +void ClientChannelFilter::FilterBasedCallData::Destroy( grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, grpc_closure* then_schedule_closure) { auto* calld = static_cast(elem->call_data); @@ -2242,10 +2252,10 @@ void ClientChannel::FilterBasedCallData::Destroy( } } -void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch( +void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { auto* calld = static_cast(elem->call_data); - ClientChannel* chand = static_cast(elem->channel_data); + auto* chand = static_cast(elem->channel_data); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace) && !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand, @@ -2350,13 +2360,13 @@ void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch( } } -void ClientChannel::FilterBasedCallData::SetPollent( +void ClientChannelFilter::FilterBasedCallData::SetPollent( grpc_call_element* elem, grpc_polling_entity* pollent) { auto* calld = static_cast(elem->call_data); calld->pollent_ = pollent; } -size_t ClientChannel::FilterBasedCallData::GetBatchIndex( +size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex( grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in CheckResolution() assumes it will be. @@ -2370,7 +2380,7 @@ size_t ClientChannel::FilterBasedCallData::GetBatchIndex( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedCallData::PendingBatchesAdd( +void ClientChannelFilter::FilterBasedCallData::PendingBatchesAdd( grpc_transport_stream_op_batch* batch) { const size_t idx = GetBatchIndex(batch); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { @@ -2384,7 +2394,7 @@ void ClientChannel::FilterBasedCallData::PendingBatchesAdd( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner( +void ClientChannelFilter::FilterBasedCallData::FailPendingBatchInCallCombiner( void* arg, grpc_error_handle error) { grpc_transport_stream_op_batch* batch = static_cast(arg); @@ -2396,7 +2406,7 @@ void ClientChannel::FilterBasedCallData::FailPendingBatchInCallCombiner( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedCallData::PendingBatchesFail( +void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail( grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate) { GPR_ASSERT(!error.ok()); @@ -2430,7 +2440,7 @@ void ClientChannel::FilterBasedCallData::PendingBatchesFail( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner( +void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner( void* arg, grpc_error_handle /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); @@ -2441,7 +2451,7 @@ void ClientChannel::FilterBasedCallData::ResumePendingBatchInCallCombiner( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedCallData::PendingBatchesResume() { +void ClientChannelFilter::FilterBasedCallData::PendingBatchesResume() { // Retries not enabled; send down batches as-is. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { size_t num_batches = 0; @@ -2471,7 +2481,7 @@ void ClientChannel::FilterBasedCallData::PendingBatchesResume() { // A class to handle the call combiner cancellation callback for a // queued pick. -class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller { +class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller { public: explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld) : calld_(calld) { @@ -2512,7 +2522,8 @@ class ClientChannel::FilterBasedCallData::ResolverQueuedCallCanceller { grpc_closure closure_; }; -void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) { +void ClientChannelFilter::FilterBasedCallData::TryCheckResolution( + bool was_queued) { auto result = CheckResolution(was_queued); if (result.has_value()) { if (!result->ok()) { @@ -2523,12 +2534,12 @@ void ClientChannel::FilterBasedCallData::TryCheckResolution(bool was_queued) { } } -void ClientChannel::FilterBasedCallData::OnAddToQueueLocked() { +void ClientChannelFilter::FilterBasedCallData::OnAddToQueueLocked() { // Register call combiner cancellation callback. resolver_call_canceller_ = new ResolverQueuedCallCanceller(this); } -void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() { +void ClientChannelFilter::FilterBasedCallData::RetryCheckResolutionLocked() { // Lame the call combiner canceller. resolver_call_canceller_ = nullptr; // Do an async callback to resume call processing, so that we're not @@ -2540,7 +2551,7 @@ void ClientChannel::FilterBasedCallData::RetryCheckResolutionLocked() { }); } -void ClientChannel::FilterBasedCallData::CreateDynamicCall() { +void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() { DynamicFilters::Call::Args args = {dynamic_filters(), pollent_, path_, call_start_time_, deadline_, arena(), call_context_, call_combiner()}; @@ -2565,7 +2576,7 @@ void ClientChannel::FilterBasedCallData::CreateDynamicCall() { PendingBatchesResume(); } -void ClientChannel::FilterBasedCallData:: +void ClientChannelFilter::FilterBasedCallData:: RecvTrailingMetadataReadyForConfigSelectorCommitCallback( void* arg, grpc_error_handle error) { auto* calld = static_cast(arg); @@ -2588,10 +2599,10 @@ void ClientChannel::FilterBasedCallData:: } // -// ClientChannel::LoadBalancedCall::LbCallState +// ClientChannelFilter::LoadBalancedCall::LbCallState // -class ClientChannel::LoadBalancedCall::LbCallState +class ClientChannelFilter::LoadBalancedCall::LbCallState : public ClientChannelLbCallState { public: explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {} @@ -2610,10 +2621,10 @@ class ClientChannel::LoadBalancedCall::LbCallState }; // -// ClientChannel::LoadBalancedCall::Metadata +// ClientChannelFilter::LoadBalancedCall::Metadata // -class ClientChannel::LoadBalancedCall::Metadata +class ClientChannelFilter::LoadBalancedCall::Metadata : public LoadBalancingPolicy::MetadataInterface { public: explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {} @@ -2685,11 +2696,11 @@ class ClientChannel::LoadBalancedCall::Metadata }; // -// ClientChannel::LoadBalancedCall::LbCallState +// ClientChannelFilter::LoadBalancedCall::LbCallState // ServiceConfigCallData::CallAttributeInterface* -ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute( +ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttribute( UniqueTypeName type) const { auto* service_config_call_data = GetServiceConfigCallData(lb_call_->call_context_); @@ -2697,15 +2708,16 @@ ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute( } ClientCallTracer::CallAttemptTracer* -ClientChannel::LoadBalancedCall::LbCallState::GetCallAttemptTracer() const { +ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttemptTracer() + const { return lb_call_->call_attempt_tracer(); } // -// ClientChannel::LoadBalancedCall::BackendMetricAccessor +// ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor // -class ClientChannel::LoadBalancedCall::BackendMetricAccessor +class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor : public LoadBalancingPolicy::BackendMetricAccessor { public: BackendMetricAccessor(LoadBalancedCall* lb_call, @@ -2747,7 +2759,7 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor }; // -// ClientChannel::LoadBalancedCall +// ClientChannelFilter::LoadBalancedCall // namespace { @@ -2763,8 +2775,8 @@ void CreateCallAttemptTracer(grpc_call_context_element* context, } // namespace -ClientChannel::LoadBalancedCall::LoadBalancedCall( - ClientChannel* chand, grpc_call_context_element* call_context, +ClientChannelFilter::LoadBalancedCall::LoadBalancedCall( + ClientChannelFilter* chand, grpc_call_context_element* call_context, absl::AnyInvocable on_commit, bool is_transparent_retry) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) @@ -2779,13 +2791,13 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall( } } -ClientChannel::LoadBalancedCall::~LoadBalancedCall() { +ClientChannelFilter::LoadBalancedCall::~LoadBalancedCall() { if (backend_metric_data_ != nullptr) { backend_metric_data_->BackendMetricData::~BackendMetricData(); } } -void ClientChannel::LoadBalancedCall::RecordCallCompletion( +void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion( absl::Status status, grpc_metadata_batch* recv_trailing_metadata, grpc_transport_stream_stats* transport_stream_stats, absl::string_view peer_address) { @@ -2806,7 +2818,7 @@ void ClientChannel::LoadBalancedCall::RecordCallCompletion( } } -void ClientChannel::LoadBalancedCall::RecordLatency() { +void ClientChannelFilter::LoadBalancedCall::RecordLatency() { // Compute latency and report it to the tracer. if (call_attempt_tracer() != nullptr) { gpr_timespec latency = @@ -2815,7 +2827,8 @@ void ClientChannel::LoadBalancedCall::RecordLatency() { } } -void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() { +void ClientChannelFilter::LoadBalancedCall:: + RemoveCallFromLbQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list", chand_, this); @@ -2826,10 +2839,10 @@ void ClientChannel::LoadBalancedCall::RemoveCallFromLbQueuedCallsLocked() { // Note: There's no need to actually remove the call from the queue // here, beacuse that will be done in either // LbQueuedCallCanceller::CancelLocked() or - // in ClientChannel::UpdateStateAndPickerLocked(). + // in ClientChannelFilter::UpdateStateAndPickerLocked(). } -void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { +void ClientChannelFilter::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list", chand_, this); @@ -2843,8 +2856,8 @@ void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { OnAddToQueueLocked(); } -absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( - bool was_queued) { +absl::optional +ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) { // We may accumulate multiple pickers here, because if a picker says // to queue the call, we check again to see if the picker has been // updated before we queue it. @@ -2933,7 +2946,7 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( } } -bool ClientChannel::LoadBalancedCall::PickSubchannelImpl( +bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl( LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) { GPR_ASSERT(connected_subchannel_ == nullptr); // Perform LB pick. @@ -3023,11 +3036,11 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelImpl( } // -// ClientChannel::FilterBasedLoadBalancedCall +// ClientChannelFilter::FilterBasedLoadBalancedCall // -ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( - ClientChannel* chand, const grpc_call_element_args& args, +ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( + ClientChannelFilter* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, absl::AnyInvocable on_commit, bool is_transparent_retry) : LoadBalancedCall(chand, args.context, std::move(on_commit), @@ -3039,7 +3052,8 @@ ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( pollent_(pollent), on_call_destruction_complete_(on_call_destruction_complete) {} -ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() { +ClientChannelFilter::FilterBasedLoadBalancedCall:: + ~FilterBasedLoadBalancedCall() { // Make sure there are no remaining pending batches. for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { GPR_ASSERT(pending_batches_[i] == nullptr); @@ -3050,7 +3064,7 @@ ClientChannel::FilterBasedLoadBalancedCall::~FilterBasedLoadBalancedCall() { } } -void ClientChannel::FilterBasedLoadBalancedCall::Orphan() { +void ClientChannelFilter::FilterBasedLoadBalancedCall::Orphan() { // If the recv_trailing_metadata op was never started, then notify // about call completion here, as best we can. We assume status // CANCELLED in this case. @@ -3063,7 +3077,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::Orphan() { LoadBalancedCall::Orphan(); } -size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex( +size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex( grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in PickSubchannelImpl() assumes it will be. @@ -3077,7 +3091,7 @@ size_t ClientChannel::FilterBasedLoadBalancedCall::GetBatchIndex( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd( +void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesAdd( grpc_transport_stream_op_batch* batch) { const size_t idx = GetBatchIndex(batch); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { @@ -3090,8 +3104,8 @@ void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesAdd( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner( - void* arg, grpc_error_handle error) { +void ClientChannelFilter::FilterBasedLoadBalancedCall:: + FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error) { grpc_transport_stream_op_batch* batch = static_cast(arg); auto* self = static_cast( @@ -3102,7 +3116,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::FailPendingBatchInCallCombiner( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail( +void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail( grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate) { GPR_ASSERT(!error.ok()); @@ -3137,7 +3151,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesFail( } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedLoadBalancedCall:: +void ClientChannelFilter::FilterBasedLoadBalancedCall:: ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); @@ -3148,7 +3162,7 @@ void ClientChannel::FilterBasedLoadBalancedCall:: } // This is called via the call combiner, so access to calld is synchronized. -void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() { +void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { @@ -3176,8 +3190,8 @@ void ClientChannel::FilterBasedLoadBalancedCall::PendingBatchesResume() { closures.RunClosures(call_combiner_); } -void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch( - grpc_transport_stream_op_batch* batch) { +void ClientChannelFilter::FilterBasedLoadBalancedCall:: + StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) || GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { gpr_log(GPR_INFO, @@ -3291,7 +3305,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::StartTransportStreamOpBatch( } } -void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady( +void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady( void* arg, grpc_error_handle error) { auto* self = static_cast(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { @@ -3310,8 +3324,8 @@ void ClientChannel::FilterBasedLoadBalancedCall::RecvInitialMetadataReady( error); } -void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady( - void* arg, grpc_error_handle error) { +void ClientChannelFilter::FilterBasedLoadBalancedCall:: + RecvTrailingMetadataReady(void* arg, grpc_error_handle error) { auto* self = static_cast(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, @@ -3370,7 +3384,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::RecvTrailingMetadataReady( // because there may be multiple LB picks happening in parallel. // Instead, we will probably need to maintain a list in the CallData // object of pending LB picks to be cancelled when the closure runs. -class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller { +class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller { public: explicit LbQueuedCallCanceller( RefCountedPtr lb_call) @@ -3417,7 +3431,8 @@ class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller { grpc_closure closure_; }; -void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) { +void ClientChannelFilter::FilterBasedLoadBalancedCall::TryPick( + bool was_queued) { auto result = PickSubchannel(was_queued); if (result.has_value()) { if (!result->ok()) { @@ -3428,13 +3443,13 @@ void ClientChannel::FilterBasedLoadBalancedCall::TryPick(bool was_queued) { } } -void ClientChannel::FilterBasedLoadBalancedCall::OnAddToQueueLocked() { +void ClientChannelFilter::FilterBasedLoadBalancedCall::OnAddToQueueLocked() { // Register call combiner cancellation callback. lb_call_canceller_ = new LbQueuedCallCanceller(RefAsSubclass()); } -void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() { +void ClientChannelFilter::FilterBasedLoadBalancedCall::RetryPickLocked() { // Lame the call combiner canceller. lb_call_canceller_ = nullptr; // Do an async callback to resume call processing, so that we're not @@ -3463,7 +3478,7 @@ void ClientChannel::FilterBasedLoadBalancedCall::RetryPickLocked() { absl::OkStatus()); } -void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() { +void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() { Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata()); GPR_ASSERT(path != nullptr); SubchannelCall::Args call_args = { @@ -3491,17 +3506,17 @@ void ClientChannel::FilterBasedLoadBalancedCall::CreateSubchannelCall() { } // -// ClientChannel::PromiseBasedLoadBalancedCall +// ClientChannelFilter::PromiseBasedLoadBalancedCall // -ClientChannel::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall( - ClientChannel* chand, absl::AnyInvocable on_commit, +ClientChannelFilter::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall( + ClientChannelFilter* chand, absl::AnyInvocable on_commit, bool is_transparent_retry) : LoadBalancedCall(chand, GetContext(), std::move(on_commit), is_transparent_retry) {} ArenaPromise -ClientChannel::PromiseBasedLoadBalancedCall::MakeCallPromise( +ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise( CallArgs call_args, OrphanablePtr lb_call) { pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value(); // Record ops in tracer. @@ -3614,21 +3629,22 @@ ClientChannel::PromiseBasedLoadBalancedCall::MakeCallPromise( }); } -Arena* ClientChannel::PromiseBasedLoadBalancedCall::arena() const { +Arena* ClientChannelFilter::PromiseBasedLoadBalancedCall::arena() const { return GetContext(); } grpc_metadata_batch* -ClientChannel::PromiseBasedLoadBalancedCall::send_initial_metadata() const { +ClientChannelFilter::PromiseBasedLoadBalancedCall::send_initial_metadata() + const { return client_initial_metadata_.get(); } -void ClientChannel::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() { +void ClientChannelFilter::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() { waker_ = GetContext()->MakeNonOwningWaker(); was_queued_ = true; } -void ClientChannel::PromiseBasedLoadBalancedCall::RetryPickLocked() { +void ClientChannelFilter::PromiseBasedLoadBalancedCall::RetryPickLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: RetryPickLocked()", chand(), this); } diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index f54c5db4669..2e3b96af1d4 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -87,8 +87,8 @@ // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" -// Channel arg containing a pointer to the ClientChannel object. -#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel" +// Channel arg containing a pointer to the ClientChannelFilter object. +#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel_filter" // Max number of batches that can be pending on a call at any given // time. This includes one batch for each of the following ops: @@ -102,7 +102,7 @@ namespace grpc_core { -class ClientChannel { +class ClientChannelFilter { public: static const grpc_channel_filter kFilterVtableWithPromises; static const grpc_channel_filter kFilterVtableWithoutPromises; @@ -115,9 +115,9 @@ class ClientChannel { struct RawPointerChannelArgTag {}; static absl::string_view ChannelArgName() { return GRPC_ARG_CLIENT_CHANNEL; } - // Returns the ClientChannel object from channel, or null if channel + // Returns the ClientChannelFilter object from channel, or null if channel // is not a client channel. - static ClientChannel* GetFromChannel(Channel* channel); + static ClientChannelFilter* GetFromChannel(Channel* channel); static ArenaPromise MakeCallPromise( grpc_channel_element* elem, CallArgs call_args, @@ -196,7 +196,7 @@ class ClientChannel { // via grpc_client_channel_watch_connectivity_state(). class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { public: - ExternalConnectivityWatcher(ClientChannel* chand, + ExternalConnectivityWatcher(ClientChannelFilter* chand, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, @@ -205,7 +205,7 @@ class ClientChannel { ~ExternalConnectivityWatcher() override; // Removes the watcher from the external_watchers_ map. - static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, + static void RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand, grpc_closure* on_complete, bool cancel); @@ -222,7 +222,7 @@ class ClientChannel { void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_); - ClientChannel* chand_; + ClientChannelFilter* chand_; grpc_polling_entity pollent_; grpc_connectivity_state initial_state_; grpc_connectivity_state* state_; @@ -231,8 +231,9 @@ class ClientChannel { std::atomic done_{false}; }; - ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); - ~ClientChannel(); + ClientChannelFilter(grpc_channel_element_args* args, + grpc_error_handle* error); + ~ClientChannelFilter(); // Filter vtable functions. static grpc_error_handle Init(grpc_channel_element* elem, @@ -378,15 +379,15 @@ class ClientChannel { }; // -// ClientChannel::LoadBalancedCall +// ClientChannelFilter::LoadBalancedCall // // TODO(roth): As part of simplifying cancellation in the filter stack, // this should no longer need to be ref-counted. -class ClientChannel::LoadBalancedCall +class ClientChannelFilter::LoadBalancedCall : public InternallyRefCounted { public: - LoadBalancedCall(ClientChannel* chand, + LoadBalancedCall(ClientChannelFilter* chand, grpc_call_context_element* call_context, absl::AnyInvocable on_commit, bool is_transparent_retry); @@ -396,15 +397,15 @@ class ClientChannel::LoadBalancedCall // Called by channel when removing a call from the list of queued calls. void RemoveCallFromLbQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); // Called by the channel for each queued call when a new picker // becomes available. virtual void RetryPickLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) = 0; + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0; protected: - ClientChannel* chand() const { return chand_; } + ClientChannelFilter* chand() const { return chand_; } ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { return static_cast( call_context_[GRPC_CONTEXT_CALL_TRACER].value); @@ -458,13 +459,13 @@ class ClientChannel::LoadBalancedCall grpc_error_handle* error); // Adds the call to the channel's list of queued picks if not already present. void AddCallToLbQueuedCallsLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); // Called when adding the call to the LB queue. virtual void OnAddToQueueLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) = 0; + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0; - ClientChannel* chand_; + ClientChannelFilter* chand_; absl::AnyInvocable on_commit_; @@ -477,8 +478,8 @@ class ClientChannel::LoadBalancedCall grpc_call_context_element* const call_context_; }; -class ClientChannel::FilterBasedLoadBalancedCall - : public ClientChannel::LoadBalancedCall { +class ClientChannelFilter::FilterBasedLoadBalancedCall + : public ClientChannelFilter::LoadBalancedCall { public: // If on_call_destruction_complete is non-null, then it will be // invoked once the LoadBalancedCall is completely destroyed. @@ -486,7 +487,7 @@ class ClientChannel::FilterBasedLoadBalancedCall // the LB call has a subchannel call and ensuring that the // on_call_destruction_complete closure passed down from the surface // is not invoked until after the subchannel call stack is destroyed. - FilterBasedLoadBalancedCall(ClientChannel* chand, + FilterBasedLoadBalancedCall(ClientChannelFilter* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, @@ -555,10 +556,10 @@ class ClientChannel::FilterBasedLoadBalancedCall void TryPick(bool was_queued); void OnAddToQueueLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); void RetryPickLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); void CreateSubchannelCall(); @@ -579,9 +580,8 @@ class ClientChannel::FilterBasedLoadBalancedCall // Set when we fail inside the LB call. grpc_error_handle failure_error_; - // Accessed while holding ClientChannel::lb_mu_. LbQueuedCallCanceller* lb_call_canceller_ - ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr; + ABSL_GUARDED_BY(&ClientChannelFilter::lb_mu_) = nullptr; RefCountedPtr subchannel_call_; @@ -604,10 +604,10 @@ class ClientChannel::FilterBasedLoadBalancedCall grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; }; -class ClientChannel::PromiseBasedLoadBalancedCall - : public ClientChannel::LoadBalancedCall { +class ClientChannelFilter::PromiseBasedLoadBalancedCall + : public ClientChannelFilter::LoadBalancedCall { public: - PromiseBasedLoadBalancedCall(ClientChannel* chand, + PromiseBasedLoadBalancedCall(ClientChannelFilter* chand, absl::AnyInvocable on_commit, bool is_transparent_retry); @@ -622,7 +622,7 @@ class ClientChannel::PromiseBasedLoadBalancedCall void RetryPickLocked() override; void OnAddToQueueLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); grpc_polling_entity pollent_; ClientMetadataHandle client_initial_metadata_; diff --git a/src/core/ext/filters/client_channel/client_channel_internal.h b/src/core/ext/filters/client_channel/client_channel_internal.h index 24cded22538..c0985c9ac28 100644 --- a/src/core/ext/filters/client_channel/client_channel_internal.h +++ b/src/core/ext/filters/client_channel/client_channel_internal.h @@ -35,7 +35,7 @@ // // This file contains internal interfaces used to allow various plugins // (filters, LB policies, etc) to access internal data provided by the -// ClientChannel that is not normally accessible via external APIs. +// ClientChannelFilter that is not normally accessible via external APIs. // // Channel arg key for health check service name. diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc index 25d269de09b..3dc57fe3e04 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.cc +++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc @@ -42,12 +42,12 @@ void BuildClientChannelConfiguration(CoreConfiguration::Builder* builder) { internal::RetryServiceConfigParser::Register(builder); builder->channel_init() ->RegisterFilter(GRPC_CLIENT_CHANNEL, - &ClientChannel::kFilterVtableWithPromises) + &ClientChannelFilter::kFilterVtableWithPromises) .If(IsEverythingBelowClientChannelPromiseSafe) .Terminal(); builder->channel_init() ->RegisterFilter(GRPC_CLIENT_CHANNEL, - &ClientChannel::kFilterVtableWithoutPromises) + &ClientChannelFilter::kFilterVtableWithoutPromises) .IfNot(IsEverythingBelowClientChannelPromiseSafe) .Terminal(); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 1ae17ff0620..a6ff725a39c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1592,8 +1592,8 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { // Start watching the channel's connectivity state. If the channel // goes into state TRANSIENT_FAILURE before the timer fires, we go into // fallback mode even if the fallback timeout has not elapsed. - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(lb_channel_)); GPR_ASSERT(client_channel != nullptr); // Ref held by callback. watcher_ = @@ -1659,8 +1659,8 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() { } void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(lb_channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); } diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 382c579488e..66758a47e41 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -1557,8 +1557,8 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr lb_policy) parent_channelz_node_ = std::move(parent_channelz_node); } // Start connectivity watch. - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher")); client_channel->AddConnectivityWatcher( @@ -1583,8 +1583,8 @@ void RlsLb::RlsChannel::Orphan() { } // Stop connectivity watch. if (watcher_ != nullptr) { - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); watcher_ = nullptr; diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index f2d393f9484..b48bc0b3d9d 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -98,7 +98,7 @@ namespace grpc_core { // RetryFilter::RetryFilter(const ChannelArgs& args, grpc_error_handle* error) - : client_channel_(args.GetObject()), + : client_channel_(args.GetObject()), event_engine_(args.GetObject()), per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)), service_config_parser_index_( diff --git a/src/core/ext/filters/client_channel/retry_filter.h b/src/core/ext/filters/client_channel/retry_filter.h index c5d713f5fa9..189b3eac874 100644 --- a/src/core/ext/filters/client_channel/retry_filter.h +++ b/src/core/ext/filters/client_channel/retry_filter.h @@ -72,7 +72,7 @@ class RetryFilter { return retry_throttle_data_; } - ClientChannel* client_channel() const { return client_channel_; } + ClientChannelFilter* client_channel() const { return client_channel_; } size_t per_rpc_retry_buffer_size() const { return per_rpc_retry_buffer_size_; @@ -110,7 +110,7 @@ class RetryFilter { static void GetChannelInfo(grpc_channel_element* /*elem*/, const grpc_channel_info* /*info*/) {} - ClientChannel* client_channel_; + ClientChannelFilter* client_channel_; grpc_event_engine::experimental::EventEngine* const event_engine_; size_t per_rpc_retry_buffer_size_; RefCountedPtr retry_throttle_data_; diff --git a/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc b/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc index 67736895be6..6f1d4ffd184 100644 --- a/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc +++ b/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc @@ -310,8 +310,9 @@ namespace { void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { grpc_transport_stream_op_batch* batch = static_cast(arg); - auto* lb_call = static_cast( - batch->handler_private.extra_arg); + auto* lb_call = + static_cast( + batch->handler_private.extra_arg); // Note: This will release the call combiner. lb_call->StartTransportStreamOpBatch(batch); } @@ -1710,7 +1711,7 @@ void RetryFilter::LegacyCallData::StartTransportStreamOpBatch( call_attempt_->StartRetriableBatches(); } -OrphanablePtr +OrphanablePtr RetryFilter::LegacyCallData::CreateLoadBalancedCall( absl::AnyInvocable on_commit, bool is_transparent_retry) { grpc_call_element_args args = {owning_call_, nullptr, call_context_, diff --git a/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h b/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h index ea02f97f87c..1e5ec3253fc 100644 --- a/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h +++ b/src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h @@ -256,7 +256,7 @@ class RetryFilter::LegacyCallData { void MaybeCancelPerAttemptRecvTimer(); LegacyCallData* calld_; - OrphanablePtr lb_call_; + OrphanablePtr lb_call_; bool lb_call_committed_ = false; grpc_closure on_per_attempt_recv_timer_; @@ -363,7 +363,7 @@ class RetryFilter::LegacyCallData { void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures); static void StartTransparentRetry(void* arg, grpc_error_handle error); - OrphanablePtr + OrphanablePtr CreateLoadBalancedCall(absl::AnyInvocable on_commit, bool is_transparent_retry); @@ -394,7 +394,8 @@ class RetryFilter::LegacyCallData { // LB call used when we've committed to a call attempt and the retry // state for that attempt is no longer needed. This provides a fast // path for long-running streaming calls that minimizes overhead. - OrphanablePtr committed_call_; + OrphanablePtr + committed_call_; // When are are not yet fully committed to a particular call (i.e., // either we might still retry or we have committed to the call but diff --git a/src/core/ext/xds/xds_transport_grpc.cc b/src/core/ext/xds/xds_transport_grpc.cc index 1deff78f2a3..3cc054cfa2b 100644 --- a/src/core/ext/xds/xds_transport_grpc.cc +++ b/src/core/ext/xds/xds_transport_grpc.cc @@ -281,8 +281,8 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( if (IsLameChannel(channel_)) { *status = absl::UnavailableError("xds client has a lame channel"); } else { - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); watcher_ = new StateWatcher(std::move(on_connectivity_failure)); client_channel->AddConnectivityWatcher( @@ -297,8 +297,8 @@ GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() { void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { if (!IsLameChannel(channel_)) { - ClientChannel* client_channel = - ClientChannel::GetFromChannel(Channel::FromC(channel_)); + ClientChannelFilter* client_channel = + ClientChannelFilter::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); } diff --git a/test/core/client_channel/client_channel_test.cc b/test/core/client_channel/client_channel_test.cc index 400f8509a95..c012dd46396 100644 --- a/test/core/client_channel/client_channel_test.cc +++ b/test/core/client_channel/client_channel_test.cc @@ -35,20 +35,20 @@ namespace testing { namespace { TEST(MakeSubchannelArgs, UsesChannelDefaultAuthorityByDefault) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs(), ChannelArgs(), nullptr, "foo.example.com"); EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "foo.example.com"); } TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgs) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), ChannelArgs(), nullptr, "foo.example.com"); EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com"); } TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs(), ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), nullptr, "foo.example.com"); @@ -57,7 +57,7 @@ TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) { TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgsOverridesValueFromResolver) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "baz.example.com"), nullptr, "foo.example.com"); @@ -65,14 +65,14 @@ TEST(MakeSubchannelArgs, } TEST(MakeSubchannelArgs, ArgsFromChannelTrumpPerAddressArgs) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs().Set("foo", 1), ChannelArgs().Set("foo", 2), nullptr, "foo.example.com"); EXPECT_EQ(args.GetInt("foo"), 1); } TEST(MakeSubchannelArgs, StripsOutNoSubchannelArgs) { - ChannelArgs args = ClientChannel::MakeSubchannelArgs( + ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs( ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo", 1), ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar", 1), nullptr, "foo.example.com");