diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index b11e9685297..e507aa0c2cb 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -57,8 +57,8 @@ #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -149,8 +149,8 @@ class ChannelData { RefCountedPtr service_config() const { return service_config_; } - RefCountedPtr logical_thread() const { - return logical_thread_; + WorkSerializer* work_serializer() const { + return work_serializer_.get(); } RefCountedPtr GetConnectedSubchannelInDataPlane( @@ -164,10 +164,13 @@ class ChannelData { grpc_closure* watcher_timer_init) { auto* watcher = new ExternalConnectivityWatcher( this, pollent, state, on_complete, watcher_timer_init); - MutexLock lock(&external_watchers_mu_); - // Will be deleted when the watch is complete. - GPR_ASSERT(external_watchers_[on_complete] == nullptr); - external_watchers_[on_complete] = watcher; + { + MutexLock lock(&external_watchers_mu_); + // Will be deleted when the watch is complete. + GPR_ASSERT(external_watchers_[on_complete] == nullptr); + external_watchers_[on_complete] = watcher; + } + watcher->Start(); } void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, @@ -208,6 +211,8 @@ class ChannelData { ~ExternalConnectivityWatcher(); + void Start(); + void Notify(grpc_connectivity_state state) override; void Cancel(); @@ -283,9 +288,9 @@ class ChannelData { RefCountedPtr service_config_; // - // Fields used in the control plane. Guarded by logical_thread. + // Fields used in the control plane. Guarded by work_serializer. // - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; grpc_pollset_set* interested_parties_; RefCountedPtr subchannel_pool_; OrphanablePtr resolving_lb_policy_; @@ -297,10 +302,10 @@ class ChannelData { std::map subchannel_refcount_map_; // The set of SubchannelWrappers that currently exist. // No need to hold a ref, since the map is updated in the control-plane - // logical_thread when the SubchannelWrappers are created and destroyed. + // work_serializer when the SubchannelWrappers are created and destroyed. std::set subchannel_wrappers_; // Pending ConnectedSubchannel updates for each SubchannelWrapper. - // Updates are queued here in the control plane logical_thread and then + // Updates are queued here in the control plane work_serializer and then // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr, RefCountedPtrLess> @@ -308,7 +313,7 @@ class ChannelData { // // Fields accessed from both data plane mutex and control plane - // logical_thread. + // work_serializer. // Atomic disconnect_error_; @@ -842,7 +847,7 @@ class CallData { // Note that no synchronization is needed here, because even if the // underlying subchannel is shared between channels, this wrapper will only // be used within one channel, so it will always be synchronized by the -// control plane logical_thread. +// control plane work_serializer. class ChannelData::SubchannelWrapper : public SubchannelInterface { public: SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, @@ -968,7 +973,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { health_check_service_name_ = std::move(health_check_service_name); } - // Caller must be holding the control-plane logical_thread. + // Caller must be holding the control-plane work_serializer. ConnectedSubchannel* connected_subchannel() const { return connected_subchannel_.get(); } @@ -1017,7 +1022,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { gpr_log(GPR_INFO, "chand=%p: connectivity change for subchannel wrapper %p " "subchannel %p (connected_subchannel=%p state=%s); " - "hopping into logical_thread", + "hopping into work_serializer", parent_->chand_, parent_.get(), parent_->subchannel_, connected_subchannel.get(), ConnectivityStateName(new_state)); } @@ -1050,7 +1055,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { : parent_(std::move(parent)), state_(new_state), connected_subchannel_(std::move(connected_subchannel)) { - parent_->parent_->chand_->logical_thread_->Run( + parent_->parent_->chand_->work_serializer_->Run( [this]() { ApplyUpdateInControlPlaneLogicalThread(); }, DEBUG_LOCATION); } @@ -1118,7 +1123,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { // CancelConnectivityStateWatch() with its watcher, we know the // corresponding WrapperWatcher to cancel on the underlying subchannel. std::map watcher_map_; - // To be accessed only in the control plane logical_thread. + // To be accessed only in the control plane work_serializer. RefCountedPtr connected_subchannel_; // To be accessed only in the data plane mutex. RefCountedPtr connected_subchannel_in_data_plane_; @@ -1141,8 +1146,6 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( grpc_polling_entity_add_to_pollset_set(&pollent_, chand_->interested_parties_); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); - chand_->logical_thread_->Run([this]() { AddWatcherLocked(); }, - DEBUG_LOCATION); } ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { @@ -1152,6 +1155,11 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { "ExternalConnectivityWatcher"); } +void ChannelData::ExternalConnectivityWatcher::Start() { + chand_->work_serializer_->Run([this]() { AddWatcherLocked(); }, + DEBUG_LOCATION); +} + void ChannelData::ExternalConnectivityWatcher::Notify( grpc_connectivity_state state) { bool done = false; @@ -1164,12 +1172,12 @@ void ChannelData::ExternalConnectivityWatcher::Notify( // Report new state to the user. *state_ = state; ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE); - // Hop back into the logical_thread to clean up. + // Hop back into the work_serializer to clean up. // Not needed in state SHUTDOWN, because the tracker will // automatically remove all watchers in that case. if (state != GRPC_CHANNEL_SHUTDOWN) { - chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, - DEBUG_LOCATION); + chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } } @@ -1180,9 +1188,9 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() { return; // Already done. } ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED); - // Hop back into the logical_thread to clean up. - chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, - DEBUG_LOCATION); + // Hop back into the work_serializer to clean up. + chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() { @@ -1209,8 +1217,8 @@ class ChannelData::ConnectivityWatcherAdder { initial_state_(initial_state), watcher_(std::move(watcher)) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder"); - chand_->logical_thread_->Run([this]() { AddWatcherLocked(); }, - DEBUG_LOCATION); + chand_->work_serializer_->Run([this]() { AddWatcherLocked(); }, + DEBUG_LOCATION); } private: @@ -1235,8 +1243,8 @@ class ChannelData::ConnectivityWatcherRemover { AsyncConnectivityStateWatcherInterface* watcher) : chand_(chand), watcher_(watcher) { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); - chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); }, - DEBUG_LOCATION); + chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); }, + DEBUG_LOCATION); } private: @@ -1389,7 +1397,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) client_channel_factory_( ClientChannelFactory::GetFromChannelArgs(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)), - logical_thread_(MakeRefCounted()), + work_serializer_(std::make_shared()), interested_parties_(grpc_pollset_set_create()), subchannel_pool_(GetSubchannelPool(args->channel_args)), state_tracker_("client_channel", GRPC_CHANNEL_IDLE), @@ -1563,7 +1571,7 @@ void ChannelData::UpdateServiceConfigLocked( void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { // Instantiate resolving LB policy. LoadBalancingPolicy::Args lb_args; - lb_args.combiner = combiner_; + lb_args.work_serializer = work_serializer_; lb_args.channel_control_helper = grpc_core::MakeUnique(this); lb_args.args = channel_args_; @@ -1856,10 +1864,10 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem, if (op->bind_pollset != nullptr) { grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset); } - // Pop into control plane logical_thread for remaining ops. + // Pop into control plane work_serializer for remaining ops. op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op"); - chand->logical_thread_->Run( + chand->work_serializer_->Run( [op]() { ChannelData::StartTransportOpLocked(op); }, DEBUG_LOCATION); } @@ -1925,7 +1933,7 @@ grpc_connectivity_state ChannelData::CheckConnectivityState( grpc_connectivity_state out = state_tracker_.state(); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); - logical_thread_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION); + work_serializer_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION); } return out; } @@ -3843,7 +3851,7 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem, GRPC_CLOSURE_CREATE( [](void* arg, grpc_error* /*error*/) { auto* chand = static_cast(arg); - chand->logical_thread()->Run( + chand->work_serializer()->Run( [chand]() { chand->CheckConnectivityState(/*try_to_connect=*/true); }, diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 97827143f65..0ab171fa9aa 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -33,7 +33,7 @@ DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(false, "lb_policy_refcount"); LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount) : InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount), - logical_thread_(std::move(args.logical_thread)), + work_serializer_(std::move(args.work_serializer)), interested_parties_(grpc_pollset_set_create()), channel_control_helper_(std::move(args.channel_control_helper)) {} @@ -99,28 +99,30 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // been processed, and we'll be trying to re-process the same // pick again, leading to a crash. // 2. We are currently running in the data plane mutex, but we - // need to bounce into the control plane logical_thread to call + // need to bounce into the control plane work_serializer to call // ExitIdleLocked(). if (!exit_idle_called_) { exit_idle_called_ = true; - // Ref held by closure. - parent_->Ref(DEBUG_LOCATION, "QueuePicker::CallExitIdle").release(); - parent_->combiner()->Run( - GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr), - GRPC_ERROR_NONE); + auto* parent = parent_->Ref().release(); // ref held by lambda. + ExecCtx::Run(DEBUG_LOCATION, + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error* /*error*/) { + auto* parent = static_cast(arg); + parent->work_serializer()->Run( + [parent]() { + parent->ExitIdleLocked(); + parent->Unref(); + }, + DEBUG_LOCATION); + }, + parent, nullptr), + GRPC_ERROR_NONE); } PickResult result; result.type = PickResult::PICK_QUEUE; return result; } -void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg, - grpc_error* /*error*/) { - LoadBalancingPolicy* parent = static_cast(arg); - parent->ExitIdleLocked(); - parent->Unref(DEBUG_LOCATION, "QueuePicker::CallExitIdle"); -} - // // LoadBalancingPolicy::TransientFailurePicker // diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 3779a17c5d5..da29efa80b6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -31,8 +31,8 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/string_view.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/transport/connectivity_state.h" namespace grpc_core { @@ -72,7 +72,7 @@ extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; /// LoadBalacingPolicy API. /// /// Note: All methods with a "Locked" suffix must be called from the -/// logical_thread passed to the constructor. +/// work_serializer passed to the constructor. /// /// Any I/O done by the LB policy should be done under the pollset_set /// returned by \a interested_parties(). @@ -309,9 +309,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Args used to instantiate an LB policy. struct Args { - /// The logical_thread under which all LB policy calls will be run. - /// Policy does NOT take ownership of the reference to the logical_thread. - RefCountedPtr logical_thread; + /// The work_serializer under which all LB policy calls will be run. + /// Policy does NOT take ownership of the reference to the work_serializer. + std::shared_ptr work_serializer; /// Channel control helper. /// Note: LB policies MUST NOT call any method on the helper from /// their constructor. @@ -349,7 +349,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { grpc_pollset_set* interested_parties() const { return interested_parties_; } - // Note: This must be invoked while holding the logical_thread. + // Note: This must be invoked while holding the work_serializer. void Orphan() override; // A picker that returns PICK_QUEUE for all picks. @@ -382,8 +382,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { }; protected: - RefCountedPtr logical_thread() const { - return logical_thread_; + std::shared_ptr work_serializer() const { + return work_serializer_; } // Note: LB policies MUST NOT call any method on the helper from their @@ -397,7 +397,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { private: /// Logical Thread under which LB policy actions take place. - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; /// Owned pointer to interested parties in load balancing decisions. grpc_pollset_set* interested_parties_; /// Channel control helper. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5497b39a325..170039ba189 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -246,7 +246,7 @@ class GrpcLb : public LoadBalancingPolicy { // // Note: This is called from the picker, so it will be invoked in // the channel's data plane mutex, NOT the control plane - // logical_thread. It should not be accessed by any other part of the LB + // work_serializer. It should not be accessed by any other part of the LB // policy. const char* ShouldDrop(); @@ -254,7 +254,7 @@ class GrpcLb : public LoadBalancingPolicy { std::vector serverlist_; // Guarded by the channel's data plane mutex, NOT the control - // plane logical_thread. It should not be accessed by anything but the + // plane work_serializer. It should not be accessed by anything but the // picker via the ShouldDrop() method. size_t drop_index_ = 0; }; @@ -904,7 +904,7 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - lb_calld->grpclb_policy()->logical_thread()->Run( + lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); }, DEBUG_LOCATION); } @@ -983,7 +983,7 @@ void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - lb_calld->grpclb_policy()->logical_thread()->Run( + lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); }, DEBUG_LOCATION); } @@ -1002,7 +1002,7 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(grpc_error* error) { void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg, grpc_error* /*error*/) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->logical_thread()->Run( + lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION); } @@ -1021,7 +1021,7 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() { void GrpcLb::BalancerCallState::OnBalancerMessageReceived( void* arg, grpc_error* /*error*/) { BalancerCallState* lb_calld = static_cast(arg); - lb_calld->grpclb_policy()->logical_thread()->Run( + lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); }, DEBUG_LOCATION); } @@ -1187,7 +1187,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); GRPC_ERROR_REF(error); // owned by lambda - lb_calld->grpclb_policy()->logical_thread()->Run( + lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); }, DEBUG_LOCATION); } @@ -1526,7 +1526,7 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked( void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg, grpc_error* /*error*/) { GrpcLb* self = static_cast(arg); - self->logical_thread()->Run( + self->work_serializer()->Run( [self]() { self->OnBalancerChannelConnectivityChangedLocked(); }, DEBUG_LOCATION); } @@ -1614,7 +1614,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { GrpcLb* grpclb_policy = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - grpclb_policy->logical_thread()->Run( + grpclb_policy->work_serializer()->Run( [grpclb_policy, error]() { grpclb_policy->OnBalancerCallRetryTimerLocked(error); }, @@ -1658,7 +1658,7 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() { void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { GrpcLb* grpclb_policy = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - grpclb_policy->logical_thread()->Run( + grpclb_policy->work_serializer()->Run( [grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); }, DEBUG_LOCATION); } @@ -1703,7 +1703,7 @@ OrphanablePtr GrpcLb::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = new Helper(Ref()); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.logical_thread = logical_thread(); + lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index d657267ecef..834b3ee771d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -146,7 +146,7 @@ void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) { // Create child policy if not already present. if (parent_->child_policy_ == nullptr) { LoadBalancingPolicy::Args args; - args.logical_thread = parent_->logical_thread(); + args.work_serializer = parent_->work_serializer(); args.args = parent_->args_; args.channel_control_helper = grpc_core::MakeUnique(parent_->Ref()); parent_->child_policy_ = diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index c87109428ea..9f6733f1564 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -259,7 +259,7 @@ class XdsLb : public LoadBalancingPolicy { const grpc_channel_args* args); static void OnDelayedRemovalTimer(void* arg, grpc_error* error); - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + void OnDelayedRemovalTimerLocked(grpc_error* error); XdsLb* xds_policy() const { return locality_map_->xds_policy(); } @@ -312,8 +312,8 @@ class XdsLb : public LoadBalancingPolicy { void UpdateConnectivityStateLocked(); static void OnDelayedRemovalTimer(void* arg, grpc_error* error); static void OnFailoverTimer(void* arg, grpc_error* error); - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); - static void OnFailoverTimerLocked(void* arg, grpc_error* error); + void OnDelayedRemovalTimerLocked(grpc_error* error); + void OnFailoverTimerLocked(grpc_error* error); const XdsPriorityListUpdate& priority_list_update() const { return xds_policy_->priority_list_update_; @@ -375,7 +375,7 @@ class XdsLb : public LoadBalancingPolicy { // Methods for dealing with fallback state. void MaybeCancelFallbackAtStartupChecks(); static void OnFallbackTimer(void* arg, grpc_error* error); - static void OnFallbackTimerLocked(void* arg, grpc_error* error); + void OnFallbackTimerLocked(grpc_error* error); void UpdateFallbackPolicyLocked(); OrphanablePtr CreateFallbackPolicyLocked( const char* name, const grpc_channel_args* args); @@ -786,7 +786,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - logical_thread(), interested_parties(), + work_serializer(), interested_parties(), StringView(eds_service_name()), nullptr /* service config watcher */, *args_, &error); // TODO(roth): If we decide that we care about fallback mode, add @@ -862,7 +862,7 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() { void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { XdsLb* xdslb_policy = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - xdslb_policy->logical_thread()->Run( + xdslb_policy->work_serializer()->Run( [xdslb_policy, error]() { xdslb_policy->OnFallbackTimerLocked(error); }, DEBUG_LOCATION); } @@ -990,7 +990,7 @@ OrphanablePtr XdsLb::CreateFallbackPolicyLocked( FallbackHelper* helper = new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper")); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.logical_thread = logical_thread(); + lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); @@ -1136,6 +1136,9 @@ XdsLb::LocalityMap::LocalityMap(RefCountedPtr xds_policy, gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, xds_policy_.get(), priority_); } + // Closure Initialization + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, grpc_schedule_on_exec_ctx); // Start the failover timer. @@ -1386,18 +1389,16 @@ void XdsLb::LocalityMap::UpdateConnectivityStateLocked() { void XdsLb::LocalityMap::OnDelayedRemovalTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_policy_->logical_thread()->Run( + self->xds_policy_->work_serializer()->Run( [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, DEBUG_LOCATION); } -void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(void* arg, - grpc_error* error) { - LocalityMap* self = static_cast(arg); - self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - const bool keep = self->priority_list_update().Contains(self->priority_) && - self->priority_ <= self->xds_policy_->current_priority_; +void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(grpc_error* error) { + delayed_removal_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { + const bool keep = priority_list_update().Contains(priority_) && + priority_ <= xds_policy_->current_priority_; if (!keep) { // This check is to make sure we always delete the locality maps from // the lowest priority even if the closures of the back-to-back timers @@ -1406,8 +1407,8 @@ void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(void* arg, // deactivated locality maps when out-of-order closures are run. // TODO(juanlishen): Check the timer implementation to see if this // defense is necessary. - if (self->priority_ == self->xds_policy_->LowestPriority()) { - self->xds_policy_->priorities_.pop_back(); + if (priority_ == xds_policy_->LowestPriority()) { + xds_policy_->priorities_.pop_back(); } else { gpr_log(GPR_ERROR, "[xdslb %p] Priority %" PRIu32 @@ -1424,15 +1425,14 @@ void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(void* arg, void XdsLb::LocalityMap::OnFailoverTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_policy_->logical_thread()->Run( + self->xds_policy_->work_serializer()->Run( [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); } -void XdsLb::LocalityMap::OnFailoverTimerLocked(void* arg, grpc_error* error) { - LocalityMap* self = static_cast(arg); - self->failover_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - self->xds_policy_->FailoverOnConnectionFailureLocked(); +void XdsLb::LocalityMap::OnFailoverTimerLocked(grpc_error* error) { + failover_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !xds_policy_->shutting_down_) { + xds_policy_->FailoverOnConnectionFailureLocked(); } Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); GRPC_ERROR_UNREF(error); @@ -1484,7 +1484,7 @@ XdsLb::LocalityMap::Locality::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.logical_thread = xds_policy()->logical_thread(); + lb_policy_args.work_serializer = xds_policy()->work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::unique_ptr(helper); @@ -1678,17 +1678,16 @@ void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimer(void* arg, grpc_error* error) { Locality* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_policy()->logical_thread()->Run( + self->xds_policy()->work_serializer()->Run( [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, DEBUG_LOCATION); } void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked( - void* arg, grpc_error* error) { - Locality* self = static_cast(arg); - self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { - self->locality_map_->localities_.erase(self->name_); + grpc_error* error) { + delayed_removal_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !shutdown_ && weight_ == 0) { + locality_map_->localities_.erase(name_); } Unref(DEBUG_LOCATION, "Locality+timer"); GRPC_ERROR_UNREF(error); diff --git a/src/core/ext/filters/client_channel/resolver.cc b/src/core/ext/filters/client_channel/resolver.cc index fd7d6322ec2..f56fd9bd6c6 100644 --- a/src/core/ext/filters/client_channel/resolver.cc +++ b/src/core/ext/filters/client_channel/resolver.cc @@ -29,11 +29,11 @@ namespace grpc_core { // Resolver // -Resolver::Resolver(RefCountedPtr logical_thread, +Resolver::Resolver(std::shared_ptr work_serializer, std::unique_ptr result_handler) : InternallyRefCounted(&grpc_trace_resolver_refcount), result_handler_(std::move(result_handler)), - logical_thread_(std::move(logical_thread)) {} + work_serializer_(std::move(work_serializer)) {} // // Resolver::Result diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 470901e4303..6beef968efc 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -28,7 +28,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" extern grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount; @@ -45,7 +45,7 @@ namespace grpc_core { /// DNS). /// /// Note: All methods with a "Locked" suffix must be called from the -/// logical_thread passed to the constructor. +/// work_serializer passed to the constructor. class Resolver : public InternallyRefCounted { public: /// Results returned by the resolver. @@ -115,29 +115,28 @@ class Resolver : public InternallyRefCounted { /// implementations. At that point, this method can go away. virtual void ResetBackoffLocked() {} - // Note: This must be invoked while holding the logical_thread. + // Note: This must be invoked while holding the work_serializer. void Orphan() override { ShutdownLocked(); Unref(); } protected: - /// Does NOT take ownership of the reference to \a logical_thread. - explicit Resolver(RefCountedPtr logical_thread, + explicit Resolver(std::shared_ptr work_serializer, std::unique_ptr result_handler); /// Shuts down the resolver. virtual void ShutdownLocked() = 0; - RefCountedPtr logical_thread() const { - return logical_thread_; + std::shared_ptr work_serializer() const { + return work_serializer_; } ResultHandler* result_handler() const { return result_handler_.get(); } private: std::unique_ptr result_handler_; - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index b78bcbe0006..2d5ff217e6d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -42,9 +42,9 @@ #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/iomgr_custom.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/json/json.h" #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -91,7 +91,7 @@ class AresDnsResolver : public Resolver { bool request_service_config_; /// pollset_set to drive the name resolution process grpc_pollset_set* interested_parties_; - /// closures used by the logical_thread + /// closures used by the work_serializer grpc_closure on_next_resolution_; grpc_closure on_resolved_; /// are we currently resolving? @@ -120,7 +120,7 @@ class AresDnsResolver : public Resolver { }; AresDnsResolver::AresDnsResolver(ResolverArgs args) - : Resolver(args.logical_thread, std::move(args.result_handler)), + : Resolver(args.work_serializer, std::move(args.result_handler)), backoff_( BackOff::Options() .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * @@ -206,8 +206,8 @@ void AresDnsResolver::ShutdownLocked() { void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); }, - DEBUG_LOCATION); + r->work_serializer()->Run([r, error]() { r->OnNextResolutionLocked(error); }, + DEBUG_LOCATION); } void AresDnsResolver::OnNextResolutionLocked(grpc_error* error) { @@ -318,8 +318,8 @@ std::string ChooseServiceConfig(char* service_config_choice_json, void AresDnsResolver::OnResolved(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); }, - DEBUG_LOCATION); + r->work_serializer()->Run([r, error]() { r->OnResolvedLocked(error); }, + DEBUG_LOCATION); } void AresDnsResolver::OnResolvedLocked(grpc_error* error) { @@ -334,15 +334,15 @@ void AresDnsResolver::OnResolvedLocked(grpc_error* error) { } if (addresses_ != nullptr) { Result result; - result.addresses = std::move(*r->addresses_); - if (r->service_config_json_ != nullptr) { + result.addresses = std::move(*addresses_); + if (service_config_json_ != nullptr) { std::string service_config_string = ChooseServiceConfig( - r->service_config_json_, &result.service_config_error); - gpr_free(r->service_config_json_); + service_config_json_, &result.service_config_error); + gpr_free(service_config_json_); if (result.service_config_error == GRPC_ERROR_NONE && !service_config_string.empty()) { GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", - r, service_config_string.c_str()); + this, service_config_string.c_str()); result.service_config = ServiceConfig::Create( service_config_string, &result.service_config_error); } @@ -425,7 +425,7 @@ void AresDnsResolver::StartResolvingLocked() { dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, &on_resolved_, &addresses_, enable_srv_queries_ /* check_grpclb */, request_service_config_ ? &service_config_json_ : nullptr, - query_timeout_ms_, logical_thread()); + query_timeout_ms_, work_serializer()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", this, pending_request_); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index 57d66934702..780b3735da8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -65,8 +65,8 @@ struct grpc_ares_ev_driver { /** refcount of the event driver */ gpr_refcount refs; - /** logical_thread to synchronize c-ares and I/O callbacks on */ - grpc_core::RefCountedPtr logical_thread; + /** work_serializer to synchronize c-ares and I/O callbacks on */ + std::shared_ptr work_serializer; /** a list of grpc_fd that this event driver is currently using. */ fd_node* fds; /** is this event driver currently working? */ @@ -145,7 +145,7 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) = grpc_error* grpc_ares_ev_driver_create_locked( grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread, + std::shared_ptr work_serializer, grpc_ares_request* request) { *ev_driver = new grpc_ares_ev_driver(); ares_options opts; @@ -163,7 +163,7 @@ grpc_error* grpc_ares_ev_driver_create_locked( gpr_free(*ev_driver); return err; } - (*ev_driver)->logical_thread = std::move(logical_thread); + (*ev_driver)->work_serializer = std::move(work_serializer); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; @@ -171,7 +171,7 @@ grpc_error* grpc_ares_ev_driver_create_locked( (*ev_driver)->shutting_down = false; (*ev_driver)->request = request; (*ev_driver)->polled_fd_factory = - grpc_core::NewGrpcPolledFdFactory((*ev_driver)->logical_thread); + grpc_core::NewGrpcPolledFdFactory((*ev_driver)->work_serializer); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); (*ev_driver)->query_timeout_ms = query_timeout_ms; @@ -234,7 +234,7 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms( static void on_timeout(void* arg, grpc_error* error) { grpc_ares_ev_driver* driver = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - driver->logical_thread->Run( + driver->work_serializer->Run( [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); } @@ -253,7 +253,7 @@ static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_error* error) { static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) { grpc_ares_ev_driver* driver = static_cast(arg); GRPC_ERROR_REF(error); - driver->logical_thread->Run( + driver->work_serializer->Run( [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, DEBUG_LOCATION); } @@ -330,7 +330,7 @@ static void on_readable_locked(fd_node* fdn, grpc_error* error) { static void on_readable(void* arg, grpc_error* error) { fd_node* fdn = static_cast(arg); - fdn->ev_driver->logical_thread->Run( + fdn->ev_driver->work_serializer->Run( [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); } @@ -360,7 +360,7 @@ static void on_writable_locked(fd_node* fdn, grpc_error* error) { static void on_writable(void* arg, grpc_error* error) { fd_node* fdn = static_cast(arg); GRPC_ERROR_REF(error); - fdn->ev_driver->logical_thread->Run( + fdn->ev_driver->work_serializer->Run( [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); } @@ -386,7 +386,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fdn = static_cast(gpr_malloc(sizeof(fd_node))); fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( - socks[i], ev_driver->pollset_set, ev_driver->logical_thread); + socks[i], ev_driver->pollset_set, ev_driver->work_serializer); GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, fdn->grpc_polled_fd->GetName()); fdn->ev_driver = ev_driver; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 93aba9bb9b1..cedf0c39709 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -43,7 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked( grpc_error* grpc_ares_ev_driver_create_locked( grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread, + std::shared_ptr work_serializer, grpc_ares_request* request); /* Called back when all DNS lookups have completed. */ @@ -90,13 +90,13 @@ class GrpcPolledFdFactory { /* Creates a new wrapped fd for the current platform */ virtual GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, - RefCountedPtr logical_thread) = 0; + std::shared_ptr work_serializer) = 0; /* Optionally configures the ares channel after creation */ virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; }; std::unique_ptr NewGrpcPolledFdFactory( - RefCountedPtr logical_thread); + std::shared_ptr work_serializer); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc index 7aa28801943..947c3657833 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc @@ -31,7 +31,7 @@ #include #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -42,8 +42,8 @@ void ares_uv_poll_close_cb(uv_handle_t* handle) { delete handle; } class GrpcPolledFdLibuv : public GrpcPolledFd { public: GrpcPolledFdLibuv(ares_socket_t as, - RefCountedPtr logical_thread) - : as_(as), logical_thread_(std::move(logical_thread)) { + std::shared_ptr work_serializer) + : as_(as), work_serializer_(std::move(work_serializer)) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as); handle_ = new uv_poll_t(); uv_poll_init_socket(uv_default_loop(), handle_, as); @@ -106,7 +106,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd { grpc_closure* read_closure_ = nullptr; grpc_closure* write_closure_ = nullptr; int poll_events_ = 0; - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; }; struct AresUvPollCbArg { @@ -152,23 +152,23 @@ void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) { GrpcPolledFdLibuv* polled_fd = reinterpret_cast(handle->data); AresUvPollCbArg* arg = new AresUvPollCbArg(handle, status, events); - polled_fd->logical_thread_->Run([arg]() { ares_uv_poll_cb_locked(arg); }, - DEBUG_LOCATION); + polled_fd->work_serializer_->Run([arg]() { ares_uv_poll_cb_locked(arg); }, + DEBUG_LOCATION); } class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, - RefCountedPtr logical_thread) override { - return new GrpcPolledFdLibuv(as, logical_thread); + std::shared_ptr work_serializer) override { + return new GrpcPolledFdLibuv(as, work_serializer); } void ConfigureAresChannelLocked(ares_channel channel) override {} }; std::unique_ptr NewGrpcPolledFdFactory( - RefCountedPtr /*logical_thread*/) { + std::shared_ptr /*work_serializer*/) { return MakeUnique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index ebe17fd46d9..e1d3884677e 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -90,7 +90,7 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, - RefCountedPtr /*logical_thread*/) override { + std::shared_ptr /*work_serializer*/) override { return new GrpcPolledFdPosix(as, driver_pollset_set); } @@ -98,7 +98,7 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { }; std::unique_ptr NewGrpcPolledFdFactory( - RefCountedPtr /*logical_thread*/) { + std::shared_ptr /*work_serializer*/) { return MakeUnique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index ae0d75b78fb..1be5906a7ba 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -31,11 +31,11 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/iocp_windows.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_windows.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_windows.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -98,7 +98,7 @@ class GrpcPolledFdWindows { }; GrpcPolledFdWindows(ares_socket_t as, - RefCountedPtr logical_thread, + std::shared_ptr work_serializer, int address_family, int socket_type) : read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), @@ -106,7 +106,7 @@ class GrpcPolledFdWindows { gotten_into_driver_list_(false), address_family_(address_family), socket_type_(socket_type), - logical_thread_(std::move(logical_thread)) { + work_serializer_(std::move(work_serializer)) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); // Closure Initialization GRPC_CLOSURE_INIT(&outer_read_closure_, @@ -148,8 +148,8 @@ class GrpcPolledFdWindows { GPR_ASSERT(!read_buf_has_data_); read_buf_ = GRPC_SLICE_MALLOC(4192); if (connect_done_) { - logical_thread_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, - DEBUG_LOCATION); + work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, + DEBUG_LOCATION); } else { GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false); pending_continue_register_for_on_readable_locked_ = true; @@ -205,8 +205,8 @@ class GrpcPolledFdWindows { GPR_ASSERT(write_closure_ == nullptr); write_closure_ = write_closure; if (connect_done_) { - logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); }, - DEBUG_LOCATION); + work_serializer_->Run( + [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); } else { GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false); pending_continue_register_for_on_writeable_locked_ = true; @@ -421,7 +421,7 @@ class GrpcPolledFdWindows { GrpcPolledFdWindows* grpc_polled_fd = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - grpc_polled_fd->logical_thread_->Run( + grpc_polled_fd->work_serializer_->Run( [grpc_polled_fd, error]() { grpc_polled_fd->OnTcpConnectLocked(error); }, @@ -462,12 +462,12 @@ class GrpcPolledFdWindows { wsa_connect_error_ = WSA_OPERATION_ABORTED; } if (pending_continue_register_for_on_readable_locked_) { - logical_thread_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, - DEBUG_LOCATION); + work_serializer_->Run([this]() { ContinueRegisterForOnReadableLocked(); }, + DEBUG_LOCATION); } if (pending_continue_register_for_on_writeable_locked_) { - logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); }, - DEBUG_LOCATION); + work_serializer_->Run( + [this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); } GRPC_ERROR_UNREF(error); } @@ -576,7 +576,7 @@ class GrpcPolledFdWindows { static void OnIocpReadable(void* arg, grpc_error* error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - polled_fd->logical_thread_->Run( + polled_fd->work_serializer_->Run( [polled_fd, error]() { OnIocpReadableLocked(error); }, DEBUG_LOCATION); } @@ -622,7 +622,7 @@ class GrpcPolledFdWindows { static void OnIocpWriteable(void* arg, grpc_error* error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); GRPC_ERROR_REF(error); // error owned by lambda - polled_fd->logical_thread_->Run( + polled_fd->work_serializer_->Run( [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); }, DEBUG_LOCATION); } @@ -660,7 +660,7 @@ class GrpcPolledFdWindows { bool gotten_into_driver_list() const { return gotten_into_driver_list_; } void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; } - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; char recv_from_source_addr_[200]; ares_socklen_t recv_from_source_addr_len_; grpc_slice read_buf_; @@ -702,8 +702,8 @@ struct SockToPolledFdEntry { * with a GrpcPolledFdWindows factory and event driver */ class SockToPolledFdMap { public: - SockToPolledFdMap(RefCountedPtr logical_thread) - : logical_thread_(std::move(logical_thread)) {} + SockToPolledFdMap(std::shared_ptr work_serializer) + : work_serializer_(std::move(work_serializer)) {} ~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); } @@ -761,7 +761,7 @@ class SockToPolledFdMap { } grpc_tcp_set_non_block(s); GrpcPolledFdWindows* polled_fd = - new GrpcPolledFdWindows(s, map->logical_thread_, af, type); + new GrpcPolledFdWindows(s, map->work_serializer_, af, type); GRPC_CARES_TRACE_LOG( "fd:|%s| created with params af:%d type:%d protocol:%d", polled_fd->GetName(), af, type, protocol); @@ -817,7 +817,7 @@ class SockToPolledFdMap { private: SockToPolledFdEntry* head_ = nullptr; - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; }; const struct ares_socket_functions custom_ares_sock_funcs = { @@ -866,12 +866,12 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd { class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { public: - GrpcPolledFdFactoryWindows(RefCountedPtr logical_thread) - : sock_to_polled_fd_map_(logical_thread) {} + GrpcPolledFdFactoryWindows(std::shared_ptr work_serializer) + : sock_to_polled_fd_map_(work_serializer) {} GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, - RefCountedPtr logical_thread) override { + std::shared_ptr work_serializer) override { GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as); // Set a flag so that the virtual socket "close" method knows it // doesn't need to call ShutdownLocked, since now the driver will. @@ -889,8 +889,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { }; std::unique_ptr NewGrpcPolledFdFactory( - RefCountedPtr logical_thread) { - return MakeUnique(std::move(logical_thread)); + std::shared_ptr work_serializer) { + return MakeUnique(std::move(work_serializer)); } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index a562e2a4d85..a74ea201094 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -350,7 +350,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, bool check_grpclb, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) { + std::shared_ptr work_serializer) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; ares_channel* channel = nullptr; @@ -373,7 +373,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( port.reset(gpr_strdup(default_port)); } error = grpc_ares_ev_driver_create_locked( - &r->ev_driver, interested_parties, query_timeout_ms, logical_thread, r); + &r->ev_driver, interested_parties, query_timeout_ms, work_serializer, r); if (error != GRPC_ERROR_NONE) goto error_cleanup; channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); // If dns_server is specified, use it. @@ -590,7 +590,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) { + std::shared_ptr work_serializer) { grpc_ares_request* r = static_cast(gpr_zalloc(sizeof(grpc_ares_request))); r->ev_driver = nullptr; @@ -624,7 +624,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( // Look up name using c-ares lib. grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( r, dns_server, name, default_port, interested_parties, check_grpclb, - query_timeout_ms, logical_thread); + query_timeout_ms, work_serializer); return r; } @@ -633,7 +633,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) = + std::shared_ptr work_serializer) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { @@ -674,8 +674,8 @@ void grpc_ares_cleanup(void) {} */ typedef struct grpc_resolve_address_ares_request { - /* logical_thread that queries and related callbacks run under */ - grpc_core::RefCountedPtr logical_thread; + /* work_serializer that queries and related callbacks run under */ + std::shared_ptr work_serializer; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving addresses */ @@ -724,8 +724,8 @@ static void on_dns_lookup_done(void* arg, grpc_error* error) { grpc_resolve_address_ares_request* r = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - r->logical_thread->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, - DEBUG_LOCATION); + r->work_serializer->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, + DEBUG_LOCATION); } static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { @@ -737,7 +737,7 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, &r->on_dns_lookup_done_locked, &r->addresses, false /* check_grpclb */, nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, - r->logical_thread); + r->work_serializer); } static void grpc_resolve_address_ares_impl(const char* name, @@ -747,13 +747,13 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolved_addresses** addrs) { grpc_resolve_address_ares_request* r = new grpc_resolve_address_ares_request(); - r->logical_thread = grpc_core::MakeRefCounted(); + r->work_serializer = std::make_shared(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; r->name = name; r->default_port = default_port; r->interested_parties = interested_parties; - r->logical_thread->Run( + r->work_serializer->Run( [r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); }, DEBUG_LOCATION); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 8c63590c993..07bd88fb413 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -23,9 +23,9 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/work_serializer.h" #define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 120000 @@ -66,7 +66,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread); + std::shared_ptr work_serializer); /* Cancel the pending grpc_ares_request \a request */ extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index 6e76a408bdb..d464403bc43 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -31,7 +31,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) { + std::shared_ptr work_serializer) { return NULL; } @@ -40,7 +40,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) = + std::shared_ptr work_serializer) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {} diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 94e3ffa80cb..f838601ffd2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -33,9 +33,9 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/work_serializer.h" #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 @@ -97,7 +97,7 @@ class NativeDnsResolver : public Resolver { }; NativeDnsResolver::NativeDnsResolver(ResolverArgs args) - : Resolver(args.logical_thread, std::move(args.result_handler)), + : Resolver(args.work_serializer, std::move(args.result_handler)), backoff_( BackOff::Options() .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * @@ -150,8 +150,8 @@ void NativeDnsResolver::ShutdownLocked() { void NativeDnsResolver::OnNextResolution(void* arg, grpc_error* error) { NativeDnsResolver* r = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); }, - DEBUG_LOCATION); + r->work_serializer()->Run([r, error]() { r->OnNextResolutionLocked(error); }, + DEBUG_LOCATION); } void NativeDnsResolver::OnNextResolutionLocked(grpc_error* error) { @@ -166,8 +166,8 @@ void NativeDnsResolver::OnNextResolutionLocked(grpc_error* error) { void NativeDnsResolver::OnResolved(void* arg, grpc_error* error) { NativeDnsResolver* r = static_cast(arg); GRPC_ERROR_REF(error); // owned by lambda - r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); }, - DEBUG_LOCATION); + r->work_serializer()->Run([r, error]() { r->OnResolvedLocked(error); }, + DEBUG_LOCATION); } void NativeDnsResolver::OnResolvedLocked(grpc_error* error) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 14fc0304705..252c8e9d126 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -35,9 +35,9 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -87,7 +87,7 @@ class FakeResolver : public Resolver { }; FakeResolver::FakeResolver(ResolverArgs args) - : Resolver(args.logical_thread, std::move(args.result_handler)), + : Resolver(args.work_serializer, std::move(args.result_handler)), response_generator_( FakeResolverResponseGenerator::GetFromArgs(args.args)) { // Channels sharing the same subchannels may have different resolver response @@ -119,8 +119,8 @@ void FakeResolver::RequestReresolutionLocked() { if (!reresolution_closure_pending_) { reresolution_closure_pending_ = true; Ref().release(); // ref held by closure - logical_thread()->Run([this]() { ReturnReresolutionResult(); }, - DEBUG_LOCATION); + work_serializer()->Run([this]() { ReturnReresolutionResult(); }, + DEBUG_LOCATION); } } } @@ -228,7 +228,7 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver, std::move(result)); - resolver->logical_thread()->Run( + resolver->work_serializer()->Run( [arg]() { arg->SetResponseLocked(); delete arg; @@ -246,7 +246,7 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver, std::move(result), true); - resolver->logical_thread()->Run( + resolver->work_serializer()->Run( [arg]() { arg->SetReresolutionResponseLocked(); delete arg; @@ -263,7 +263,7 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() { } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver, Resolver::Result()); - resolver->logical_thread()->Run( + resolver->work_serializer()->Run( [arg]() { arg->SetReresolutionResponseLocked(); delete arg; @@ -280,7 +280,7 @@ void FakeResolverResponseGenerator::SetFailure() { } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver, Resolver::Result()); - resolver->logical_thread()->Run( + resolver->work_serializer()->Run( [arg]() { arg->SetFailureLocked(); delete arg; @@ -297,7 +297,7 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() { } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter( resolver, Resolver::Result(), false, false); - resolver->logical_thread()->Run( + resolver->work_serializer()->Run( [arg]() { arg->SetFailureLocked(); delete arg; @@ -313,7 +313,7 @@ void FakeResolverResponseGenerator::SetFakeResolver( if (has_result_) { FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver_, std::move(result_)); - resolver_->logical_thread()->Run( + resolver_->work_serializer()->Run( [arg]() { arg->SetResponseLocked(); delete arg; diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 671b72deb8b..47aeac6ca64 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -31,9 +31,9 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -57,7 +57,7 @@ class SockaddrResolver : public Resolver { SockaddrResolver::SockaddrResolver(ServerAddressList addresses, ResolverArgs args) - : Resolver(args.logical_thread, std::move(args.result_handler)), + : Resolver(args.work_serializer, std::move(args.result_handler)), addresses_(std::move(addresses)), channel_args_(grpc_channel_args_copy(args.args)) {} diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 9767a875e3f..e9f1f4a33fd 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -33,7 +33,7 @@ namespace { class XdsResolver : public Resolver { public: explicit XdsResolver(ResolverArgs args) - : Resolver(args.logical_thread, std::move(args.result_handler)), + : Resolver(args.work_serializer, std::move(args.result_handler)), args_(grpc_channel_args_copy(args.args)), interested_parties_(args.pollset_set) { char* path = args.uri->path; @@ -90,7 +90,7 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) { void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - combiner(), interested_parties_, StringView(server_name_.get()), + work_serializer(), interested_parties_, StringView(server_name_.get()), grpc_core::MakeUnique(Ref()), *args_, &error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h index 0852640c8a0..d8dd801ae09 100644 --- a/src/core/ext/filters/client_channel/resolver_factory.h +++ b/src/core/ext/filters/client_channel/resolver_factory.h @@ -38,8 +38,8 @@ struct ResolverArgs { const grpc_channel_args* args = nullptr; /// Used to drive I/O in the name resolution process. grpc_pollset_set* pollset_set = nullptr; - /// The logical_thread under which all resolver calls will be run. - RefCountedPtr logical_thread; + /// The work_serializer under which all resolver calls will be run. + std::shared_ptr work_serializer; /// The result handler to be used by the resolver. std::unique_ptr result_handler; }; diff --git a/src/core/ext/filters/client_channel/resolver_registry.cc b/src/core/ext/filters/client_channel/resolver_registry.cc index 5d2393f7801..718556a4c2a 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.cc +++ b/src/core/ext/filters/client_channel/resolver_registry.cc @@ -145,7 +145,8 @@ bool ResolverRegistry::IsValidTarget(const char* target) { OrphanablePtr ResolverRegistry::CreateResolver( const char* target, const grpc_channel_args* args, - grpc_pollset_set* pollset_set, RefCountedPtr logical_thread, + grpc_pollset_set* pollset_set, + std::shared_ptr work_serializer, std::unique_ptr result_handler) { GPR_ASSERT(g_state != nullptr); grpc_uri* uri = nullptr; @@ -156,7 +157,7 @@ OrphanablePtr ResolverRegistry::CreateResolver( resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; - resolver_args.logical_thread = std::move(logical_thread); + resolver_args.work_serializer = std::move(work_serializer); resolver_args.result_handler = std::move(result_handler); OrphanablePtr resolver = factory == nullptr ? nullptr diff --git a/src/core/ext/filters/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h index a0844036229..82025065494 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.h +++ b/src/core/ext/filters/client_channel/resolver_registry.h @@ -61,17 +61,16 @@ class ResolverRegistry { /// prepends default_prefix to target and tries again. /// If a resolver factory is found, uses it to instantiate a resolver and /// returns it; otherwise, returns nullptr. - /// \a args, \a pollset_set, and \a logical_thread are passed to the factory's - /// \a CreateResolver() method. - /// \a args are the channel args to be included in resolver results. - /// \a pollset_set is used to drive I/O in the name resolution process. - /// \a logical_thread is the logical_thread under which all resolver calls - /// will be run. \a result_handler is used to return results from the - /// resolver. + /// \a args, \a pollset_set, and \a work_serializer are passed to the + /// factory's \a CreateResolver() method. \a args are the channel args to be + /// included in resolver results. \a pollset_set is used to drive I/O in the + /// name resolution process. \a work_serializer is the work_serializer under + /// which all resolver calls will be run. \a result_handler is used to return + /// results from the resolver. static OrphanablePtr CreateResolver( const char* target, const grpc_channel_args* args, grpc_pollset_set* pollset_set, - RefCountedPtr logical_thread, + std::shared_ptr work_serializer, std::unique_ptr result_handler); /// Returns the default authority to pass from a client for \a target. diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index a5f1cbd34be..b6d6bfcfd54 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -186,7 +186,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( process_resolver_result_user_data_(process_resolver_result_user_data) { GPR_ASSERT(process_resolver_result != nullptr); resolver_ = ResolverRegistry::CreateResolver( - target_uri_.get(), args.args, interested_parties(), combiner(), + target_uri_.get(), args.args, interested_parties(), work_serializer(), grpc_core::MakeUnique(Ref())); // Since the validity of args has been checked when create the channel, // CreateResolver() must return a non-null result. @@ -372,7 +372,7 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( TraceStringVector* trace_strings) { ResolvingControlHelper* helper = new ResolvingControlHelper(Ref()); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.logical_thread = logical_thread(); + lb_policy_args.work_serializer = work_serializer(); lb_policy_args.channel_control_helper = std::unique_ptr(helper); lb_policy_args.args = &args; diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index a961f91420f..6738052d53d 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -130,7 +130,7 @@ class XdsPriorityListUpdate { }; // There are two phases of accessing this class's content: -// 1. to initialize in the control plane logical_thread; +// 1. to initialize in the control plane work_serializer; // 2. to use in the data plane mutex. // So no additional synchronization is needed. class XdsDropConfig : public RefCounted { diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index e6d7e985f56..8c40b013e2f 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -46,10 +46,10 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/logical_thread.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -166,40 +166,35 @@ class XdsClient::ChannelState::AdsCallState private: static void OnTimer(void* arg, grpc_error* error) { ResourceState* self = static_cast(arg); - self->ads_calld_->xds_client()->combiner_->Run( - GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self, - nullptr), - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error); // ref owned by lambda + self->ads_calld_->xds_client()->work_serializer_->Run( + [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); } - static void OnTimerLocked(void* arg, grpc_error* error) { - ResourceState* self = static_cast(arg); - if (error == GRPC_ERROR_NONE && self->timer_pending_) { - self->timer_pending_ = false; + void OnTimerLocked(grpc_error* error) { + if (error == GRPC_ERROR_NONE && timer_pending_) { + timer_pending_ = false; char* msg; gpr_asprintf( &msg, "timeout obtaining resource {type=%s name=%s} from xds server", - self->type_url_.c_str(), self->name_.c_str()); + type_url_.c_str(), name_.c_str()); grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] %s", - self->ads_calld_->xds_client(), grpc_error_string(error)); + gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(), + grpc_error_string(error)); } - if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) { - self->ads_calld_->xds_client()->service_config_watcher_->OnError( - error); - } else if (self->type_url_ == kCdsTypeUrl) { - ClusterState& state = - self->ads_calld_->xds_client()->cluster_map_[self->name_]; + if (type_url_ == kLdsTypeUrl || type_url_ == kRdsTypeUrl) { + ads_calld_->xds_client()->service_config_watcher_->OnError(error); + } else if (type_url_ == kCdsTypeUrl) { + ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_]; for (const auto& p : state.watchers) { p.first->OnError(GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); - } else if (self->type_url_ == kEdsTypeUrl) { - EndpointState& state = - self->ads_calld_->xds_client()->endpoint_map_[self->name_]; + } else if (type_url_ == kEdsTypeUrl) { + EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_]; for (const auto& p : state.watchers) { p.first->OnError(GRPC_ERROR_REF(error)); } @@ -208,8 +203,9 @@ class XdsClient::ChannelState::AdsCallState GPR_UNREACHABLE_CODE(return ); } } - self->ads_calld_.reset(); - self->Unref(); + ads_calld_.reset(); + Unref(); + GRPC_ERROR_UNREF(error); } const std::string type_url_; @@ -243,7 +239,7 @@ class XdsClient::ChannelState::AdsCallState void AcceptEdsUpdate(EdsUpdateMap eds_update_map); static void OnRequestSent(void* arg, grpc_error* error); - static void OnRequestSentLocked(void* arg, grpc_error* error); + void OnRequestSentLocked(grpc_error* error); static void OnResponseReceived(void* arg, grpc_error* error); static void OnStatusReceived(void* arg, grpc_error* error); void OnResponseReceivedLocked(); @@ -393,7 +389,7 @@ class XdsClient::ChannelState::StateWatcher public: explicit StateWatcher(RefCountedPtr parent) : AsyncConnectivityStateWatcherInterface( - parent->xds_client()->logical_thread_), + parent->xds_client()->work_serializer_), parent_(std::move(parent)) {} private: @@ -646,7 +642,7 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimer( void* arg, grpc_error* error) { RetryableCall* calld = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - calld->chand_->xds_client()->logical_thread_->Run( + calld->chand_->xds_client()->work_serializer_->Run( [calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION); } @@ -1077,19 +1073,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); - ads_calld->xds_client()->combiner_->Run( - GRPC_CLOSURE_INIT(&ads_calld->on_request_sent_, OnRequestSentLocked, - ads_calld, nullptr), - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error); // ref owned by lambda + ads_calld->xds_client()->work_serializer_->Run( + [ads_calld, error]() { ads_calld->OnRequestSentLocked(error); }, + DEBUG_LOCATION); } void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( - void* arg, grpc_error* error) { - AdsCallState* self = static_cast(arg); - if (self->IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) { + grpc_error* error) { + if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) { // Clean up the sent message. - grpc_byte_buffer_destroy(self->send_message_payload_); - self->send_message_payload_ = nullptr; + grpc_byte_buffer_destroy(send_message_payload_); + send_message_payload_ = nullptr; // Continue to send another pending message if any. // TODO(roth): The current code to handle buffered messages has the // advantage of sending only the most recent list of resource names for @@ -1099,41 +1094,36 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( // order of resource types. We need to fix this if we are seeing some // resource type(s) starved due to frequent requests of other resource // type(s). - auto it = self->buffered_requests_.begin(); - if (it != self->buffered_requests_.end()) { - self->SendMessageLocked(*it); - self->buffered_requests_.erase(it); + auto it = buffered_requests_.begin(); + if (it != buffered_requests_.end()) { + SendMessageLocked(*it); + buffered_requests_.erase(it); } } - self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); + Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); + GRPC_ERROR_UNREF(error); } void XdsClient::ChannelState::AdsCallState::OnResponseReceived( - void* arg, grpc_error* error) { + void* arg, grpc_error* /* error */) { AdsCallState* ads_calld = static_cast(arg); - ads_calld->xds_client()->combiner_->Run( - GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, - OnResponseReceivedLocked, ads_calld, nullptr), - GRPC_ERROR_REF(error)); + ads_calld->xds_client()->work_serializer_->Run( + [ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); } -void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( - void* arg, grpc_error* /*error*/) { - AdsCallState* ads_calld = static_cast(arg); - XdsClient* xds_client = ads_calld->xds_client(); +void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Empty payload means the call was cancelled. - if (!ads_calld->IsCurrentCallOnChannel() || - ads_calld->recv_message_payload_ == nullptr) { - ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); + if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { + Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); return; } // Read the response. grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_); + grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(ads_calld->recv_message_payload_); - ads_calld->recv_message_payload_ = nullptr; + grpc_byte_buffer_destroy(recv_message_payload_); + recv_message_payload_ = nullptr; // TODO(juanlishen): When we convert this to use the xds protocol, the // balancer will send us a fallback timeout such that we should go into // fallback mode if we have lost contact with the balancer after a certain @@ -1152,18 +1142,19 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( std::string type_url; // Note that XdsAdsResponseDecodeAndParse() also validate the response. grpc_error* parse_error = XdsAdsResponseDecodeAndParse( - response_slice, xds_client->server_name_, xds_client->route_config_name_, - ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update, - &cds_update_map, &eds_update_map, &version, &nonce, &type_url); + response_slice, xds_client()->server_name_, + xds_client()->route_config_name_, EdsServiceNamesForRequest(), + &lds_update, &rds_update, &cds_update_map, &eds_update_map, &version, + &nonce, &type_url); grpc_slice_unref_internal(response_slice); if (type_url.empty()) { // Ignore unparsable response. gpr_log(GPR_ERROR, "[xds_client %p] No type_url found. error=%s", - xds_client, grpc_error_string(parse_error)); + xds_client(), grpc_error_string(parse_error)); GRPC_ERROR_UNREF(parse_error); } else { // Update nonce. - auto& state = ads_calld->state_map_[type_url]; + auto& state = state_map_[type_url]; state.nonce = std::move(nonce); // NACK or ACK the response. if (parse_error != GRPC_ERROR_NONE) { @@ -1173,34 +1164,33 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( gpr_log( GPR_ERROR, "[xds_client %p] ADS response can't be accepted, NACKing. error=%s", - xds_client, grpc_error_string(parse_error)); - ads_calld->SendMessageLocked(type_url); + xds_client(), grpc_error_string(parse_error)); + SendMessageLocked(type_url); } else { - ads_calld->seen_response_ = true; + seen_response_ = true; // Accept the ADS response according to the type_url. if (type_url == kLdsTypeUrl) { - ads_calld->AcceptLdsUpdate(std::move(lds_update)); + AcceptLdsUpdate(std::move(lds_update)); } else if (type_url == kRdsTypeUrl) { - ads_calld->AcceptRdsUpdate(std::move(rds_update)); + AcceptRdsUpdate(std::move(rds_update)); } else if (type_url == kCdsTypeUrl) { - ads_calld->AcceptCdsUpdate(std::move(cds_update_map)); + AcceptCdsUpdate(std::move(cds_update_map)); } else if (type_url == kEdsTypeUrl) { - ads_calld->AcceptEdsUpdate(std::move(eds_update_map)); + AcceptEdsUpdate(std::move(eds_update_map)); } state.version = std::move(version); // ACK the update. - ads_calld->SendMessageLocked(type_url); + SendMessageLocked(type_url); // Start load reporting if needed. - auto& lrs_call = ads_calld->chand()->lrs_calld_; + auto& lrs_call = chand()->lrs_calld_; if (lrs_call != nullptr) { LrsCallState* lrs_calld = lrs_call->calld(); if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); } } } - if (xds_client->shutting_down_) { - ads_calld->Unref(DEBUG_LOCATION, - "ADS+OnResponseReceivedLocked+xds_shutdown"); + if (xds_client()->shutting_down_) { + Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown"); return; } // Keep listening for updates. @@ -1221,7 +1211,7 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived( void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - ads_calld->xds_client()->logical_thread_->Run( + ads_calld->xds_client()->work_serializer_->Run( [ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); }, DEBUG_LOCATION); } @@ -1300,7 +1290,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_client()->logical_thread_->Run( + self->xds_client()->work_serializer_->Run( [self, error]() { self->OnNextReportTimerLocked(error); }, DEBUG_LOCATION); } @@ -1352,7 +1342,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - self->xds_client()->logical_thread_->Run( + self->xds_client()->work_serializer_->Run( [self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION); } @@ -1528,7 +1518,7 @@ bool XdsClient::ChannelState::LrsCallState::ShouldSendLoadReports( void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); - lrs_calld->xds_client()->logical_thread_->Run( + lrs_calld->xds_client()->work_serializer_->Run( [lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION); } @@ -1544,7 +1534,7 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { void XdsClient::ChannelState::LrsCallState::OnResponseReceived( void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); - lrs_calld->xds_client()->logical_thread_->Run( + lrs_calld->xds_client()->work_serializer_->Run( [lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); } @@ -1580,12 +1570,12 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { gpr_log(GPR_INFO, "[xds_client %p] LRS response received, %" PRIuPTR " cluster names, load_report_interval=%" PRId64 "ms", - xds_client, new_cluster_names.size(), + xds_client(), new_cluster_names.size(), new_load_reporting_interval); size_t i = 0; for (const auto& name : new_cluster_names) { gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s", - xds_client, i++, name.c_str()); + xds_client(), i++, name.c_str()); } } if (new_load_reporting_interval < @@ -1600,8 +1590,8 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { } } // Ignore identical update. - if (lrs_calld->cluster_names_ == new_cluster_names && - lrs_calld->load_reporting_interval_ == new_load_reporting_interval) { + if (cluster_names_ == new_cluster_names && + load_reporting_interval_ == new_load_reporting_interval) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] Incoming LRS response identical to current, " @@ -1613,8 +1603,8 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { // Stop current load reporting (if any) to adopt the new config. reporter_.reset(); // Record the new config. - lrs_calld->cluster_names_ = std::move(new_cluster_names); - lrs_calld->load_reporting_interval_ = new_load_reporting_interval; + cluster_names_ = std::move(new_cluster_names); + load_reporting_interval_ = new_load_reporting_interval; // Try starting sending load report. MaybeStartReportingLocked(); }(); @@ -1641,7 +1631,7 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceived( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda - lrs_calld->xds_client()->logical_thread_->Run( + lrs_calld->xds_client()->work_serializer_->Run( [lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); }, DEBUG_LOCATION); } @@ -1696,7 +1686,7 @@ UniquePtr GenerateBuildVersionString() { } // namespace -XdsClient::XdsClient(RefCountedPtr logical_thread, +XdsClient::XdsClient(std::shared_ptr work_serializer, grpc_pollset_set* interested_parties, StringView server_name, std::unique_ptr watcher, @@ -1704,7 +1694,7 @@ XdsClient::XdsClient(RefCountedPtr logical_thread, : InternallyRefCounted(&grpc_xds_client_trace), request_timeout_(GetRequestTimeout(channel_args)), build_version_(GenerateBuildVersionString()), - combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), + work_serializer_(std::move(work_serializer)), interested_parties_(interested_parties), bootstrap_(XdsBootstrap::ReadFromFile(error)), server_name_(server_name), diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 3abed152e74..3f458893bc8 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -32,7 +32,7 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/string_view.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -73,7 +73,7 @@ class XdsClient : public InternallyRefCounted { // If *error is not GRPC_ERROR_NONE after construction, then there was // an error initializing the client. - XdsClient(RefCountedPtr logical_thread, + XdsClient(std::shared_ptr work_serializer, grpc_pollset_set* interested_parties, StringView server_name, std::unique_ptr watcher, const grpc_channel_args& channel_args, grpc_error** error); @@ -202,12 +202,12 @@ class XdsClient : public InternallyRefCounted { static int ChannelArgCmp(void* p, void* q); static const grpc_arg_pointer_vtable kXdsClientVtable; - + const grpc_millis request_timeout_; grpc_core::UniquePtr build_version_; - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; grpc_pollset_set* interested_parties_; std::unique_ptr bootstrap_; diff --git a/src/core/ext/filters/client_channel/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/xds/xds_client_stats.h index a896c4e6877..0b399654134 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client_stats.h +++ b/src/core/ext/filters/client_channel/xds/xds_client_stats.h @@ -132,20 +132,20 @@ class XdsClientStats { // If the refcount is 0, there won't be new calls recorded to the // LocalityStats, so the LocalityStats can be safely deleted when all the // in-progress calls have finished. - // Only be called from the control plane logical_thread. + // Only be called from the control plane work_serializer. void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); } - // Might be called from the control plane logical_thread or the data plane + // Might be called from the control plane work_serializer or the data plane // mutex. // TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged, // this method will also only be invoked in the control plane - // logical_thread. We may then be able to simplify the LocalityStats' + // work_serializer. We may then be able to simplify the LocalityStats' // lifetime by making it RefCounted<> and populating the protobuf in its // dtor. void UnrefByPicker() { picker_refcount_.FetchSub(1, MemoryOrder::ACQ_REL); } - // Only be called from the control plane logical_thread. + // Only be called from the control plane work_serializer. // The only place where the picker_refcount_ can be increased is // RefByPicker(), which also can only be called from the control plane - // logical_thread. Also, if the picker_refcount_ is 0, + // work_serializer. Also, if the picker_refcount_ is 0, // total_requests_in_progress_ can't be increased from 0. So it's safe to // delete the LocalityStats right after this method returns true. bool IsSafeToDelete() { @@ -164,12 +164,12 @@ class XdsClientStats { Atomic total_issued_requests_{0}; // Protects load_metric_stats_. A mutex is necessary because the length of // load_metric_stats_ can be accessed by both the callback intercepting the - // call's recv_trailing_metadata (not from any logical_thread) and the load - // reporting thread (from the control plane logical_thread). + // call's recv_trailing_metadata (not from any work_serializer) and the load + // reporting thread (from the control plane work_serializer). Mutex load_metric_stats_mu_; LoadMetricMap load_metric_stats_; - // Can be accessed from either the control plane logical_thread or the data - // plane logical_thread. + // Can be accessed from either the control plane work_serializer or the data + // plane work_serializer. Atomic picker_refcount_{0}; }; @@ -215,7 +215,7 @@ class XdsClientStats { // Protects dropped_requests_. A mutex is necessary because the length of // dropped_requests_ can be accessed by both the picker (from data plane // mutex) and the load reporting thread (from the control plane - // logical_thread). + // work_serializer). Mutex dropped_requests_mu_; DroppedRequestsMap dropped_requests_; // The timestamp of last reporting. For the LB-policy-wide first report, the diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 6234f65c685..a90b694ee0b 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -251,11 +251,6 @@ class Closure { #endif GRPC_ERROR_UNREF(error); } - - static std::function ToFunction(grpc_closure* closure, - grpc_error* error) { - return [closure, error] { Run(DEBUG_LOCATION, closure, error); }; - } }; } // namespace grpc_core diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index b1507391a06..2d34f52e635 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -59,11 +59,12 @@ class AsyncConnectivityStateWatcherInterface::Notifier { public: Notifier(RefCountedPtr watcher, grpc_connectivity_state state, - const RefCountedPtr& logical_thread) + const std::shared_ptr& work_serializer) : watcher_(std::move(watcher)), state_(state) { - if (logical_thread != nullptr) { - logical_thread->Run([this]() { SendNotification(this, GRPC_ERROR_NONE); }, - DEBUG_LOCATION); + if (work_serializer != nullptr) { + work_serializer->Run( + [this]() { SendNotification(this, GRPC_ERROR_NONE); }, + DEBUG_LOCATION); } else { GRPC_CLOSURE_INIT(&closure_, SendNotification, this, grpc_schedule_on_exec_ctx); @@ -89,7 +90,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier { void AsyncConnectivityStateWatcherInterface::Notify( grpc_connectivity_state state) { - new Notifier(Ref(), state, logical_thread_); // Deletes itself when done. + new Notifier(Ref(), state, work_serializer_); // Deletes itself when done. } // diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 4eb053eb27e..5ab62bed40c 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -29,7 +29,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -72,14 +72,14 @@ class AsyncConnectivityStateWatcherInterface // If \a combiner is nullptr, then the notification will be scheduled on the // ExecCtx. explicit AsyncConnectivityStateWatcherInterface( - RefCountedPtr logical_thread = nullptr) - : logical_thread_(std::move(logical_thread)) {} + std::shared_ptr work_serializer = nullptr) + : work_serializer_(std::move(work_serializer)) {} // Invoked asynchronously when Notify() is called. virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0; private: - RefCountedPtr logical_thread_; + std::shared_ptr work_serializer_; }; // Tracks connectivity state. Maintains a list of watchers that are diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index 1064653c1d0..0e9a110a9f1 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -26,14 +26,14 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "test/core/util/test_config.h" static gpr_mu g_mu; static bool g_fail_resolution = true; -static grpc_core::RefCountedPtr* g_logical_thread; +static std::shared_ptr* g_work_serializer; static void my_resolve_address(const char* addr, const char* /*default_port*/, grpc_pollset_set* /*interested_parties*/, @@ -66,7 +66,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked( std::unique_ptr* addresses, bool /*check_grpclb*/, char** /*service_config_json*/, int /*query_timeout_ms*/, - grpc_core::RefCountedPtr /*logical_thread*/) { + std::shared_ptr /*work_serializer*/) { gpr_mu_lock(&g_mu); GPR_ASSERT(0 == strcmp("test", addr)); grpc_error* error = GRPC_ERROR_NONE; @@ -99,7 +99,7 @@ static grpc_core::OrphanablePtr create_resolver( GPR_ASSERT(uri); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = std::move(result_handler); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -163,8 +163,8 @@ int main(int argc, char** argv) { gpr_mu_init(&g_mu); { grpc_core::ExecCtx exec_ctx; - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto work_serializer = grpc_core::MakeRefCounted(); + g_work_serializer = &work_serializer; grpc_set_resolver_impl(&test_resolver); grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; grpc_cancel_ares_request_locked = my_cancel_ares_request_locked; diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 9bf218c8ad8..1bc1c89c97f 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -26,7 +26,7 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "test/core/util/test_config.h" @@ -37,14 +37,14 @@ constexpr int kMinResolutionPeriodForCheckMs = 900; extern grpc_address_resolver_vtable* grpc_resolve_address_impl; static grpc_address_resolver_vtable* default_resolve_address; -static grpc_core::RefCountedPtr* g_logical_thread; +static std::shared_ptr* g_work_serializer; static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread); + std::shared_ptr work_serializer); // Counter incremented by test_resolve_address_impl indicating the number of // times a system-level resolution has happened. @@ -95,11 +95,11 @@ static grpc_ares_request* test_dns_lookup_ares_locked( grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done, std::unique_ptr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr logical_thread) { + std::shared_ptr work_serializer) { grpc_ares_request* result = g_default_dns_lookup_ares_locked( dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addresses, check_grpclb, service_config_json, query_timeout_ms, - std::move(logical_thread)); + std::move(work_serializer)); ++g_resolution_count; static grpc_millis last_resolution_time = 0; grpc_millis now = @@ -272,7 +272,7 @@ static void on_first_resolution(OnResolutionCallbackArg* cb_arg) { gpr_mu_unlock(g_iomgr_args.mu); } -static void start_test_under_logical_thread(void* arg) { +static void start_test_under_work_serializer(void* arg) { OnResolutionCallbackArg* res_cb_arg = static_cast(arg); res_cb_arg->result_handler = new ResultHandler(); @@ -284,7 +284,7 @@ static void start_test_under_logical_thread(void* arg) { GPR_ASSERT(uri != nullptr); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = std::unique_ptr( res_cb_arg->result_handler); g_resolution_count = 0; @@ -308,8 +308,8 @@ static void test_cooldown() { OnResolutionCallbackArg* res_cb_arg = new OnResolutionCallbackArg(); res_cb_arg->uri_str = "dns:127.0.0.1"; - (*g_logical_thread) - ->Run([res_cb_arg]() { start_test_under_logical_thread(res_cb_arg); }, + (*g_work_serializer) + ->Run([res_cb_arg]() { start_test_under_work_serializer(res_cb_arg); }, DEBUG_LOCATION); grpc_core::ExecCtx::Get()->Flush(); poll_pollset_until_request_done(&g_iomgr_args); @@ -320,8 +320,8 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto work_serializer = grpc_core::MakeRefCounted(); + g_work_serializer = &work_serializer; g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index d2b01bf435e..cb701ca0b52 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -25,10 +25,10 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "test/core/util/test_config.h" -static grpc_core::RefCountedPtr* g_logical_thread; +static std::shared_ptr* g_work_serializer; class TestResultHandler : public grpc_core::Resolver::ResultHandler { void ReturnResult(grpc_core::Resolver::Result /*result*/) override {} @@ -44,7 +44,7 @@ static void test_succeeds(grpc_core::ResolverFactory* factory, GPR_ASSERT(uri); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -61,7 +61,7 @@ static void test_fails(grpc_core::ResolverFactory* factory, GPR_ASSERT(uri); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -73,8 +73,8 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); { - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto work_serializer = grpc_core::MakeRefCounted(); + g_work_serializer = &work_serializer; grpc_core::ResolverFactory* dns = grpc_core::ResolverRegistry::LookupResolverFactory("dns"); diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index cbd746c8f6b..9eb4364cc89 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -28,7 +28,7 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "test/core/util/test_config.h" @@ -63,7 +63,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { }; static grpc_core::OrphanablePtr build_fake_resolver( - grpc_core::RefCountedPtr logical_thread, + std::shared_ptr work_serializer, grpc_core::FakeResolverResponseGenerator* response_generator, std::unique_ptr result_handler) { grpc_core::ResolverFactory* factory = @@ -74,7 +74,7 @@ static grpc_core::OrphanablePtr build_fake_resolver( grpc_channel_args channel_args = {1, &generator_arg}; grpc_core::ResolverArgs args; args.args = &channel_args; - args.logical_thread = std::move(logical_thread); + args.work_serializer = std::move(work_serializer); args.result_handler = std::move(result_handler); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -118,7 +118,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() { static void test_fake_resolver() { grpc_core::ExecCtx exec_ctx; - grpc_core::RefCountedPtr logical_thread = + std::shared_ptr work_serializer = grpc_core::MakeRefCounted(); // Create resolver. ResultHandler* result_handler = new ResultHandler(); @@ -126,7 +126,7 @@ static void test_fake_resolver() { response_generator = grpc_core::MakeRefCounted(); grpc_core::OrphanablePtr resolver = build_fake_resolver( - logical_thread, response_generator.get(), + work_serializer, response_generator.get(), std::unique_ptr(result_handler)); GPR_ASSERT(resolver.get() != nullptr); resolver->StartLocked(); diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index 7ce2ad9c247..1249b2ffeb7 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -24,11 +24,11 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "test/core/util/test_config.h" -static grpc_core::RefCountedPtr* g_logical_thread; +static std::shared_ptr* g_work_serializer; class ResultHandler : public grpc_core::Resolver::ResultHandler { public: @@ -46,7 +46,7 @@ static void test_succeeds(grpc_core::ResolverFactory* factory, GPR_ASSERT(uri); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -67,7 +67,7 @@ static void test_fails(grpc_core::ResolverFactory* factory, GPR_ASSERT(uri); grpc_core::ResolverArgs args; args.uri = uri; - args.logical_thread = *g_logical_thread; + args.work_serializer = *g_work_serializer; args.result_handler = grpc_core::MakeUnique(); grpc_core::OrphanablePtr resolver = factory->CreateResolver(std::move(args)); @@ -79,8 +79,8 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto work_serializer = grpc_core::MakeRefCounted(); + g_work_serializer = &work_serializer; grpc_core::ResolverFactory* ipv4 = grpc_core::ResolverRegistry::LookupResolverFactory("ipv4"); diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index e29401dbf5a..c565acc72b3 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -380,7 +380,7 @@ grpc_ares_request* my_dns_lookup_ares_locked( std::unique_ptr* addresses, bool /*check_grpclb*/, char** /*service_config_json*/, int /*query_timeout*/, - grpc_core::RefCountedPtr /*combiner*/) { + std::shared_ptr /*combiner*/) { addr_req* r = static_cast(gpr_malloc(sizeof(*r))); r->addr = gpr_strdup(addr); r->on_done = on_done; diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 3ebbbbdbbfd..61165f01c79 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -49,7 +49,7 @@ static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr combiner); + std::shared_ptr combiner); static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); @@ -106,7 +106,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked( grpc_pollset_set* interested_parties, grpc_closure* on_done, std::unique_ptr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_core::RefCountedPtr combiner) { + std::shared_ptr combiner) { if (0 != strcmp(addr, "test")) { return iomgr_dns_lookup_ares_locked(dns_server, addr, default_port, interested_parties, on_done, addresses, diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index ea1a6c2830c..1624ec45402 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -55,7 +55,7 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy { const std::string& delegate_policy_name, intptr_t initial_refcount = 1) : LoadBalancingPolicy(std::move(args), initial_refcount) { Args delegate_args; - delegate_args.logical_thread = logical_thread(); + delegate_args.work_serializer = work_serializer(); delegate_args.channel_control_helper = std::move(delegating_helper); delegate_args.args = args.args; delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 726f7eff765..7cb568e591e 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -36,7 +36,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/thd.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" #include "test/core/end2end/cq_verifier.h" @@ -81,7 +81,7 @@ struct ArgsStruct { gpr_mu* mu; grpc_pollset* pollset; grpc_pollset_set* pollset_set; - grpc_core::RefCountedPtr lock; + std::shared_ptr lock; grpc_channel_args* channel_args; }; diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 22389f15b1a..d3d30daa07f 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -51,7 +51,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/logical_thread.h" +#include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils.h" @@ -192,7 +192,7 @@ struct ArgsStruct { gpr_mu* mu; grpc_pollset* pollset; grpc_pollset_set* pollset_set; - grpc_core::RefCountedPtr lock; + std::shared_ptr lock; grpc_channel_args* channel_args; vector expected_addrs; std::string expected_service_config_string;