|
|
|
@ -86,8 +86,11 @@ typedef enum { |
|
|
|
|
/* Status came from 'the wire' - or somewhere below the surface
|
|
|
|
|
layer */ |
|
|
|
|
STATUS_FROM_WIRE, |
|
|
|
|
/* Status was created by some internal channel stack operation */ |
|
|
|
|
/* Status was created by some internal channel stack operation: must come via
|
|
|
|
|
add_batch_error */ |
|
|
|
|
STATUS_FROM_CORE, |
|
|
|
|
/* Status was created by some surface error */ |
|
|
|
|
STATUS_FROM_SURFACE, |
|
|
|
|
/* Status came from the server sending status */ |
|
|
|
|
STATUS_FROM_SERVER_STATUS, |
|
|
|
|
STATUS_SOURCE_COUNT |
|
|
|
@ -212,6 +215,8 @@ struct grpc_call { |
|
|
|
|
void *saved_receiving_stream_ready_bctlp; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
int grpc_call_error_trace = 0; |
|
|
|
|
|
|
|
|
|
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) |
|
|
|
|
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) |
|
|
|
|
#define CALL_ELEM_FROM_CALL(call, idx) \ |
|
|
|
@ -221,11 +226,11 @@ struct grpc_call { |
|
|
|
|
|
|
|
|
|
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, |
|
|
|
|
grpc_transport_stream_op *op); |
|
|
|
|
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
grpc_status_code status, |
|
|
|
|
const char *description); |
|
|
|
|
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
status_source source, grpc_status_code status, |
|
|
|
|
const char *description); |
|
|
|
|
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
grpc_error *error); |
|
|
|
|
status_source source, grpc_error *error); |
|
|
|
|
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, |
|
|
|
|
grpc_error *error); |
|
|
|
|
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, |
|
|
|
@ -242,10 +247,18 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); |
|
|
|
|
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, |
|
|
|
|
grpc_error *error); |
|
|
|
|
|
|
|
|
|
static void add_init_error(grpc_error **composite, grpc_error *new) { |
|
|
|
|
if (new == GRPC_ERROR_NONE) return; |
|
|
|
|
if (*composite == GRPC_ERROR_NONE) |
|
|
|
|
*composite = GRPC_ERROR_CREATE("Call creation failed"); |
|
|
|
|
*composite = grpc_error_add_child(*composite, new); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
const grpc_call_create_args *args, |
|
|
|
|
grpc_call **out_call) { |
|
|
|
|
size_t i, j; |
|
|
|
|
grpc_error *error = GRPC_ERROR_NONE; |
|
|
|
|
grpc_channel_stack *channel_stack = |
|
|
|
|
grpc_channel_get_channel_stack(args->channel); |
|
|
|
|
grpc_call *call; |
|
|
|
@ -304,12 +317,18 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
/* TODO(ctiller): This should change to use the appropriate census start_op
|
|
|
|
|
* call. */ |
|
|
|
|
if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { |
|
|
|
|
GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); |
|
|
|
|
if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { |
|
|
|
|
add_init_error(&error, |
|
|
|
|
GRPC_ERROR_CREATE("Census tracing propagation requested " |
|
|
|
|
"without Census context propagation")); |
|
|
|
|
} |
|
|
|
|
grpc_call_context_set( |
|
|
|
|
call, GRPC_CONTEXT_TRACING, |
|
|
|
|
args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); |
|
|
|
|
} else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { |
|
|
|
|
add_init_error(&error, |
|
|
|
|
GRPC_ERROR_CREATE("Census context propagation requested " |
|
|
|
|
"without Census tracing propagation")); |
|
|
|
|
} |
|
|
|
|
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { |
|
|
|
|
call->cancellation_is_inherited = 1; |
|
|
|
@ -332,12 +351,14 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); |
|
|
|
|
/* initial refcount dropped by grpc_call_destroy */ |
|
|
|
|
grpc_error *error = grpc_call_stack_init( |
|
|
|
|
exec_ctx, channel_stack, 1, destroy_call, call, call->context, |
|
|
|
|
args->server_transport_data, path, call->start_time, send_deadline, |
|
|
|
|
CALL_STACK_FROM_CALL(call)); |
|
|
|
|
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, |
|
|
|
|
destroy_call, call, call->context, |
|
|
|
|
args->server_transport_data, path, |
|
|
|
|
call->start_time, send_deadline, |
|
|
|
|
CALL_STACK_FROM_CALL(call))); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); |
|
|
|
|
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
if (args->cq != NULL) { |
|
|
|
|
GPR_ASSERT( |
|
|
|
@ -512,7 +533,6 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, |
|
|
|
|
grpc_status_code status, |
|
|
|
|
const char *description, |
|
|
|
|
void *reserved) { |
|
|
|
|
grpc_call_error r; |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
GRPC_API_TRACE( |
|
|
|
|
"grpc_call_cancel_with_status(" |
|
|
|
@ -520,16 +540,16 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, |
|
|
|
|
4, (c, (int)status, description, reserved)); |
|
|
|
|
GPR_ASSERT(reserved == NULL); |
|
|
|
|
gpr_mu_lock(&c->mu); |
|
|
|
|
r = cancel_with_status(&exec_ctx, c, status, description); |
|
|
|
|
cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, |
|
|
|
|
description); |
|
|
|
|
gpr_mu_unlock(&c->mu); |
|
|
|
|
grpc_exec_ctx_finish(&exec_ctx); |
|
|
|
|
return r; |
|
|
|
|
return GRPC_CALL_OK; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
typedef struct termination_closure { |
|
|
|
|
grpc_closure closure; |
|
|
|
|
grpc_call *call; |
|
|
|
|
grpc_error *error; |
|
|
|
|
grpc_transport_stream_op op; |
|
|
|
|
} termination_closure; |
|
|
|
|
|
|
|
|
@ -544,36 +564,27 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
termination_closure *tc = tcp; |
|
|
|
|
memset(&tc->op, 0, sizeof(tc->op)); |
|
|
|
|
tc->op.cancel_error = tc->error; |
|
|
|
|
tc->op.cancel_error = GRPC_ERROR_REF(error); |
|
|
|
|
/* reuse closure to catch completion */ |
|
|
|
|
grpc_closure_init(&tc->closure, done_termination, tc, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
tc->op.on_complete = &tc->closure; |
|
|
|
|
tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
execute_op(exec_ctx, tc->call, &tc->op); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, |
|
|
|
|
termination_closure *tc) { |
|
|
|
|
set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, |
|
|
|
|
GRPC_ERROR_REF(tc->error)); |
|
|
|
|
grpc_closure_init(&tc->closure, send_termination, tc, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CALL_INTERNAL_REF(tc->call, "termination"); |
|
|
|
|
grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); |
|
|
|
|
return GRPC_CALL_OK; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_call *c, grpc_error *error) { |
|
|
|
|
static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
termination_closure *tc = gpr_malloc(sizeof(*tc)); |
|
|
|
|
memset(tc, 0, sizeof(*tc)); |
|
|
|
|
tc->call = c; |
|
|
|
|
tc->error = error; |
|
|
|
|
return terminate_with_status(exec_ctx, tc); |
|
|
|
|
GRPC_CALL_INTERNAL_REF(tc->call, "termination"); |
|
|
|
|
grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination, |
|
|
|
|
tc, grpc_schedule_on_exec_ctx), |
|
|
|
|
error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
status_source source, grpc_error *error) { |
|
|
|
|
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); |
|
|
|
|
terminate_with_error(exec_ctx, c, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -585,32 +596,35 @@ static grpc_error *error_from_status(grpc_status_code status, |
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, status); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
grpc_status_code status, |
|
|
|
|
const char *description) { |
|
|
|
|
return terminate_with_error(exec_ctx, c, |
|
|
|
|
error_from_status(status, description)); |
|
|
|
|
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, |
|
|
|
|
status_source source, grpc_status_code status, |
|
|
|
|
const char *description) { |
|
|
|
|
cancel_with_error(exec_ctx, c, source, |
|
|
|
|
error_from_status(status, description)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* FINAL STATUS CODE MANIPULATION |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void get_final_status_from(grpc_call *call, status_source from_source, |
|
|
|
|
void (*set_value)(grpc_status_code code, |
|
|
|
|
void *user_data), |
|
|
|
|
void *set_value_user_data, |
|
|
|
|
grpc_slice *details) { |
|
|
|
|
static bool get_final_status_from( |
|
|
|
|
grpc_call *call, status_source from_source, bool allow_ok_status, |
|
|
|
|
void (*set_value)(grpc_status_code code, void *user_data), |
|
|
|
|
void *set_value_user_data, grpc_slice *details) { |
|
|
|
|
grpc_status_code code; |
|
|
|
|
const char *msg = NULL; |
|
|
|
|
grpc_error_get_status(call->status[from_source].error, call->send_deadline, |
|
|
|
|
&code, &msg, NULL); |
|
|
|
|
if (code == GRPC_STATUS_OK && !allow_ok_status) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
set_value(code, set_value_user_data); |
|
|
|
|
if (details != NULL) { |
|
|
|
|
*details = |
|
|
|
|
msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void get_final_status(grpc_call *call, |
|
|
|
@ -618,22 +632,37 @@ static void get_final_status(grpc_call *call, |
|
|
|
|
void *user_data), |
|
|
|
|
void *set_value_user_data, grpc_slice *details) { |
|
|
|
|
int i; |
|
|
|
|
/* search for the best status we can present: ideally the error we use has a
|
|
|
|
|
clearly defined grpc-status, and we'll prefer that. */ |
|
|
|
|
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
|
|
|
|
if (call->status[i].is_set && |
|
|
|
|
grpc_error_has_clear_grpc_status(call->status[i].error)) { |
|
|
|
|
get_final_status_from(call, (status_source)i, set_value, |
|
|
|
|
set_value_user_data, details); |
|
|
|
|
return; |
|
|
|
|
if (grpc_call_error_trace) { |
|
|
|
|
gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); |
|
|
|
|
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
|
|
|
|
if (call->status[i].is_set) { |
|
|
|
|
gpr_log(GPR_DEBUG, " %d: %s", i, |
|
|
|
|
grpc_error_string(call->status[i].error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* If no clearly defined status exists, search for 'anything' */ |
|
|
|
|
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
|
|
|
|
if (call->status[i].is_set) { |
|
|
|
|
get_final_status_from(call, (status_source)i, set_value, |
|
|
|
|
set_value_user_data, details); |
|
|
|
|
return; |
|
|
|
|
/* first search through ignoring "OK" statuses: if something went wrong,
|
|
|
|
|
* ensure we report it */ |
|
|
|
|
for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) { |
|
|
|
|
/* search for the best status we can present: ideally the error we use has a
|
|
|
|
|
clearly defined grpc-status, and we'll prefer that. */ |
|
|
|
|
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
|
|
|
|
if (call->status[i].is_set && |
|
|
|
|
grpc_error_has_clear_grpc_status(call->status[i].error)) { |
|
|
|
|
if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, |
|
|
|
|
set_value, set_value_user_data, details)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* If no clearly defined status exists, search for 'anything' */ |
|
|
|
|
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { |
|
|
|
|
if (call->status[i].is_set) { |
|
|
|
|
if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, |
|
|
|
|
set_value, set_value_user_data, details)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* If nothing exists, set some default */ |
|
|
|
@ -1014,11 +1043,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&call->mu); |
|
|
|
|
|
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (bctl->send_initial_metadata) { |
|
|
|
|
grpc_metadata_batch_destroy( |
|
|
|
|
exec_ctx, |
|
|
|
@ -1161,7 +1185,8 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, |
|
|
|
|
grpc_call *call = bctl->call; |
|
|
|
|
gpr_mu_lock(&bctl->call->mu); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); |
|
|
|
|
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || |
|
|
|
|
call->receiving_stream == NULL) { |
|
|
|
@ -1188,7 +1213,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", |
|
|
|
|
algo); |
|
|
|
|
gpr_log(GPR_ERROR, "%s", error_msg); |
|
|
|
|
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); |
|
|
|
|
cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, |
|
|
|
|
GRPC_STATUS_UNIMPLEMENTED, error_msg); |
|
|
|
|
} else if (grpc_compression_options_is_algorithm_enabled( |
|
|
|
|
&compression_options, algo) == 0) { |
|
|
|
|
/* check if algorithm is supported by current channel config */ |
|
|
|
@ -1197,7 +1223,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", |
|
|
|
|
algo_name); |
|
|
|
|
gpr_log(GPR_ERROR, "%s", error_msg); |
|
|
|
|
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); |
|
|
|
|
cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, |
|
|
|
|
GRPC_STATUS_UNIMPLEMENTED, error_msg); |
|
|
|
|
} else { |
|
|
|
|
call->incoming_compression_algorithm = algo; |
|
|
|
|
} |
|
|
|
@ -1227,7 +1254,10 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
if (error == GRPC_ERROR_NONE) return; |
|
|
|
|
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); |
|
|
|
|
if (idx == 0) cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); |
|
|
|
|
if (idx == 0) { |
|
|
|
|
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
bctl->errors[idx] = error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|