diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9acaddfb382..db464bbc6b5 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -679,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, bool covered_by_poller, - const char *reason) { +void grpc_chttp2_become_writable( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_write_type stream_write_type, const char *reason) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); - grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason); + } + switch (stream_write_type) { + case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: + grpc_chttp2_initiate_write(exec_ctx, t, true, reason); + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: + grpc_chttp2_initiate_write(exec_ctx, t, false, reason); + break; } } @@ -837,7 +845,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "new_stream"); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -932,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "op.send_message"); } } @@ -1094,7 +1106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_initial_metadata"); } } else { @@ -1185,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_trailing_metadata"); } } @@ -1866,7 +1880,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, false, "unstalled"); + grpc_chttp2_become_writable( + exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "unstalled"); } } t->initial_window_update = 0; @@ -2024,27 +2040,22 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, if (s->incoming_window_delta < max_recv_bytes) { uint32_t add_max_recv_bytes = (uint32_t)(max_recv_bytes - s->incoming_window_delta); - bool new_window_write_is_covered_by_poller = - s->incoming_window_delta + initial_window_size < (int64_t)have_already; - /* gpr_log(GPR_DEBUG, "%d %d %d", - (int)(s->incoming_window_delta - s->announce_window), - (int)(-(int64_t)initial_window_size / 2), force_send); */ + grpc_chttp2_stream_write_type write_type = + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED; + if (s->incoming_window_delta + initial_window_size < + (int64_t)have_already) { + write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; + } + if (s->incoming_window_delta - s->announce_window < + -(int64_t)initial_window_size / 2) { + write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; + } GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); - bool force_send = (s->incoming_window_delta - s->announce_window <= - -(int64_t)initial_window_size / 2) || - s->announce_window > initial_window_size / 2; - /* gpr_log(GPR_DEBUG, "%s:%d: iwd=%d ann=%d iws=%d force=%d", - t->peer_string, - s->id, (int)s->incoming_window_delta, (int)s->announce_window, - initial_window_size, force_send); */ - if (force_send) { - grpc_chttp2_become_writable(exec_ctx, t, s, - new_window_write_is_covered_by_poller, - "read_incoming_stream"); - } + grpc_chttp2_become_writable(exec_ctx, t, s, write_type, + "read_incoming_stream"); } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 83901f8d958..8fa0bb471ae 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -113,8 +113,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, false, - "stream.read_flow_control"); + grpc_chttp2_become_writable( + exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "stream.read_flow_control"); } } } else { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index fca63023928..1dabf9edba2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -714,11 +714,21 @@ void grpc_chttp2_incoming_byte_stream_finished( void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint64_t id); +typedef enum { + /* don't initiate a transport write, but piggyback on the next one */ + GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, + /* initiate a covered write */ + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + /* initiate an uncovered write */ + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED +} grpc_chttp2_stream_write_type; + /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, bool covered_by_poller, + grpc_chttp2_stream *s, + grpc_chttp2_stream_write_type type, const char *reason); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index ca9b931564c..a882a0bba19 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -406,7 +406,8 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, incoming_frame_size); if (s->incoming_window_delta - s->announce_window <= -(int64_t)target_incoming_window / 2) { - grpc_chttp2_become_writable(exec_ctx, t, s, false, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, "window-update-required"); } s->received_bytes += incoming_frame_size;