Get write batching working again

reviewable/pr8008/r2
Craig Tiller 8 years ago
parent a81dac2888
commit 09b05fd3fd
  1. 34
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 3
      src/core/ext/transport/chttp2/transport/internal.h
  3. 21
      src/core/lib/iomgr/combiner.c
  4. 9
      src/core/lib/iomgr/combiner.h
  5. 5
      test/core/iomgr/combiner_test.c

@ -542,8 +542,10 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
return "IDLE";
case GRPC_CHTTP2_WRITE_STATE_WRITING:
return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
return "WRITING+MORE";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
return "WRITING+MORE+COVERED";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
@ -568,12 +570,22 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, covered_by_poller);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME);
set_write_state(
t,
covered_by_poller
? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
: GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
if (covered_by_poller) {
set_write_state(
t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER);
}
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
break;
}
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
@ -638,13 +650,21 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME:
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE, false);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
GRPC_ERROR_NONE);
GRPC_ERROR_NONE, true);
break;
}
@ -1861,7 +1881,7 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
grpc_chttp2_become_writable(exec_ctx, t, s, false, "read_incoming_stream");
grpc_chttp2_become_writable(exec_ctx, t, s, true, "read_incoming_stream");
}
}

@ -68,7 +68,8 @@ typedef enum {
typedef enum {
GRPC_CHTTP2_WRITE_STATE_IDLE,
GRPC_CHTTP2_WRITE_STATE_WRITING,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
/* deframer state for the overall http2 stream of bytes */

@ -62,6 +62,7 @@ struct grpc_combiner {
// offload safely
gpr_atm covered_by_poller;
bool time_to_execute_final_list;
bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
};
@ -81,6 +82,11 @@ static error_data unpack_error_data(uintptr_t p) {
return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
}
static bool is_covered_by_poller(grpc_combiner *lock) {
return lock->final_list_covered_by_poller ||
gpr_atm_acq_load(&lock->covered_by_poller) > 0;
}
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
@ -183,8 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
if (lock->optional_workqueue != NULL &&
grpc_exec_ctx_ready_to_finish(exec_ctx) &&
gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and
// so can help the execution context out): schedule remaining work to be
@ -205,8 +210,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
if (lock->optional_workqueue != NULL &&
gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
@ -225,6 +229,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
lock->final_list_covered_by_poller = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
@ -277,11 +282,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error), false);
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error) {
grpc_closure *closure, grpc_error *error,
bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute_finally c=%p; ac=%p",
lock, closure, exec_ctx->active_combiner));
@ -298,6 +304,9 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, 2);
}
if (covered_by_poller) {
lock->final_list_covered_by_poller = true;
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}

@ -55,14 +55,9 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool covered_by_poller);
// Execute \a action within the lock just prior to unlocking.
// if \a hint_async_break is true, the combiner tries to hand execution to
// another thread before finishing the primary queue of combined closures and
// executing the finally list.
// Deprecation warning: \a hint_async_break will be removed in a future version
// Takes a very slow and round-about path if not called from a
// grpc_combiner_execute closure.
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error);
grpc_closure *closure, grpc_error *error,
bool covered_by_poller);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);

@ -134,8 +134,9 @@ static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_combiner_execute_finally(
exec_ctx, arg, grpc_closure_create(in_finally, NULL), GRPC_ERROR_NONE);
grpc_combiner_execute_finally(exec_ctx, arg,
grpc_closure_create(in_finally, NULL),
GRPC_ERROR_NONE, false);
}
static void test_execute_finally(void) {

Loading…
Cancel
Save