Fix race in call.c

pull/9662/head
Craig Tiller 8 years ago
parent 656d75ca65
commit 4bab9463c3
  1. 51
      src/core/lib/surface/call.c

@ -101,6 +101,17 @@ typedef struct {
grpc_error *error; grpc_error *error;
} received_status; } received_status;
static gpr_atm pack_received_status(received_status r) {
return r.is_set ? (1 | (gpr_atm)r.error) : 0;
}
static received_status unpack_received_status(gpr_atm atm) {
return (atm & 1) == 0
? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
: (received_status){.is_set = true,
.error = (grpc_error *)(atm & ~(gpr_atm)1)};
}
#define MAX_ERRORS_PER_BATCH 3 #define MAX_ERRORS_PER_BATCH 3
typedef struct batch_control { typedef struct batch_control {
@ -165,8 +176,8 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */ Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2]; grpc_metadata_array *buffered_metadata[2];
/* Received call statuses from various sources */ /* Packed received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT]; gpr_atm status[STATUS_SOURCE_COUNT];
/* Call data useful used for reporting. Only valid after the call has /* Call data useful used for reporting. Only valid after the call has
* completed */ * completed */
@ -446,7 +457,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
GRPC_ERROR_UNREF(c->status[i].error); GRPC_ERROR_UNREF(
unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
} }
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
@ -614,13 +626,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
*/ */
static bool get_final_status_from( static bool get_final_status_from(
grpc_call *call, status_source from_source, bool allow_ok_status, grpc_call *call, grpc_error *error, bool allow_ok_status,
void (*set_value)(grpc_status_code code, void *user_data), void (*set_value)(grpc_status_code code, void *user_data),
void *set_value_user_data, grpc_slice *details) { void *set_value_user_data, grpc_slice *details) {
grpc_status_code code; grpc_status_code code;
const char *msg = NULL; const char *msg = NULL;
grpc_error_get_status(call->status[from_source].error, call->send_deadline, grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL);
&code, &msg, NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) { if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false; return false;
} }
@ -638,12 +649,15 @@ static void get_final_status(grpc_call *call,
void *user_data), void *user_data),
void *set_value_user_data, grpc_slice *details) { void *set_value_user_data, grpc_slice *details) {
int i; int i;
received_status status[STATUS_SOURCE_COUNT];
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
}
if (grpc_call_error_trace) { if (grpc_call_error_trace) {
gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) { if (status[i].is_set) {
gpr_log(GPR_DEBUG, " %d: %s", i, gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
grpc_error_string(call->status[i].error));
} }
} }
} }
@ -653,9 +667,9 @@ static void get_final_status(grpc_call *call,
/* search for the best status we can present: ideally the error we use has a /* search for the best status we can present: ideally the error we use has a
clearly defined grpc-status, and we'll prefer that. */ clearly defined grpc-status, and we'll prefer that. */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set && if (status[i].is_set &&
grpc_error_has_clear_grpc_status(call->status[i].error)) { grpc_error_has_clear_grpc_status(status[i].error)) {
if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) { set_value, set_value_user_data, details)) {
return; return;
} }
@ -663,8 +677,8 @@ static void get_final_status(grpc_call *call,
} }
/* If no clearly defined status exists, search for 'anything' */ /* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) { if (status[i].is_set) {
if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) { set_value, set_value_user_data, details)) {
return; return;
} }
@ -681,12 +695,13 @@ static void get_final_status(grpc_call *call,
static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
status_source source, grpc_error *error) { status_source source, grpc_error *error) {
if (call->status[source].is_set) { if (!gpr_atm_rel_cas(&call->status[source],
pack_received_status((received_status){
.is_set = false, .error = GRPC_ERROR_NONE}),
pack_received_status((received_status){
.is_set = true, .error = error}))) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return;
} }
call->status[source].is_set = true;
call->status[source].error = error;
} }
/******************************************************************************* /*******************************************************************************

Loading…
Cancel
Save