Merge pull request #9529 from ctiller/racey

Fix race detected by TSAN
pull/9487/head^2
Craig Tiller 8 years ago committed by GitHub
commit d505b53e4c
  1. 1
      doc/environment_variables.md
  2. 113
      src/core/lib/surface/call.c
  3. 2
      src/core/lib/surface/call.h
  4. 1
      src/core/lib/surface/init.c

@ -35,6 +35,7 @@ some configuration as environment variables that can be set.
A comma separated list of tracers that provide additional insight into how A comma separated list of tracers that provide additional insight into how
gRPC C core is processing requests via debug logs. Available tracers include: gRPC C core is processing requests via debug logs. Available tracers include:
- api - traces api calls to the C core - api - traces api calls to the C core
- call_error - traces the possible errors contributing to final call status
- channel - traces operations on the C core channel stack - channel - traces operations on the C core channel stack
- combiner - traces combiner lock state - combiner - traces combiner lock state
- compression - traces compression operations - compression - traces compression operations

@ -86,8 +86,11 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface /* Status came from 'the wire' - or somewhere below the surface
layer */ layer */
STATUS_FROM_WIRE, STATUS_FROM_WIRE,
/* Status was created by some internal channel stack operation */ /* Status was created by some internal channel stack operation: must come via
add_batch_error */
STATUS_FROM_CORE, STATUS_FROM_CORE,
/* Status was created by some surface error */
STATUS_FROM_SURFACE,
/* Status came from the server sending status */ /* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS, STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT STATUS_SOURCE_COUNT
@ -212,6 +215,8 @@ struct grpc_call {
void *saved_receiving_stream_ready_bctlp; void *saved_receiving_stream_ready_bctlp;
}; };
int grpc_call_error_trace = 0;
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \ #define CALL_ELEM_FROM_CALL(call, idx) \
@ -221,11 +226,11 @@ struct grpc_call {
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op); grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status, status_source source, grpc_status_code status,
const char *description); const char *description);
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error); status_source source, grpc_error *error);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
grpc_error *error); grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
@ -352,7 +357,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
call->start_time, send_deadline, call->start_time, send_deadline,
CALL_STACK_FROM_CALL(call))); CALL_STACK_FROM_CALL(call)));
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
} }
if (args->cq != NULL) { if (args->cq != NULL) {
GPR_ASSERT( GPR_ASSERT(
@ -527,7 +533,6 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status, grpc_status_code status,
const char *description, const char *description,
void *reserved) { void *reserved) {
grpc_call_error r;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_call_cancel_with_status(" "grpc_call_cancel_with_status("
@ -535,16 +540,16 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
4, (c, (int)status, description, reserved)); 4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == NULL); GPR_ASSERT(reserved == NULL);
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
r = cancel_with_status(&exec_ctx, c, status, description); cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
description);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
return r; return GRPC_CALL_OK;
} }
typedef struct termination_closure { typedef struct termination_closure {
grpc_closure closure; grpc_closure closure;
grpc_call *call; grpc_call *call;
grpc_error *error;
grpc_transport_stream_op op; grpc_transport_stream_op op;
} termination_closure; } termination_closure;
@ -559,36 +564,27 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
grpc_error *error) { grpc_error *error) {
termination_closure *tc = tcp; termination_closure *tc = tcp;
memset(&tc->op, 0, sizeof(tc->op)); memset(&tc->op, 0, sizeof(tc->op));
tc->op.cancel_error = tc->error; tc->op.cancel_error = GRPC_ERROR_REF(error);
/* reuse closure to catch completion */ /* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc, tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op); execute_op(exec_ctx, tc->call, &tc->op);
} }
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
termination_closure *tc) { grpc_error *error) {
set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_REF(tc->error));
grpc_closure_init(&tc->closure, send_termination, tc,
grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "termination");
grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE);
return GRPC_CALL_OK;
}
static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx,
grpc_call *c, grpc_error *error) {
termination_closure *tc = gpr_malloc(sizeof(*tc)); termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(*tc)); memset(tc, 0, sizeof(*tc));
tc->call = c; tc->call = c;
tc->error = error; GRPC_CALL_INTERNAL_REF(tc->call, "termination");
return terminate_with_status(exec_ctx, tc); grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
tc, grpc_schedule_on_exec_ctx),
error);
} }
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error) { status_source source, grpc_error *error) {
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
terminate_with_error(exec_ctx, c, error); terminate_with_error(exec_ctx, c, error);
} }
@ -600,10 +596,10 @@ static grpc_error *error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status); GRPC_ERROR_INT_GRPC_STATUS, status);
} }
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status, status_source source, grpc_status_code status,
const char *description) { const char *description) {
return terminate_with_error(exec_ctx, c, cancel_with_error(exec_ctx, c, source,
error_from_status(status, description)); error_from_status(status, description));
} }
@ -611,21 +607,24 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
* FINAL STATUS CODE MANIPULATION * FINAL STATUS CODE MANIPULATION
*/ */
static void get_final_status_from(grpc_call *call, status_source from_source, static bool get_final_status_from(
void (*set_value)(grpc_status_code code, grpc_call *call, status_source from_source, bool allow_ok_status,
void *user_data), void (*set_value)(grpc_status_code code, void *user_data),
void *set_value_user_data, void *set_value_user_data, grpc_slice *details) {
grpc_slice *details) {
grpc_status_code code; grpc_status_code code;
const char *msg = NULL; const char *msg = NULL;
grpc_error_get_status(call->status[from_source].error, call->send_deadline, grpc_error_get_status(call->status[from_source].error, call->send_deadline,
&code, &msg, NULL); &code, &msg, NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
set_value(code, set_value_user_data); set_value(code, set_value_user_data);
if (details != NULL) { if (details != NULL) {
*details = *details =
msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg);
} }
return true;
} }
static void get_final_status(grpc_call *call, static void get_final_status(grpc_call *call,
@ -633,24 +632,39 @@ static void get_final_status(grpc_call *call,
void *user_data), void *user_data),
void *set_value_user_data, grpc_slice *details) { void *set_value_user_data, grpc_slice *details) {
int i; int i;
if (grpc_call_error_trace) {
gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) {
gpr_log(GPR_DEBUG, " %d: %s", i,
grpc_error_string(call->status[i].error));
}
}
}
/* first search through ignoring "OK" statuses: if something went wrong,
* ensure we report it */
for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
/* search for the best status we can present: ideally the error we use has a /* search for the best status we can present: ideally the error we use has a
clearly defined grpc-status, and we'll prefer that. */ clearly defined grpc-status, and we'll prefer that. */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set && if (call->status[i].is_set &&
grpc_error_has_clear_grpc_status(call->status[i].error)) { grpc_error_has_clear_grpc_status(call->status[i].error)) {
get_final_status_from(call, (status_source)i, set_value, if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
set_value_user_data, details); set_value, set_value_user_data, details)) {
return; return;
} }
} }
}
/* If no clearly defined status exists, search for 'anything' */ /* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) { if (call->status[i].is_set) {
get_final_status_from(call, (status_source)i, set_value, if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
set_value_user_data, details); set_value, set_value_user_data, details)) {
return; return;
} }
} }
}
}
/* If nothing exists, set some default */ /* If nothing exists, set some default */
if (call->is_client) { if (call->is_client) {
set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
@ -1029,11 +1043,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu); gpr_mu_lock(&call->mu);
if (error != GRPC_ERROR_NONE) {
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
if (bctl->send_initial_metadata) { if (bctl->send_initial_metadata) {
grpc_metadata_batch_destroy( grpc_metadata_batch_destroy(
exec_ctx, exec_ctx,
@ -1176,7 +1185,8 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu); gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
} }
if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) { call->receiving_stream == NULL) {
@ -1203,7 +1213,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
algo); algo);
gpr_log(GPR_ERROR, "%s", error_msg); gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled( } else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) { &compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */ /* check if algorithm is supported by current channel config */
@ -1212,7 +1223,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name); algo_name);
gpr_log(GPR_ERROR, "%s", error_msg); gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else { } else {
call->incoming_compression_algorithm = algo; call->incoming_compression_algorithm = algo;
} }
@ -1242,7 +1254,10 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error) { grpc_error *error) {
if (error == GRPC_ERROR_NONE) return; if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
if (idx == 0) cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); if (idx == 0) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
bctl->errors[idx] = error; bctl->errors[idx] = error;
} }

@ -125,6 +125,8 @@ uint8_t grpc_call_is_client(grpc_call *call);
grpc_compression_algorithm grpc_call_compression_for_level( grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call *call, grpc_compression_level level); grpc_call *call, grpc_compression_level level);
extern int grpc_call_error_trace;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -199,6 +199,7 @@ void grpc_init(void) {
grpc_cq_event_timeout_trace = 1; grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures); grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); grpc_register_tracer("resource_quota", &grpc_resource_quota_trace);
grpc_register_tracer("call_error", &grpc_call_error_trace);
#ifndef NDEBUG #ifndef NDEBUG
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif #endif

Loading…
Cancel
Save