Refine error handling in call

reviewable/pr8842/r2
Craig Tiller 8 years ago
parent 0d8e0b138d
commit 9490389f86
  1. 1
      src/core/lib/iomgr/error.c
  2. 143
      src/core/lib/surface/call.c

@ -349,6 +349,7 @@ static grpc_error *recursively_find_error_with_status(grpc_error *error,
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) { if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) {
return error; return error;
} }
if (is_special(error)) return NULL;
// Otherwise, search through its children. // Otherwise, search through its children.
intptr_t key = 0; intptr_t key = 0;
while (true) { while (true) {

@ -98,13 +98,17 @@ typedef struct {
grpc_slice details; grpc_slice details;
} received_status; } received_status;
#define MAX_ERRORS_PER_BATCH 3
typedef struct batch_control { typedef struct batch_control {
grpc_call *call; grpc_call *call;
grpc_cq_completion cq_completion; grpc_cq_completion cq_completion;
grpc_closure finish_batch; grpc_closure finish_batch;
void *notify_tag; void *notify_tag;
gpr_refcount steps_to_complete; gpr_refcount steps_to_complete;
grpc_error *error;
grpc_error *errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
uint8_t send_initial_metadata; uint8_t send_initial_metadata;
uint8_t send_message; uint8_t send_message;
@ -186,6 +190,7 @@ struct grpc_call {
grpc_call *sibling_prev; grpc_call *sibling_prev;
grpc_slice_buffer_stream sending_stream; grpc_slice_buffer_stream sending_stream;
grpc_byte_stream *receiving_stream; grpc_byte_stream *receiving_stream;
grpc_byte_buffer **receiving_buffer; grpc_byte_buffer **receiving_buffer;
grpc_slice receiving_slice; grpc_slice receiving_slice;
@ -1000,14 +1005,74 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} }
static grpc_error *consolidate_batch_errors(batch_control *bctl) {
size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors);
if (n == 0) {
return GRPC_ERROR_NONE;
} else if (n == 1) {
return GRPC_ERROR_REF(bctl->errors[0]);
} else {
return GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n);
}
}
static void post_batch_completion(grpc_exec_ctx *exec_ctx, static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) { batch_control *bctl) {
grpc_call *child_call;
grpc_call *next_child_call;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
grpc_error *error = bctl->error; grpc_error *error = consolidate_batch_errors(bctl);
gpr_mu_lock(&call->mu);
if (error != GRPC_ERROR_NONE) {
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error);
}
if (bctl->send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->send_final_op) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->recv_final_op) { if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
call->received_final_op = true;
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call, NULL);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status);
get_final_details(call, call->final_op.client.status_details);
} else {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled);
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE; error = GRPC_ERROR_NONE;
} }
gpr_mu_unlock(&call->mu);
if (bctl->is_notify_tag_closure) { if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */ /* unrefs bctl->error */
grpc_closure_run(exec_ctx, bctl->notify_tag, error); grpc_closure_run(exec_ctx, bctl->notify_tag, error);
@ -1171,11 +1236,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error) { grpc_error *error) {
if (error == GRPC_ERROR_NONE) return; if (error == GRPC_ERROR_NONE) return;
cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
if (bctl->error == GRPC_ERROR_NONE) { bctl->errors[idx] = error;
bctl->error = GRPC_ERROR_CREATE("Call batch operation failed");
}
bctl->error = grpc_error_add_child(bctl->error, error);
} }
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
@ -1223,76 +1285,12 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) { grpc_error *error) {
batch_control *bctl = bctlp; batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
grpc_call *next_child_call;
GRPC_ERROR_REF(error);
gpr_mu_lock(&call->mu);
// If the error has an associated status code, set the call's status.
intptr_t status;
if (error != GRPC_ERROR_NONE &&
grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error);
}
if (bctl->send_initial_metadata) {
if (error != GRPC_ERROR_NONE) {
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error);
}
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->send_message) {
call->sending_message = 0;
}
if (bctl->send_final_op) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
call->received_final_op = true;
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call, NULL);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status);
get_final_details(call, call->final_op.client.status_details);
} else {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled);
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
gpr_mu_unlock(&call->mu);
if (gpr_unref(&bctl->steps_to_complete)) { if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl); post_batch_completion(exec_ctx, bctl);
} }
GRPC_ERROR_UNREF(error);
} }
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
@ -1326,7 +1324,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) { if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion"); GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->error = GRPC_ERROR_NONE;
if (!is_notify_tag_closure) { if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag); grpc_cq_begin_op(call->cq, notify_tag);
} }

Loading…
Cancel
Save