diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 023b7c2e954..8ab26e512d7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -836,8 +836,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; } if (s->fetched_send_message_length == s->fetching_send_message->length) { - ssize_t notify_offset = s->fetching_slice_end_offset; - if (notify_offset <= 0) { + int64_t notify_offset = s->next_message_end_offset; + if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished"); @@ -848,7 +848,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, } else { t->write_cb_pool = cb->next; } - cb->call_at_byte = (size_t)notify_offset; + cb->call_at_byte = notify_offset; cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = NULL; cb->next = s->on_write_finished_cbs; @@ -1005,13 +1005,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, frame_hdr[4] = (uint8_t)(len); s->fetching_send_message = op->send_message; s->fetched_send_message_length = 0; - s->fetching_slice_end_offset = - (ssize_t)s->flow_controlled_buffer.length + (ssize_t)len; + s->next_message_end_offset = s->flow_controlled_bytes_written + + (int64_t)s->flow_controlled_buffer.length + + (int64_t)len; s->complete_fetch_covered_by_poller = op->covered_by_poller; if (flags & GRPC_WRITE_BUFFER_HINT) { /* allow up to 64kb to be buffered */ /* TODO(ctiller): make this configurable */ - s->fetching_slice_end_offset -= 65536; + s->next_message_end_offset -= 65536; } continue_fetching_send_locked(exec_ctx, t, s); if (s->id != 0) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 3263c99bde5..774fed0722d 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -147,7 +147,7 @@ typedef struct grpc_chttp2_outstanding_ping { } grpc_chttp2_outstanding_ping; typedef struct grpc_chttp2_write_cb { - size_t call_at_byte; + int64_t call_at_byte; grpc_closure *closure; struct grpc_chttp2_write_cb *next; } grpc_chttp2_write_cb; @@ -353,7 +353,8 @@ struct grpc_chttp2_stream { grpc_byte_stream *fetching_send_message; uint32_t fetched_send_message_length; gpr_slice fetching_slice; - int64_t fetching_slice_end_offset; + int64_t next_message_end_offset; + int64_t flow_controlled_bytes_written; bool complete_fetch_covered_by_poller; grpc_closure complete_fetch; grpc_closure complete_fetch_locked; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ebdbce1bfde..d34a7918b5f 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -56,16 +56,16 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, size_t send_bytes, + grpc_chttp2_stream *s, int64_t send_bytes, grpc_chttp2_write_cb **list, grpc_error *error) { grpc_chttp2_write_cb *cb = *list; *list = NULL; + s->flow_controlled_bytes_written += send_bytes; while (cb) { grpc_chttp2_write_cb *next = cb->next; - if (cb->call_at_byte <= send_bytes) { + if (cb->call_at_byte <= s->flow_controlled_bytes_written) { finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error)); } else { - cb->call_at_byte -= send_bytes; add_to_write_list(list, cb); } cb = next; @@ -236,8 +236,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_REF(error), "send_initial_metadata_finished"); } if (s->sending_bytes != 0) { - update_list(exec_ctx, t, s, s->sending_bytes, &s->on_write_finished_cbs, - GRPC_ERROR_REF(error)); + update_list(exec_ctx, t, s, (int64_t)s->sending_bytes, + &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); s->sending_bytes = 0; } if (s->sent_trailing_metadata) {