New combiner

reviewable/pr20542/r1
Yash Tibrewal 6 years ago
parent fb8d6c2ee3
commit 3d363368ca
  1. 31
      src/core/ext/filters/client_channel/client_channel.cc
  2. 91
      src/core/lib/iomgr/combiner.cc
  3. 37
      src/core/lib/iomgr/combiner.h

@ -1035,10 +1035,10 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
: parent_(std::move(parent)), : parent_(std::move(parent)),
state_(new_state), state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) { connected_subchannel_(std::move(connected_subchannel)) {
GRPC_CLOSURE_INIT( parent_->parent_->chand_->combiner_->Exec(
&closure_, ApplyUpdateInControlPlaneCombiner, this, GRPC_CLOSURE_INIT(&closure_, ApplyUpdateInControlPlaneCombiner,
grpc_combiner_scheduler(parent_->parent_->chand_->combiner_)); this, nullptr),
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
private: private:
@ -1131,9 +1131,8 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_, grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_); chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED( chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, nullptr),
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -1160,9 +1159,8 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
// Not needed in state SHUTDOWN, because the tracker will // Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case. // automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) { if (state != GRPC_CHANNEL_SHUTDOWN) {
GRPC_CLOSURE_SCHED( chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
} }
@ -1175,9 +1173,8 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
} }
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED); GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up. // Hop back into the combiner to clean up.
GRPC_CLOSURE_SCHED( chand_->combiner_->Exec(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -1811,10 +1808,9 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
// Pop into control plane combiner for remaining ops. // Pop into control plane combiner for remaining ops.
op->handler_private.extra_arg = elem; op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op"); GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED( chand->combiner_->Exec(
GRPC_CLOSURE_INIT(&op->handler_private.closure, GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op, ChannelData::StartTransportOpLocked, op, nullptr),
grpc_combiner_scheduler(chand->combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -1881,8 +1877,7 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
grpc_connectivity_state out = state_tracker_.state(); grpc_connectivity_state out = state_tracker_.state();
if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, combiner_->Exec(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, nullptr),
grpc_combiner_scheduler(combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
return out; return out;

@ -45,42 +45,19 @@ grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
#define STATE_UNORPHANED 1 #define STATE_UNORPHANED 1
#define STATE_ELEM_COUNT_LOW_BIT 2 #define STATE_ELEM_COUNT_LOW_BIT 2
struct grpc_combiner { namespace grpc_core {
grpc_combiner* next_combiner_on_this_exec_ctx = nullptr; static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_closure_scheduler scheduler; grpc_error* error);
grpc_closure_scheduler finally_scheduler; static void combiner_exec(Combiner* lock, grpc_closure* closure,
grpc_core::MultiProducerSingleConsumerQueue queue; grpc_error* error);
// either: static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
// a pointer to the initiating exec ctx if that is the only exec_ctx that has grpc_error* error);
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
static void combiner_run(grpc_closure* closure, grpc_error* error);
static void combiner_exec(grpc_closure* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
static const grpc_closure_scheduler_vtable scheduler = {
combiner_run, combiner_exec, "combiner:immediately"};
static const grpc_closure_scheduler_vtable finally_scheduler = {
combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
static void offload(void* arg, grpc_error* error); static void offload(void* arg, grpc_error* error);
grpc_combiner* grpc_combiner_create(void) { Combiner* grpc_combiner_create(void) {
grpc_combiner* lock = grpc_core::New<grpc_combiner>(); Combiner* lock = grpc_core::New<Combiner>();
gpr_ref_init(&lock->refs, 1); gpr_ref_init(&lock->refs, 1);
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
grpc_closure_list_init(&lock->final_list); grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
@ -90,13 +67,13 @@ grpc_combiner* grpc_combiner_create(void) {
return lock; return lock;
} }
static void really_destroy(grpc_combiner* lock) { static void really_destroy(Combiner* lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
grpc_core::Delete(lock); grpc_core::Delete(lock);
} }
static void start_destroy(grpc_combiner* lock) { static void start_destroy(Combiner* lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@ -117,20 +94,20 @@ static void start_destroy(grpc_combiner* lock) {
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) #define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif #endif
void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { void grpc_combiner_unref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) { if (gpr_unref(&lock->refs)) {
start_destroy(lock); start_destroy(lock);
} }
} }
grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { Combiner* grpc_combiner_ref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM(" REF", 1); GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
gpr_ref(&lock->refs); gpr_ref(&lock->refs);
return lock; return lock;
} }
static void push_last_on_exec_ctx(grpc_combiner* lock) { static void push_last_on_exec_ctx(Combiner* lock) {
lock->next_combiner_on_this_exec_ctx = nullptr; lock->next_combiner_on_this_exec_ctx = nullptr;
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
@ -143,7 +120,7 @@ static void push_last_on_exec_ctx(grpc_combiner* lock) {
} }
} }
static void push_first_on_exec_ctx(grpc_combiner* lock) { static void push_first_on_exec_ctx(Combiner* lock) {
lock->next_combiner_on_this_exec_ctx = lock->next_combiner_on_this_exec_ctx =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock; grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
@ -152,14 +129,9 @@ static void push_first_on_exec_ctx(grpc_combiner* lock) {
} }
} }
#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ static void combiner_exec(Combiner* lock, grpc_closure* cl, grpc_error* error) {
((grpc_combiner*)(((char*)((closure)->scheduler)) - \
offsetof(grpc_combiner, scheduler_name)))
static void combiner_exec(grpc_closure* cl, grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute", 0); GPR_TIMER_SCOPE("combiner.execute", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS();
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, GRPC_COMBINER_TRACE(gpr_log(GPR_INFO,
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
@ -198,11 +170,11 @@ static void move_next() {
} }
static void offload(void* arg, grpc_error* error) { static void offload(void* arg, grpc_error* error) {
grpc_combiner* lock = static_cast<grpc_combiner*>(arg); Combiner* lock = static_cast<Combiner*>(arg);
push_last_on_exec_ctx(lock); push_last_on_exec_ctx(lock);
} }
static void queue_offload(grpc_combiner* lock) { static void queue_offload(Combiner* lock) {
GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
move_next(); move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
@ -211,8 +183,7 @@ static void queue_offload(grpc_combiner* lock) {
bool grpc_combiner_continue_exec_ctx() { bool grpc_combiner_continue_exec_ctx() {
GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0); GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0);
grpc_combiner* lock = Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
if (lock == nullptr) { if (lock == nullptr) {
return false; return false;
} }
@ -329,19 +300,17 @@ bool grpc_combiner_continue_exec_ctx() {
static void enqueue_finally(void* closure, grpc_error* error); static void enqueue_finally(void* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute_finally", 0); GPR_TIMER_SCOPE("combiner.execute_finally", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS();
grpc_combiner* lock =
COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure, GPR_INFO, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure,
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0); GPR_TIMER_MARK("slowpath", 0);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_core::Combiner::Exec(
grpc_combiner_scheduler(lock)), lock, GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
error);
return; return;
} }
@ -351,8 +320,8 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
grpc_closure_list_append(&lock->final_list, closure, error); grpc_closure_list_append(&lock->final_list, closure, error);
} }
static void combiner_run(grpc_closure* closure, grpc_error* error) { static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); grpc_error* error) {
#ifndef NDEBUG #ifndef NDEBUG
closure->scheduled = false; closure->scheduled = false;
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
@ -372,11 +341,11 @@ static void enqueue_finally(void* closure, grpc_error* error) {
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* combiner) { static void Combiner::Exec(grpc_closure* closure, grpc_error* error) {
return &combiner->scheduler; combiner_exec(combiner, closure, error);
} }
grpc_closure_scheduler* grpc_combiner_finally_scheduler( static void Combiner::FinallyExec(grpc_closure* closure, grpc_error* error) {
grpc_combiner* combiner) { combiner_finally_exec(combiner, closure, exec);
return &combiner->finally_scheduler;
} }
} // namespace grpc_core

@ -34,7 +34,7 @@
// Initialize the lock, with an optional workqueue to shift load to when // Initialize the lock, with an optional workqueue to shift load to when
// necessary // necessary
grpc_combiner* grpc_combiner_create(void); grpc_core::Combiner* grpc_combiner_create(void);
#ifndef NDEBUG #ifndef NDEBUG
#define GRPC_COMBINER_DEBUG_ARGS \ #define GRPC_COMBINER_DEBUG_ARGS \
@ -51,15 +51,38 @@ grpc_combiner* grpc_combiner_create(void);
// Ref/unref the lock, for when we're sharing the lock ownership // Ref/unref the lock, for when we're sharing the lock ownership
// Prefer to use the macros above // Prefer to use the macros above
grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS); grpc_core::Combiner* grpc_combiner_ref(
void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS); grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS);
grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* lock);
// Scheduler to execute \a action within the lock just prior to unlocking.
grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock);
bool grpc_combiner_continue_exec_ctx(); bool grpc_combiner_continue_exec_ctx();
extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace; extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace;
namespace grpc_core {
class Combiner {
public:
static void Exec(grpc_closure* closure, grpc_error* error);
static void FinallyExec(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */

Loading…
Cancel
Save