Use a separate list for streams stalled by transport in writing path

pull/4933/head
yang-g 9 years ago
parent 1b3b1ee05a
commit 348f3a234f
  1. 12
      src/core/transport/chttp2/internal.h
  2. 18
      src/core/transport/chttp2/stream_lists.c
  3. 14
      src/core/transport/chttp2/writing.c

@ -67,6 +67,9 @@ typedef enum {
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/* streams waiting for the outgoing window in the writing path, they will be
* merged to the stalled list or writable list under transport lock. */
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@ -504,11 +507,11 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
/** Get a writable stream
returns non-zero if there was a stream available */
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
/** Get a writable stream
returns non-zero if there was a stream available */
int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
@ -560,9 +563,12 @@ int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_stalled_by_transport(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);

@ -313,12 +313,26 @@ int grpc_chttp2_list_pop_check_read_ops(
return r;
}
void grpc_chttp2_list_add_stalled_by_transport(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (transport_writing->outgoing_window > 0) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
} else {
stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
}
}
int grpc_chttp2_list_pop_stalled_by_transport(

@ -130,8 +130,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
}
}
if (stream_global->send_trailing_metadata) {
@ -273,8 +273,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
stream_writing->sent_message = 1;
}
} else if (transport_writing->outgoing_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
@ -312,8 +312,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
/* do nothing - already reffed */
}
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
} else {
@ -330,6 +330,8 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing);
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {

Loading…
Cancel
Save