New combiner

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent fb8d6c2ee3
commit 3d363368ca
  1. 31
      src/core/ext/filters/client_channel/client_channel.cc
  2. 91
      src/core/lib/iomgr/combiner.cc
  3. 37
      src/core/lib/iomgr/combiner.h

@ -1035,10 +1035,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
: parent_(std::move(parent)),
state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) {
GRPC_CLOSURE_INIT(
&closure_, ApplyUpdateInControlPlaneCombiner, this,
grpc_combiner_scheduler(parent_->parent_->chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
parent_->parent_->chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&closure_, ApplyUpdateInControlPlaneCombiner,
this, nullptr),
GRPC_ERROR_NONE);
}
private:
@ -1131,9 +1131,8 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -1160,9 +1159,8 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
}
@ -1175,9 +1173,8 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
}
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -1811,10 +1808,9 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
// Pop into control plane combiner for remaining ops.
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(
chand->combiner_->Exec(
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op,
grpc_combiner_scheduler(chand->combiner_)),
ChannelData::StartTransportOpLocked, op, nullptr),
GRPC_ERROR_NONE);
}
@ -1881,8 +1877,7 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
grpc_connectivity_state out = state_tracker_.state();
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
grpc_combiner_scheduler(combiner_)),
combiner_->Exec(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, nullptr),
GRPC_ERROR_NONE);
}
return out;

@ -45,42 +45,19 @@ grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
#define STATE_UNORPHANED 1
#define STATE_ELEM_COUNT_LOW_BIT 2
struct grpc_combiner {
grpc_combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
static void combiner_run(grpc_closure* closure, grpc_error* error);
static void combiner_exec(grpc_closure* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
static const grpc_closure_scheduler_vtable scheduler = {
combiner_run, combiner_exec, "combiner:immediately"};
static const grpc_closure_scheduler_vtable finally_scheduler = {
combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
namespace grpc_core {
static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_error* error);
static void combiner_exec(Combiner* lock, grpc_closure* closure,
grpc_error* error);
static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
grpc_error* error);
static void offload(void* arg, grpc_error* error);
grpc_combiner* grpc_combiner_create(void) {
grpc_combiner* lock = grpc_core::New<grpc_combiner>();
Combiner* grpc_combiner_create(void) {
Combiner* lock = grpc_core::New<Combiner>();
gpr_ref_init(&lock->refs, 1);
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(
@ -90,13 +67,13 @@ grpc_combiner* grpc_combiner_create(void) {
return lock;
}
static void really_destroy(grpc_combiner* lock) {
static void really_destroy(Combiner* lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
grpc_core::Delete(lock);
}
static void start_destroy(grpc_combiner* lock) {
static void start_destroy(Combiner* lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@ -117,20 +94,20 @@ static void start_destroy(grpc_combiner* lock) {
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif
void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
void grpc_combiner_unref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) {
start_destroy(lock);
}
}
grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
Combiner* grpc_combiner_ref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
gpr_ref(&lock->refs);
return lock;
}
static void push_last_on_exec_ctx(grpc_combiner* lock) {
static void push_last_on_exec_ctx(Combiner* lock) {
lock->next_combiner_on_this_exec_ctx = nullptr;
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
@ -143,7 +120,7 @@ static void push_last_on_exec_ctx(grpc_combiner* lock) {
}
}
static void push_first_on_exec_ctx(grpc_combiner* lock) {
static void push_first_on_exec_ctx(Combiner* lock) {
lock->next_combiner_on_this_exec_ctx =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
@ -152,14 +129,9 @@ static void push_first_on_exec_ctx(grpc_combiner* lock) {
}
}
#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
((grpc_combiner*)(((char*)((closure)->scheduler)) - \
offsetof(grpc_combiner, scheduler_name)))
static void combiner_exec(grpc_closure* cl, grpc_error* error) {
static void combiner_exec(Combiner* lock, grpc_closure* cl, grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS();
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO,
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
@ -198,11 +170,11 @@ static void move_next() {
}
static void offload(void* arg, grpc_error* error) {
grpc_combiner* lock = static_cast<grpc_combiner*>(arg);
Combiner* lock = static_cast<Combiner*>(arg);
push_last_on_exec_ctx(lock);
}
static void queue_offload(grpc_combiner* lock) {
static void queue_offload(Combiner* lock) {
GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
@ -211,8 +183,7 @@ static void queue_offload(grpc_combiner* lock) {
bool grpc_combiner_continue_exec_ctx() {
GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0);
grpc_combiner* lock =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
if (lock == nullptr) {
return false;
}
@ -329,19 +300,17 @@ bool grpc_combiner_continue_exec_ctx() {
static void enqueue_finally(void* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute_finally", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS();
grpc_combiner* lock =
COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure,
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure,
grpc_combiner_scheduler(lock)),
error);
grpc_core::Combiner::Exec(
lock, GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
return;
}
@ -351,8 +320,8 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
grpc_closure_list_append(&lock->final_list, closure, error);
}
static void combiner_run(grpc_closure* closure, grpc_error* error) {
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler);
static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_error* error) {
#ifndef NDEBUG
closure->scheduled = false;
GRPC_COMBINER_TRACE(gpr_log(
@ -372,11 +341,11 @@ static void enqueue_finally(void* closure, grpc_error* error) {
GRPC_ERROR_REF(error));
}
grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* combiner) {
return &combiner->scheduler;
static void Combiner::Exec(grpc_closure* closure, grpc_error* error) {
combiner_exec(combiner, closure, error);
}
grpc_closure_scheduler* grpc_combiner_finally_scheduler(
grpc_combiner* combiner) {
return &combiner->finally_scheduler;
static void Combiner::FinallyExec(grpc_closure* closure, grpc_error* error) {
combiner_finally_exec(combiner, closure, exec);
}
} // namespace grpc_core

@ -34,7 +34,7 @@
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
grpc_combiner* grpc_combiner_create(void);
grpc_core::Combiner* grpc_combiner_create(void);
#ifndef NDEBUG
#define GRPC_COMBINER_DEBUG_ARGS \
@ -51,15 +51,38 @@ grpc_combiner* grpc_combiner_create(void);
// Ref/unref the lock, for when we're sharing the lock ownership
// Prefer to use the macros above
grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against
grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* lock);
// Scheduler to execute \a action within the lock just prior to unlocking.
grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock);
grpc_core::Combiner* grpc_combiner_ref(
grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS);
bool grpc_combiner_continue_exec_ctx();
extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace;
namespace grpc_core {
class Combiner {
public:
static void Exec(grpc_closure* closure, grpc_error* error);
static void FinallyExec(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */

Loading…
Cancel
Save