Merge pull request #16594 from yashykt/statusdr

Make batch_error an atomic to avoid data races
pull/16606/head
Yash Tibrewal 6 years ago committed by GitHub
commit b8193d58b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      src/core/lib/surface/call.cc

@ -96,7 +96,7 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
grpc_error* batch_error;
gpr_atm batch_error;
grpc_transport_stream_op_batch op;
} batch_control;
@ -1116,14 +1116,17 @@ static void finish_batch_completion(void* user_data,
}
static void reset_batch_errors(batch_control* bctl) {
GRPC_ERROR_UNREF(bctl->batch_error);
bctl->batch_error = GRPC_ERROR_NONE;
GRPC_ERROR_UNREF(
reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
grpc_error* error = GRPC_ERROR_REF(bctl->batch_error);
grpc_error* error = GRPC_ERROR_REF(
reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@ -1287,8 +1290,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@ -1394,8 +1399,10 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
call->send_deadline = md->deadline;
}
} else {
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@ -1445,8 +1452,10 @@ static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
if (bctl->batch_error == GRPC_ERROR_NONE) {
bctl->batch_error = GRPC_ERROR_REF(error);
if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
}
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, GRPC_ERROR_REF(error));

Loading…
Cancel
Save