Add absl::Status support to lockfree_event & call_combiner

reviewable/pr27362/r4
Esun Kim 3 years ago
parent 7d2f9c842c
commit 7e6823815e
  1. 50
      src/core/lib/iomgr/call_combiner.cc
  2. 5
      src/core/lib/iomgr/call_combiner.h
  3. 24
      src/core/lib/iomgr/lockfree_event.cc
  4. 8
      src/core/lib/iomgr/lockfree_event.h

@ -33,18 +33,27 @@ DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
namespace {
constexpr static intptr_t kErrorBit =
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
// absl::Status: 2nd bit from LSB can be used.
// (1st bit is reserved for absl::Status inline bit.)
2;
#else
// grpc_error LSB can be used
1;
#endif
grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
if (cancel_state & 1) {
return reinterpret_cast<grpc_error_handle>(cancel_state &
~static_cast<gpr_atm>(1));
if (cancel_state & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
return internal::StatusGetFromPtr(cancel_state & ~kErrorBit);
#else
return reinterpret_cast<grpc_error_handle>(cancel_state & ~kErrorBit);
#endif
}
return GRPC_ERROR_NONE;
}
gpr_atm EncodeCancelStateError(grpc_error_handle error) {
return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
}
} // namespace
CallCombiner::CallCombiner() {
@ -57,7 +66,14 @@ CallCombiner::CallCombiner() {
}
CallCombiner::~CallCombiner() {
GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
if (cancel_state_ & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(cancel_state_ & ~kErrorBit);
#else
GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
cancel_state_ & ~static_cast<gpr_atm>(kErrorBit)));
#endif
}
}
#ifdef GRPC_TSAN_ENABLED
@ -235,15 +251,29 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
void CallCombiner::Cancel(grpc_error_handle error) {
GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
intptr_t status_ptr = internal::StatusAllocPtr(error);
if ((status_ptr & kErrorBit) > 0) {
// absl::Status shouldn't have kErrorBit, could be a code bug.
gpr_log(GPR_ERROR, "CallCombiner::Cancel got an error which has kErrorBit");
abort();
}
gpr_atm new_state = kErrorBit | status_ptr;
#else
gpr_atm new_state = kErrorBit | reinterpret_cast<gpr_atm>(error);
#endif
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
grpc_error_handle original_error = DecodeCancelStateError(original_state);
if (original_error != GRPC_ERROR_NONE) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(status_ptr);
#else
GRPC_ERROR_UNREF(error);
#endif
break;
}
if (gpr_atm_full_cas(&cancel_state_, original_state,
EncodeCancelStateError(error))) {
if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
if (original_state != 0) {
grpc_closure* notify_on_cancel =
reinterpret_cast<grpc_closure*>(original_state);

@ -109,9 +109,10 @@ class CallCombiner {
gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
MultiProducerSingleConsumerQueue queue_;
// Either 0 (if not cancelled and no cancellation closure set),
// a grpc_closure* (if the lowest bit is 0),
// or a grpc_error_handle (if the lowest bit is 1).
// a grpc_closure* (if kErrorBit is 0),
// or a grpc_error_handle (if kErrorBit is 1).
gpr_atm cancel_state_ = 0;
#ifdef GRPC_TSAN_ENABLED
// A fake ref-counted lock that is kept alive after the destruction of
// grpc_call_combiner, when we are running the original closure.

@ -77,7 +77,11 @@ void LockfreeEvent::DestroyEvent() {
do {
curr = gpr_atm_no_barrier_load(&state_);
if (curr & kShutdownBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(curr & ~kShutdownBit);
#else
GRPC_ERROR_UNREF((grpc_error_handle)(curr & ~kShutdownBit));
#endif
} else {
GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady);
}
@ -139,8 +143,13 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
contains a pointer to the shutdown-error). If the fd is shutdown,
schedule the closure with the shutdown error */
if ((curr & kShutdownBit) > 0) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle shutdown_err =
internal::StatusGetFromPtr(curr & ~kShutdownBit);
#else
grpc_error_handle shutdown_err =
reinterpret_cast<grpc_error_handle>(curr & ~kShutdownBit);
#endif
ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
@ -160,7 +169,18 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
}
bool LockfreeEvent::SetShutdown(grpc_error_handle shutdown_error) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
intptr_t status_ptr = internal::StatusAllocPtr(shutdown_error);
if ((status_ptr & kShutdownBit) > 0) {
// absl::Status shouldn't have kShutdownBit, could be a code bug.
gpr_log(GPR_ERROR,
"LockfreeEvent::SetShutdown got an error which has kShutdownBit");
abort();
}
gpr_atm new_state = status_ptr | kShutdownBit;
#else
gpr_atm new_state = reinterpret_cast<gpr_atm>(shutdown_error) | kShutdownBit;
#endif
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
@ -184,7 +204,11 @@ bool LockfreeEvent::SetShutdown(grpc_error_handle shutdown_error) {
/* If fd is already shutdown, we are done */
if ((curr & kShutdownBit) > 0) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(status_ptr);
#else
GRPC_ERROR_UNREF(shutdown_error);
#endif
return false;
}

@ -62,8 +62,14 @@ class LockfreeEvent {
void SetReady();
private:
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
// absl::Status: 2nd bit from LSB can be used.
// (1st bit is reserved for absl::Status inline bit.)
enum State { kClosureNotReady = 0, kClosureReady = 1, kShutdownBit = 2 };
#else
// grpc_error LSB can be used
enum State { kClosureNotReady = 0, kClosureReady = 2, kShutdownBit = 1 };
#endif
gpr_atm state_;
};

Loading…
Cancel
Save