Fix race detected by TSAN

pull/9529/head
Craig Tiller 8 years ago
parent 7b005f7fdd
commit 2dc32eabcb
  1. 81
      src/core/lib/surface/call.c

@ -86,8 +86,11 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
/* Status was created by some internal channel stack operation */
/* Status was created by some internal channel stack operation: must come via
add_batch_error */
STATUS_FROM_CORE,
/* Status was created by some surface error */
STATUS_FROM_SURFACE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@ -221,11 +224,11 @@ struct grpc_call {
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error);
status_source source, grpc_error *error);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
@ -337,7 +340,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
args->server_transport_data, path, call->start_time, send_deadline,
CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error));
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
if (args->cq != NULL) {
GPR_ASSERT(
@ -512,7 +515,6 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description,
void *reserved) {
grpc_call_error r;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
@ -520,16 +522,16 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == NULL);
gpr_mu_lock(&c->mu);
r = cancel_with_status(&exec_ctx, c, status, description);
cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
description);
gpr_mu_unlock(&c->mu);
grpc_exec_ctx_finish(&exec_ctx);
return r;
return GRPC_CALL_OK;
}
typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
grpc_transport_stream_op op;
} termination_closure;
@ -544,36 +546,27 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
grpc_error *error) {
termination_closure *tc = tcp;
memset(&tc->op, 0, sizeof(tc->op));
tc->op.cancel_error = tc->error;
tc->op.cancel_error = GRPC_ERROR_REF(error);
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc,
grpc_schedule_on_exec_ctx);
tc->op.on_complete = &tc->closure;
tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
grpc_schedule_on_exec_ctx);
execute_op(exec_ctx, tc->call, &tc->op);
}
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
termination_closure *tc) {
set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_REF(tc->error));
grpc_closure_init(&tc->closure, send_termination, tc,
grpc_schedule_on_exec_ctx);
GRPC_CALL_INTERNAL_REF(tc->call, "termination");
grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE);
return GRPC_CALL_OK;
}
static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx,
grpc_call *c, grpc_error *error) {
static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error) {
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(*tc));
tc->call = c;
tc->error = error;
return terminate_with_status(exec_ctx, tc);
GRPC_CALL_INTERNAL_REF(tc->call, "termination");
grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
tc, grpc_schedule_on_exec_ctx),
error);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error) {
status_source source, grpc_error *error) {
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
terminate_with_error(exec_ctx, c, error);
}
@ -585,11 +578,11 @@ static grpc_error *error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
return terminate_with_error(exec_ctx, c,
error_from_status(status, description));
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description) {
cancel_with_error(exec_ctx, c, source,
error_from_status(status, description));
}
/*******************************************************************************
@ -1014,11 +1007,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
if (error != GRPC_ERROR_NONE) {
set_status_from_error(exec_ctx, call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
if (bctl->send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
@ -1161,7 +1149,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error));
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
@ -1188,7 +1176,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
algo);
gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@ -1197,7 +1186,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else {
call->incoming_compression_algorithm = algo;
}
@ -1227,7 +1217,10 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
if (idx == 0) cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error));
if (idx == 0) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
bctl->errors[idx] = error;
}

Loading…
Cancel
Save