From 936f1ea9ac37965a6cbe22a04f4fedc9d7facc01 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 14 Oct 2016 15:15:19 -0700 Subject: [PATCH] Fix some test failures --- .../chttp2/transport/chttp2_transport.c | 90 ++++++++++++------- .../ext/transport/chttp2/transport/internal.h | 6 +- src/core/lib/iomgr/error.c | 16 +++- src/core/lib/iomgr/error.h | 4 + 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8ab26e512d7..b1dd9740111 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -90,10 +90,6 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); -/** Start disconnection chain */ -static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error); - static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); @@ -118,6 +114,11 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +static void close_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error); +static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -367,7 +368,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; t->destroying = 1; - drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); + close_transport_locked( + exec_ctx, t, + grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); } @@ -382,6 +386,19 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { + if (t->close_transport_on_writes_finished == NULL) { + t->close_transport_on_writes_finished = + GRPC_ERROR_CREATE("Delayed close due to in-progress write"); + } + t->close_transport_on_writes_finished = + grpc_error_add_child(t->close_transport_on_writes_finished, error); + return; + } + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -392,6 +409,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -555,13 +573,19 @@ static const char *write_state_name(grpc_chttp2_write_state st) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_write_state(grpc_chttp2_transport *t, +static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_write_state st, const char *reason) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t, t->is_client ? "CLIENT" : "SERVER", write_state_name(t->write_state), write_state_name(st), reason)); t->write_state = st; + if (st == GRPC_CHTTP2_WRITE_STATE_IDLE && + t->close_transport_on_writes_finished != NULL) { + grpc_error *err = t->close_transport_on_writes_finished; + t->close_transport_on_writes_finished = NULL; + close_transport_locked(exec_ctx, t, err); + } } void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, @@ -571,7 +595,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, &t->write_action_begin_locked, @@ -579,7 +603,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state( - t, + exec_ctx, t, covered_by_poller ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, @@ -588,7 +612,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: if (covered_by_poller) { set_write_state( - t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, + exec_ctx, t, + GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, reason); } break; @@ -614,10 +639,12 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing"); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + "begin writing"); grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); } else { - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); } GPR_TIMER_END("write_action_begin_locked", 0); @@ -645,7 +672,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_transport *t = tp; if (error != GRPC_ERROR_NONE) { - drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); } grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); @@ -655,11 +682,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITE_STATE_WRITING: GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "finish writing"); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [!covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, @@ -668,7 +696,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_combiner_execute_finally(exec_ctx, t->combiner, @@ -1434,8 +1462,8 @@ static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { ++*nrefs; } -static grpc_error *removal_error(grpc_error *extra_error, - grpc_chttp2_stream *s) { +static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, + const char *master_error_msg) { grpc_error *refs[3]; size_t nrefs = 0; add_error(s->read_closed_error, refs, &nrefs); @@ -1443,8 +1471,7 @@ static grpc_error *removal_error(grpc_error *extra_error, add_error(extra_error, refs, &nrefs); grpc_error *error = GRPC_ERROR_NONE; if (nrefs > 0) { - error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, - nrefs); + error = GRPC_ERROR_CREATE_REFERENCING(master_error_msg, refs, nrefs); } GRPC_ERROR_UNREF(extra_error); return error; @@ -1453,7 +1480,8 @@ static grpc_error *removal_error(grpc_error *extra_error, static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { - error = removal_error(error, s); + error = + removal_error(error, s, "Pending writes failed due to stream closure"); s->fetching_send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), @@ -1507,7 +1535,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (s->read_closed && s->write_closed) { if (s->id != 0) { remove_stream(exec_ctx, t, s->id, - removal_error(GRPC_ERROR_REF(error), s)); + removal_error(GRPC_ERROR_REF(error), s, "Stream removed")); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); } @@ -1650,16 +1678,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { - if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } - close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); - end_all_the_calls(exec_ctx, t, error); -} - /** update window from a settings change */ typedef struct { grpc_chttp2_transport *t; @@ -1743,6 +1761,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_ERROR_REF(error); + grpc_error *err = error; + if (err != GRPC_ERROR_NONE) { + err = grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state); + } + GPR_SWAP(grpc_error *, err, error); + GRPC_ERROR_UNREF(err); if (!t->closed) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; @@ -1789,7 +1815,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, error = GRPC_ERROR_CREATE("Transport closed"); } if (error != GRPC_ERROR_NONE) { - drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; } else if (!t->closed) { keep_reading = true; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 774fed0722d..008dda80437 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -316,6 +316,10 @@ struct grpc_chttp2_transport { gpr_slice goaway_text; grpc_chttp2_write_cb *write_cb_pool; + + /* if non-NULL, close the transport with this error when writes are finished + */ + grpc_error *close_transport_on_writes_finished; }; typedef enum { @@ -509,7 +513,7 @@ extern int grpc_flowctl_trace; if (!(grpc_http_trace)) \ ; \ else \ - stmt + stmt typedef enum { GRPC_CHTTP2_FLOWCTL_MOVE, diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 38fd1e09605..f6bb3a04779 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) { return "http_status"; case GRPC_ERROR_INT_LIMIT: return "limit"; + case GRPC_ERROR_INT_OCCURRED_DURING_WRITE: + return "occurred_during_write"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) { return "tsi_error"; case GRPC_ERROR_STR_FILENAME: return "filename"; + case GRPC_ERROR_STR_QUEUED_BUFFERS: + return "queued_buffers"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -523,21 +527,25 @@ static char *fmt_time(void *p) { return out; } -static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { +static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap, + bool *first) { if (n == NULL) return; - add_errs(n->left, s, sz, cap); + add_errs(n->left, s, sz, cap, first); + if (!*first) append_chr(',', s, sz, cap); + *first = false; const char *e = grpc_error_string(n->value); append_str(e, s, sz, cap); grpc_error_free_string(e); - add_errs(n->right, s, sz, cap); + add_errs(n->right, s, sz, cap, first); } static char *errs_string(grpc_error *err) { char *s = NULL; size_t sz = 0; size_t cap = 0; + bool first = true; append_chr('[', &s, &sz, &cap); - add_errs(err->errs.root, &s, &sz, &cap); + add_errs(err->errs.root, &s, &sz, &cap, &first); append_chr(']', &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index ae6a6cb35e2..f3f3b80a092 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -100,6 +100,8 @@ typedef enum { GRPC_ERROR_INT_HTTP_STATUS, /// context sensitive limit associated with the error GRPC_ERROR_INT_LIMIT, + /// chttp2: did the error occur while a write was in progress + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, } grpc_error_ints; typedef enum { @@ -121,6 +123,8 @@ typedef enum { GRPC_ERROR_STR_TSI_ERROR, /// filename that we were trying to read/write when this error occurred GRPC_ERROR_STR_FILENAME, + /// which data was queued for writing when the error occurred + GRPC_ERROR_STR_QUEUED_BUFFERS } grpc_error_strs; typedef enum {