Merge pull request #8775 from ctiller/hansel

Fix logic race in chttp2 write path
pull/8828/head
Craig Tiller 8 years ago committed by GitHub
commit b79af5b542
  1. 53
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 10
      src/core/ext/transport/chttp2/transport/internal.h
  3. 2
      src/core/ext/transport/chttp2/transport/writing.c
  4. 11
      src/core/lib/iomgr/combiner.c

@ -111,9 +111,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored); grpc_error *error_ignored);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error);
static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error); grpc_error *error);
@ -428,6 +425,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */ /* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) { while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
} }
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@ -523,6 +521,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
} }
} }
if (s->fail_pending_writes_on_writes_finished_error != NULL) {
GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
}
GPR_ASSERT(s->send_initial_metadata_finished == NULL); GPR_ASSERT(s->send_initial_metadata_finished == NULL);
GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->fetching_send_message == NULL);
GPR_ASSERT(s->send_trailing_metadata_finished == NULL); GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@ -704,8 +706,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
} }
} }
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->write_state) { switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: case GRPC_CHTTP2_WRITE_STATE_IDLE:
GPR_UNREACHABLE_CODE(break); GPR_UNREACHABLE_CODE(break);
@ -734,6 +734,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break; break;
} }
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0); GPR_TIMER_END("terminate_writing_with_lock", 0);
} }
@ -1404,6 +1406,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
} }
} }
if (grpc_chttp2_list_remove_writable_stream(t, s)) { if (grpc_chttp2_list_remove_writable_stream(t, s)) {
grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
} }
@ -1534,9 +1537,41 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error; return error;
} }
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_transport *t,
grpc_error *error) { grpc_chttp2_stream *s) {
if (s->need_fail_pending_writes_on_writes_finished) {
grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
s->fail_pending_writes_on_writes_finished_error = NULL;
s->need_fail_pending_writes_on_writes_finished = false;
grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
}
}
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
if (s->need_fail_pending_writes_on_writes_finished ||
(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
(s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
s->included[GRPC_CHTTP2_LIST_WRITING]))) {
/* If a write is in progress, and it involves this stream, wait for the
* write to complete before cancelling things out. If we don't do this, then
* our combiner lock might think that some operation on its queue might be
* covering a completion even though there is none, in which case we might
* offload to another thread, which isn't guarateed to exist */
if (error != GRPC_ERROR_NONE) {
if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
"Post-poned fail writes due to in-progress write");
}
s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
s->fail_pending_writes_on_writes_finished_error, error);
}
s->need_fail_pending_writes_on_writes_finished = true;
return; /* early out */
}
error = error =
removal_error(error, s, "Pending writes failed due to stream closure"); removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL; s->send_initial_metadata = NULL;
@ -1590,7 +1625,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (close_writes && !s->write_closed) { if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true; s->write_closed = true;
fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
} }
if (s->read_closed && s->write_closed) { if (s->read_closed && s->write_closed) {

@ -409,6 +409,9 @@ struct grpc_chttp2_stream {
grpc_error *read_closed_error; grpc_error *read_closed_error;
/** the error that resulted in this stream being write-closed */ /** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error; grpc_error *write_closed_error;
/** should any writes be cleared once this stream becomes non-writable */
bool need_fail_pending_writes_on_writes_finished;
grpc_error *fail_pending_writes_on_writes_finished_error;
grpc_published_metadata_method published_metadata[2]; grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested; bool final_metadata_requested;
@ -689,4 +692,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s); grpc_chttp2_stream *s);
void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -208,6 +208,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
} }
} else { } else {
grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
} }
} }
@ -252,6 +253,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
} }
grpc_slice_buffer_reset_and_unref(&t->outbuf); grpc_slice_buffer_reset_and_unref(&t->outbuf);

@ -90,6 +90,12 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
} }
#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
#define IS_COVERED_BY_POLLER_ARGS(lock) \
(lock)->final_list_covered_by_poller, \
gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
is_covered_by_poller((lock))
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock)); grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL; lock->next_combiner_on_this_exec_ctx = NULL;
@ -197,9 +203,10 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GRPC_COMBINER_TRACE( GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_continue_exec_ctx workqueue=%p " "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
"is_covered_by_poller=%d exec_ctx_ready_to_finish=%d " "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
" exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d", "time_to_execute_final_list=%d",
lock, lock->optional_workqueue, is_covered_by_poller(lock), lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
grpc_exec_ctx_ready_to_finish(exec_ctx), grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list)); lock->time_to_execute_final_list));

Loading…
Cancel
Save