Add absl::Status support to lockfree_event & call_combiner (#27362)

* Add absl::Status support to lockfree_event & call_combiner

* Revert "Add absl::Status support to lockfree_event & call_combiner"

This reverts commit 7e6823815e.

* Use StatusAllocHeapPtr for call_combiner and lockfree_event

* Update by review
reviewable/pr27715/r1^2
Esun Kim 3 years ago committed by GitHub
parent 84ddc3289f
commit 544deed283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      src/core/lib/gprpp/status_helper.cc
  2. 11
      src/core/lib/gprpp/status_helper.h
  3. 38
      src/core/lib/iomgr/call_combiner.cc
  4. 18
      src/core/lib/iomgr/lockfree_event.cc
  5. 10
      test/core/gprpp/status_helper_test.cc

@ -403,6 +403,25 @@ absl::Status StatusGetFromPtr(uintptr_t ptr) {
return *reinterpret_cast<absl::Status*>(&ptr);
}
uintptr_t StatusAllocHeapPtr(absl::Status s) {
if (s.ok()) return kOkStatusPtr;
absl::Status* ptr = new absl::Status(s);
return reinterpret_cast<uintptr_t>(ptr);
}
void StatusFreeHeapPtr(uintptr_t ptr) {
absl::Status* s = reinterpret_cast<absl::Status*>(ptr);
delete s;
}
absl::Status StatusGetFromHeapPtr(uintptr_t ptr) {
if (ptr == kOkStatusPtr) {
return absl::OkStatus();
} else {
return *reinterpret_cast<absl::Status*>(ptr);
}
}
} // namespace internal
} // namespace grpc_core

@ -176,6 +176,17 @@ void StatusFreePtr(uintptr_t ptr);
/// This shouldn't be used except migration purpose.
absl::Status StatusGetFromPtr(uintptr_t ptr);
/// Returns ptr that is allocated in the heap memory and the given status is
/// copied into. This ptr can be used to get Status later and should be
/// freed by StatusFreeHeapPtr. This can be 0 in case of OkStatus.
uintptr_t StatusAllocHeapPtr(absl::Status s);
/// Frees the allocated status at heap ptr.
void StatusFreeHeapPtr(uintptr_t ptr);
/// Get the status from a heap ptr.
absl::Status StatusGetFromHeapPtr(uintptr_t ptr);
} // namespace internal
} // namespace grpc_core

@ -33,18 +33,20 @@ DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
namespace {
// grpc_error LSB can be used
constexpr static intptr_t kErrorBit = 1;
grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
if (cancel_state & 1) {
return reinterpret_cast<grpc_error_handle>(cancel_state &
~static_cast<gpr_atm>(1));
if (cancel_state & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
return internal::StatusGetFromHeapPtr(cancel_state & ~kErrorBit);
#else
return reinterpret_cast<grpc_error_handle>(cancel_state & ~kErrorBit);
#endif
}
return GRPC_ERROR_NONE;
}
gpr_atm EncodeCancelStateError(grpc_error_handle error) {
return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
}
} // namespace
CallCombiner::CallCombiner() {
@ -57,7 +59,14 @@ CallCombiner::CallCombiner() {
}
CallCombiner::~CallCombiner() {
GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
if (cancel_state_ & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreeHeapPtr(cancel_state_ & ~kErrorBit);
#else
GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
cancel_state_ & ~static_cast<gpr_atm>(kErrorBit)));
#endif
}
}
#ifdef GRPC_TSAN_ENABLED
@ -235,15 +244,24 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
void CallCombiner::Cancel(grpc_error_handle error) {
GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
intptr_t status_ptr = internal::StatusAllocHeapPtr(error);
gpr_atm new_state = kErrorBit | status_ptr;
#else
gpr_atm new_state = kErrorBit | reinterpret_cast<gpr_atm>(error);
#endif
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
grpc_error_handle original_error = DecodeCancelStateError(original_state);
if (original_error != GRPC_ERROR_NONE) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreeHeapPtr(status_ptr);
#else
GRPC_ERROR_UNREF(error);
#endif
break;
}
if (gpr_atm_full_cas(&cancel_state_, original_state,
EncodeCancelStateError(error))) {
if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
if (original_state != 0) {
grpc_closure* notify_on_cancel =
reinterpret_cast<grpc_closure*>(original_state);

@ -77,7 +77,11 @@ void LockfreeEvent::DestroyEvent() {
do {
curr = gpr_atm_no_barrier_load(&state_);
if (curr & kShutdownBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreeHeapPtr(curr & ~kShutdownBit);
#else
GRPC_ERROR_UNREF((grpc_error_handle)(curr & ~kShutdownBit));
#endif
} else {
GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady);
}
@ -139,8 +143,13 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
contains a pointer to the shutdown-error). If the fd is shutdown,
schedule the closure with the shutdown error */
if ((curr & kShutdownBit) > 0) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle shutdown_err =
internal::StatusGetFromHeapPtr(curr & ~kShutdownBit);
#else
grpc_error_handle shutdown_err =
reinterpret_cast<grpc_error_handle>(curr & ~kShutdownBit);
#endif
ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
@ -160,7 +169,12 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
}
bool LockfreeEvent::SetShutdown(grpc_error_handle shutdown_error) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
intptr_t status_ptr = internal::StatusAllocHeapPtr(shutdown_error);
gpr_atm new_state = status_ptr | kShutdownBit;
#else
gpr_atm new_state = reinterpret_cast<gpr_atm>(shutdown_error) | kShutdownBit;
#endif
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
@ -184,7 +198,11 @@ bool LockfreeEvent::SetShutdown(grpc_error_handle shutdown_error) {
/* If fd is already shutdown, we are done */
if ((curr & kShutdownBit) > 0) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreeHeapPtr(status_ptr);
#else
GRPC_ERROR_UNREF(shutdown_error);
#endif
return false;
}

@ -160,6 +160,16 @@ TEST(StatusUtilTest, AllocPtr) {
}
}
TEST(StatusUtilTest, AllocHeapPtr) {
absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
absl::AbortedError("Message")};
for (const auto& s : statuses) {
uintptr_t p = internal::StatusAllocHeapPtr(s);
EXPECT_EQ(s, internal::StatusGetFromHeapPtr(p));
internal::StatusFreeHeapPtr(p);
}
}
} // namespace
} // namespace grpc_core

Loading…
Cancel
Save