From 9490389f868f6addf0e27f8c299f1e63c76dde1e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 11 Oct 2016 15:43:35 -0700 Subject: [PATCH] Refine error handling in call --- src/core/lib/iomgr/error.c | 1 + src/core/lib/surface/call.c | 145 ++++++++++++++++++------------------ 2 files changed, 72 insertions(+), 74 deletions(-) diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 82edcfd16c6..f6eb8e02c97 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -349,6 +349,7 @@ static grpc_error *recursively_find_error_with_status(grpc_error *error, if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) { return error; } + if (is_special(error)) return NULL; // Otherwise, search through its children. intptr_t key = 0; while (true) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index ed186271e7c..04979ff4601 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -62,7 +62,7 @@ /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch - api: + api: - initial metadata send - message send - status/close send (depending on client/server) @@ -98,13 +98,17 @@ typedef struct { grpc_slice details; } received_status; +#define MAX_ERRORS_PER_BATCH 3 + typedef struct batch_control { grpc_call *call; grpc_cq_completion cq_completion; grpc_closure finish_batch; void *notify_tag; gpr_refcount steps_to_complete; - grpc_error *error; + + grpc_error *errors[MAX_ERRORS_PER_BATCH]; + gpr_atm num_errors; uint8_t send_initial_metadata; uint8_t send_message; @@ -186,6 +190,7 @@ struct grpc_call { grpc_call *sibling_prev; grpc_slice_buffer_stream sending_stream; + grpc_byte_stream *receiving_stream; grpc_byte_buffer **receiving_buffer; grpc_slice receiving_slice; @@ -1000,14 +1005,74 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } +static grpc_error *consolidate_batch_errors(batch_control *bctl) { + size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors); + if (n == 0) { + return GRPC_ERROR_NONE; + } else if (n == 1) { + return GRPC_ERROR_REF(bctl->errors[0]); + } else { + return GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n); + } +} + static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_call *child_call; + grpc_call *next_child_call; grpc_call *call = bctl->call; - grpc_error *error = bctl->error; + grpc_error *error = consolidate_batch_errors(bctl); + + gpr_mu_lock(&call->mu); + + if (error != GRPC_ERROR_NONE) { + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); + } + + if (bctl->send_initial_metadata) { + grpc_metadata_batch_destroy( + exec_ctx, + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); + } + if (bctl->send_final_op) { + grpc_metadata_batch_destroy( + exec_ctx, + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); + } if (bctl->recv_final_op) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(exec_ctx, call, md); + + call->received_final_op = true; + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + + if (call->is_client) { + get_final_status(call, set_status_value_directly, + call->final_op.client.status); + get_final_details(call, call->final_op.client.status_details); + } else { + get_final_status(call, set_cancelled_value, + call->final_op.server.cancelled); + } + GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } + gpr_mu_unlock(&call->mu); + if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ grpc_closure_run(exec_ctx, bctl->notify_tag, error); @@ -1171,11 +1236,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; - cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); - if (bctl->error == GRPC_ERROR_NONE) { - bctl->error = GRPC_ERROR_CREATE("Call batch operation failed"); - } - bctl->error = grpc_error_add_child(bctl->error, error); + int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); + bctl->errors[idx] = error; } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, @@ -1223,76 +1285,12 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - grpc_call *child_call; - grpc_call *next_child_call; - - GRPC_ERROR_REF(error); - - gpr_mu_lock(&call->mu); - - // If the error has an associated status code, set the call's status. - intptr_t status; - if (error != GRPC_ERROR_NONE && - grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { - set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); - } - - if (bctl->send_initial_metadata) { - if (error != GRPC_ERROR_NONE) { - set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); - } - grpc_metadata_batch_destroy( - exec_ctx, - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); - } - if (bctl->send_message) { - call->sending_message = 0; - } - if (bctl->send_final_op) { - grpc_metadata_batch_destroy( - exec_ctx, - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); - } - if (bctl->recv_final_op) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(exec_ctx, call, md); - - call->received_final_op = true; - /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); - } - if (call->is_client) { - get_final_status(call, set_status_value_directly, - call->final_op.client.status); - get_final_details(call, call->final_op.client.status_details); - } else { - get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled); - } - - GRPC_ERROR_UNREF(error); - error = GRPC_ERROR_NONE; - } add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); - gpr_mu_unlock(&call->mu); + if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } - - GRPC_ERROR_UNREF(error); } static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, @@ -1326,7 +1324,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); - bctl->error = GRPC_ERROR_NONE; if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); }