diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 1811fc71df8..2a1d50a923d 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -32,6 +32,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/slice/slice_internal.h" @@ -398,4 +399,47 @@ inline bool grpc_log_if_error(const char* what, grpc_error_handle error, #define GRPC_LOG_IF_ERROR(what, error) \ (grpc_log_if_error((what), (error), __FILE__, __LINE__)) +/// Helper class to get & set grpc_error_handle in a thread-safe fashion. +/// This could be considered as atomic. +class AtomicError { + public: + AtomicError() { + error_ = GRPC_ERROR_NONE; + lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; + } + explicit AtomicError(grpc_error_handle error) { + error_ = GRPC_ERROR_REF(error); + } + ~AtomicError() { GRPC_ERROR_UNREF(error_); } + + AtomicError(const AtomicError&) = delete; + AtomicError& operator=(const AtomicError&) = delete; + + /// returns get() == GRPC_ERROR_NONE + bool ok() { + gpr_spinlock_lock(&lock_); + bool ret = error_ == GRPC_ERROR_NONE; + gpr_spinlock_unlock(&lock_); + return ret; + } + + grpc_error_handle get() { + gpr_spinlock_lock(&lock_); + grpc_error_handle ret = error_; + gpr_spinlock_unlock(&lock_); + return ret; + } + + void set(grpc_error_handle error) { + gpr_spinlock_lock(&lock_); + GRPC_ERROR_UNREF(error_); + error_ = GRPC_ERROR_REF(error); + gpr_spinlock_unlock(&lock_); + } + + private: + grpc_error_handle error_; + gpr_spinlock lock_; +}; + #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 2763afe67bc..6543c58e58b 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -107,7 +107,7 @@ struct batch_control { grpc_closure start_batch; grpc_closure finish_batch; std::atomic steps_to_complete{0}; - gpr_atm batch_error = reinterpret_cast(GRPC_ERROR_NONE); + AtomicError batch_error; void set_num_steps_to_complete(uintptr_t steps) { steps_to_complete.store(steps, std::memory_order_release); } @@ -251,7 +251,7 @@ struct grpc_call { grpc_core::Server* core_server; } server; } final_op; - gpr_atm status_error = 0; + AtomicError status_error; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -555,12 +555,11 @@ static void destroy_call(void* call, grpc_error_handle /*error*/) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_error_handle status_error = - reinterpret_cast(gpr_atm_acq_load(&c->status_error)); + grpc_error_handle status_error = c->status_error.get(); grpc_error_get_status(status_error, c->send_deadline, &c->final_info.final_status, nullptr, nullptr, &(c->final_info.error_string)); - GRPC_ERROR_UNREF(status_error); + c->status_error.set(GRPC_ERROR_NONE); c->final_info.stats.latency = gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time); grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, @@ -747,7 +746,8 @@ static void set_final_status(grpc_call* call, grpc_error_handle error) { call->final_op.client.error_string); // explicitly take a ref grpc_slice_ref_internal(*call->final_op.client.status_details); - gpr_atm_rel_store(&call->status_error, reinterpret_cast(error)); + call->status_error.set(error); + GRPC_ERROR_UNREF(error); grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (channelz_channel != nullptr) { @@ -763,9 +763,7 @@ static void set_final_status(grpc_call* call, grpc_error_handle error) { grpc_core::channelz::ServerNode* channelz_node = call->final_op.server.core_server->channelz_node(); if (channelz_node != nullptr) { - if (*call->final_op.server.cancelled || - reinterpret_cast( - gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) { + if (*call->final_op.server.cancelled || !call->status_error.ok()) { channelz_node->RecordCallFailed(); } else { channelz_node->RecordCallSucceeded(); @@ -1152,6 +1150,7 @@ static batch_control* reuse_or_allocate_batch_control(grpc_call* call, } bctl->~batch_control(); bctl->op = {}; + new (&bctl->batch_error) AtomicError(); } else { bctl = call->arena->New(); *pslot = bctl; @@ -1170,17 +1169,13 @@ static void finish_batch_completion(void* user_data, } static void reset_batch_errors(batch_control* bctl) { - GRPC_ERROR_UNREF(reinterpret_cast( - gpr_atm_acq_load(&bctl->batch_error))); - gpr_atm_rel_store(&bctl->batch_error, - reinterpret_cast(GRPC_ERROR_NONE)); + bctl->batch_error.set(GRPC_ERROR_NONE); } static void post_batch_completion(batch_control* bctl) { grpc_call* next_child_call; grpc_call* call = bctl->call; - grpc_error_handle error = GRPC_ERROR_REF(reinterpret_cast( - gpr_atm_acq_load(&bctl->batch_error))); + grpc_error_handle error = GRPC_ERROR_REF(bctl->batch_error.get()); if (bctl->op.send_initial_metadata) { call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */].Clear(); @@ -1347,10 +1342,8 @@ static void receiving_stream_ready(void* bctlp, grpc_error_handle error) { grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { call->receiving_stream.reset(); - if (reinterpret_cast( - gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) { - gpr_atm_rel_store(&bctl->batch_error, - reinterpret_cast(GRPC_ERROR_REF(error))); + if (bctl->batch_error.ok()) { + bctl->batch_error.set(error); } cancel_with_error(call, GRPC_ERROR_REF(error)); } @@ -1482,10 +1475,8 @@ static void receiving_initial_metadata_ready(void* bctlp, call->send_deadline = deadline; } } else { - if (reinterpret_cast( - gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) { - gpr_atm_rel_store(&bctl->batch_error, - reinterpret_cast(GRPC_ERROR_REF(error))); + if (bctl->batch_error.ok()) { + bctl->batch_error.set(error); } cancel_with_error(call, GRPC_ERROR_REF(error)); } @@ -1537,10 +1528,8 @@ static void finish_batch(void* bctlp, grpc_error_handle error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); - if (reinterpret_cast( - gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) { - gpr_atm_rel_store(&bctl->batch_error, - reinterpret_cast(GRPC_ERROR_REF(error))); + if (bctl->batch_error.ok()) { + bctl->batch_error.set(error); } if (error != GRPC_ERROR_NONE) { cancel_with_error(call, GRPC_ERROR_REF(error)); @@ -1779,8 +1768,9 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } } - gpr_atm_rel_store(&call->status_error, - reinterpret_cast(status_error)); + call->status_error.set(status_error); + GRPC_ERROR_UNREF(status_error); + if (!prepare_application_metadata( call, static_cast(