Merge pull request #5350 from yang-g/stalled_by_transport_race

Fix race between add_writing_stalled and destroy stream
pull/5213/head^2
Craig Tiller 9 years ago
commit 914a2e7217
  1. 9
      src/core/transport/chttp2/internal.h
  2. 22
      src/core/transport/chttp2/stream_lists.c
  3. 10
      src/core/transport/chttp2/writing.c
  4. 2
      src/core/transport/chttp2_transport.c

@ -485,7 +485,8 @@ struct grpc_chttp2_stream {
/** Someone is unlocking the transport mutex: check to see if writes /** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */ are required, and schedule them if so */
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing, grpc_chttp2_transport_writing *writing,
int is_parsing); int is_parsing);
void grpc_chttp2_perform_writes( void grpc_chttp2_perform_writes(
@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing); grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport( void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing, bool is_window_available); grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_stalled_by_transport( int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_global **stream_global);

@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport( void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
STREAM_FROM_WRITING(stream_writing), if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
}
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
} }
void grpc_chttp2_list_flush_writing_stalled_by_transport( void grpc_chttp2_list_flush_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available) { bool is_window_available) {
grpc_chttp2_stream *stream; grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
if (is_window_available) { if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
} else { } else {
stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
} }
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
} }
} }
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
int grpc_chttp2_list_pop_stalled_by_transport( int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream_global **stream_global) {

@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing); grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes( int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing, int is_parsing) { grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_writing *stream_writing;
@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window); transport_global, outgoing_window);
bool is_window_available = transport_writing->outgoing_window > 0; bool is_window_available = transport_writing->outgoing_window > 0;
grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing, grpc_chttp2_list_flush_writing_stalled_by_transport(
is_window_available); exec_ctx, transport_writing, is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data /* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */ (according to available window sizes) and add to the output buffer */
@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
} }
} else { } else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing); stream_writing);
} }
} }
if (stream_global->send_trailing_metadata) { if (stream_global->send_trailing_metadata) {

@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("unlock", 0); GPR_TIMER_BEGIN("unlock", 0);
if (!t->writing_active && !t->closed && if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing, grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
t->parsing_active)) { t->parsing_active)) {
t->writing_active = 1; t->writing_active = 1;
REF_TRANSPORT(t, "writing"); REF_TRANSPORT(t, "writing");

Loading…
Cancel
Save