Remove exec_ctx dependency from logical thread

reviewable/pr21361/r3
Yash Tibrewal 5 years ago
parent 2575e8a24e
commit e36ede8c67
  1. 184
      src/core/ext/filters/client_channel/client_channel.cc
  2. 24
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 22
      src/core/ext/filters/client_channel/lb_policy.h
  4. 26
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 2
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  6. 27
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  7. 2
      src/core/ext/filters/client_channel/resolver.h
  8. 26
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  9. 97
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  10. 12
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  11. 2
      src/core/ext/filters/client_channel/resolver_factory.h
  12. 4
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  13. 20
      src/core/lib/iomgr/logical_thread.cc
  14. 1
      src/core/lib/iomgr/logical_thread.h
  15. 2
      test/core/util/test_lb_policies.cc

@ -209,8 +209,8 @@ class ChannelData {
void Cancel(); void Cancel();
private: private:
static void AddWatcherLocked(ExternalConnectivityWatcher* arg); void AddWatcherLocked();
static void RemoveWatcherLocked(ExternalConnectivityWatcher* arg); void RemoveWatcherLocked();
ChannelData* chand_; ChannelData* chand_;
grpc_polling_entity pollent_; grpc_polling_entity pollent_;
@ -243,9 +243,9 @@ class ChannelData {
grpc_error* DoPingLocked(grpc_transport_op* op); grpc_error* DoPingLocked(grpc_transport_op* op);
static void StartTransportOpLocked(void* arg, grpc_error* ignored); static void StartTransportOpLocked(grpc_transport_op* op);
static void TryToConnectLocked(ChannelData* arg); void TryToConnectLocked();
void ProcessLbPolicy( void ProcessLbPolicy(
const Resolver::Result& resolver_result, const Resolver::Result& resolver_result,
@ -279,9 +279,9 @@ class ChannelData {
RefCountedPtr<ServiceConfig> service_config_; RefCountedPtr<ServiceConfig> service_config_;
// //
// Fields used in the control plane. Guarded by combiner. // Fields used in the control plane. Guarded by logical_thread.
// //
RefCountedPtr<LogicalThread> combiner_; RefCountedPtr<LogicalThread> logical_thread_;
grpc_pollset_set* interested_parties_; grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_; OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
@ -293,17 +293,18 @@ class ChannelData {
std::map<Subchannel*, int> subchannel_refcount_map_; std::map<Subchannel*, int> subchannel_refcount_map_;
// The set of SubchannelWrappers that currently exist. // The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane // No need to hold a ref, since the map is updated in the control-plane
// combiner when the SubchannelWrappers are created and destroyed. // logical_thread when the SubchannelWrappers are created and destroyed.
std::set<SubchannelWrapper*> subchannel_wrappers_; std::set<SubchannelWrapper*> subchannel_wrappers_;
// Pending ConnectedSubchannel updates for each SubchannelWrapper. // Pending ConnectedSubchannel updates for each SubchannelWrapper.
// Updates are queued here in the control plane combiner and then applied // Updates are queued here in the control plane logical_thread and then
// in the data plane mutex when the picker is updated. // applied in the data plane mutex when the picker is updated.
std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>, std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>,
RefCountedPtrLess<SubchannelWrapper>> RefCountedPtrLess<SubchannelWrapper>>
pending_subchannel_updates_; pending_subchannel_updates_;
// //
// Fields accessed from both data plane mutex and control plane combiner. // Fields accessed from both data plane mutex and control plane
// logical_thread.
// //
Atomic<grpc_error*> disconnect_error_; Atomic<grpc_error*> disconnect_error_;
@ -837,7 +838,7 @@ class CallData {
// Note that no synchronization is needed here, because even if the // Note that no synchronization is needed here, because even if the
// underlying subchannel is shared between channels, this wrapper will only // underlying subchannel is shared between channels, this wrapper will only
// be used within one channel, so it will always be synchronized by the // be used within one channel, so it will always be synchronized by the
// control plane combiner. // control plane logical_thread.
class ChannelData::SubchannelWrapper : public SubchannelInterface { class ChannelData::SubchannelWrapper : public SubchannelInterface {
public: public:
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
@ -963,7 +964,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
health_check_service_name_ = std::move(health_check_service_name); health_check_service_name_ = std::move(health_check_service_name);
} }
// Caller must be holding the control-plane combiner. // Caller must be holding the control-plane logical_thread.
ConnectedSubchannel* connected_subchannel() const { ConnectedSubchannel* connected_subchannel() const {
return connected_subchannel_.get(); return connected_subchannel_.get();
} }
@ -1014,7 +1015,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: connectivity change for subchannel wrapper %p " "chand=%p: connectivity change for subchannel wrapper %p "
"subchannel %p (connected_subchannel=%p state=%s); " "subchannel %p (connected_subchannel=%p state=%s); "
"hopping into combiner", "hopping into logical_thread",
parent_->chand_, parent_.get(), parent_->subchannel_, parent_->chand_, parent_.get(), parent_->subchannel_,
connected_subchannel.get(), ConnectivityStateName(new_state)); connected_subchannel.get(), ConnectivityStateName(new_state));
} }
@ -1047,41 +1048,39 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
: parent_(std::move(parent)), : parent_(std::move(parent)),
state_(new_state), state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) { connected_subchannel_(std::move(connected_subchannel)) {
ExecCtx::Run( ExecCtx::Run(DEBUG_LOCATION,
DEBUG_LOCATION, GRPC_CLOSURE_CREATE(
GRPC_CLOSURE_CREATE( [](void* arg, grpc_error* /*error*/) {
[](void* arg, grpc_error* /*error*/) { Updater* self = static_cast<Updater*>(arg);
Updater* self = static_cast<Updater*>(arg); self->parent_->parent_->chand_->logical_thread_->Run(
self->parent_->parent_->chand_->combiner_->Run( [self]() {
[self]() { ApplyUpdateInControlPlaneCombiner(self); }, self->ApplyUpdateInControlPlaneLogicalThread();
DEBUG_LOCATION); },
}, DEBUG_LOCATION);
this, nullptr), },
GRPC_ERROR_NONE); this, nullptr),
GRPC_ERROR_NONE);
} }
private: private:
static void ApplyUpdateInControlPlaneCombiner(void* arg) { void ApplyUpdateInControlPlaneLogicalThread() {
Updater* self = static_cast<Updater*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: processing connectivity change in combiner " "chand=%p: processing connectivity change in logical thread "
"for subchannel wrapper %p subchannel %p " "for subchannel wrapper %p subchannel %p "
"(connected_subchannel=%p state=%s): watcher=%p", "(connected_subchannel=%p state=%s): watcher=%p",
self->parent_->parent_->chand_, self->parent_->parent_.get(), parent_->parent_->chand_, parent_->parent_.get(),
self->parent_->parent_->subchannel_, parent_->parent_->subchannel_, connected_subchannel_.get(),
self->connected_subchannel_.get(), ConnectivityStateName(state_), parent_->watcher_.get());
ConnectivityStateName(self->state_),
self->parent_->watcher_.get());
} }
// Ignore update if the parent WatcherWrapper has been replaced // Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled. // since this callback was scheduled.
if (self->parent_->watcher_ == nullptr) return; if (parent_->watcher_ == nullptr) return;
self->parent_->last_seen_state_ = self->state_; parent_->last_seen_state_ = state_;
self->parent_->parent_->MaybeUpdateConnectedSubchannel( parent_->parent_->MaybeUpdateConnectedSubchannel(
std::move(self->connected_subchannel_)); std::move(connected_subchannel_));
self->parent_->watcher_->OnConnectivityStateChange(self->state_); parent_->watcher_->OnConnectivityStateChange(state_);
delete self; delete this;
} }
RefCountedPtr<WatcherWrapper> parent_; RefCountedPtr<WatcherWrapper> parent_;
@ -1126,7 +1125,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
// CancelConnectivityStateWatch() with its watcher, we know the // CancelConnectivityStateWatch() with its watcher, we know the
// corresponding WrapperWatcher to cancel on the underlying subchannel. // corresponding WrapperWatcher to cancel on the underlying subchannel.
std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_; std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
// To be accessed only in the control plane combiner. // To be accessed only in the control plane logical_thread.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
// To be accessed only in the data plane mutex. // To be accessed only in the data plane mutex.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_; RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
@ -1149,16 +1148,16 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_, grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_); chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
ExecCtx::Run(DEBUG_LOCATION, ExecCtx::Run(
GRPC_CLOSURE_CREATE( DEBUG_LOCATION,
[](void* arg, grpc_error* /*error*/) { GRPC_CLOSURE_CREATE(
auto* self = [](void* arg, grpc_error* /*error*/) {
static_cast<ExternalConnectivityWatcher*>(arg); auto* self = static_cast<ExternalConnectivityWatcher*>(arg);
self->chand_->combiner_->Run( self->chand_->logical_thread_->Run(
[self]() { AddWatcherLocked(self); }, DEBUG_LOCATION); [self]() { self->AddWatcherLocked(); }, DEBUG_LOCATION);
}, },
this, nullptr), this, nullptr),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
@ -1180,12 +1179,12 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
// Report new state to the user. // Report new state to the user.
*state_ = state; *state_ = state;
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE); ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
// Hop back into the combiner to clean up. // Hop back into the logical_thread to clean up.
// Not needed in state SHUTDOWN, because the tracker will // Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case. // automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) { if (state != GRPC_CHANNEL_SHUTDOWN) {
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
} }
@ -1196,23 +1195,20 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
return; // Already done. return; // Already done.
} }
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED); ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up. // Hop back into the logical_thread to clean up.
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
ExternalConnectivityWatcher* self) { Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE);
// Add new watcher. // Add new watcher.
self->chand_->state_tracker_.AddWatcher( chand_->state_tracker_.AddWatcher(
self->initial_state_, initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
OrphanablePtr<ConnectivityStateWatcherInterface>(self));
} }
void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {
ExternalConnectivityWatcher* self) { chand_->state_tracker_.RemoveWatcher(this);
self->chand_->state_tracker_.RemoveWatcher(self);
} }
// //
@ -1228,17 +1224,15 @@ class ChannelData::ConnectivityWatcherAdder {
initial_state_(initial_state), initial_state_(initial_state),
watcher_(std::move(watcher)) { watcher_(std::move(watcher)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
chand_->combiner_->Run([this]() { AddWatcherLocked(this); }, chand_->logical_thread_->Run([this]() { AddWatcherLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
private: private:
static void AddWatcherLocked(ConnectivityWatcherAdder* self) { void AddWatcherLocked() {
self->chand_->state_tracker_.AddWatcher(self->initial_state_, chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
std::move(self->watcher_)); GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, delete this;
"ConnectivityWatcherAdder");
delete self;
} }
ChannelData* chand_; ChannelData* chand_;
@ -1256,16 +1250,16 @@ class ChannelData::ConnectivityWatcherRemover {
AsyncConnectivityStateWatcherInterface* watcher) AsyncConnectivityStateWatcherInterface* watcher)
: chand_(chand), watcher_(watcher) { : chand_(chand), watcher_(watcher) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); }, chand_->logical_thread_->Run([this]() { RemoveWatcherLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
private: private:
static void RemoveWatcherLocked(ConnectivityWatcherRemover* self) { void RemoveWatcherLocked() {
self->chand_->state_tracker_.RemoveWatcher(self->watcher_); chand_->state_tracker_.RemoveWatcher(watcher_);
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
"ConnectivityWatcherRemover"); "ConnectivityWatcherRemover");
delete self; delete this;
} }
ChannelData* chand_; ChannelData* chand_;
@ -1410,7 +1404,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_( client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)), ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)), channelz_node_(GetChannelzNode(args->channel_args)),
combiner_(MakeRefCounted<LogicalThread>()), logical_thread_(MakeRefCounted<LogicalThread>()),
interested_parties_(grpc_pollset_set_create()), interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)), subchannel_pool_(GetSubchannelPool(args->channel_args)),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE), state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
@ -1584,7 +1578,7 @@ void ChannelData::UpdateServiceConfigLocked(
void ChannelData::CreateResolvingLoadBalancingPolicyLocked() { void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
// Instantiate resolving LB policy. // Instantiate resolving LB policy.
LoadBalancingPolicy::Args lb_args; LoadBalancingPolicy::Args lb_args;
lb_args.combiner = combiner_; lb_args.logical_thread = logical_thread_;
lb_args.channel_control_helper = MakeUnique<ClientChannelControlHelper>(this); lb_args.channel_control_helper = MakeUnique<ClientChannelControlHelper>(this);
lb_args.args = channel_args_; lb_args.args = channel_args_;
grpc_core::UniquePtr<char> target_uri(gpr_strdup(target_uri_.get())); grpc_core::UniquePtr<char> target_uri(gpr_strdup(target_uri_.get()));
@ -1805,8 +1799,7 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
return result.error; return result.error;
} }
void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) { void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
grpc_channel_element* elem = grpc_channel_element* elem =
static_cast<grpc_channel_element*>(op->handler_private.extra_arg); static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
@ -1877,15 +1870,11 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
if (op->bind_pollset != nullptr) { if (op->bind_pollset != nullptr) {
grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset); grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
} }
// Pop into control plane combiner for remaining ops. // Pop into control plane logical_thread for remaining ops.
op->handler_private.extra_arg = elem; op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op"); GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
chand->combiner_->Run( chand->logical_thread_->Run(
Closure::ToFunction( [op]() { ChannelData::StartTransportOpLocked(op); }, DEBUG_LOCATION);
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op, nullptr),
GRPC_ERROR_NONE),
DEBUG_LOCATION);
} }
void ChannelData::GetChannelInfo(grpc_channel_element* elem, void ChannelData::GetChannelInfo(grpc_channel_element* elem,
@ -1936,14 +1925,13 @@ ChannelData::GetConnectedSubchannelInDataPlane(
return connected_subchannel->Ref(); return connected_subchannel->Ref();
} }
void ChannelData::TryToConnectLocked(ChannelData* arg) { void ChannelData::TryToConnectLocked() {
auto* chand = static_cast<ChannelData*>(arg); if (resolving_lb_policy_ != nullptr) {
if (chand->resolving_lb_policy_ != nullptr) { resolving_lb_policy_->ExitIdleLocked();
chand->resolving_lb_policy_->ExitIdleLocked();
} else { } else {
chand->CreateResolvingLoadBalancingPolicyLocked(); CreateResolvingLoadBalancingPolicyLocked();
} }
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect"); GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
} }
grpc_connectivity_state ChannelData::CheckConnectivityState( grpc_connectivity_state ChannelData::CheckConnectivityState(
@ -1955,8 +1943,8 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
GRPC_CLOSURE_CREATE( GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) { [](void* arg, grpc_error* /*error*/) {
auto* chand = static_cast<ChannelData*>(arg); auto* chand = static_cast<ChannelData*>(arg);
chand->combiner_->Run( chand->logical_thread_->Run(
[chand]() { TryToConnectLocked(chand); }, [chand]() { chand->TryToConnectLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
}, },
this, nullptr), this, nullptr),
@ -3872,7 +3860,7 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
// The picker being null means that the channel is currently in IDLE state. // The picker being null means that the channel is currently in IDLE state.
// The incoming call will make the channel exit IDLE. // The incoming call will make the channel exit IDLE.
if (chand->picker() == nullptr) { if (chand->picker() == nullptr) {
// Bounce into the control plane combiner to exit IDLE. // Bounce into the control plane logical thread to exit IDLE.
chand->CheckConnectivityState(/*try_to_connect=*/true); chand->CheckConnectivityState(/*try_to_connect=*/true);
// Queue the pick, so that it will be attempted once the channel // Queue the pick, so that it will be attempted once the channel
// becomes connected. // becomes connected.

@ -33,7 +33,7 @@ DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(false, "lb_policy_refcount");
LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount) LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount), : InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount),
combiner_(std::move(args.combiner)), logical_thread_(std::move(args.logical_thread)),
interested_parties_(grpc_pollset_set_create()), interested_parties_(grpc_pollset_set_create()),
channel_control_helper_(std::move(args.channel_control_helper)) {} channel_control_helper_(std::move(args.channel_control_helper)) {}
@ -98,16 +98,17 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// the time this function returns, the pick will already have // the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same // been processed, and we'll be trying to re-process the same
// pick again, leading to a crash. // pick again, leading to a crash.
// 2. We are currently running in the data plane combiner, but we // 2. We are currently running in the data plane logical_thread, but we
// need to bounce into the control plane combiner to call // need to bounce into the control plane logical_thread to call
// ExitIdleLocked(). // ExitIdleLocked().
if (!exit_idle_called_) { if (!exit_idle_called_) {
exit_idle_called_ = true; exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure. auto* parent = parent_->Ref().release(); // ref held by lambda.
parent_->combiner()->Run( parent_->logical_thread()->Run(
Closure::ToFunction( [parent]() {
GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr), parent->ExitIdleLocked();
GRPC_ERROR_NONE), parent->Unref();
},
DEBUG_LOCATION); DEBUG_LOCATION);
} }
PickResult result; PickResult result;
@ -115,13 +116,6 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
return result; return result;
} }
void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg,
grpc_error* /*error*/) {
LoadBalancingPolicy* parent = static_cast<LoadBalancingPolicy*>(arg);
parent->ExitIdleLocked();
parent->Unref();
}
// //
// LoadBalancingPolicy::TransientFailurePicker // LoadBalancingPolicy::TransientFailurePicker
// //

@ -72,7 +72,7 @@ extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
/// LoadBalacingPolicy API. /// LoadBalacingPolicy API.
/// ///
/// Note: All methods with a "Locked" suffix must be called from the /// Note: All methods with a "Locked" suffix must be called from the
/// combiner passed to the constructor. /// logical_thread passed to the constructor.
/// ///
/// Any I/O done by the LB policy should be done under the pollset_set /// Any I/O done by the LB policy should be done under the pollset_set
/// returned by \a interested_parties(). /// returned by \a interested_parties().
@ -242,7 +242,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// live in the LB policy object itself. /// live in the LB policy object itself.
/// ///
/// Currently, pickers are always accessed from within the /// Currently, pickers are always accessed from within the
/// client_channel data plane combiner, so they do not have to be /// client_channel data plane logical_thread, so they do not have to be
/// thread-safe. /// thread-safe.
class SubchannelPicker { class SubchannelPicker {
public: public:
@ -309,9 +309,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Args used to instantiate an LB policy. /// Args used to instantiate an LB policy.
struct Args { struct Args {
/// The combiner under which all LB policy calls will be run. /// The logical_thread under which all LB policy calls will be run.
/// Policy does NOT take ownership of the reference to the combiner. /// Policy does NOT take ownership of the reference to the logical_thread.
RefCountedPtr<LogicalThread> combiner; RefCountedPtr<LogicalThread> logical_thread;
/// Channel control helper. /// Channel control helper.
/// Note: LB policies MUST NOT call any method on the helper from /// Note: LB policies MUST NOT call any method on the helper from
/// their constructor. /// their constructor.
@ -349,7 +349,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_pollset_set* interested_parties() const { return interested_parties_; } grpc_pollset_set* interested_parties() const { return interested_parties_; }
// Note: This must be invoked while holding the combiner. // Note: This must be invoked while holding the logical_thread.
void Orphan() override; void Orphan() override;
// A picker that returns PICK_QUEUE for all picks. // A picker that returns PICK_QUEUE for all picks.
@ -365,8 +365,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
PickResult Pick(PickArgs args) override; PickResult Pick(PickArgs args) override;
private: private:
static void CallExitIdle(void* arg, grpc_error* error);
RefCountedPtr<LoadBalancingPolicy> parent_; RefCountedPtr<LoadBalancingPolicy> parent_;
bool exit_idle_called_ = false; bool exit_idle_called_ = false;
}; };
@ -384,7 +382,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
}; };
protected: protected:
RefCountedPtr<LogicalThread> combiner() const { return combiner_; } RefCountedPtr<LogicalThread> logical_thread() const {
return logical_thread_;
}
// Note: LB policies MUST NOT call any method on the helper from their // Note: LB policies MUST NOT call any method on the helper from their
// constructor. // constructor.
@ -396,8 +396,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual void ShutdownLocked() = 0; virtual void ShutdownLocked() = 0;
private: private:
/// Combiner under which LB policy actions take place. /// Logical Thread under which LB policy actions take place.
RefCountedPtr<LogicalThread> combiner_; RefCountedPtr<LogicalThread> logical_thread_;
/// Owned pointer to interested parties in load balancing decisions. /// Owned pointer to interested parties in load balancing decisions.
grpc_pollset_set* interested_parties_; grpc_pollset_set* interested_parties_;
/// Channel control helper. /// Channel control helper.

@ -251,16 +251,16 @@ class GrpcLb : public LoadBalancingPolicy {
// should not be dropped. // should not be dropped.
// //
// Note: This is called from the picker, so it will be invoked in // Note: This is called from the picker, so it will be invoked in
// the channel's data plane combiner, NOT the control plane // the channel's data plane logical_thread, NOT the control plane
// combiner. It should not be accessed by any other part of the LB // logical_thread. It should not be accessed by any other part of the LB
// policy. // policy.
const char* ShouldDrop(); const char* ShouldDrop();
private: private:
grpc_grpclb_serverlist* serverlist_; grpc_grpclb_serverlist* serverlist_;
// Guarded by the channel's data plane combiner, NOT the control // Guarded by the channel's data plane logical_thread, NOT the control
// plane combiner. It should not be accessed by anything but the // plane logical_thread. It should not be accessed by anything but the
// picker via the ShouldDrop() method. // picker via the ShouldDrop() method.
size_t drop_index_ = 0; size_t drop_index_ = 0;
}; };
@ -911,7 +911,7 @@ void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg, void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg,
grpc_error* error) { grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run( lb_calld->grpclb_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_, GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
MaybeSendClientLoadReportLocked, lb_calld, nullptr), MaybeSendClientLoadReportLocked, lb_calld, nullptr),
@ -998,7 +998,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg, void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
grpc_error* error) { grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run( lb_calld->grpclb_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_, GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
ClientLoadReportDoneLocked, lb_calld, nullptr), ClientLoadReportDoneLocked, lb_calld, nullptr),
@ -1022,7 +1022,7 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg, void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg,
grpc_error* error) { grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run( lb_calld->grpclb_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_initial_request_sent_, GRPC_CLOSURE_INIT(&lb_calld->lb_on_initial_request_sent_,
OnInitialRequestSentLocked, lb_calld, nullptr), OnInitialRequestSentLocked, lb_calld, nullptr),
@ -1048,7 +1048,7 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(
void GrpcLb::BalancerCallState::OnBalancerMessageReceived(void* arg, void GrpcLb::BalancerCallState::OnBalancerMessageReceived(void* arg,
grpc_error* error) { grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run( lb_calld->grpclb_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_, GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_,
OnBalancerMessageReceivedLocked, lb_calld, nullptr), OnBalancerMessageReceivedLocked, lb_calld, nullptr),
@ -1206,7 +1206,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg, void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg,
grpc_error* error) { grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run( lb_calld->grpclb_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_status_received_, GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_status_received_,
OnBalancerStatusReceivedLocked, lb_calld, nullptr), OnBalancerStatusReceivedLocked, lb_calld, nullptr),
@ -1548,7 +1548,7 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg, void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error) { grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg); GrpcLb* self = static_cast<GrpcLb*>(arg);
self->combiner()->Run( self->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, &GrpcLb::OnBalancerChannelConnectivityChangedLocked,
@ -1647,7 +1647,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run( grpclb_policy->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_, GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_,
&GrpcLb::OnBalancerCallRetryTimerLocked, &GrpcLb::OnBalancerCallRetryTimerLocked,
@ -1694,7 +1694,7 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run( grpclb_policy->logical_thread()->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_, Closure::ToFunction(GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_,
&GrpcLb::OnFallbackTimerLocked, &GrpcLb::OnFallbackTimerLocked,
grpclb_policy, nullptr), grpclb_policy, nullptr),
@ -1742,7 +1742,7 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) { const char* name, const grpc_channel_args* args) {
Helper* helper = new Helper(Ref()); Helper* helper = new Helper(Ref());
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner(); lb_policy_args.logical_thread = logical_thread();
lb_policy_args.args = args; lb_policy_args.args = args;
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper); std::unique_ptr<ChannelControlHelper>(helper);

@ -160,7 +160,7 @@ void CdsLb::ClusterWatcher::OnClusterChanged(CdsUpdate cluster_data) {
// Create child policy if not already present. // Create child policy if not already present.
if (parent_->child_policy_ == nullptr) { if (parent_->child_policy_ == nullptr) {
LoadBalancingPolicy::Args args; LoadBalancingPolicy::Args args;
args.combiner = parent_->combiner(); args.logical_thread = parent_->logical_thread();
args.args = parent_->args_; args.args = parent_->args_;
args.channel_control_helper = MakeUnique<Helper>(parent_->Ref()); args.channel_control_helper = MakeUnique<Helper>(parent_->Ref());
parent_->child_policy_ = parent_->child_policy_ =

@ -746,11 +746,11 @@ void XdsLb::ShutdownLocked() {
if (xds_client_from_channel_ != nullptr) { if (xds_client_from_channel_ != nullptr) {
xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()), xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()),
endpoint_watcher_); endpoint_watcher_);
} if (config_->lrs_load_reporting_server_name() != nullptr) {
if (config_->lrs_load_reporting_server_name() != nullptr) { xds_client()->RemoveClientStats(
xds_client()->RemoveClientStats( StringView(config_->lrs_load_reporting_server_name()),
StringView(config_->lrs_load_reporting_server_name()), StringView(eds_service_name()), &client_stats_);
StringView(eds_service_name()), &client_stats_); }
} }
xds_client_from_channel_.reset(); xds_client_from_channel_.reset();
xds_client_.reset(); xds_client_.reset();
@ -799,8 +799,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
if (xds_client_from_channel_ == nullptr) { if (xds_client_from_channel_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>( xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties(), StringView(eds_service_name()), logical_thread(), interested_parties(),
nullptr /* service config watcher */, *args_, &error); StringView(eds_service_name()), nullptr /* service config watcher */,
*args_, &error);
// TODO(roth): If we decide that we care about fallback mode, add // TODO(roth): If we decide that we care about fallback mode, add
// proper error handling here. // proper error handling here.
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
@ -870,7 +871,7 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() {
void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) { void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->combiner()->Run( xdslb_policy->logical_thread()->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_, Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_,
&XdsLb::OnFallbackTimerLocked, &XdsLb::OnFallbackTimerLocked,
xdslb_policy, nullptr), xdslb_policy, nullptr),
@ -1001,7 +1002,7 @@ OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked(
FallbackHelper* helper = FallbackHelper* helper =
new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper")); new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper"));
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner(); lb_policy_args.logical_thread = logical_thread();
lb_policy_args.args = args; lb_policy_args.args = args;
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper); std::unique_ptr<ChannelControlHelper>(helper);
@ -1409,7 +1410,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() {
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer(
void* arg, grpc_error* error) { void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run( self->xds_policy_->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr), OnDelayedRemovalTimerLocked, self, nullptr),
@ -1450,7 +1451,7 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg,
grpc_error* error) { grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run( self->xds_policy_->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked,
self, nullptr), self, nullptr),
@ -1513,7 +1514,7 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) { const char* name, const grpc_channel_args* args) {
Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper"));
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = xds_policy()->combiner(); lb_policy_args.logical_thread = xds_policy()->logical_thread();
lb_policy_args.args = args; lb_policy_args.args = args;
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper); std::unique_ptr<ChannelControlHelper>(helper);
@ -1708,7 +1709,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() {
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer(
void* arg, grpc_error* error) { void* arg, grpc_error* error) {
Locality* self = static_cast<Locality*>(arg); Locality* self = static_cast<Locality*>(arg);
self->xds_policy()->combiner()->Run( self->xds_policy()->logical_thread()->Run(
Closure::ToFunction( Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr), OnDelayedRemovalTimerLocked, self, nullptr),

@ -87,7 +87,7 @@ class Resolver : public InternallyRefCounted<Resolver> {
// Not copyable nor movable. // Not copyable nor movable.
Resolver(const Resolver&) = delete; Resolver(const Resolver&) = delete;
Resolver& operator=(const Resolver&) = delete; Resolver& operator=(const Resolver&) = delete;
virtual ~Resolver() {} virtual ~Resolver() = default;
/// Starts resolving. /// Starts resolving.
virtual void StartLocked() = 0; virtual void StartLocked() = 0;

@ -104,10 +104,10 @@ class GrpcPolledFdWindows {
tcp_write_state_(WRITE_IDLE), tcp_write_state_(WRITE_IDLE),
gotten_into_driver_list_(false), gotten_into_driver_list_(false),
address_family_(address_family), address_family_(address_family),
socket_type_(socket_type) { socket_type_(socket_type),
combiner_(std::move(combiner)) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
winsocket_ = grpc_winsocket_create(as, name_); winsocket_ = grpc_winsocket_create(as, name_);
combiner_ = std::move(combiner);
} }
~GrpcPolledFdWindows() { ~GrpcPolledFdWindows() {
@ -137,7 +137,7 @@ class GrpcPolledFdWindows {
GPR_ASSERT(!read_buf_has_data_); GPR_ASSERT(!read_buf_has_data_);
read_buf_ = GRPC_SLICE_MALLOC(4192); read_buf_ = GRPC_SLICE_MALLOC(4192);
if (connect_done_) { if (connect_done_) {
combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); }, combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} else { } else {
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false); GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
@ -145,12 +145,7 @@ class GrpcPolledFdWindows {
} }
} }
static void ContinueRegisterForOnReadableLocked( void ContinueRegisterForOnReadableLocked() {
GrpcPolledFdWindows* grpc_polled_fd) {
grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE);
}
void InnerContinueRegisterForOnReadableLocked(grpc_error* unused_error) {
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"fd:|%s| InnerContinueRegisterForOnReadableLocked " "fd:|%s| InnerContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d", "wsa_connect_error_:%d",
@ -202,7 +197,7 @@ class GrpcPolledFdWindows {
GPR_ASSERT(write_closure_ == nullptr); GPR_ASSERT(write_closure_ == nullptr);
write_closure_ = write_closure; write_closure_ = write_closure;
if (connect_done_) { if (connect_done_) {
combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); }, combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} else { } else {
GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false); GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false);
@ -210,12 +205,7 @@ class GrpcPolledFdWindows {
} }
} }
static void ContinueRegisterForOnWriteableLocked( void ContinueRegisterForOnWriteableLocked() {
GrpcPolledFdWindows* grpc_polled_fd) {
grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE);
}
void InnerContinueRegisterForOnWriteableLocked(grpc_error* unused_error) {
GRPC_CARES_TRACE_LOG( GRPC_CARES_TRACE_LOG(
"fd:|%s| InnerContinueRegisterForOnWriteableLocked " "fd:|%s| InnerContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d", "wsa_connect_error_:%d",
@ -475,11 +465,11 @@ class GrpcPolledFdWindows {
wsa_connect_error_ = WSA_OPERATION_ABORTED; wsa_connect_error_ = WSA_OPERATION_ABORTED;
} }
if (pending_continue_register_for_on_readable_locked_) { if (pending_continue_register_for_on_readable_locked_) {
combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); }, combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
if (pending_continue_register_for_on_writeable_locked_) { if (pending_continue_register_for_on_writeable_locked_) {
combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); }, combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
} }

@ -172,22 +172,14 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {} FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
struct SetResponseClosureArg { void FakeResolverResponseGenerator::SetResponseLocked(SetResponseArg* arg) {
RefCountedPtr<FakeResolver> resolver; auto& resolver = arg->resolver;
Resolver::Result result;
bool has_result = false;
bool immediate = true;
};
void FakeResolverResponseGenerator::SetResponseLocked(void* arg) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
auto& resolver = closure_arg->resolver;
if (!resolver->shutdown_) { if (!resolver->shutdown_) {
resolver->next_result_ = std::move(closure_arg->result); resolver->next_result_ = std::move(arg->result);
resolver->has_next_result_ = true; resolver->has_next_result_ = true;
resolver->MaybeSendResultLocked(); resolver->MaybeSendResultLocked();
} }
delete closure_arg; delete arg;
} }
void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
@ -201,21 +193,21 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
} }
resolver = resolver_->Ref(); resolver = resolver_->Ref();
} }
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = std::move(resolver); arg->resolver = std::move(resolver);
closure_arg->result = std::move(result); arg->result = std::move(result);
closure_arg->resolver->combiner()->Run( arg->resolver->combiner()->Run([arg]() { SetResponseLocked(arg); },
[closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION); DEBUG_LOCATION);
} }
void FakeResolverResponseGenerator::SetReresolutionResponseLocked(void* arg) { void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); SetResponseArg* arg) {
auto& resolver = closure_arg->resolver; auto& resolver = arg->resolver;
if (!resolver->shutdown_) { if (!resolver->shutdown_) {
resolver->reresolution_result_ = std::move(closure_arg->result); resolver->reresolution_result_ = std::move(arg->result);
resolver->has_reresolution_result_ = closure_arg->has_result; resolver->has_reresolution_result_ = arg->has_result;
} }
delete closure_arg; delete arg;
} }
void FakeResolverResponseGenerator::SetReresolutionResponse( void FakeResolverResponseGenerator::SetReresolutionResponse(
@ -226,13 +218,12 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
GPR_ASSERT(resolver_ != nullptr); GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref(); resolver = resolver_->Ref();
} }
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = std::move(resolver); arg->resolver = std::move(resolver);
closure_arg->result = std::move(result); arg->result = std::move(result);
closure_arg->has_result = true; arg->has_result = true;
closure_arg->resolver->combiner()->Run( arg->resolver->combiner()->Run(
[closure_arg]() { SetReresolutionResponseLocked(closure_arg); }, [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION);
DEBUG_LOCATION);
} }
void FakeResolverResponseGenerator::UnsetReresolutionResponse() { void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
@ -242,21 +233,19 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
GPR_ASSERT(resolver_ != nullptr); GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref(); resolver = resolver_->Ref();
} }
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = std::move(resolver); arg->resolver = std::move(resolver);
closure_arg->resolver->combiner()->Run( arg->resolver->combiner()->Run(
[closure_arg]() { SetReresolutionResponseLocked(closure_arg); }, [arg]() { SetReresolutionResponseLocked(arg); }, DEBUG_LOCATION);
DEBUG_LOCATION);
} }
void FakeResolverResponseGenerator::SetFailureLocked(void* arg) { void FakeResolverResponseGenerator::SetFailureLocked(SetResponseArg* arg) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); auto& resolver = arg->resolver;
auto& resolver = closure_arg->resolver;
if (!resolver->shutdown_) { if (!resolver->shutdown_) {
resolver->return_failure_ = true; resolver->return_failure_ = true;
if (closure_arg->immediate) resolver->MaybeSendResultLocked(); if (arg->immediate) resolver->MaybeSendResultLocked();
} }
delete closure_arg; delete arg;
} }
void FakeResolverResponseGenerator::SetFailure() { void FakeResolverResponseGenerator::SetFailure() {
@ -266,10 +255,10 @@ void FakeResolverResponseGenerator::SetFailure() {
GPR_ASSERT(resolver_ != nullptr); GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref(); resolver = resolver_->Ref();
} }
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = std::move(resolver); arg->resolver = std::move(resolver);
closure_arg->resolver->combiner()->Run( arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); },
[closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION); DEBUG_LOCATION);
} }
void FakeResolverResponseGenerator::SetFailureOnReresolution() { void FakeResolverResponseGenerator::SetFailureOnReresolution() {
@ -279,11 +268,11 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() {
GPR_ASSERT(resolver_ != nullptr); GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref(); resolver = resolver_->Ref();
} }
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = std::move(resolver); arg->resolver = std::move(resolver);
closure_arg->immediate = false; arg->immediate = false;
closure_arg->resolver->combiner()->Run( arg->resolver->combiner()->Run([arg]() { SetFailureLocked(arg); },
[closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION); DEBUG_LOCATION);
} }
void FakeResolverResponseGenerator::SetFakeResolver( void FakeResolverResponseGenerator::SetFakeResolver(
@ -292,11 +281,11 @@ void FakeResolverResponseGenerator::SetFakeResolver(
resolver_ = std::move(resolver); resolver_ = std::move(resolver);
if (resolver_ == nullptr) return; if (resolver_ == nullptr) return;
if (has_result_) { if (has_result_) {
SetResponseClosureArg* closure_arg = new SetResponseClosureArg(); SetResponseArg* arg = new SetResponseArg();
closure_arg->resolver = resolver_->Ref(); arg->resolver = resolver_->Ref();
closure_arg->result = std::move(result_); arg->result = std::move(result_);
resolver_->combiner()->Run( resolver_->combiner()->Run([arg]() { SetResponseLocked(arg); },
[closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION); DEBUG_LOCATION);
has_result_ = false; has_result_ = false;
} }
} }

@ -80,9 +80,15 @@ class FakeResolverResponseGenerator
// Set the corresponding FakeResolver to this generator. // Set the corresponding FakeResolver to this generator.
void SetFakeResolver(RefCountedPtr<FakeResolver> resolver); void SetFakeResolver(RefCountedPtr<FakeResolver> resolver);
static void SetResponseLocked(void* arg); struct SetResponseArg {
static void SetReresolutionResponseLocked(void* arg); RefCountedPtr<FakeResolver> resolver;
static void SetFailureLocked(void* arg); Resolver::Result result;
bool has_result = false;
bool immediate = true;
};
static void SetResponseLocked(SetResponseArg* arg);
static void SetReresolutionResponseLocked(SetResponseArg* arg);
static void SetFailureLocked(SetResponseArg* arg);
// Mutex protecting the members below. // Mutex protecting the members below.
Mutex mu_; Mutex mu_;

@ -39,7 +39,7 @@ struct ResolverArgs {
/// Used to drive I/O in the name resolution process. /// Used to drive I/O in the name resolution process.
grpc_pollset_set* pollset_set = nullptr; grpc_pollset_set* pollset_set = nullptr;
/// The combiner under which all resolver calls will be run. /// The combiner under which all resolver calls will be run.
RefCountedPtr<LogicalThread> combiner = nullptr; RefCountedPtr<LogicalThread> combiner;
/// The result handler to be used by the resolver. /// The result handler to be used by the resolver.
std::unique_ptr<Resolver::ResultHandler> result_handler; std::unique_ptr<Resolver::ResultHandler> result_handler;
}; };

@ -187,7 +187,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
process_resolver_result_user_data_(process_resolver_result_user_data) { process_resolver_result_user_data_(process_resolver_result_user_data) {
GPR_ASSERT(process_resolver_result != nullptr); GPR_ASSERT(process_resolver_result != nullptr);
resolver_ = ResolverRegistry::CreateResolver( resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), args.args, interested_parties(), combiner(), target_uri_.get(), args.args, interested_parties(), logical_thread(),
MakeUnique<ResolverResultHandler>(Ref())); MakeUnique<ResolverResultHandler>(Ref()));
// Since the validity of args has been checked when create the channel, // Since the validity of args has been checked when create the channel,
// CreateResolver() must return a non-null result. // CreateResolver() must return a non-null result.
@ -373,7 +373,7 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
TraceStringVector* trace_strings) { TraceStringVector* trace_strings) {
ResolvingControlHelper* helper = new ResolvingControlHelper(Ref()); ResolvingControlHelper* helper = new ResolvingControlHelper(Ref());
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner(); lb_policy_args.logical_thread = logical_thread();
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper); std::unique_ptr<ChannelControlHelper>(helper);
lb_policy_args.args = &args; lb_policy_args.args = &args;

@ -49,6 +49,11 @@ void LogicalThreadImpl::Run(std::function<void()> callback,
callback(); callback();
// Loan this thread to the logical thread and drain the queue. // Loan this thread to the logical thread and drain the queue.
DrainQueue(); DrainQueue();
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the logical thread. In that case, delete the object.
if (orphaned_.Load(MemoryOrder::ACQUIRE)) {
delete this;
}
} else { } else {
CallbackWrapper* cb_wrapper = CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location); new CallbackWrapper(std::move(callback), location);
@ -62,15 +67,11 @@ void LogicalThreadImpl::Run(std::function<void()> callback,
} }
void LogicalThreadImpl::Orphan() { void LogicalThreadImpl::Orphan() {
ExecCtx::Run(DEBUG_LOCATION, if (size_.Load(MemoryOrder::ACQUIRE) == 0) {
GRPC_CLOSURE_CREATE( delete this;
[](void* arg, grpc_error* /*error*/) { } else {
LogicalThreadImpl* self = orphaned_.Store(true, MemoryOrder::RELEASE);
static_cast<LogicalThreadImpl*>(arg); }
delete self;
},
this, nullptr),
GRPC_ERROR_NONE);
} }
// The thread that calls this loans itself to the logical thread so as to // The thread that calls this loans itself to the logical thread so as to
@ -83,7 +84,6 @@ void LogicalThreadImpl::DrainQueue() {
gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this); gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this);
} }
size_t prev_size = size_.FetchSub(1); size_t prev_size = size_.FetchSub(1);
// prev_size should be atleast 1 since
GPR_DEBUG_ASSERT(prev_size >= 1); GPR_DEBUG_ASSERT(prev_size >= 1);
if (prev_size == 1) { if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {

@ -45,6 +45,7 @@ class LogicalThreadImpl : public Orphanable {
void DrainQueue(); void DrainQueue();
Atomic<size_t> size_{0}; Atomic<size_t> size_{0};
Atomic<bool> orphaned_{false};
MultiProducerSingleConsumerQueue queue_; MultiProducerSingleConsumerQueue queue_;
}; };

@ -55,7 +55,7 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
const std::string& delegate_policy_name, intptr_t initial_refcount = 1) const std::string& delegate_policy_name, intptr_t initial_refcount = 1)
: LoadBalancingPolicy(std::move(args), initial_refcount) { : LoadBalancingPolicy(std::move(args), initial_refcount) {
Args delegate_args; Args delegate_args;
delegate_args.combiner = combiner(); delegate_args.logical_thread = logical_thread();
delegate_args.channel_control_helper = std::move(delegating_helper); delegate_args.channel_control_helper = std::move(delegating_helper);
delegate_args.args = args.args; delegate_args.args = args.args;
delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(

Loading…
Cancel
Save