Cancel pending write closures on stream cancellation

pull/15983/head
kpayson64 6 years ago
parent e36ca9ee1e
commit 5562dd514b
  1. 23
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 13
      src/core/ext/transport/chttp2/transport/internal.h
  3. 17
      src/core/ext/transport/chttp2/transport/stream_lists.cc
  4. 4
      src/core/ext/transport/chttp2/transport/writing.cc

@ -813,7 +813,6 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) {
grpc_error* err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = nullptr;
@ -1205,10 +1204,16 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE) ||
closure->error_data.error != GRPC_ERROR_NONE || s->seen_error) {
// If the stream has failed, or this closure will fail, ignore
// CLOSURE_BARRIER_MAY_COVER_WRITE and run the callback immediately
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
if (grpc_chttp2_list_add_waiting_for_write_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2:pending_write_closure");
}
grpc_closure_list_append(&s->run_after_write, closure,
closure->error_data.error);
}
}
@ -1989,7 +1994,9 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
s->byte_stream_error = GRPC_ERROR_REF(error);
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
post_benign_reclaimer(t);
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
@ -1998,10 +2005,6 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
"Last stream closed after sending GOAWAY", &error, 1));
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
}
GRPC_ERROR_UNREF(error);
maybe_start_some_streams(t);
@ -2009,6 +2012,10 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error* due_to_error) {
GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
if (grpc_chttp2_list_remove_waiting_for_write_stream(t, s)) {
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:pending_write_closure");
}
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error)) {
close_from_api(t, s, due_to_error);

@ -54,6 +54,8 @@ typedef enum {
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
/** streams with closures waiting to be run on a write **/
GRPC_CHTTP2_LIST_WAITING_FOR_WRITE,
STREAM_LIST_COUNT /* must be last */
} grpc_chttp2_stream_list_id;
@ -431,9 +433,6 @@ struct grpc_chttp2_transport {
*/
grpc_error* close_transport_on_writes_finished;
/* a list of closures to run after writes are finished */
grpc_closure_list run_after_write;
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@ -584,6 +583,7 @@ struct grpc_chttp2_stream {
grpc_slice_buffer flow_controlled_buffer;
grpc_closure_list run_after_write;
grpc_chttp2_write_cb* on_flow_controlled_cbs;
grpc_chttp2_write_cb* on_write_finished_cbs;
grpc_chttp2_write_cb* finish_after_write;
@ -686,6 +686,13 @@ bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream** s);
bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
/********* Flow Control ***************/
// Takes in a flow control action and performs all the needed operations.

@ -35,6 +35,8 @@ static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
return "stalled_by_stream";
case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
return "waiting_for_concurrency";
case GRPC_CHTTP2_LIST_WAITING_FOR_WRITE:
return "waiting_for_write";
case STREAM_LIST_COUNT:
GPR_UNREACHABLE_CODE(return "unknown");
}
@ -214,3 +216,18 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}
bool grpc_chttp2_list_add_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
}
bool grpc_chttp2_list_pop_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream** s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
}
bool grpc_chttp2_list_remove_waiting_for_write_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_WRITE);
}

@ -641,6 +641,10 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
}
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
}
while (grpc_chttp2_list_pop_waiting_for_write_stream(t, &s)) {
GRPC_CLOSURE_LIST_SCHED(&s->run_after_write);
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2:write_closure_sched");
}
grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
GRPC_ERROR_UNREF(error);
}

Loading…
Cancel
Save