From e9a766593b33a18574cd3dfdaaea46a6dcdddc91 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 12 May 2017 13:17:14 -0700 Subject: [PATCH] Progress --- include/grpc/impl/codegen/grpc_types.h | 7 +++- .../chttp2/transport/chttp2_transport.c | 38 ++++++++++++------- .../ext/transport/chttp2/transport/internal.h | 6 +++ .../ext/transport/chttp2/transport/writing.c | 16 ++++++-- 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 4738e02ecba..f03ec094307 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -350,8 +350,11 @@ typedef enum grpc_call_error { /** Force compression to be disabled for a particular write (start_write/add_metadata). Illegal on invoke/accept. */ #define GRPC_WRITE_NO_COMPRESS (0x00000002u) +/** Force this message to be written to the socket before completing it */ +#define GRPC_WRITE_THROUGH (0x00000004u) /** Mask of all valid flags. */ -#define GRPC_WRITE_USED_MASK (GRPC_WRITE_BUFFER_HINT | GRPC_WRITE_NO_COMPRESS) +#define GRPC_WRITE_USED_MASK \ + (GRPC_WRITE_BUFFER_HINT | GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_THROUGH) /* Initial metadata flags */ /** Signal that the call is idempotent */ @@ -372,7 +375,7 @@ typedef enum grpc_call_error { GRPC_INITIAL_METADATA_WAIT_FOR_READY | \ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \ - GRPC_INITIAL_METADATA_CORKED) + GRPC_INITIAL_METADATA_CORKED | GRPC_WRITE_THROUGH) /** A single metadata element */ typedef struct grpc_metadata { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e0aa3499c91..a9bc36b0d4e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -909,7 +909,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( - exec_ctx, t->ep, &t->outbuf, true, + exec_ctx, t->ep, &t->outbuf, t->write_is_covered, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, grpc_combiner_scheduler(t->combiner, false))); GPR_TIMER_END("write_action", 0); @@ -1192,8 +1192,12 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, cb->call_at_byte = notify_offset; cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + grpc_chttp2_write_cb **list = + s->fetching_send_message->flags & GRPC_WRITE_THROUGH + ? &s->on_write_finished_cbs + : &s->on_flow_controlled_cbs; + cb->next = *list; + *list = cb; } s->fetching_send_message = NULL; return; /* early out */ @@ -1873,6 +1877,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } +static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, + grpc_error *error) { + while (*list) { + grpc_chttp2_write_cb *cb = *list; + *list = cb->next; + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, + GRPC_ERROR_REF(error), + "on_write_finished_cb"); + cb->next = t->write_cb_pool; + t->write_cb_pool = cb; + } + GRPC_ERROR_UNREF(error); +} + void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { @@ -1892,16 +1911,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); - while (s->on_write_finished_cbs) { - grpc_chttp2_write_cb *cb = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb->next; - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, - GRPC_ERROR_REF(error), - "on_write_finished_cb"); - cb->next = t->write_cb_pool; - t->write_cb_pool = cb; - } - GRPC_ERROR_UNREF(error); + flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, + GRPC_ERROR_REF(error)); + flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); } void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d66e396ee8..682c14be3a4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -421,6 +421,10 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; + + /** is the next write covered by a poller? set in grpc_chttp2_begin_write, + read in write_action */ + bool write_is_covered; }; typedef enum { @@ -458,6 +462,7 @@ struct grpc_chttp2_stream { grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; + int64_t flow_controlled_bytes_flowed; bool complete_fetch_covered_by_poller; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -531,6 +536,7 @@ struct grpc_chttp2_stream { uint32_t announce_window; grpc_slice_buffer flow_controlled_buffer; + grpc_chttp2_write_cb *on_flow_controlled_cbs; grpc_chttp2_write_cb *on_write_finished_cbs; grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 5be1092946a..92ee747e6be 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -138,10 +138,11 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, int64_t send_bytes, - grpc_chttp2_write_cb **list, grpc_error *error) { + grpc_chttp2_write_cb **list, int64_t *ctr, + grpc_error *error) { grpc_chttp2_write_cb *cb = *list; *list = NULL; - s->flow_controlled_bytes_written += send_bytes; + *ctr += send_bytes; while (cb) { grpc_chttp2_write_cb *next = cb->next; if (cb->call_at_byte <= s->flow_controlled_bytes_written) { @@ -228,6 +229,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( bool sent_initial_metadata = s->sent_initial_metadata; bool now_writing = false; + t->write_is_covered = false; GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, @@ -259,6 +261,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( gpr_inf_past(GPR_CLOCK_MONOTONIC); t->ping_recv_state.ping_strikes = 0; } + t->write_is_covered = true; } /* send any window updates */ if (s->announce_window > 0) { @@ -320,11 +323,16 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } } s->sending_bytes += send_bytes; + update_list(exec_ctx, t, s, send_bytes, &s->on_flow_controlled_cbs, + &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE); now_writing = true; if (s->flow_controlled_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } + if (s->on_write_finished_cbs != NULL) { + t->write_is_covered = true; + } } else if (t->outgoing_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; @@ -364,6 +372,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); } now_writing = true; + t->write_is_covered = true; } } @@ -430,7 +439,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (s->sending_bytes != 0) { update_list(exec_ctx, t, s, (int64_t)s->sending_bytes, - &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); + &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, + GRPC_ERROR_REF(error)); s->sending_bytes = 0; } if (s->sent_trailing_metadata) {