Provide reasons in error messages when streams are canceled with a connection drop

pull/6773/head
Yuchen Zeng 9 years ago
parent 0d6196025e
commit a9d8c5564e
  1. 74
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 3
      src/core/ext/transport/chttp2/transport/internal.h

@ -47,6 +47,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@ -107,7 +108,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
grpc_status_code status,
gpr_slice *optional_message);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@ -161,6 +163,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->ep == NULL);
gpr_slice_unref(t->optional_drop_message);
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@ -260,6 +264,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@ -859,7 +864,7 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE);
GRPC_STATUS_UNAVAILABLE, NULL);
}
}
@ -936,7 +941,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(exec_ctx, transport_global, stream_global,
op->cancel_with_status);
op->cancel_with_status, op->optional_close_message);
}
if (op->close_with_status != GRPC_STATUS_OK) {
@ -960,7 +965,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@ -1015,7 +1020,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@ -1201,7 +1206,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@ -1240,7 +1245,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@ -1303,7 +1308,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
grpc_status_code status,
gpr_slice *optional_message) {
if (!stream_global->read_closed || !stream_global->write_closed) {
if (stream_global->id != 0) {
gpr_slice_buffer_add(
@ -1313,8 +1319,12 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
&stream_global->stats.outgoing));
}
if (optional_message) {
gpr_slice_ref(*optional_message);
}
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
NULL);
optional_message);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
stream_global->seen_error = true;
@ -1519,16 +1529,23 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
1);
}
typedef struct {
grpc_exec_ctx *exec_ctx;
gpr_slice optional_drop_message;
} cancel_stream_cb_arg;
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
cancel_from_api(user_data, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE);
cancel_stream_cb_arg *arg = user_data;
cancel_from_api(arg->exec_ctx, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE, &arg->optional_drop_message);
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
cancel_stream_cb_arg arg = {exec_ctx, t->optional_drop_message};
grpc_chttp2_for_all_streams(&t->global, &arg, cancel_stream_cb);
}
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
@ -1599,6 +1616,31 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
}
}
static bool try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
return false;
grpc_http_parser parser;
size_t i = 0;
bool success = false;
grpc_http_parser_init(&parser);
grpc_http1_trace = 1;
for (; i < t->read_buffer.count &&
grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
i++)
;
if (grpc_http_parser_eof(&parser) && parser.type == GRPC_HTTP_RESPONSE) {
success = true;
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, "Trying to connect an http1.x server, received status:%d",
parser.http.response.status));
}
grpc_http_parser_destroy(&parser);
return success;
}
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_chttp2_transport *t = arg;
GPR_TIMER_BEGIN("reading_action.parse", 0);
@ -1610,6 +1652,14 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
;
if (i != t->read_buffer.count) {
success = false;
gpr_slice_unref(t->optional_drop_message);
if (try_http_parsing(exec_ctx, t)) {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received http1.x response");
} else {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received unparseable response");
}
}
GPR_TIMER_END("reading_action.parse", 0);
grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked,

@ -383,6 +383,9 @@ struct grpc_chttp2_transport {
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
/** Message explaining the reason of dropping connection */
gpr_slice optional_drop_message;
};
typedef struct {

Loading…
Cancel
Save