From e36ede8c67e4816e2b657211509ad41c3288c033 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 8 Jan 2020 19:48:47 -0800 Subject: [PATCH] Remove exec_ctx dependency from logical thread --- .../filters/client_channel/client_channel.cc | 184 ++++++++---------- .../ext/filters/client_channel/lb_policy.cc | 24 +-- .../ext/filters/client_channel/lb_policy.h | 22 +-- .../client_channel/lb_policy/grpclb/grpclb.cc | 26 +-- .../client_channel/lb_policy/xds/cds.cc | 2 +- .../client_channel/lb_policy/xds/xds.cc | 27 +-- .../ext/filters/client_channel/resolver.h | 2 +- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 26 +-- .../resolver/fake/fake_resolver.cc | 97 ++++----- .../resolver/fake/fake_resolver.h | 12 +- .../filters/client_channel/resolver_factory.h | 2 +- .../client_channel/resolving_lb_policy.cc | 4 +- src/core/lib/iomgr/logical_thread.cc | 20 +- src/core/lib/iomgr/logical_thread.h | 1 + test/core/util/test_lb_policies.cc | 2 +- 15 files changed, 210 insertions(+), 241 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index c0255a9230c..988761a1dc5 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -209,8 +209,8 @@ class ChannelData { void Cancel(); private: - static void AddWatcherLocked(ExternalConnectivityWatcher* arg); - static void RemoveWatcherLocked(ExternalConnectivityWatcher* arg); + void AddWatcherLocked(); + void RemoveWatcherLocked(); ChannelData* chand_; grpc_polling_entity pollent_; @@ -243,9 +243,9 @@ class ChannelData { grpc_error* DoPingLocked(grpc_transport_op* op); - static void StartTransportOpLocked(void* arg, grpc_error* ignored); + static void StartTransportOpLocked(grpc_transport_op* op); - static void TryToConnectLocked(ChannelData* arg); + void TryToConnectLocked(); void ProcessLbPolicy( const Resolver::Result& resolver_result, @@ -279,9 +279,9 @@ class ChannelData { RefCountedPtr service_config_; // - // Fields used in the control plane. Guarded by combiner. + // Fields used in the control plane. Guarded by logical_thread. // - RefCountedPtr combiner_; + RefCountedPtr logical_thread_; grpc_pollset_set* interested_parties_; RefCountedPtr subchannel_pool_; OrphanablePtr resolving_lb_policy_; @@ -293,17 +293,18 @@ class ChannelData { std::map subchannel_refcount_map_; // The set of SubchannelWrappers that currently exist. // No need to hold a ref, since the map is updated in the control-plane - // combiner when the SubchannelWrappers are created and destroyed. + // logical_thread when the SubchannelWrappers are created and destroyed. std::set subchannel_wrappers_; // Pending ConnectedSubchannel updates for each SubchannelWrapper. - // Updates are queued here in the control plane combiner and then applied - // in the data plane mutex when the picker is updated. + // Updates are queued here in the control plane logical_thread and then + // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr, RefCountedPtrLess> pending_subchannel_updates_; // - // Fields accessed from both data plane mutex and control plane combiner. + // Fields accessed from both data plane mutex and control plane + // logical_thread. // Atomic disconnect_error_; @@ -837,7 +838,7 @@ class CallData { // Note that no synchronization is needed here, because even if the // underlying subchannel is shared between channels, this wrapper will only // be used within one channel, so it will always be synchronized by the -// control plane combiner. +// control plane logical_thread. class ChannelData::SubchannelWrapper : public SubchannelInterface { public: SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, @@ -963,7 +964,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { health_check_service_name_ = std::move(health_check_service_name); } - // Caller must be holding the control-plane combiner. + // Caller must be holding the control-plane logical_thread. ConnectedSubchannel* connected_subchannel() const { return connected_subchannel_.get(); } @@ -1014,7 +1015,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { gpr_log(GPR_INFO, "chand=%p: connectivity change for subchannel wrapper %p " "subchannel %p (connected_subchannel=%p state=%s); " - "hopping into combiner", + "hopping into logical_thread", parent_->chand_, parent_.get(), parent_->subchannel_, connected_subchannel.get(), ConnectivityStateName(new_state)); } @@ -1047,41 +1048,39 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { : parent_(std::move(parent)), state_(new_state), connected_subchannel_(std::move(connected_subchannel)) { - ExecCtx::Run( - DEBUG_LOCATION, - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* /*error*/) { - Updater* self = static_cast(arg); - self->parent_->parent_->chand_->combiner_->Run( - [self]() { ApplyUpdateInControlPlaneCombiner(self); }, - DEBUG_LOCATION); - }, - this, nullptr), - GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error* /*error*/) { + Updater* self = static_cast(arg); + self->parent_->parent_->chand_->logical_thread_->Run( + [self]() { + self->ApplyUpdateInControlPlaneLogicalThread(); + }, + DEBUG_LOCATION); + }, + this, nullptr), + GRPC_ERROR_NONE); } private: - static void ApplyUpdateInControlPlaneCombiner(void* arg) { - Updater* self = static_cast(arg); + void ApplyUpdateInControlPlaneLogicalThread() { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, - "chand=%p: processing connectivity change in combiner " + "chand=%p: processing connectivity change in logical thread " "for subchannel wrapper %p subchannel %p " "(connected_subchannel=%p state=%s): watcher=%p", - self->parent_->parent_->chand_, self->parent_->parent_.get(), - self->parent_->parent_->subchannel_, - self->connected_subchannel_.get(), - ConnectivityStateName(self->state_), - self->parent_->watcher_.get()); + parent_->parent_->chand_, parent_->parent_.get(), + parent_->parent_->subchannel_, connected_subchannel_.get(), + ConnectivityStateName(state_), parent_->watcher_.get()); } // Ignore update if the parent WatcherWrapper has been replaced // since this callback was scheduled. - if (self->parent_->watcher_ == nullptr) return; - self->parent_->last_seen_state_ = self->state_; - self->parent_->parent_->MaybeUpdateConnectedSubchannel( - std::move(self->connected_subchannel_)); - self->parent_->watcher_->OnConnectivityStateChange(self->state_); - delete self; + if (parent_->watcher_ == nullptr) return; + parent_->last_seen_state_ = state_; + parent_->parent_->MaybeUpdateConnectedSubchannel( + std::move(connected_subchannel_)); + parent_->watcher_->OnConnectivityStateChange(state_); + delete this; } RefCountedPtr parent_; @@ -1126,7 +1125,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { // CancelConnectivityStateWatch() with its watcher, we know the // corresponding WrapperWatcher to cancel on the underlying subchannel. std::map watcher_map_; - // To be accessed only in the control plane combiner. + // To be accessed only in the control plane logical_thread. RefCountedPtr connected_subchannel_; // To be accessed only in the data plane mutex. RefCountedPtr connected_subchannel_in_data_plane_; @@ -1149,16 +1148,16 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( grpc_polling_entity_add_to_pollset_set(&pollent_, chand_->interested_parties_); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); - ExecCtx::Run(DEBUG_LOCATION, - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* /*error*/) { - auto* self = - static_cast(arg); - self->chand_->combiner_->Run( - [self]() { AddWatcherLocked(self); }, DEBUG_LOCATION); - }, - this, nullptr), - GRPC_ERROR_NONE); + ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error* /*error*/) { + auto* self = static_cast(arg); + self->chand_->logical_thread_->Run( + [self]() { self->AddWatcherLocked(); }, DEBUG_LOCATION); + }, + this, nullptr), + GRPC_ERROR_NONE); } ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { @@ -1180,12 +1179,12 @@ void ChannelData::ExternalConnectivityWatcher::Notify( // Report new state to the user. *state_ = state; ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE); - // Hop back into the combiner to clean up. + // Hop back into the logical_thread to clean up. // Not needed in state SHUTDOWN, because the tracker will // automatically remove all watchers in that case. if (state != GRPC_CHANNEL_SHUTDOWN) { - chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, - DEBUG_LOCATION); + chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } } @@ -1196,23 +1195,20 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() { return; // Already done. } ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED); - // Hop back into the combiner to clean up. - chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, - DEBUG_LOCATION); + // Hop back into the logical_thread to clean up. + chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } -void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( - ExternalConnectivityWatcher* self) { - Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE); +void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { + Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE); // Add new watcher. - self->chand_->state_tracker_.AddWatcher( - self->initial_state_, - OrphanablePtr(self)); + chand_->state_tracker_.AddWatcher( + initial_state_, OrphanablePtr(this)); } -void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( - ExternalConnectivityWatcher* self) { - self->chand_->state_tracker_.RemoveWatcher(self); +void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() { + chand_->state_tracker_.RemoveWatcher(this); } // @@ -1228,17 +1224,15 @@ class ChannelData::ConnectivityWatcherAdder { initial_state_(initial_state), watcher_(std::move(watcher)) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder"); - chand_->combiner_->Run([this]() { AddWatcherLocked(this); }, - DEBUG_LOCATION); + chand_->logical_thread_->Run([this]() { AddWatcherLocked(); }, + DEBUG_LOCATION); } private: - static void AddWatcherLocked(ConnectivityWatcherAdder* self) { - self->chand_->state_tracker_.AddWatcher(self->initial_state_, - std::move(self->watcher_)); - GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, - "ConnectivityWatcherAdder"); - delete self; + void AddWatcherLocked() { + chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_)); + GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder"); + delete this; } ChannelData* chand_; @@ -1256,16 +1250,16 @@ class ChannelData::ConnectivityWatcherRemover { AsyncConnectivityStateWatcherInterface* watcher) : chand_(chand), watcher_(watcher) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); - chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, - DEBUG_LOCATION); + chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } private: - static void RemoveWatcherLocked(ConnectivityWatcherRemover* self) { - self->chand_->state_tracker_.RemoveWatcher(self->watcher_); - GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, + void RemoveWatcherLocked() { + chand_->state_tracker_.RemoveWatcher(watcher_); + GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherRemover"); - delete self; + delete this; } ChannelData* chand_; @@ -1410,7 +1404,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) client_channel_factory_( ClientChannelFactory::GetFromChannelArgs(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)), - combiner_(MakeRefCounted()), + logical_thread_(MakeRefCounted()), interested_parties_(grpc_pollset_set_create()), subchannel_pool_(GetSubchannelPool(args->channel_args)), state_tracker_("client_channel", GRPC_CHANNEL_IDLE), @@ -1584,7 +1578,7 @@ void ChannelData::UpdateServiceConfigLocked( void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { // Instantiate resolving LB policy. LoadBalancingPolicy::Args lb_args; - lb_args.combiner = combiner_; + lb_args.logical_thread = logical_thread_; lb_args.channel_control_helper = MakeUnique(this); lb_args.args = channel_args_; grpc_core::UniquePtr target_uri(gpr_strdup(target_uri_.get())); @@ -1805,8 +1799,7 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { return result.error; } -void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) { - grpc_transport_op* op = static_cast(arg); +void ChannelData::StartTransportOpLocked(grpc_transport_op* op) { grpc_channel_element* elem = static_cast(op->handler_private.extra_arg); ChannelData* chand = static_cast(elem->channel_data); @@ -1877,15 +1870,11 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem, if (op->bind_pollset != nullptr) { grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset); } - // Pop into control plane combiner for remaining ops. + // Pop into control plane logical_thread for remaining ops. op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op"); - chand->combiner_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&op->handler_private.closure, - ChannelData::StartTransportOpLocked, op, nullptr), - GRPC_ERROR_NONE), - DEBUG_LOCATION); + chand->logical_thread_->Run( + [op]() { ChannelData::StartTransportOpLocked(op); }, DEBUG_LOCATION); } void ChannelData::GetChannelInfo(grpc_channel_element* elem, @@ -1936,14 +1925,13 @@ ChannelData::GetConnectedSubchannelInDataPlane( return connected_subchannel->Ref(); } -void ChannelData::TryToConnectLocked(ChannelData* arg) { - auto* chand = static_cast(arg); - if (chand->resolving_lb_policy_ != nullptr) { - chand->resolving_lb_policy_->ExitIdleLocked(); +void ChannelData::TryToConnectLocked() { + if (resolving_lb_policy_ != nullptr) { + resolving_lb_policy_->ExitIdleLocked(); } else { - chand->CreateResolvingLoadBalancingPolicyLocked(); + CreateResolvingLoadBalancingPolicyLocked(); } - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect"); + GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect"); } grpc_connectivity_state ChannelData::CheckConnectivityState( @@ -1955,8 +1943,8 @@ grpc_connectivity_state ChannelData::CheckConnectivityState( GRPC_CLOSURE_CREATE( [](void* arg, grpc_error* /*error*/) { auto* chand = static_cast(arg); - chand->combiner_->Run( - [chand]() { TryToConnectLocked(chand); }, + chand->logical_thread_->Run( + [chand]() { chand->TryToConnectLocked(); }, DEBUG_LOCATION); }, this, nullptr), @@ -3872,7 +3860,7 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, // The picker being null means that the channel is currently in IDLE state. // The incoming call will make the channel exit IDLE. if (chand->picker() == nullptr) { - // Bounce into the control plane combiner to exit IDLE. + // Bounce into the control plane logical thread to exit IDLE. chand->CheckConnectivityState(/*try_to_connect=*/true); // Queue the pick, so that it will be attempted once the channel // becomes connected. diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index d4e36c45a1f..5c6bc386618 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -33,7 +33,7 @@ DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(false, "lb_policy_refcount"); LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount) : InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount), - combiner_(std::move(args.combiner)), + logical_thread_(std::move(args.logical_thread)), interested_parties_(grpc_pollset_set_create()), channel_control_helper_(std::move(args.channel_control_helper)) {} @@ -98,16 +98,17 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // the time this function returns, the pick will already have // been processed, and we'll be trying to re-process the same // pick again, leading to a crash. - // 2. We are currently running in the data plane combiner, but we - // need to bounce into the control plane combiner to call + // 2. We are currently running in the data plane logical_thread, but we + // need to bounce into the control plane logical_thread to call // ExitIdleLocked(). if (!exit_idle_called_) { exit_idle_called_ = true; - parent_->Ref().release(); // ref held by closure. - parent_->combiner()->Run( - Closure::ToFunction( - GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr), - GRPC_ERROR_NONE), + auto* parent = parent_->Ref().release(); // ref held by lambda. + parent_->logical_thread()->Run( + [parent]() { + parent->ExitIdleLocked(); + parent->Unref(); + }, DEBUG_LOCATION); } PickResult result; @@ -115,13 +116,6 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( return result; } -void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg, - grpc_error* /*error*/) { - LoadBalancingPolicy* parent = static_cast(arg); - parent->ExitIdleLocked(); - parent->Unref(); -} - // // LoadBalancingPolicy::TransientFailurePicker // diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index c0d375a4351..fa6dcb7ffee 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -72,7 +72,7 @@ extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; /// LoadBalacingPolicy API. /// /// Note: All methods with a "Locked" suffix must be called from the -/// combiner passed to the constructor. +/// logical_thread passed to the constructor. /// /// Any I/O done by the LB policy should be done under the pollset_set /// returned by \a interested_parties(). @@ -242,7 +242,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// live in the LB policy object itself. /// /// Currently, pickers are always accessed from within the - /// client_channel data plane combiner, so they do not have to be + /// client_channel data plane logical_thread, so they do not have to be /// thread-safe. class SubchannelPicker { public: @@ -309,9 +309,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Args used to instantiate an LB policy. struct Args { - /// The combiner under which all LB policy calls will be run. - /// Policy does NOT take ownership of the reference to the combiner. - RefCountedPtr combiner; + /// The logical_thread under which all LB policy calls will be run. + /// Policy does NOT take ownership of the reference to the logical_thread. + RefCountedPtr logical_thread; /// Channel control helper. /// Note: LB policies MUST NOT call any method on the helper from /// their constructor. @@ -349,7 +349,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { grpc_pollset_set* interested_parties() const { return interested_parties_; } - // Note: This must be invoked while holding the combiner. + // Note: This must be invoked while holding the logical_thread. void Orphan() override; // A picker that returns PICK_QUEUE for all picks. @@ -365,8 +365,6 @@ class LoadBalancingPolicy : public InternallyRefCounted { PickResult Pick(PickArgs args) override; private: - static void CallExitIdle(void* arg, grpc_error* error); - RefCountedPtr parent_; bool exit_idle_called_ = false; }; @@ -384,7 +382,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { }; protected: - RefCountedPtr combiner() const { return combiner_; } + RefCountedPtr logical_thread() const { + return logical_thread_; + } // Note: LB policies MUST NOT call any method on the helper from their // constructor. @@ -396,8 +396,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { virtual void ShutdownLocked() = 0; private: - /// Combiner under which LB policy actions take place. - RefCountedPtr combiner_; + /// Logical Thread under which LB policy actions take place. + RefCountedPtr logical_thread_; /// Owned pointer to interested parties in load balancing decisions. grpc_pollset_set* interested_parties_; /// Channel control helper. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index e985e495403..c1059d6e574 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -251,16 +251,16 @@ class GrpcLb : public LoadBalancingPolicy { // should not be dropped. // // Note: This is called from the picker, so it will be invoked in - // the channel's data plane combiner, NOT the control plane - // combiner. It should not be accessed by any other part of the LB + // the channel's data plane logical_thread, NOT the control plane + // logical_thread. It should not be accessed by any other part of the LB // policy. const char* ShouldDrop(); private: grpc_grpclb_serverlist* serverlist_; - // Guarded by the channel's data plane combiner, NOT the control - // plane combiner. It should not be accessed by anything but the + // Guarded by the channel's data plane logical_thread, NOT the control + // plane logical_thread. It should not be accessed by anything but the // picker via the ShouldDrop() method. size_t drop_index_ = 0; }; @@ -911,7 +911,7 @@ void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->combiner()->Run( + lb_calld->grpclb_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_, MaybeSendClientLoadReportLocked, lb_calld, nullptr), @@ -998,7 +998,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->combiner()->Run( + lb_calld->grpclb_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_, ClientLoadReportDoneLocked, lb_calld, nullptr), @@ -1022,7 +1022,7 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->combiner()->Run( + lb_calld->grpclb_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&lb_calld->lb_on_initial_request_sent_, OnInitialRequestSentLocked, lb_calld, nullptr), @@ -1048,7 +1048,7 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked( void GrpcLb::BalancerCallState::OnBalancerMessageReceived(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->combiner()->Run( + lb_calld->grpclb_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_, OnBalancerMessageReceivedLocked, lb_calld, nullptr), @@ -1206,7 +1206,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->combiner()->Run( + lb_calld->grpclb_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_status_received_, OnBalancerStatusReceivedLocked, lb_calld, nullptr), @@ -1548,7 +1548,7 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked( void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg, grpc_error* error) { GrpcLb* self = static_cast(arg); - self->combiner()->Run( + self->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, &GrpcLb::OnBalancerChannelConnectivityChangedLocked, @@ -1647,7 +1647,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->combiner()->Run( + grpclb_policy->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked, @@ -1694,7 +1694,7 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() { void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { GrpcLb* grpclb_policy = static_cast(arg); - grpclb_policy->combiner()->Run( + grpclb_policy->logical_thread()->Run( Closure::ToFunction(GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, grpclb_policy, nullptr), @@ -1742,7 +1742,7 @@ OrphanablePtr GrpcLb::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = new Helper(Ref()); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); + lb_policy_args.logical_thread = logical_thread(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index ae8c5f05571..c7b97fdaf18 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -160,7 +160,7 @@ void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) { // Create child policy if not already present. if (parent_->child_policy_ == nullptr) { LoadBalancingPolicy::Args args; - args.combiner = parent_->combiner(); + args.logical_thread = parent_->logical_thread(); args.args = parent_->args_; args.channel_control_helper = MakeUnique(parent_->Ref()); parent_->child_policy_ = diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index bfe3a3c63c6..35912521591 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -746,11 +746,11 @@ void XdsLb::ShutdownLocked() { if (xds_client_from_channel_ != nullptr) { xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()), endpoint_watcher_); - } - if (config_->lrs_load_reporting_server_name() != nullptr) { - xds_client()->RemoveClientStats( - StringView(config_->lrs_load_reporting_server_name()), - StringView(eds_service_name()), &client_stats_); + if (config_->lrs_load_reporting_server_name() != nullptr) { + xds_client()->RemoveClientStats( + StringView(config_->lrs_load_reporting_server_name()), + StringView(eds_service_name()), &client_stats_); + } } xds_client_from_channel_.reset(); xds_client_.reset(); @@ -799,8 +799,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) { if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - combiner(), interested_parties(), StringView(eds_service_name()), - nullptr /* service config watcher */, *args_, &error); + logical_thread(), interested_parties(), + StringView(eds_service_name()), nullptr /* service config watcher */, + *args_, &error); // TODO(roth): If we decide that we care about fallback mode, add // proper error handling here. GPR_ASSERT(error == GRPC_ERROR_NONE); @@ -870,7 +871,7 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() { void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { XdsLb* xdslb_policy = static_cast(arg); - xdslb_policy->combiner()->Run( + xdslb_policy->logical_thread()->Run( Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, xdslb_policy, nullptr), @@ -1001,7 +1002,7 @@ OrphanablePtr XdsLb::CreateFallbackPolicyLocked( FallbackHelper* helper = new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper")); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); + lb_policy_args.logical_thread = logical_thread(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); @@ -1409,7 +1410,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); - self->xds_policy_->combiner()->Run( + self->xds_policy_->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, self, nullptr), @@ -1450,7 +1451,7 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); - self->xds_policy_->combiner()->Run( + self->xds_policy_->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, self, nullptr), @@ -1513,7 +1514,7 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = xds_policy()->combiner(); + lb_policy_args.logical_thread = xds_policy()->logical_thread(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); @@ -1708,7 +1709,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( void* arg, grpc_error* error) { Locality* self = static_cast(arg); - self->xds_policy()->combiner()->Run( + self->xds_policy()->logical_thread()->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, self, nullptr), diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index fca67c82775..2842f24142a 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -87,7 +87,7 @@ class Resolver : public InternallyRefCounted { // Not copyable nor movable. Resolver(const Resolver&) = delete; Resolver& operator=(const Resolver&) = delete; - virtual ~Resolver() {} + virtual ~Resolver() = default; /// Starts resolving. virtual void StartLocked() = 0; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index cd1a1eaeac9..3f8f34df056 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -104,10 +104,10 @@ class GrpcPolledFdWindows { tcp_write_state_(WRITE_IDLE), gotten_into_driver_list_(false), address_family_(address_family), - socket_type_(socket_type) { + socket_type_(socket_type), + combiner_(std::move(combiner)) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); winsocket_ = grpc_winsocket_create(as, name_); - combiner_ = std::move(combiner); } ~GrpcPolledFdWindows() { @@ -137,7 +137,7 @@ class GrpcPolledFdWindows { GPR_ASSERT(!read_buf_has_data_); read_buf_ = GRPC_SLICE_MALLOC(4192); if (connect_done_) { - combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); }, + combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, DEBUG_LOCATION); } else { GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false); @@ -145,12 +145,7 @@ class GrpcPolledFdWindows { } } - static void ContinueRegisterForOnReadableLocked( - GrpcPolledFdWindows* grpc_polled_fd) { - grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE); - } - - void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) { + void ContinueRegisterForOnReadableLocked() { GRPC_CARES_TRACE_LOG( "fd:|%s| InnerContinueRegisterForOnReadableLocked " "wsa_connect_error_:%d", @@ -202,7 +197,7 @@ class GrpcPolledFdWindows { GPR_ASSERT(write_closure_ == nullptr); write_closure_ = write_closure; if (connect_done_) { - combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); }, + combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); } else { GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false); @@ -210,12 +205,7 @@ class GrpcPolledFdWindows { } } - static void ContinueRegisterForOnWriteableLocked( - GrpcPolledFdWindows* grpc_polled_fd) { - grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE); - } - - void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) { + void ContinueRegisterForOnWriteableLocked() { GRPC_CARES_TRACE_LOG( "fd:|%s| InnerContinueRegisterForOnWriteableLocked " "wsa_connect_error_:%d", @@ -475,11 +465,11 @@ class GrpcPolledFdWindows { wsa_connect_error_ = WSA_OPERATION_ABORTED; } if (pending_continue_register_for_on_readable_locked_) { - combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); }, + combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, DEBUG_LOCATION); } if (pending_continue_register_for_on_writeable_locked_) { - combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); }, + combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); } } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index d956d40cfd9..b75af860420 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -172,22 +172,14 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {} FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {} -struct SetResponseClosureArg { - RefCountedPtr resolver; - Resolver::Result result; - bool has_result = false; - bool immediate = true; -}; - -void FakeResolverResponseGenerator::SetResponseLocked(void* arg) { - SetResponseClosureArg* closure_arg = static_cast(arg); - auto& resolver = closure_arg->resolver; +void FakeResolverResponseGenerator::SetResponseLocked(SetResponseArg* arg) { + auto& resolver = arg->resolver; if (!resolver->shutdown_) { - resolver->next_result_ = std::move(closure_arg->result); + resolver->next_result_ = std::move(arg->result); resolver->has_next_result_ = true; resolver->MaybeSendResultLocked(); } - delete closure_arg; + delete arg; } void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { @@ -201,21 +193,21 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { } resolver = resolver_->Ref(); } - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = std::move(resolver); - closure_arg->result = std::move(result); - closure_arg->resolver->combiner()->Run( - [closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = std::move(resolver); + arg->result = std::move(result); + arg->resolver->combiner()->Run([arg]() { SetResponseLocked(arg); }, + DEBUG_LOCATION); } -void FakeResolverResponseGenerator::SetReresolutionResponseLocked(void* arg) { - SetResponseClosureArg* closure_arg = static_cast(arg); - auto& resolver = closure_arg->resolver; +void FakeResolverResponseGenerator::SetReresolutionResponseLocked( + SetResponseArg* arg) { + auto& resolver = arg->resolver; if (!resolver->shutdown_) { - resolver->reresolution_result_ = std::move(closure_arg->result); - resolver->has_reresolution_result_ = closure_arg->has_result; + resolver->reresolution_result_ = std::move(arg->result); + resolver->has_reresolution_result_ = arg->has_result; } - delete closure_arg; + delete arg; } void FakeResolverResponseGenerator::SetReresolutionResponse( @@ -226,13 +218,12 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = std::move(resolver); - closure_arg->result = std::move(result); - closure_arg->has_result = true; - closure_arg->resolver->combiner()->Run( - [closure_arg]() { SetReresolutionResponseLocked(closure_arg); }, - DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = std::move(resolver); + arg->result = std::move(result); + arg->has_result = true; + arg->resolver->combiner()->Run( + [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION); } void FakeResolverResponseGenerator::UnsetReresolutionResponse() { @@ -242,21 +233,19 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = std::move(resolver); - closure_arg->resolver->combiner()->Run( - [closure_arg]() { SetReresolutionResponseLocked(closure_arg); }, - DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = std::move(resolver); + arg->resolver->combiner()->Run( + [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION); } -void FakeResolverResponseGenerator::SetFailureLocked(void* arg) { - SetResponseClosureArg* closure_arg = static_cast(arg); - auto& resolver = closure_arg->resolver; +void FakeResolverResponseGenerator::SetFailureLocked(SetResponseArg* arg) { + auto& resolver = arg->resolver; if (!resolver->shutdown_) { resolver->return_failure_ = true; - if (closure_arg->immediate) resolver->MaybeSendResultLocked(); + if (arg->immediate) resolver->MaybeSendResultLocked(); } - delete closure_arg; + delete arg; } void FakeResolverResponseGenerator::SetFailure() { @@ -266,10 +255,10 @@ void FakeResolverResponseGenerator::SetFailure() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = std::move(resolver); - closure_arg->resolver->combiner()->Run( - [closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = std::move(resolver); + arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); }, + DEBUG_LOCATION); } void FakeResolverResponseGenerator::SetFailureOnReresolution() { @@ -279,11 +268,11 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = std::move(resolver); - closure_arg->immediate = false; - closure_arg->resolver->combiner()->Run( - [closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = std::move(resolver); + arg->immediate = false; + arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); }, + DEBUG_LOCATION); } void FakeResolverResponseGenerator::SetFakeResolver( @@ -292,11 +281,11 @@ void FakeResolverResponseGenerator::SetFakeResolver( resolver_ = std::move(resolver); if (resolver_ == nullptr) return; if (has_result_) { - SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); - closure_arg->resolver = resolver_->Ref(); - closure_arg->result = std::move(result_); - resolver_->combiner()->Run( - [closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION); + SetResponseArg* arg = new SetResponseArg(); + arg->resolver = resolver_->Ref(); + arg->result = std::move(result_); + resolver_->combiner()->Run([arg]() { SetResponseLocked(arg); }, + DEBUG_LOCATION); has_result_ = false; } } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index fb107e73215..730cda822e5 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -80,9 +80,15 @@ class FakeResolverResponseGenerator // Set the corresponding FakeResolver to this generator. void SetFakeResolver(RefCountedPtr resolver); - static void SetResponseLocked(void* arg); - static void SetReresolutionResponseLocked(void* arg); - static void SetFailureLocked(void* arg); + struct SetResponseArg { + RefCountedPtr resolver; + Resolver::Result result; + bool has_result = false; + bool immediate = true; + }; + static void SetResponseLocked(SetResponseArg* arg); + static void SetReresolutionResponseLocked(SetResponseArg* arg); + static void SetFailureLocked(SetResponseArg* arg); // Mutex protecting the members below. Mutex mu_; diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h index e638adb6a5f..4e17d37d944 100644 --- a/src/core/ext/filters/client_channel/resolver_factory.h +++ b/src/core/ext/filters/client_channel/resolver_factory.h @@ -39,7 +39,7 @@ struct ResolverArgs { /// Used to drive I/O in the name resolution process. grpc_pollset_set* pollset_set = nullptr; /// The combiner under which all resolver calls will be run. - RefCountedPtr combiner = nullptr; + RefCountedPtr combiner; /// The result handler to be used by the resolver. std::unique_ptr result_handler; }; diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index fc5a398da84..bba8dd14c7d 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -187,7 +187,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( process_resolver_result_user_data_(process_resolver_result_user_data) { GPR_ASSERT(process_resolver_result != nullptr); resolver_ = ResolverRegistry::CreateResolver( - target_uri_.get(), args.args, interested_parties(), combiner(), + target_uri_.get(), args.args, interested_parties(), logical_thread(), MakeUnique(Ref())); // Since the validity of args has been checked when create the channel, // CreateResolver() must return a non-null result. @@ -373,7 +373,7 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( TraceStringVector* trace_strings) { ResolvingControlHelper* helper = new ResolvingControlHelper(Ref()); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); + lb_policy_args.logical_thread = logical_thread(); lb_policy_args.channel_control_helper = std::unique_ptr(helper); lb_policy_args.args = &args; diff --git a/src/core/lib/iomgr/logical_thread.cc b/src/core/lib/iomgr/logical_thread.cc index 5eb7b58923b..aa608cf8891 100644 --- a/src/core/lib/iomgr/logical_thread.cc +++ b/src/core/lib/iomgr/logical_thread.cc @@ -49,6 +49,11 @@ void LogicalThreadImpl::Run(std::function callback, callback(); // Loan this thread to the logical thread and drain the queue. DrainQueue(); + // It is possible that while draining the queue, one of the callbacks ended + // up orphaning the logical thread. In that case, delete the object. + if (orphaned_.Load(MemoryOrder::ACQUIRE)) { + delete this; + } } else { CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback), location); @@ -62,15 +67,11 @@ void LogicalThreadImpl::Run(std::function callback, } void LogicalThreadImpl::Orphan() { - ExecCtx::Run(DEBUG_LOCATION, - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* /*error*/) { - LogicalThreadImpl* self = - static_cast(arg); - delete self; - }, - this, nullptr), - GRPC_ERROR_NONE); + if (size_.Load(MemoryOrder::ACQUIRE) == 0) { + delete this; + } else { + orphaned_.Store(true, MemoryOrder::RELEASE); + } } // The thread that calls this loans itself to the logical thread so as to @@ -83,7 +84,6 @@ void LogicalThreadImpl::DrainQueue() { gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this); } size_t prev_size = size_.FetchSub(1); - // prev_size should be atleast 1 since GPR_DEBUG_ASSERT(prev_size >= 1); if (prev_size == 1) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { diff --git a/src/core/lib/iomgr/logical_thread.h b/src/core/lib/iomgr/logical_thread.h index 7935679edf3..b3294a6a527 100644 --- a/src/core/lib/iomgr/logical_thread.h +++ b/src/core/lib/iomgr/logical_thread.h @@ -45,6 +45,7 @@ class LogicalThreadImpl : public Orphanable { void DrainQueue(); Atomic size_{0}; + Atomic orphaned_{false}; MultiProducerSingleConsumerQueue queue_; }; diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 150a196ae45..d2d7718d2bb 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -55,7 +55,7 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy { const std::string& delegate_policy_name, intptr_t initial_refcount = 1) : LoadBalancingPolicy(std::move(args), initial_refcount) { Args delegate_args; - delegate_args.combiner = combiner(); + delegate_args.logical_thread = logical_thread(); delegate_args.channel_control_helper = std::move(delegating_helper); delegate_args.args = args.args; delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(