Fix some test failures

reviewable/pr8239/r2
Craig Tiller 8 years ago
parent fd77de6a8a
commit 936f1ea9ac
  1. 90
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 6
      src/core/ext/transport/chttp2/transport/internal.h
  3. 16
      src/core/lib/iomgr/error.c
  4. 4
      src/core/lib/iomgr/error.h

@ -90,10 +90,6 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value); grpc_chttp2_setting_id id, uint32_t value);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error); grpc_chttp2_stream *s, grpc_error *error);
@ -118,6 +114,11 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error); grpc_error *error);
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_error *error);
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
/******************************************************************************* /*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -367,7 +368,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) { grpc_error *error) {
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
t->destroying = 1; t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); close_transport_locked(
exec_ctx, t,
grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy");
} }
@ -382,6 +386,19 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_error *error) { grpc_error *error) {
if (!t->closed) { if (!t->closed) {
if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
if (t->close_transport_on_writes_finished == NULL) {
t->close_transport_on_writes_finished =
GRPC_ERROR_CREATE("Delayed close due to in-progress write");
}
t->close_transport_on_writes_finished =
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
t->closed = 1; t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport"); GRPC_ERROR_REF(error), "close_transport");
@ -392,6 +409,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) { while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
} }
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -555,13 +573,19 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
GPR_UNREACHABLE_CODE(return "UNKNOWN"); GPR_UNREACHABLE_CODE(return "UNKNOWN");
} }
static void set_write_state(grpc_chttp2_transport *t, static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_write_state st, const char *reason) { grpc_chttp2_write_state st, const char *reason) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t, GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER", t->is_client ? "CLIENT" : "SERVER",
write_state_name(t->write_state), write_state_name(t->write_state),
write_state_name(st), reason)); write_state_name(st), reason));
t->write_state = st; t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
t->close_transport_on_writes_finished != NULL) {
grpc_error *err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = NULL;
close_transport_locked(exec_ctx, t, err);
}
} }
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
@ -571,7 +595,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) { switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked, &t->write_action_begin_locked,
@ -579,7 +603,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING: case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state( set_write_state(
t, exec_ctx, t,
covered_by_poller covered_by_poller
? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
: GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
@ -588,7 +612,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
if (covered_by_poller) { if (covered_by_poller) {
set_write_state( set_write_state(
t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, exec_ctx, t,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
reason); reason);
} }
break; break;
@ -614,10 +639,12 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt; grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing"); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"begin writing");
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else { } else {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"begin writing nothing");
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
} }
GPR_TIMER_END("write_action_begin_locked", 0); GPR_TIMER_END("write_action_begin_locked", 0);
@ -645,7 +672,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_transport *t = tp; grpc_chttp2_transport *t = tp;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
} }
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
@ -655,11 +682,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_UNREACHABLE_CODE(break); GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITE_STATE_WRITING: case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0); GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"finish writing");
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0); GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]"); "continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
@ -668,7 +696,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0); GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]"); "continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
@ -1434,8 +1462,8 @@ static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
++*nrefs; ++*nrefs;
} }
static grpc_error *removal_error(grpc_error *extra_error, static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
grpc_chttp2_stream *s) { const char *master_error_msg) {
grpc_error *refs[3]; grpc_error *refs[3];
size_t nrefs = 0; size_t nrefs = 0;
add_error(s->read_closed_error, refs, &nrefs); add_error(s->read_closed_error, refs, &nrefs);
@ -1443,8 +1471,7 @@ static grpc_error *removal_error(grpc_error *extra_error,
add_error(extra_error, refs, &nrefs); add_error(extra_error, refs, &nrefs);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
if (nrefs > 0) { if (nrefs > 0) {
error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, error = GRPC_ERROR_CREATE_REFERENCING(master_error_msg, refs, nrefs);
nrefs);
} }
GRPC_ERROR_UNREF(extra_error); GRPC_ERROR_UNREF(extra_error);
return error; return error;
@ -1453,7 +1480,8 @@ static grpc_error *removal_error(grpc_error *extra_error,
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error) { grpc_error *error) {
error = removal_error(error, s); error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->fetching_send_message = NULL; s->fetching_send_message = NULL;
grpc_chttp2_complete_closure_step( grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
@ -1507,7 +1535,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (s->read_closed && s->write_closed) { if (s->read_closed && s->write_closed) {
if (s->id != 0) { if (s->id != 0) {
remove_stream(exec_ctx, t, s->id, remove_stream(exec_ctx, t, s->id,
removal_error(GRPC_ERROR_REF(error), s)); removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
} }
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
} }
@ -1650,16 +1678,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
/** update window from a settings change */ /** update window from a settings change */
typedef struct { typedef struct {
grpc_chttp2_transport *t; grpc_chttp2_transport *t;
@ -1743,6 +1761,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
grpc_error *err = error;
if (err != GRPC_ERROR_NONE) {
err = grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state);
}
GPR_SWAP(grpc_error *, err, error);
GRPC_ERROR_UNREF(err);
if (!t->closed) { if (!t->closed) {
GPR_TIMER_BEGIN("reading_action.parse", 0); GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0; size_t i = 0;
@ -1789,7 +1815,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
error = GRPC_ERROR_CREATE("Transport closed"); error = GRPC_ERROR_CREATE("Transport closed");
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0; t->endpoint_reading = 0;
} else if (!t->closed) { } else if (!t->closed) {
keep_reading = true; keep_reading = true;

@ -316,6 +316,10 @@ struct grpc_chttp2_transport {
gpr_slice goaway_text; gpr_slice goaway_text;
grpc_chttp2_write_cb *write_cb_pool; grpc_chttp2_write_cb *write_cb_pool;
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
}; };
typedef enum { typedef enum {
@ -509,7 +513,7 @@ extern int grpc_flowctl_trace;
if (!(grpc_http_trace)) \ if (!(grpc_http_trace)) \
; \ ; \
else \ else \
stmt stmt
typedef enum { typedef enum {
GRPC_CHTTP2_FLOWCTL_MOVE, GRPC_CHTTP2_FLOWCTL_MOVE,

@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "http_status"; return "http_status";
case GRPC_ERROR_INT_LIMIT: case GRPC_ERROR_INT_LIMIT:
return "limit"; return "limit";
case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
return "occurred_during_write";
} }
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
} }
@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "tsi_error"; return "tsi_error";
case GRPC_ERROR_STR_FILENAME: case GRPC_ERROR_STR_FILENAME:
return "filename"; return "filename";
case GRPC_ERROR_STR_QUEUED_BUFFERS:
return "queued_buffers";
} }
GPR_UNREACHABLE_CODE(return "unknown"); GPR_UNREACHABLE_CODE(return "unknown");
} }
@ -523,21 +527,25 @@ static char *fmt_time(void *p) {
return out; return out;
} }
static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
bool *first) {
if (n == NULL) return; if (n == NULL) return;
add_errs(n->left, s, sz, cap); add_errs(n->left, s, sz, cap, first);
if (!*first) append_chr(',', s, sz, cap);
*first = false;
const char *e = grpc_error_string(n->value); const char *e = grpc_error_string(n->value);
append_str(e, s, sz, cap); append_str(e, s, sz, cap);
grpc_error_free_string(e); grpc_error_free_string(e);
add_errs(n->right, s, sz, cap); add_errs(n->right, s, sz, cap, first);
} }
static char *errs_string(grpc_error *err) { static char *errs_string(grpc_error *err) {
char *s = NULL; char *s = NULL;
size_t sz = 0; size_t sz = 0;
size_t cap = 0; size_t cap = 0;
bool first = true;
append_chr('[', &s, &sz, &cap); append_chr('[', &s, &sz, &cap);
add_errs(err->errs.root, &s, &sz, &cap); add_errs(err->errs.root, &s, &sz, &cap, &first);
append_chr(']', &s, &sz, &cap); append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap); append_chr(0, &s, &sz, &cap);
return s; return s;

@ -100,6 +100,8 @@ typedef enum {
GRPC_ERROR_INT_HTTP_STATUS, GRPC_ERROR_INT_HTTP_STATUS,
/// context sensitive limit associated with the error /// context sensitive limit associated with the error
GRPC_ERROR_INT_LIMIT, GRPC_ERROR_INT_LIMIT,
/// chttp2: did the error occur while a write was in progress
GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
} grpc_error_ints; } grpc_error_ints;
typedef enum { typedef enum {
@ -121,6 +123,8 @@ typedef enum {
GRPC_ERROR_STR_TSI_ERROR, GRPC_ERROR_STR_TSI_ERROR,
/// filename that we were trying to read/write when this error occurred /// filename that we were trying to read/write when this error occurred
GRPC_ERROR_STR_FILENAME, GRPC_ERROR_STR_FILENAME,
/// which data was queued for writing when the error occurred
GRPC_ERROR_STR_QUEUED_BUFFERS
} grpc_error_strs; } grpc_error_strs;
typedef enum { typedef enum {

Loading…
Cancel
Save