Replace atm with AtomicError in call (#27314)

pull/27418/head
Esun Kim 3 years ago committed by GitHub
parent 9b3f75d322
commit 47ca8c1209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      src/core/lib/iomgr/error.h
  2. 48
      src/core/lib/surface/call.cc

@ -32,6 +32,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/slice/slice_internal.h"
@ -398,4 +399,47 @@ inline bool grpc_log_if_error(const char* what, grpc_error_handle error,
#define GRPC_LOG_IF_ERROR(what, error) \
(grpc_log_if_error((what), (error), __FILE__, __LINE__))
/// Helper class to get & set grpc_error_handle in a thread-safe fashion.
/// This could be considered as atomic<grpc_error_handle>.
class AtomicError {
public:
AtomicError() {
error_ = GRPC_ERROR_NONE;
lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
}
explicit AtomicError(grpc_error_handle error) {
error_ = GRPC_ERROR_REF(error);
}
~AtomicError() { GRPC_ERROR_UNREF(error_); }
AtomicError(const AtomicError&) = delete;
AtomicError& operator=(const AtomicError&) = delete;
/// returns get() == GRPC_ERROR_NONE
bool ok() {
gpr_spinlock_lock(&lock_);
bool ret = error_ == GRPC_ERROR_NONE;
gpr_spinlock_unlock(&lock_);
return ret;
}
grpc_error_handle get() {
gpr_spinlock_lock(&lock_);
grpc_error_handle ret = error_;
gpr_spinlock_unlock(&lock_);
return ret;
}
void set(grpc_error_handle error) {
gpr_spinlock_lock(&lock_);
GRPC_ERROR_UNREF(error_);
error_ = GRPC_ERROR_REF(error);
gpr_spinlock_unlock(&lock_);
}
private:
grpc_error_handle error_;
gpr_spinlock lock_;
};
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */

@ -107,7 +107,7 @@ struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
std::atomic<intptr_t> steps_to_complete{0};
gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
AtomicError batch_error;
void set_num_steps_to_complete(uintptr_t steps) {
steps_to_complete.store(steps, std::memory_order_release);
}
@ -251,7 +251,7 @@ struct grpc_call {
grpc_core::Server* core_server;
} server;
} final_op;
gpr_atm status_error = 0;
AtomicError status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@ -555,12 +555,11 @@ static void destroy_call(void* call, grpc_error_handle /*error*/) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
grpc_error_handle status_error =
reinterpret_cast<grpc_error_handle>(gpr_atm_acq_load(&c->status_error));
grpc_error_handle status_error = c->status_error.get();
grpc_error_get_status(status_error, c->send_deadline,
&c->final_info.final_status, nullptr, nullptr,
&(c->final_info.error_string));
GRPC_ERROR_UNREF(status_error);
c->status_error.set(GRPC_ERROR_NONE);
c->final_info.stats.latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
@ -747,7 +746,8 @@ static void set_final_status(grpc_call* call, grpc_error_handle error) {
call->final_op.client.error_string);
// explicitly take a ref
grpc_slice_ref_internal(*call->final_op.client.status_details);
gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
call->status_error.set(error);
GRPC_ERROR_UNREF(error);
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
@ -763,9 +763,7 @@ static void set_final_status(grpc_call* call, grpc_error_handle error) {
grpc_core::channelz::ServerNode* channelz_node =
call->final_op.server.core_server->channelz_node();
if (channelz_node != nullptr) {
if (*call->final_op.server.cancelled ||
reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
if (*call->final_op.server.cancelled || !call->status_error.ok()) {
channelz_node->RecordCallFailed();
} else {
channelz_node->RecordCallSucceeded();
@ -1152,6 +1150,7 @@ static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
}
bctl->~batch_control();
bctl->op = {};
new (&bctl->batch_error) AtomicError();
} else {
bctl = call->arena->New<batch_control>();
*pslot = bctl;
@ -1170,17 +1169,13 @@ static void finish_batch_completion(void* user_data,
}
static void reset_batch_errors(batch_control* bctl) {
GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&bctl->batch_error)));
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
bctl->batch_error.set(GRPC_ERROR_NONE);
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
grpc_error_handle error = GRPC_ERROR_REF(reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&bctl->batch_error)));
grpc_error_handle error = GRPC_ERROR_REF(bctl->batch_error.get());
if (bctl->op.send_initial_metadata) {
call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */].Clear();
@ -1347,10 +1342,8 @@ static void receiving_stream_ready(void* bctlp, grpc_error_handle error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
if (reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
if (bctl->batch_error.ok()) {
bctl->batch_error.set(error);
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@ -1482,10 +1475,8 @@ static void receiving_initial_metadata_ready(void* bctlp,
call->send_deadline = deadline;
}
} else {
if (reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
if (bctl->batch_error.ok()) {
bctl->batch_error.set(error);
}
cancel_with_error(call, GRPC_ERROR_REF(error));
}
@ -1537,10 +1528,8 @@ static void finish_batch(void* bctlp, grpc_error_handle error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
if (reinterpret_cast<grpc_error_handle>(
gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
gpr_atm_rel_store(&bctl->batch_error,
reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
if (bctl->batch_error.ok()) {
bctl->batch_error.set(error);
}
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, GRPC_ERROR_REF(error));
@ -1779,8 +1768,9 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
}
gpr_atm_rel_store(&call->status_error,
reinterpret_cast<gpr_atm>(status_error));
call->status_error.set(status_error);
GRPC_ERROR_UNREF(status_error);
if (!prepare_application_metadata(
call,
static_cast<int>(

Loading…
Cancel
Save