From 7ee8aa58f6bef5cd61cb8eec2640364271667139 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 19 Mar 2018 12:08:49 -0700 Subject: [PATCH 1/2] Fix flow control bug --- .../chttp2/transport/chttp2_transport.cc | 20 +++++++++++++++---- .../ext/transport/chttp2/transport/internal.h | 5 +++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e2fb155646e..eb952813cde 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -674,6 +674,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); + s->unprocessed_incoming_frames_buffer_cached_length = 0; grpc_slice_buffer_init(&s->frame_storage); grpc_slice_buffer_init(&s->compressed_data_buffer); grpc_slice_buffer_init(&s->decompressed_data_buffer); @@ -1579,20 +1580,27 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_message) { GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); - size_t already_received; + size_t before = 0; GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(!s->pending_byte_stream); s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { if (!s->read_closed) { - already_received = s->frame_storage.length; + before = s->frame_storage.length + + s->unprocessed_incoming_frames_buffer.length; + } + } + grpc_chttp2_maybe_complete_recv_message(t, s); + if (s->id != 0) { + if (!s->read_closed && s->frame_storage.length == 0) { + size_t after = s->frame_storage.length + + s->unprocessed_incoming_frames_buffer_cached_length; s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, - already_received); + before - after); grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); } } - grpc_chttp2_maybe_complete_recv_message(t, s); } if (op->recv_trailing_metadata) { @@ -1870,6 +1878,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, } } } + // save the length of the buffer before handing control back to application + // threads. Needed to support correct flow control bookkeeping + s->unprocessed_incoming_frames_buffer_cached_length = + s->unprocessed_incoming_frames_buffer.length; if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) { null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6b6c0b28e2a..be905e7f485 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -507,6 +507,11 @@ struct grpc_chttp2_stream { grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure* on_next; /* protected by t combiner */ bool pending_byte_stream; /* protected by t combiner */ + // cached length of buffer to be used by the transport thread in cases where + // stream->pending_byte_stream == true. The value is saved before + // application threads are allowed to modify + // unprocessed_incoming_frames_buffer + int32_t unprocessed_incoming_frames_buffer_cached_length; grpc_closure reset_byte_stream; grpc_error* byte_stream_error; /* protected by t combiner */ bool received_last_frame; /* protected by t combiner */ From acd8822a8630e31cd91b9e53c312a96e5a4ddb12 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 26 Mar 2018 20:12:10 -0700 Subject: [PATCH 2/2] Fix objc macos build --- src/core/ext/transport/chttp2/transport/internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index be905e7f485..9e3337536e9 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -511,7 +511,7 @@ struct grpc_chttp2_stream { // stream->pending_byte_stream == true. The value is saved before // application threads are allowed to modify // unprocessed_incoming_frames_buffer - int32_t unprocessed_incoming_frames_buffer_cached_length; + size_t unprocessed_incoming_frames_buffer_cached_length; grpc_closure reset_byte_stream; grpc_error* byte_stream_error; /* protected by t combiner */ bool received_last_frame; /* protected by t combiner */