pull/12895/head
Craig Tiller 8 years ago
parent 1ec8f8a920
commit 85516af26a
  1. 54
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 10
      src/core/ext/transport/chttp2/transport/internal.h
  3. 3
      src/core/ext/transport/chttp2/transport/writing.cc
  4. 6
      test/core/end2end/bad_server_response_test.c
  5. 3
      test/core/end2end/tests/bad_ping.c
  6. 2
      test/core/end2end/tests/keepalive_timeout.c
  7. 3
      test/core/end2end/tests/max_connection_age.c
  8. 2
      test/core/end2end/tests/shutdown_finishes_calls.c

@ -224,6 +224,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->flow_control.bdp_estimator.Destroy(); t->flow_control.bdp_estimator.Destroy();
GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks); gpr_free(t->ping_acks);
gpr_free(t->peer_string); gpr_free(t->peer_string);
gpr_free(t); gpr_free(t);
@ -571,8 +572,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
} }
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); if (t->flow_control.enable_bdp_probe) {
schedule_bdp_ping_locked(exec_ctx, t); GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(exec_ctx, t);
}
grpc_chttp2_act_on_flowctl_action( grpc_chttp2_act_on_flowctl_action(
exec_ctx, exec_ctx,
@ -607,7 +610,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_error *error) { grpc_error *error) {
if (!t->closed) { end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
if (t->closed_with_error == nullptr) {
if (!grpc_error_has_clear_grpc_status(error)) { if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE); GRPC_STATUS_UNAVAILABLE);
@ -622,10 +627,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error); grpc_error_add_child(t->close_transport_on_writes_finished, error);
return; return;
} }
t->closed = 1; GPR_ASSERT(error != GRPC_ERROR_NONE);
t->closed_with_error = GRPC_ERROR_REF(error);
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport"); GRPC_ERROR_REF(error), "close_transport");
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
if (t->ping_state.is_delayed_ping_timer_set) { if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
} }
@ -651,8 +656,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) { while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
} }
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); if (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) {
cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -848,6 +854,10 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->close_transport_on_writes_finished = NULL; t->close_transport_on_writes_finished = NULL;
close_transport_locked(exec_ctx, t, err); close_transport_locked(exec_ctx, t, err);
} }
if (t->closed_with_error != GRPC_ERROR_NONE) {
grpc_endpoint_shutdown(exec_ctx, t->ep,
GRPC_ERROR_REF(t->closed_with_error));
}
} }
} }
@ -955,7 +965,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { if (t->closed_with_error == GRPC_ERROR_NONE &&
grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
} }
} }
@ -1008,7 +1019,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r; grpc_chttp2_begin_write_result r;
if (t->closed) { if (t->closed_with_error != GRPC_ERROR_NONE) {
r.writing = false; r.writing = false;
} else { } else {
r = grpc_chttp2_begin_write(exec_ctx, t); r = grpc_chttp2_begin_write(exec_ctx, t);
@ -1471,7 +1482,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} }
if (!s->write_closed) { if (!s->write_closed) {
if (t->is_client) { if (t->is_client) {
if (!t->closed) { if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0); GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s); grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(exec_ctx, t); maybe_start_some_streams(exec_ctx, t);
@ -1479,7 +1490,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_cancel_stream( grpc_chttp2_cancel_stream(
exec_ctx, t, s, exec_ctx, t, s,
grpc_error_set_int( grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
} }
} else { } else {
@ -1768,7 +1780,10 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */ /*The transport will be closed after the write is done */
close_transport_locked( close_transport_locked(
exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); exec_ctx, t,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
} }
} }
@ -2496,7 +2511,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
} }
GPR_SWAP(grpc_error *, err, error); GPR_SWAP(grpc_error *, err, error);
GRPC_ERROR_UNREF(err); GRPC_ERROR_UNREF(err);
if (!t->closed) { if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_TIMER_BEGIN("reading_action.parse", 0); GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0; size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
@ -2536,13 +2551,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_reading_action_locked", 0); GPR_TIMER_BEGIN("post_reading_action_locked", 0);
bool keep_reading = false; bool keep_reading = false;
if (error == GRPC_ERROR_NONE && t->closed) { if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"); error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Transport closed", &t->closed_with_error, 1);
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0; t->endpoint_reading = 0;
} else if (!t->closed) { } else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true; keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
} }
@ -2680,7 +2696,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (t->destroying || t->closed) { if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error == GRPC_ERROR_NONE) { } else if (error == GRPC_ERROR_NONE) {
if (t->keepalive_permit_without_calls || if (t->keepalive_permit_without_calls ||
@ -2738,8 +2754,8 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING( close_transport_locked(exec_ctx, t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"keepalive watchdog timeout")); "keepalive watchdog timeout"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
} }
} else { } else {
/* The watchdog timer should have been cancelled by /* The watchdog timer should have been cancelled by

@ -298,7 +298,7 @@ struct grpc_chttp2_transport {
/** is the transport destroying itself? */ /** is the transport destroying itself? */
uint8_t destroying; uint8_t destroying;
/** has the upper layer closed the transport? */ /** has the upper layer closed the transport? */
uint8_t closed; grpc_error *closed_with_error;
/** is there a read request to the endpoint outstanding? */ /** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading; uint8_t endpoint_reading;
@ -340,7 +340,7 @@ struct grpc_chttp2_transport {
/** hpack encoding */ /** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor; grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */ /** is this a client? */
uint8_t is_client; bool is_client;
/** data to write next write */ /** data to write next write */
grpc_slice_buffer qbuf; grpc_slice_buffer qbuf;
@ -350,14 +350,14 @@ struct grpc_chttp2_transport {
uint32_t write_buffer_size; uint32_t write_buffer_size;
/** have we seen a goaway */ /** have we seen a goaway */
uint8_t seen_goaway; bool seen_goaway;
/** have we sent a goaway */ /** have we sent a goaway */
grpc_chttp2_sent_goaway_state sent_goaway_state; grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */ /** are the local settings dirty and need to be sent? */
uint8_t dirtied_local_settings; bool dirtied_local_settings;
/** have local settings been sent? */ /** have local settings been sent? */
uint8_t sent_local_settings; bool sent_local_settings;
/** bitmask of setting indexes to send out */ /** bitmask of setting indexes to send out */
uint32_t force_send_settings; uint32_t force_send_settings;
/** settings values */ /** settings values */

@ -245,7 +245,8 @@ class WriteContext {
void UpdateStreamsNoLongerStalled() { void UpdateStreamsNoLongerStalled() {
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) { if (t_->closed_with_error == GRPC_ERROR_NONE &&
grpc_chttp2_list_add_writable_stream(t_, s)) {
if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { if (!stream_ref_if_not_destroyed(&s->refcount->refs)) {
grpc_chttp2_list_remove_writable_stream(t_, s); grpc_chttp2_list_remove_writable_stream(t_, s);
} }

@ -62,8 +62,6 @@
#define HTTP2_DETAIL_MSG(STATUS_CODE) \ #define HTTP2_DETAIL_MSG(STATUS_CODE) \
"Received http2 header with status: " #STATUS_CODE "Received http2 header with status: " #STATUS_CODE
#define UNPARSEABLE_DETAIL_MSG "Failed parsing HTTP/2"
#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server" #define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
/* TODO(zyc) Check the content of incomming data instead of using this length */ /* TODO(zyc) Check the content of incomming data instead of using this length */
@ -208,8 +206,10 @@ static void start_rpc(int target_port, grpc_status_code expected_status,
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(status == expected_status); GPR_ASSERT(status == expected_status);
if (expected_detail != NULL) {
GPR_ASSERT(-1 != grpc_slice_slice(details, grpc_slice_from_static_string( GPR_ASSERT(-1 != grpc_slice_slice(details, grpc_slice_from_static_string(
expected_detail))); expected_detail)));
}
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv);
@ -331,7 +331,7 @@ int main(int argc, char **argv) {
/* unparseable response */ /* unparseable response */
run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1,
GRPC_STATUS_UNAVAILABLE, UNPARSEABLE_DETAIL_MSG); GRPC_STATUS_UNKNOWN, NULL);
/* http1 response */ /* http1 response */
run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE, run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE,

@ -202,8 +202,7 @@ static void test_bad_ping(grpc_end2end_test_config config) {
// The connection should be closed immediately after the misbehaved pings, // The connection should be closed immediately after the misbehaved pings,
// the in-progress RPC should fail. // the in-progress RPC should fail.
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "Endpoint read failed"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
validate_host_override_string("foo.test.google.fr:1234", call_details.host, validate_host_override_string("foo.test.google.fr:1234", call_details.host,
config); config);

@ -193,7 +193,7 @@ static void test_keepalive_timeout(grpc_end2end_test_config config) {
char *details_str = grpc_slice_to_c_string(details); char *details_str = grpc_slice_to_c_string(details);
char *method_str = grpc_slice_to_c_string(call_details.method); char *method_str = grpc_slice_to_c_string(call_details.method);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); GPR_ASSERT(status == GRPC_STATUS_INTERNAL);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "keepalive watchdog timeout")); GPR_ASSERT(0 == grpc_slice_str_cmp(details, "keepalive watchdog timeout"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
validate_host_override_string("foo.test.google.fr:1234", call_details.host, validate_host_override_string("foo.test.google.fr:1234", call_details.host,

@ -203,8 +203,7 @@ static void test_max_age_forcibly_close(grpc_end2end_test_config config) {
/* The connection should be closed immediately after the max age grace period, /* The connection should be closed immediately after the max age grace period,
the in-progress RPC should fail. */ the in-progress RPC should fail. */
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); GPR_ASSERT(status == GRPC_STATUS_INTERNAL);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "Endpoint read failed"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
validate_host_override_string("foo.test.google.fr:1234", call_details.host, validate_host_override_string("foo.test.google.fr:1234", call_details.host,
config); config);

@ -159,7 +159,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
grpc_server_destroy(f.server); grpc_server_destroy(f.server);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); GPR_ASSERT(status == GRPC_STATUS_INTERNAL || status == GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
validate_host_override_string("foo.test.google.fr:1234", call_details.host, validate_host_override_string("foo.test.google.fr:1234", call_details.host,
config); config);

Loading…
Cancel
Save