Ensure streams are sent in-order

pull/1888/head
Craig Tiller 10 years ago
parent 4ed8e401cf
commit 8631652e15
  1. 6
      src/core/transport/chttp2/internal.h
  2. 7
      src/core/transport/chttp2/stream_lists.c
  3. 33
      src/core/transport/chttp2/writing.c

@ -403,6 +403,8 @@ typedef struct {
grpc_stream_op_buffer sopb; grpc_stream_op_buffer sopb;
/** how strongly should we indicate closure with the next write */ /** how strongly should we indicate closure with the next write */
grpc_chttp2_send_closed send_closed; grpc_chttp2_send_closed send_closed;
/** how much window should we announce? */
gpr_uint32 announce_window;
} grpc_chttp2_stream_writing; } grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing { struct grpc_chttp2_stream_parsing {
@ -513,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global); grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_window_update_stream( void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);

@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void grpc_chttp2_list_add_writable_stream( void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
} }
@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream(
void grpc_chttp2_list_add_writable_window_update_stream( void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream(
int grpc_chttp2_list_pop_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) { grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream; grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global; *stream_global = &stream->global;
*stream_writing = &stream->writing;
return r; return r;
} }

@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that's become writable, frame it's data /* for each grpc_chttp2_stream that's become writable, frame it's data
(according to (according to
available window sizes) and add to the output buffer */ available window sizes) and add to the output buffer */
while (transport_global->outgoing_window && while (grpc_chttp2_list_pop_writable_stream(transport_global,
grpc_chttp2_list_pop_writable_stream(transport_global,
transport_writing, &stream_global, transport_writing, &stream_global,
&stream_writing) && &stream_writing)) {
stream_global->outgoing_window > 0) {
stream_writing->id = stream_global->id; stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode( window_delta = grpc_chttp2_preencode(
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
@ -106,12 +104,11 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that wants to update its window, add that /* for each grpc_chttp2_stream that wants to update its window, add that
* window here */ * window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
&stream_global)) { transport_writing,
&stream_global,
&stream_writing)) {
if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
gpr_slice_buffer_add( stream_writing->announce_window = stream_global->unannounced_incoming_window;
&transport_writing->outbuf,
grpc_chttp2_window_update_create(
stream_global->id, stream_global->unannounced_incoming_window));
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
incoming_window, stream_global->unannounced_incoming_window); incoming_window, stream_global->unannounced_incoming_window);
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
@ -120,6 +117,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_global->unannounced_incoming_window = 0; stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global, grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global); stream_global);
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} }
} }
@ -169,10 +167,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while ( while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
stream_writing->id, &transport_writing->hpack_compressor, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
&transport_writing->outbuf); stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf);
}
if (stream_writing->announce_window > 0) {
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(
stream_writing->id, stream_writing->announce_window));
stream_writing->announce_window = 0;
}
stream_writing->sopb.nops = 0; stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf, gpr_slice_buffer_add(&transport_writing->outbuf,

Loading…
Cancel
Save