Merge combiner and exec_ctx execution better

Allows exec_ctx callbacks to be called while a combiner is executing.
Also allows guaranteeing direct execution of callbacks from combiners,
which should allow reducing cpu burn for up/down stack interactions in
the future.
pull/7856/head
Craig Tiller 9 years ago
parent 8ef0eda856
commit dfd3a8f7a5
  1. 59
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 19
      src/core/ext/transport/chttp2/transport/internal.h
  3. 2
      src/core/ext/transport/chttp2/transport/stream_lists.c
  4. 278
      src/core/lib/iomgr/combiner.c
  5. 2
      src/core/lib/iomgr/combiner.h
  6. 33
      src/core/lib/iomgr/exec_ctx.c
  7. 3
      src/core/lib/iomgr/exec_ctx.h
  8. 5
      src/core/lib/profiling/timers.h
  9. 25
      src/core/lib/transport/transport.c
  10. 4
      src/core/lib/transport/transport.h
  11. 7
      test/core/end2end/tests/filter_causes_close.c

@ -179,33 +179,30 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t); gpr_free(t);
} }
/*#define REFCOUNTING_DEBUG 1*/ #ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG
#ifdef REFCOUNTING_DEBUG void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) grpc_chttp2_transport *t, const char *reason,
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) const char *file, int line) {
static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
const char *reason, const char *file, int line) { t->refs.count, t->refs.count - 1, reason, file, line);
gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
t->refs.count - 1, reason, file, line);
if (!gpr_unref(&t->refs)) return; if (!gpr_unref(&t->refs)) return;
destruct_transport(exec_ctx, t); destruct_transport(exec_ctx, t);
} }
static void ref_transport(grpc_chttp2_transport *t, const char *reason, void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line) { const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count, gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
t->refs.count + 1, reason, file, line); t->refs.count, t->refs.count + 1, reason, file, line);
gpr_ref(&t->refs); gpr_ref(&t->refs);
} }
#else #else
#define REF_TRANSPORT(t, r) ref_transport(t) void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t) grpc_chttp2_transport *t) {
static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
if (!gpr_unref(&t->refs)) return; if (!gpr_unref(&t->refs)) return;
destruct_transport(exec_ctx, t); destruct_transport(exec_ctx, t);
} }
static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif #endif
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -392,7 +389,7 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
t->destroying = 1; t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
UNREF_TRANSPORT(exec_ctx, t, "destroy"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy");
} }
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
@ -482,7 +479,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_slice_buffer_init(&s->writing.flow_controlled_buffer); gpr_slice_buffer_init(&s->writing.flow_controlled_buffer);
s->global.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); s->global.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
REF_TRANSPORT(t, "stream"); GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
if (server_data) { if (server_data) {
s->global.id = (uint32_t)(uintptr_t)server_data; s->global.id = (uint32_t)(uintptr_t)server_data;
@ -547,7 +544,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->global.read_closed_error); GRPC_ERROR_UNREF(s->global.read_closed_error);
GRPC_ERROR_UNREF(s->global.write_closed_error); GRPC_ERROR_UNREF(s->global.write_closed_error);
UNREF_TRANSPORT(exec_ctx, t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0); GPR_TIMER_END("destroy_stream", 0);
@ -632,6 +629,7 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
t->executor.check_read_ops_scheduled = false; t->executor.check_read_ops_scheduled = false;
check_read_ops(exec_ctx, &t->global); check_read_ops(exec_ctx, &t->global);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "initiate_read_flush_locked");
} }
/******************************************************************************* /*******************************************************************************
@ -667,7 +665,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
break; break;
case GRPC_CHTTP2_WRITING_INACTIVE: case GRPC_CHTTP2_WRITING_INACTIVE:
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason); set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
&t->initiate_writing, GRPC_ERROR_NONE, &t->initiate_writing, GRPC_ERROR_NONE,
covered_by_poller); covered_by_poller);
@ -714,7 +712,7 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
"start_writing:nothing_to_write"); "start_writing:nothing_to_write");
} }
end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE); end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE);
UNREF_TRANSPORT(exec_ctx, t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
} }
GPR_TIMER_END("start_writing", 0); GPR_TIMER_END("start_writing", 0);
} }
@ -787,7 +785,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0); GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
&t->initiate_writing, GRPC_ERROR_NONE, &t->initiate_writing, GRPC_ERROR_NONE,
true); true);
@ -795,14 +793,14 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0); GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
&t->initiate_writing, GRPC_ERROR_NONE, &t->initiate_writing, GRPC_ERROR_NONE,
false); false);
break; break;
} }
UNREF_TRANSPORT(exec_ctx, t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0); GPR_TIMER_END("terminate_writing_with_lock", 0);
} }
@ -1261,7 +1259,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
UNREF_TRANSPORT(exec_ctx, t, "transport_op"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op");
} }
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
@ -1270,7 +1268,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op->transport_private.args[0] = gt; op->transport_private.args[0] = gt;
grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
op); op);
REF_TRANSPORT(t, "transport_op"); GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
grpc_combiner_execute(exec_ctx, t->executor.combiner, grpc_combiner_execute(exec_ctx, t->executor.combiner,
&op->transport_private.closure, GRPC_ERROR_NONE); &op->transport_private.closure, GRPC_ERROR_NONE);
} }
@ -1864,7 +1862,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
} }
} else if (!t->closed) { } else if (!t->closed) {
keep_reading = true; keep_reading = true;
REF_TRANSPORT(t, "keep_reading"); GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t); prevent_endpoint_shutdown(t);
} }
gpr_slice_buffer_reset_and_unref(&t->read_buffer); gpr_slice_buffer_reset_and_unref(&t->read_buffer);
@ -1872,9 +1870,9 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) { if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
allow_endpoint_shutdown_locked(exec_ctx, t); allow_endpoint_shutdown_locked(exec_ctx, t);
UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else { } else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
} }
GPR_TIMER_END("post_reading_action_locked", 0); GPR_TIMER_END("post_reading_action_locked", 0);
@ -2247,7 +2245,8 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_transport *transport,
gpr_slice_buffer *read_buffer) { gpr_slice_buffer *read_buffer) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */ GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
if (read_buffer != NULL) { if (read_buffer != NULL) {
gpr_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer); gpr_free(read_buffer);

@ -726,6 +726,25 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
#endif #endif
//#define GRPC_CHTTP2_REFCOUNTING_DEBUG 1
#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG
#define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
#define GRPC_CHTTP2_UNREF_TRANSPORT(cl, t, r) \
grpc_chttp2_unref_transport(cl, t, r, __FILE__, __LINE__)
void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason,
const char *file, int line);
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line);
#else
#define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
#define GRPC_CHTTP2_UNREF_TRANSPORT(cl, t, r) grpc_chttp2_unref_transport(cl, t)
void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, uint32_t frame_size, grpc_chttp2_stream_global *stream_global, uint32_t frame_size,

@ -245,6 +245,8 @@ void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
if (!t->executor.check_read_ops_scheduled) { if (!t->executor.check_read_ops_scheduled) {
GRPC_CHTTP2_REF_TRANSPORT(TRANSPORT_FROM_GLOBAL(transport_global),
"initiate_read_flush_locked");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
&t->initiate_read_flush_locked, &t->initiate_read_flush_locked,
GRPC_ERROR_NONE, false); GRPC_ERROR_NONE, false);

@ -51,6 +51,7 @@ int grpc_combiner_trace = 0;
} while (0) } while (0)
struct grpc_combiner { struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue; grpc_workqueue *optional_workqueue;
gpr_mpscq queue; gpr_mpscq queue;
// state is: // state is:
@ -58,17 +59,23 @@ struct grpc_combiner {
// other bits - number of items queued on the lock // other bits - number of items queued on the lock
gpr_atm state; gpr_atm state;
bool take_async_break_before_final_list; bool take_async_break_before_final_list;
bool time_to_execute_final_list;
grpc_closure_list final_list; grpc_closure_list final_list;
grpc_closure continue_finishing; grpc_closure offload;
}; };
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock)); grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue; lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->state, 1); gpr_atm_no_barrier_store(&lock->state, 1);
gpr_mpscq_init(&lock->queue); gpr_mpscq_init(&lock->queue);
lock->take_async_break_before_final_list = false; lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list); grpc_closure_list_init(&lock->final_list);
grpc_closure_init(&lock->offload, offload, lock);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock; return lock;
} }
@ -90,177 +97,154 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
} }
} }
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); lock->next_combiner_on_this_exec_ctx = NULL;
if (exec_ctx->active_combiner == NULL) {
exec_ctx->active_combiner = exec_ctx->last_combiner = lock;
} else {
exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock;
exec_ctx->last_combiner = lock;
}
}
static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_error *error) { grpc_closure *cl, grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
grpc_combiner *lock = arg;
GRPC_COMBINER_TRACE( GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock)); gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
GPR_ASSERT(exec_ctx->active_combiner == NULL); GPR_TIMER_BEGIN("combiner.execute", 0);
exec_ctx->active_combiner = lock; gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); GPR_ASSERT(last & 1); // ensure lock has not been destroyed
GPR_ASSERT(exec_ctx->active_combiner == lock); cl->error = error;
exec_ctx->active_combiner = NULL; gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
GPR_TIMER_END("combiner.continue_executing_mainline", 0); if (last == 1) {
// code will be written when the exec_ctx calls
// grpc_combiner_continue_exec_ctx
queue_on_exec_ctx(exec_ctx, lock);
}
GPR_TIMER_END("combiner.execute", 0);
} }
static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static void move_next(grpc_exec_ctx *exec_ctx) {
GPR_TIMER_BEGIN("combiner.execute_final", 0); exec_ctx->active_combiner =
grpc_closure *c = lock->final_list.head; exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
GPR_ASSERT(c != NULL); if (exec_ctx->active_combiner == NULL) {
grpc_closure_list_init(&lock->final_list); exec_ctx->last_combiner = NULL;
lock->take_async_break_before_final_list = false;
int loops = 0;
while (c != NULL) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
loops++;
} }
GPR_TIMER_END("combiner.execute_final", 0);
} }
static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
grpc_combiner *lock = arg; grpc_combiner *lock = arg;
GRPC_COMBINER_TRACE( queue_on_exec_ctx(exec_ctx, lock);
gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock));
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
// quick peek to see if new things have turned up on the queue: if so, go back
// to executing them before the final list
if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
} else {
execute_final(exec_ctx, lock);
finish(exec_ctx, lock);
}
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
GPR_TIMER_END("combiner.continue_executing_final", 0);
} }
static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.start_execute_final", 0); move_next(exec_ctx);
GPR_ASSERT(exec_ctx->active_combiner == lock); grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
GRPC_COMBINER_TRACE( GRPC_ERROR_NONE);
gpr_log(GPR_DEBUG, }
"C:%p start_execute_final take_async_break_before_final_list=%d",
lock, lock->take_async_break_before_final_list)); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
if (lock->take_async_break_before_final_list) { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0);
grpc_closure_init(&lock->continue_finishing, continue_executing_final, grpc_combiner *lock = exec_ctx->active_combiner;
lock); if (lock == NULL) {
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, GPR_TIMER_END("combiner.continue_exec_ctx", 0);
GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
GPR_TIMER_END("combiner.start_execute_final", 0);
return false; return false;
} else {
execute_final(exec_ctx, lock);
GPR_TIMER_END("combiner.start_execute_final", 0);
return true;
} }
}
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (lock->optional_workqueue != NULL && if (lock->optional_workqueue != NULL &&
grpc_exec_ctx_ready_to_finish(exec_ctx)) { grpc_exec_ctx_ready_to_finish(exec_ctx)) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and so // this execution context wants to move on, and we have a workqueue (and so
// can help the execution context out): schedule remaining work to be picked // can help the execution context out): schedule remaining work to be picked
// up on the workqueue // up on the workqueue
grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, queue_offload(exec_ctx, lock);
lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0);
grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, return true;
&lock->continue_finishing, GRPC_ERROR_NONE);
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return false;
}
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return false;
} }
grpc_closure *cl = (grpc_closure *)n;
grpc_error *error = cl->error;
cl->cb(exec_ctx, cl->cb_arg, error);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return true;
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { if (!lock->time_to_execute_final_list ||
bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock); // peek to see if something new has shown up, and execute that with
GPR_TIMER_BEGIN("combiner.finish", 0); // priority
int loops = 0; (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
do { gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
executor = maybe_finish_one; GRPC_COMBINER_TRACE(
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, if (n == NULL) {
"C:%p finish[%d] old_state=%" PRIdPTR, lock, // queue is in an inconsistant state: use this as a cue that we should
loops, old_state)); // go off and do something else for a while (and come back later)
switch (old_state) { GPR_TIMER_MARK("delay_busy", 0);
case 5: // we're down to one queued item: if it's the final list we if (lock->optional_workqueue != NULL) {
case 4: // should do that queue_offload(exec_ctx, lock);
if (!grpc_closure_list_empty(lock->final_list)) { }
executor = start_execute_final; GPR_TIMER_END("combiner.continue_exec_ctx", 0);
} return true;
break;
case 3: // had one count, one unorphaned --> unlocked unorphaned
GPR_TIMER_END("combiner.finish", 0);
return;
case 2: // and one count, one orphaned --> unlocked and orphaned
really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.finish", 0);
return;
case 1:
case 0:
// these values are illegal - representing an already unlocked or
// deleted lock
GPR_UNREACHABLE_CODE(return );
} }
loops++; GPR_TIMER_BEGIN("combiner.exec1", 0);
} while (executor(exec_ctx, lock)); grpc_closure *cl = (grpc_closure *)n;
GPR_TIMER_END("combiner.finish", 0); grpc_error *error = cl->error;
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
if (last == 1) {
exec_ctx->active_combiner = lock;
GPR_TIMER_BEGIN("combiner.execute_first_cb", 0);
cl->cb(exec_ctx, cl->cb_arg, error); cl->cb(exec_ctx, cl->cb_arg, error);
GPR_TIMER_END("combiner.execute_first_cb", 0);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
finish(exec_ctx, lock); GPR_TIMER_END("combiner.exec1", 0);
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
} else { } else {
cl->error = error; if (lock->take_async_break_before_final_list) {
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); GPR_TIMER_MARK("async_break", 0);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock));
lock->take_async_break_before_final_list = false;
if (lock->optional_workqueue != NULL) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
} else {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
lock->take_async_break_before_final_list = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
GPR_TIMER_END("combiner.exec_1final", 0);
}
}
} }
GPR_TIMER_END("combiner.execute", 0);
GPR_TIMER_MARK("unref", 0);
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
lock->time_to_execute_final_list = false;
switch (old_state) {
case 5: // we're down to one queued item: if it's the final list we
case 4: // should do that
if (!grpc_closure_list_empty(lock->final_list)) {
lock->time_to_execute_final_list = true;
}
break;
case 3: // had one count, one unorphaned --> unlocked unorphaned
move_next(exec_ctx);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
case 2: // and one count, one orphaned --> unlocked and orphaned
move_next(exec_ctx);
really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
case 1:
case 0:
// these values are illegal - representing an already unlocked or
// deleted lock
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
GPR_UNREACHABLE_CODE(return true);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
} }
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,

@ -64,6 +64,8 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
bool hint_async_break); bool hint_async_break);
void grpc_combiner_force_async_finally(grpc_combiner *lock); void grpc_combiner_force_async_finally(grpc_combiner *lock);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
extern int grpc_combiner_trace; extern int grpc_combiner_trace;
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */

@ -37,6 +37,7 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -60,20 +61,28 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0; bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
while (!grpc_closure_list_empty(exec_ctx->closure_list)) { for (;;) {
grpc_closure *c = exec_ctx->closure_list.head; if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; grpc_closure *c = exec_ctx->closure_list.head;
while (c != NULL) { exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
grpc_closure *next = c->next_data.next; while (c != NULL) {
grpc_error *error = c->error; grpc_closure *next = c->next_data.next;
did_something = true; grpc_error *error = c->error;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); did_something = true;
c->cb(exec_ctx, c->cb_arg, error); GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
GRPC_ERROR_UNREF(error); c->cb(exec_ctx, c->cb_arg, error);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); GRPC_ERROR_UNREF(error);
c = next; GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
c = next;
}
continue;
}
if (grpc_combiner_continue_exec_ctx(exec_ctx)) {
continue;
} }
break;
} }
GPR_ASSERT(exec_ctx->active_combiner == NULL);
if (exec_ctx->stealing_from_workqueue != NULL) { if (exec_ctx->stealing_from_workqueue != NULL) {
if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue, grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,

@ -70,6 +70,7 @@ struct grpc_exec_ctx {
grpc_closure *stolen_closure; grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */ /** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner; grpc_combiner *active_combiner;
grpc_combiner *last_combiner;
bool cached_ready_to_finish; bool cached_ready_to_finish;
void *check_ready_to_finish_arg; void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
@ -79,7 +80,7 @@ struct grpc_exec_ctx {
prefer to use GRPC_EXEC_CTX_INIT whenever possible */ prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ \ { \
GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, false, finish_check_arg, \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
finish_check \ finish_check \
} }
#else #else

@ -34,6 +34,8 @@
#ifndef GRPC_CORE_LIB_PROFILING_TIMERS_H #ifndef GRPC_CORE_LIB_PROFILING_TIMERS_H
#define GRPC_CORE_LIB_PROFILING_TIMERS_H #define GRPC_CORE_LIB_PROFILING_TIMERS_H
#include <stdio.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -56,14 +58,17 @@ void gpr_timer_set_enabled(int enabled);
/* No profiling. No-op all the things. */ /* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \ #define GPR_TIMER_MARK(tag, important) \
do { \ do { \
/*printf("- %s\n", tag);*/ \
} while (0) } while (0)
#define GPR_TIMER_BEGIN(tag, important) \ #define GPR_TIMER_BEGIN(tag, important) \
do { \ do { \
/*printf("%s {\n", tag);*/ \
} while (0) } while (0)
#define GPR_TIMER_END(tag, important) \ #define GPR_TIMER_END(tag, important) \
do { \ do { \
/*printf("} // %s\n", tag);*/ \
} while (0) } while (0)
#else /* at least one profiler requested... */ #else /* at least one profiler requested... */

@ -276,3 +276,28 @@ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
op->op.on_consumed = &op->outer_on_complete; op->op.on_consumed = &op->outer_on_complete;
return &op->op; return &op->op;
} }
typedef struct {
grpc_closure outer_on_complete;
grpc_closure *inner_on_complete;
grpc_transport_stream_op op;
} made_transport_stream_op;
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
NULL);
gpr_free(op);
}
grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_closure *on_complete) {
made_transport_stream_op *op = gpr_malloc(sizeof(*op));
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op,
op);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
op->op.on_complete = &op->outer_on_complete;
return &op->op;
}

@ -292,6 +292,10 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */ \a on_consumed and then delete the returned transport op */
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
/* Allocate a grpc_transport_stream_op, and preconfigure the on_consumed closure
to \a on_consumed and then delete the returned transport op */
grpc_transport_stream_op *grpc_make_transport_stream_op(
grpc_closure *on_consumed);
#ifdef __cplusplus #ifdef __cplusplus
} }

@ -211,11 +211,10 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg,
// close the stream with an error. // close the stream with an error.
gpr_slice message = gpr_slice message =
gpr_slice_from_copied_string("Failure that's not preventable."); gpr_slice_from_copied_string("Failure that's not preventable.");
grpc_transport_stream_op op; grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL);
memset(&op, 0, sizeof(op)); grpc_transport_stream_op_add_close(op, GRPC_STATUS_PERMISSION_DENIED,
grpc_transport_stream_op_add_close(&op, GRPC_STATUS_PERMISSION_DENIED,
&message); &message);
grpc_call_next_op(exec_ctx, elem, &op); grpc_call_next_op(exec_ctx, elem, op);
} }
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, calld->recv_im_ready, exec_ctx, calld->recv_im_ready,

Loading…
Cancel
Save