From 09b05fd3fd91a473c42b99cc9636c1634eeb327e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 7 Sep 2016 13:02:05 -0700 Subject: [PATCH] Get write batching working again --- .../chttp2/transport/chttp2_transport.c | 34 +++++++++++++++---- .../ext/transport/chttp2/transport/internal.h | 3 +- src/core/lib/iomgr/combiner.c | 21 ++++++++---- src/core/lib/iomgr/combiner.h | 9 ++--- test/core/iomgr/combiner_test.c | 5 +-- 5 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 7adbd669236..986f0893977 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -542,8 +542,10 @@ static const char *write_state_name(grpc_chttp2_write_state st) { return "IDLE"; case GRPC_CHTTP2_WRITE_STATE_WRITING: return "WRITING"; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: return "WRITING+MORE"; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: + return "WRITING+MORE+COVERED"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } @@ -568,12 +570,22 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, covered_by_poller); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME); + set_write_state( + t, + covered_by_poller + ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER + : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE); break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + if (covered_by_poller) { + set_write_state( + t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER); + } + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: break; } GPR_TIMER_END("grpc_chttp2_initiate_write", 0); @@ -638,13 +650,21 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_MARK("state=writing", 0); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE); break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME: + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + grpc_combiner_execute_finally(exec_ctx, t->combiner, + &t->write_action_begin_locked, + GRPC_ERROR_NONE, false); + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked, - GRPC_ERROR_NONE); + GRPC_ERROR_NONE, true); break; } @@ -1861,7 +1881,7 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); - grpc_chttp2_become_writable(exec_ctx, t, s, false, "read_incoming_stream"); + grpc_chttp2_become_writable(exec_ctx, t, s, true, "read_incoming_stream"); } } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 27acf6321b8..6b3e2edd541 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -68,7 +68,8 @@ typedef enum { typedef enum { GRPC_CHTTP2_WRITE_STATE_IDLE, GRPC_CHTTP2_WRITE_STATE_WRITING, - GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_TO_COME, + GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, } grpc_chttp2_write_state; /* deframer state for the overall http2 stream of bytes */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 721db6337e4..b2d65597514 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -62,6 +62,7 @@ struct grpc_combiner { // offload safely gpr_atm covered_by_poller; bool time_to_execute_final_list; + bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; }; @@ -81,6 +82,11 @@ static error_data unpack_error_data(uintptr_t p) { return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; } +static bool is_covered_by_poller(grpc_combiner *lock) { + return lock->final_list_covered_by_poller || + gpr_atm_acq_load(&lock->covered_by_poller) > 0; +} + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); lock->next_combiner_on_this_exec_ctx = NULL; @@ -183,8 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx) && - gpr_atm_acq_load(&lock->covered_by_poller) > 0) { + grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be @@ -205,8 +210,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistant state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL && - gpr_atm_acq_load(&lock->covered_by_poller) > 0) { + if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { queue_offload(exec_ctx, lock); } GPR_TIMER_END("combiner.continue_exec_ctx", 0); @@ -225,6 +229,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { grpc_closure *c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); + lock->final_list_covered_by_poller = false; int loops = 0; while (c != NULL) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); @@ -277,11 +282,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), false); } void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error) { + grpc_closure *closure, grpc_error *error, + bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure, exec_ctx->active_combiner)); @@ -298,6 +304,9 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, 2); } + if (covered_by_poller) { + lock->final_list_covered_by_poller = true; + } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index fa9c143d3ca..d04eeed83a0 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -55,14 +55,9 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, bool covered_by_poller); // Execute \a action within the lock just prior to unlocking. -// if \a hint_async_break is true, the combiner tries to hand execution to -// another thread before finishing the primary queue of combined closures and -// executing the finally list. -// Deprecation warning: \a hint_async_break will be removed in a future version -// Takes a very slow and round-about path if not called from a -// grpc_combiner_execute closure. void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error); + grpc_closure *closure, grpc_error *error, + bool covered_by_poller); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 2ea4e5dd14a..f7d5809be74 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -134,8 +134,9 @@ static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_combiner_execute_finally( - exec_ctx, arg, grpc_closure_create(in_finally, NULL), GRPC_ERROR_NONE); + grpc_combiner_execute_finally(exec_ctx, arg, + grpc_closure_create(in_finally, NULL), + GRPC_ERROR_NONE, false); } static void test_execute_finally(void) {