Make transport-level errors be reflected in status messages on calls

Allows us to eliminate logging those errors by default (since they are explicitly passed up to the application).

Required plumbing errors through the stack a little more deeply than we
had previously.
pull/7025/head
Craig Tiller 9 years ago
parent d605b63383
commit f0f70a8a68
  1. 7
      src/core/ext/client_config/subchannel_call_holder.c
  2. 229
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 3
      src/core/ext/transport/chttp2/transport/internal.h
  4. 2
      src/core/lib/channel/channel_stack.c
  5. 14
      src/core/lib/iomgr/error.c
  6. 16
      src/core/lib/iomgr/error.h
  7. 3
      src/core/lib/security/transport/client_auth_filter.c
  8. 105
      src/core/lib/surface/call.c
  9. 2
      src/core/lib/surface/completion_queue.c
  10. 65
      src/core/lib/transport/transport.c
  11. 9
      src/core/lib/transport/transport.h
  12. 15
      src/core/lib/transport/transport_op_string.c

@ -120,16 +120,13 @@ retry:
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_with_status != GRPC_STATUS_OK) {
if (op->cancel_error != GRPC_ERROR_NONE) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
goto retry;
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, holder,
grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
GRPC_ERROR_INT_GRPC_STATUS,
op->cancel_with_status));
fail_locked(exec_ctx, holder, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,

@ -106,14 +106,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message);
grpc_error *error);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message);
grpc_error *error);
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
@ -163,8 +161,6 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->ep == NULL);
gpr_slice_unref(t->optional_drop_message);
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@ -266,7 +262,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->parsing.is_first_frame = true;
t->writing.is_client = is_client;
t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@ -876,7 +871,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE, NULL);
grpc_error_set_int(
GRPC_ERROR_CREATE("Stream IDs exhausted"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
@ -958,14 +955,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
if (op->cancel_with_status != GRPC_STATUS_OK) {
if (op->cancel_error != GRPC_ERROR_NONE) {
cancel_from_api(exec_ctx, transport_global, stream_global,
op->cancel_with_status, op->optional_close_message);
GRPC_ERROR_REF(op->cancel_error));
}
if (op->close_with_status != GRPC_STATUS_OK) {
if (op->close_error != GRPC_ERROR_NONE) {
close_from_api(exec_ctx, transport_global, stream_global,
op->close_with_status, op->optional_close_message);
GRPC_ERROR_REF(op->close_error));
}
if (op->send_initial_metadata != NULL) {
@ -979,12 +976,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
gpr_log(GPR_DEBUG,
"to-be-sent initial metadata size exceeds peer limit "
"(%" PRIuPTR " vs. %" PRIuPTR ")",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
cancel_from_api(
exec_ctx, transport_global, stream_global,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(
GRPC_ERROR_CREATE("to-be-sent initial metadata size "
"exceeds peer limit"),
GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size),
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@ -1038,12 +1039,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
gpr_log(GPR_DEBUG,
"to-be-sent trailing metadata size exceeds peer limit "
"(%" PRIuPTR " vs. %" PRIuPTR ")",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
cancel_from_api(
exec_ctx, transport_global, stream_global,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(
GRPC_ERROR_CREATE("to-be-sent trailing metadata size "
"exceeds peer limit"),
GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size),
GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@ -1235,8 +1240,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
cancel_from_api(
exec_ctx, transport_global, stream_global,
grpc_error_set_int(
GRPC_ERROR_CREATE(
"received initial metadata size exceeds limit"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@ -1275,8 +1284,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
cancel_from_api(
exec_ctx, transport_global, stream_global,
grpc_error_set_int(
GRPC_ERROR_CREATE(
"received trailing metadata size exceeds limit"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@ -1340,35 +1353,67 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
static void status_codes_from_error(grpc_error *error,
grpc_chttp2_error_code *http2_error,
grpc_status_code *grpc_status) {
intptr_t ip_http;
intptr_t ip_grpc;
bool have_http =
grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &ip_http);
bool have_grpc =
grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &ip_grpc);
if (have_http) {
*http2_error = (grpc_chttp2_error_code)ip_http;
} else if (have_grpc) {
*http2_error =
grpc_chttp2_grpc_status_to_http2_error((grpc_status_code)ip_grpc);
} else {
*http2_error = GRPC_CHTTP2_INTERNAL_ERROR;
}
if (have_grpc) {
*grpc_status = (grpc_status_code)ip_grpc;
} else if (have_http) {
*grpc_status =
grpc_chttp2_http2_error_to_grpc_status((grpc_chttp2_error_code)ip_http);
} else {
*grpc_status = GRPC_STATUS_INTERNAL;
}
}
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_error *due_to_error) {
if (!stream_global->read_closed || !stream_global->write_closed) {
grpc_status_code grpc_status;
grpc_chttp2_error_code http_error;
status_codes_from_error(due_to_error, &http_error, &grpc_status);
if (stream_global->id != 0) {
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(
stream_global->id,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
&stream_global->stats.outgoing));
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
&stream_global->stats.outgoing));
}
if (optional_message) {
gpr_slice_ref(*optional_message);
const char *msg =
grpc_error_get_str(due_to_error, GRPC_ERROR_STR_GRPC_MESSAGE);
bool free_msg = false;
if (msg == NULL) {
free_msg = true;
msg = grpc_error_string(due_to_error);
}
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
optional_message);
gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
grpc_status, &msg_slice);
if (free_msg) grpc_error_free_string(msg);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_mark_stream_closed(
exec_ctx, transport_global, stream_global, 1, 1,
grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
GRPC_ERROR_INT_GRPC_STATUS, status));
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, due_to_error);
}
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
@ -1469,15 +1514,17 @@ void grpc_chttp2_mark_stream_closed(
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_error *error) {
gpr_slice hdr;
gpr_slice status_hdr;
gpr_slice message_pfx;
uint8_t *p;
uint32_t len = 0;
grpc_status_code grpc_status;
grpc_chttp2_error_code http_error;
status_codes_from_error(error, &http_error, &grpc_status);
GPR_ASSERT(status >= 0 && (int)status < 100);
GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
if (stream_global->id != 0 && !transport_global->is_client) {
/* Hand roll a header block.
@ -1487,7 +1534,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
time we got around to sending this, so instead we ignore HPACK
compression
and just write the uncompressed bytes onto the wire. */
status_hdr = gpr_slice_malloc(15 + (status >= 10));
status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10));
p = GPR_SLICE_START_PTR(status_hdr);
*p++ = 0x40; /* literal header */
*p++ = 11; /* len(grpc-status) */
@ -1502,19 +1549,23 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
*p++ = 't';
*p++ = 'u';
*p++ = 's';
if (status < 10) {
if (grpc_status < 10) {
*p++ = 1;
*p++ = (uint8_t)('0' + status);
*p++ = (uint8_t)('0' + grpc_status);
} else {
*p++ = 2;
*p++ = (uint8_t)('0' + (status / 10));
*p++ = (uint8_t)('0' + (status % 10));
*p++ = (uint8_t)('0' + (grpc_status / 10));
*p++ = (uint8_t)('0' + (grpc_status % 10));
}
GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
len += (uint32_t)GPR_SLICE_LENGTH(status_hdr);
if (optional_message) {
GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
const char *optional_message =
grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
if (optional_message != NULL) {
size_t msg_len = strlen(optional_message);
GPR_ASSERT(msg_len < 127);
message_pfx = gpr_slice_malloc(15);
p = GPR_SLICE_START_PTR(message_pfx);
*p++ = 0x40;
@ -1531,10 +1582,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
*p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message);
*p++ = (uint8_t)msg_len;
GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
len += (uint32_t)GPR_SLICE_LENGTH(message_pfx);
len += (uint32_t)GPR_SLICE_LENGTH(*optional_message);
len += (uint32_t)msg_len;
}
hdr = gpr_slice_malloc(9);
@ -1555,53 +1606,54 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
if (optional_message) {
gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
gpr_slice_buffer_add(&transport_global->qbuf,
gpr_slice_ref(*optional_message));
gpr_slice_from_copied_string(optional_message));
}
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR,
&stream_global->stats.outgoing));
if (optional_message) {
gpr_slice_ref(*optional_message);
}
}
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
optional_message);
grpc_error *err = GRPC_ERROR_CREATE("Stream closed");
err = grpc_error_set_int(err, GRPC_ERROR_INT_GRPC_STATUS, status);
if (optional_message) {
char *str =
gpr_dump_slice(*optional_message, GPR_DUMP_HEX | GPR_DUMP_ASCII);
err = grpc_error_set_str(err, GRPC_ERROR_STR_GRPC_MESSAGE, str);
gpr_free(str);
const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
bool free_msg = false;
if (msg == NULL) {
free_msg = true;
msg = grpc_error_string(error);
}
gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
grpc_status, &msg_slice);
if (free_msg) grpc_error_free_string(msg);
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, err);
1, error);
}
typedef struct {
grpc_exec_ctx *exec_ctx;
grpc_error *error;
} cancel_stream_cb_args;
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global);
cancel_from_api(user_data, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE,
GPR_SLICE_IS_EMPTY(transport->optional_drop_message)
? NULL
: &transport->optional_drop_message);
cancel_stream_cb_args *args = user_data;
cancel_from_api(args->exec_ctx, transport_global, stream_global,
GRPC_ERROR_REF(args->error));
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
cancel_stream_cb_args args = {exec_ctx, error};
grpc_chttp2_for_all_streams(&t->global, &args, cancel_stream_cb);
GRPC_ERROR_UNREF(error);
}
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
close_transport_locked(exec_ctx, t, error);
end_all_the_calls(exec_ctx, t);
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
/** update window from a settings change */
@ -1708,15 +1760,7 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
t->read_buffer.slices[i]);
};
if (i != t->read_buffer.count) {
gpr_slice_unref(t->optional_drop_message);
errors[2] = try_http_parsing(exec_ctx, t);
if (errors[2] != GRPC_ERROR_NONE) {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received http1.x response");
} else {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received unparseable response");
}
}
grpc_error *err =
errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
@ -1784,6 +1828,10 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (!t->executor.writing_active && t->ep) {
@ -1798,6 +1846,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@ -1806,8 +1855,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
GRPC_LOG_IF_ERROR("close_transport", error);
}
/*******************************************************************************

@ -384,9 +384,6 @@ struct grpc_chttp2_transport {
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
/** Message explaining the reason of dropping connection */
gpr_slice optional_drop_message;
};
typedef struct {

@ -263,6 +263,6 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
grpc_transport_stream_op op;
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
op.cancel_error = GRPC_ERROR_CANCELLED;
grpc_call_next_op(exec_ctx, cur_elem, &op);
}

@ -37,6 +37,7 @@
#include <stdbool.h>
#include <string.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include <grpc/support/log.h>
@ -115,6 +116,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "wsa_error";
case GRPC_ERROR_INT_HTTP_STATUS:
return "http_status";
case GRPC_ERROR_INT_LIMIT:
return "limit";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@ -271,6 +274,12 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
void *pp;
if (is_special(err)) {
if (err == GRPC_ERROR_CANCELLED && which == GRPC_ERROR_INT_GRPC_STATUS) {
return GRPC_STATUS_CANCELLED;
}
return false;
}
if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
if (p != NULL) *p = (intptr_t)pp;
return true;
@ -286,6 +295,11 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
return new;
}
const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
if (is_special(err)) return NULL;
return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
grpc_error *new = copy_error_and_unref(src);
new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);

@ -92,6 +92,8 @@ typedef enum {
GRPC_ERROR_INT_FD,
/// HTTP status (i.e. 404)
GRPC_ERROR_INT_HTTP_STATUS,
/// context sensitive limit associated with the error
GRPC_ERROR_INT_LIMIT,
} grpc_error_ints;
typedef enum {
@ -163,23 +165,25 @@ void grpc_error_unref(grpc_error *err);
#endif
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value);
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
gpr_timespec value);
gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value);
const char *value) GRPC_MUST_USE_RESULT;
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them.
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child);
grpc_error *grpc_error_add_child(grpc_error *src,
grpc_error *child) GRPC_MUST_USE_RESULT;
grpc_error *grpc_os_error(const char *file, int line, int err,
const char *call_name);
const char *call_name) GRPC_MUST_USE_RESULT;
/// create an error associated with errno!=0 (an 'operating system' error)
#define GRPC_OS_ERROR(err, call_name) \
grpc_os_error(__FILE__, __LINE__, err, call_name)
grpc_error *grpc_wsa_error(const char *file, int line, int err,
const char *call_name);
const char *call_name) GRPC_MUST_USE_RESULT;
/// windows only: create an error associated with WSAGetLastError()!=0
#define GRPC_WSA_ERROR(err, call_name) \
grpc_wsa_error(__FILE__, __LINE__, err, call_name)

@ -220,8 +220,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
if (calld->security_context_set == 0 &&
op->cancel_with_status == GRPC_STATUS_OK) {
if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) {
calld->security_context_set = 1;
GPR_ASSERT(op->context);
if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) {

@ -402,8 +402,50 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = (grpc_status_code)status;
}
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
GRPC_MDSTR_UNREF(call->status[source].details);
}
call->status[source].details = status;
}
static void get_final_status(grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) {
set_value(call->status[i].code, set_value_user_data);
return;
}
}
if (call->is_client) {
set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
} else {
set_value(GRPC_STATUS_OK, set_value_user_data);
}
}
/* TODO(ctiller): what to do about the flush that was previously here */
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
intptr_t status;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
set_status_code(call, source, (uint32_t)status);
} else {
set_status_code(call, source, GRPC_STATUS_INTERNAL);
}
const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
bool free_msg = false;
if (msg == NULL) {
free_msg = true;
msg = grpc_error_string(error);
}
set_status_details(call, source, grpc_mdstr_from_string(msg));
if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@ -492,32 +534,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
return encodings_accepted_by_peer;
}
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
GRPC_MDSTR_UNREF(call->status[source].details);
}
call->status[source].details = status;
}
static void get_final_status(grpc_call *call,
void (*set_value)(grpc_status_code code,
void *user_data),
void *set_value_user_data) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) {
set_value(call->status[i].code, set_value_user_data);
return;
}
}
if (call->is_client) {
set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
} else {
set_value(GRPC_STATUS_OK, set_value_user_data);
}
}
static void get_final_details(grpc_call *call, char **out_details,
size_t *out_details_capacity) {
int i;
@ -741,8 +757,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_status_code status;
gpr_slice optional_message;
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
} termination_closure;
@ -758,7 +773,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close");
break;
}
gpr_slice_unref(tc->optional_message);
GRPC_ERROR_UNREF(tc->error);
grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@ -767,7 +782,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
op.cancel_with_status = tc->status;
op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
op.on_complete = &tc->closure;
@ -778,8 +793,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
tc->optional_message = gpr_slice_ref(tc->optional_message);
grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message);
op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
tc->op_closure = op.on_complete;
@ -789,14 +803,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
termination_closure *tc) {
grpc_mdstr *details = NULL;
if (GPR_SLICE_LENGTH(tc->optional_message) > 0) {
tc->optional_message = gpr_slice_ref(tc->optional_message);
details = grpc_mdstr_from_slice(tc->optional_message);
}
set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status);
set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details);
set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error);
if (tc->type == TC_CANCEL) {
grpc_closure_init(&tc->closure, send_cancel, tc);
@ -812,13 +819,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CANCEL;
tc->call = c;
tc->optional_message = gpr_slice_from_copied_string(description);
GPR_ASSERT(status != GRPC_STATUS_OK);
tc->status = status;
tc->error = grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE(description),
GRPC_ERROR_STR_GRPC_MESSAGE, description),
GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}
@ -826,13 +835,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CLOSE;
tc->call = c;
tc->optional_message = gpr_slice_from_copied_string(description);
GPR_ASSERT(status != GRPC_STATUS_OK);
tc->status = status;
tc->error = grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE(description),
GRPC_ERROR_STR_GRPC_MESSAGE, description),
GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}

@ -240,7 +240,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
"grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
"done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures) {
if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
grpc_error_free_string(errmsg);

@ -36,6 +36,7 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@ -162,55 +163,63 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL);
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status) {
GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
}
if (op->close_with_status != GRPC_STATUS_OK) {
op->close_with_status = GRPC_STATUS_OK;
if (op->optional_close_message != NULL) {
gpr_slice_unref(*op->optional_close_message);
op->optional_close_message = NULL;
}
}
}
typedef struct {
gpr_slice message;
grpc_error *error;
grpc_closure *then_call;
grpc_closure closure;
} close_message_data;
static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
close_message_data *cmd = p;
gpr_slice_unref(cmd->message);
GRPC_ERROR_UNREF(cmd->error);
if (cmd->then_call != NULL) {
cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, GRPC_ERROR_REF(error));
}
gpr_free(cmd);
}
static void add_error(grpc_transport_stream_op *op, grpc_error **which,
grpc_error *error) {
close_message_data *cmd;
cmd = gpr_malloc(sizeof(*cmd));
cmd->error = error;
cmd->then_call = op->on_complete;
grpc_closure_init(&cmd->closure, free_message, cmd);
op->on_complete = &cmd->closure;
*which = error;
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status) {
GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_error == GRPC_STATUS_OK) {
op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED,
GRPC_ERROR_INT_GRPC_STATUS, status);
op->close_error = GRPC_ERROR_NONE;
}
}
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
grpc_status_code status,
gpr_slice *optional_message) {
close_message_data *cmd;
GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_with_status != GRPC_STATUS_OK ||
op->close_with_status != GRPC_STATUS_OK) {
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
if (optional_message) {
gpr_slice_unref(*optional_message);
}
return;
}
if (optional_message) {
cmd = gpr_malloc(sizeof(*cmd));
cmd->message = *optional_message;
cmd->then_call = op->on_complete;
grpc_closure_init(&cmd->closure, free_message, cmd);
op->on_complete = &cmd->closure;
op->optional_close_message = &cmd->message;
grpc_error *error;
if (optional_message != NULL) {
char *msg = gpr_dump_slice(*optional_message, GPR_DUMP_ASCII);
error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
GRPC_ERROR_STR_GRPC_MESSAGE, msg);
gpr_free(msg);
gpr_slice_unref(*optional_message);
} else {
error = GRPC_ERROR_CREATE("Call force closed");
}
op->close_with_status = status;
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
}

@ -135,13 +135,12 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
/** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
/** If != GRPC_ERROR_NONE, cancel this stream */
grpc_error *cancel_error;
/** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this
/** If != GRPC_ERROR, send grpc-status, grpc-message, and close this
stream for both reading and writing */
grpc_status_code close_with_status;
gpr_slice *optional_close_message;
grpc_error *close_error;
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;

@ -119,10 +119,21 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA"));
}
if (op->cancel_with_status != GRPC_STATUS_OK) {
if (op->cancel_error != GRPC_STATUS_OK) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
const char *msg = grpc_error_string(op->cancel_error);
gpr_asprintf(&tmp, "CANCEL:%s", msg);
grpc_error_free_string(msg);
gpr_strvec_add(&b, tmp);
}
if (op->close_error != GRPC_STATUS_OK) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
const char *msg = grpc_error_string(op->close_error);
gpr_asprintf(&tmp, "CLOSE:%s", msg);
grpc_error_free_string(msg);
gpr_strvec_add(&b, tmp);
}

Loading…
Cancel
Save