From 4efb6966bdfb62c725c6614b0d85ea374250bb51 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 3 Jun 2015 09:32:41 -0700 Subject: [PATCH 1/4] Transport/call flow control interface Allow call objects to advertise how many bytes they are currently willing to receive. Update the transport to utilize this data to update flow control windows. --- src/core/surface/byte_buffer_queue.c | 8 ++++- src/core/surface/byte_buffer_queue.h | 2 ++ src/core/surface/call.c | 13 +++++++ src/core/transport/chttp2_transport.c | 43 +++++++++++++++++------- src/core/transport/transport.h | 6 ++++ src/core/transport/transport_op_string.c | 3 +- 6 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 7c31bfe5da2..e47dc4f4ce6 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) { } void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + q->bytes += grpc_byte_buffer_length(buffer); bba_push(&q->filling, buffer); } @@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) { } } +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; + grpc_byte_buffer *out; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { @@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { q->draining = temp_array; } - return q->draining.data[q->drain_pos++]; + out = q->draining.data[q->drain_pos++]; + q->bytes -= grpc_byte_buffer_length(out); + return out; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 32c57f87563..f01958984f9 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,6 +49,7 @@ typedef struct { size_t drain_pos; grpc_bbq_array filling; grpc_bbq_array draining; + size_t bytes; } grpc_byte_buffer_queue; void grpc_bbq_destroy(grpc_byte_buffer_queue *q); @@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); #endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2d651be6a5c..eea02211ae2 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -444,6 +444,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; memset(&op, 0, sizeof(op)); @@ -456,6 +458,17 @@ static void unlock(grpc_call *call) { op.recv_state = &call->recv_state; op.on_done_recv = call_on_done_recv; op.recv_user_data = call; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index d9c712cc633..c85eb96a6f3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -314,6 +314,18 @@ struct transport { struct stream { gpr_uint32 id; + /** The number of bytes the upper layers have offered to receive. + As the upper layer offers more bytes, this value increases. + As bytes are read, this value decreases. */ + gpr_uint32 max_recv_bytes; + /** The number of bytes the upper layer has offered to read but we have + not yet announced to HTTP2 flow control. + As the upper layers offer to read more bytes, this value increases. + As we advertise incoming flow control window, this value decreases. */ + gpr_uint32 unannounced_incoming_window; + /** The number of bytes of HTTP2 flow control we have advertised. + As we advertise incoming flow control window, this value increases. + As bytes are read, this value decreases. */ gpr_uint32 incoming_window; gpr_int64 outgoing_window; /* when the application requests writes be closed, the write_closed is @@ -659,7 +671,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->id = (gpr_uint32)(gpr_uintptr)server_data; s->outgoing_window = t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = + s->max_recv_bytes = s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); @@ -970,14 +982,13 @@ static int prepare_write(transport *t) { /* for each stream that wants to update its window, add that window here */ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; + if (!s->read_closed && s->unannounced_incoming_window > 0) { + gpr_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create( + s->id, s->unannounced_incoming_window)); + FLOWCTL_TRACE(t, s, incoming, s->id, s->unannounced_incoming_window); + s->incoming_window += s->unannounced_incoming_window; + s->unannounced_incoming_window = 0; } } @@ -1101,8 +1112,10 @@ static void maybe_start_some_streams(transport *t) { t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->max_recv_bytes = GPR_MAX(s->incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); + maybe_join_window_updates(t, s); } /* cancel out streams that will never be started */ while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { @@ -1153,6 +1166,10 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { s->incoming_sopb = op->recv_ops; s->incoming_sopb->nops = 0; s->publish_state = op->recv_state; + if (s->max_recv_bytes < op->max_recv_bytes) { + s->unannounced_incoming_window += op->max_recv_bytes - s->max_recv_bytes; + s->max_recv_bytes = op->max_recv_bytes; + } gpr_free(s->old_incoming_metadata); s->old_incoming_metadata = NULL; maybe_finish_read(t, s); @@ -1337,10 +1354,10 @@ static void maybe_finish_read(transport *t, stream *s) { static void maybe_join_window_updates(transport *t, stream *s) { if (s->incoming_sopb != NULL && - s->incoming_window < + s->unannounced_incoming_window > t->settings[LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * - 3 / 4) { + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / + 4) { stream_list_join(t, s, WINDOW_UPDATE); } } @@ -1362,6 +1379,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size); t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; + GPR_ASSERT(s->max_recv_bytes > t->incoming_frame_size); + s->max_recv_bytes -= t->incoming_frame_size; /* if the stream incoming window is getting low, schedule an update */ maybe_join_window_updates(t, s); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f60fdc0374..98fcbe752e9 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -74,6 +74,12 @@ typedef struct grpc_transport_op { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; + /** The number of bytes this peer is currently prepared to receive. + + Bytes offered are used to replenish per-stream flow control windows. + Offers are not retractable: if 5 bytes are offered and no bytes are read, + a later offer of 3 bytes still implies that 5 have been offered. */ + gpr_uint32 max_recv_bytes; void (*on_done_recv)(void *user_data, int success); void *recv_user_data; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 7bbe8276c38..11d5a9d780c 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -129,7 +129,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("RECV")); + gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_strvec_add(&b, tmp); } if (op->bind_pollset) { From 8631652e15382db204b0e7b3a1b1ef1711a45405 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 11:35:07 -0700 Subject: [PATCH 2/4] Ensure streams are sent in-order --- src/core/transport/chttp2/internal.h | 6 ++++- src/core/transport/chttp2/stream_lists.c | 7 ++++- src/core/transport/chttp2/writing.c | 33 ++++++++++++++---------- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 9d4bfc2651b..e5e6f445b70 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -403,6 +403,8 @@ typedef struct { grpc_stream_op_buffer sopb; /** how strongly should we indicate closure with the next write */ grpc_chttp2_send_closed send_closed; + /** how much window should we announce? */ + gpr_uint32 announce_window; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -513,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing); void grpc_chttp2_list_remove_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 4fea058c193..590f6abfbc3 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } @@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream( void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); @@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); *stream_global = &stream->global; + *stream_writing = &stream->writing; return r; } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index d49caa48701..e4a949076ee 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (transport_global->outgoing_window && - grpc_chttp2_list_pop_writable_stream(transport_global, + while (grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, - &stream_writing) && - stream_global->outgoing_window > 0) { + &stream_writing)) { stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, @@ -106,12 +104,11 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that wants to update its window, add that * window here */ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - &stream_global)) { + transport_writing, + &stream_global, + &stream_writing)) { if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create( - stream_global->id, stream_global->unannounced_incoming_window)); + stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, incoming_window, stream_global->unannounced_incoming_window); GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, @@ -120,6 +117,7 @@ int grpc_chttp2_unlocking_check_writes( stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -169,10 +167,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, &transport_writing->hpack_compressor, - &transport_writing->outbuf); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, + stream_writing->id, &transport_writing->hpack_compressor, + &transport_writing->outbuf); + } + if (stream_writing->announce_window > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create( + stream_writing->id, stream_writing->announce_window)); + stream_writing->announce_window = 0; + } stream_writing->sopb.nops = 0; if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, From 5c517b1f5d2a71b7a9562a5f468917b8655f861b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 13:28:10 -0700 Subject: [PATCH 3/4] Fix integration bugs --- src/core/transport/chttp2/frame_window_update.c | 4 ++-- src/core/transport/chttp2/writing.c | 4 +++- src/core/transport/chttp2_transport.c | 17 ++++++++++------- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index b817df77459..d624298ad2a 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( } GPR_ASSERT(is_last); - if (transport_parsing->incoming_stream_id) { - if (stream_parsing) { + if (transport_parsing->incoming_stream_id != 0) { + if (stream_parsing != NULL) { GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, stream_parsing, outgoing_window_update, p->amount); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index e4a949076ee..d8ec117aa5d 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -107,6 +107,7 @@ int grpc_chttp2_unlocking_check_writes( transport_writing, &stream_global, &stream_writing)) { + stream_writing->id = stream_global->id; if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, @@ -204,7 +205,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global->outgoing_sopb->nops == 0) { + if (stream_global->outgoing_sopb != NULL && + stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0b529ca5847..c923d5e42fb 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -628,19 +628,22 @@ static void perform_stream_op_locked( stream_global->publish_sopb->nops = 0; stream_global->publish_state = op->recv_state; if (stream_global->max_recv_bytes < op->max_recv_bytes) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global, - unannounced_incoming_window, op->max_recv_bytes - stream_global->max_recv_bytes); GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global, max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "op", transport_global, stream_global, unannounced_incoming_window, + op->max_recv_bytes - stream_global->max_recv_bytes); stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes; stream_global->max_recv_bytes = op->max_recv_bytes; } grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + if (stream_global->id != 0) { + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); + grpc_chttp2_list_add_writable_window_update_stream(transport_global, + stream_global); + } } if (op->bind_pollset) { @@ -1053,7 +1056,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, identifier = gpr_strdup(context_scope); } gpr_log(GPR_INFO, - "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]", + "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]", is_client ? "client" : "server", identifier, context_thread, var, current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, current_value + delta, reason, file, line); From 5065d72e4306488088d8d5dbb513758a58c41b42 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jul 2015 16:58:59 -0700 Subject: [PATCH 4/4] Update comment --- src/core/transport/transport.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index d5bec63f66e..64503604ee1 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -73,10 +73,8 @@ typedef struct grpc_transport_stream_op { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; /** The number of bytes this peer is currently prepared to receive. - - Bytes offered are used to replenish per-stream flow control windows. - Offers are not retractable: if 5 bytes are offered and no bytes are read, - a later offer of 3 bytes still implies that 5 have been offered. */ + These bytes will be eventually used to replenish per-stream flow control + windows. */ gpr_uint32 max_recv_bytes; grpc_iomgr_closure *on_done_recv;