Revert "Revert "Add absl::Status support to closure (#27308)" (#28211)" (#28220)

This reverts commit cd9730d2d2.
pull/28251/head
Esun Kim 3 years ago committed by GitHub
parent 75041ea3a9
commit 5961aeb2a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 40
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 39
      src/core/lib/gprpp/status_helper.cc
  3. 19
      src/core/lib/gprpp/status_helper.h
  4. 19
      src/core/lib/iomgr/call_combiner.cc
  5. 38
      src/core/lib/iomgr/closure.h
  6. 28
      src/core/lib/iomgr/combiner.cc
  7. 31
      src/core/lib/iomgr/exec_ctx.cc
  8. 11
      src/core/lib/iomgr/executor.cc
  9. 13
      test/core/gprpp/status_helper_test.cc

@ -1219,26 +1219,42 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
write_state_name(t->write_state));
}
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle cl_err =
grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
#else
grpc_error_handle cl_err =
reinterpret_cast<grpc_error_handle>(closure->error_data.error);
#endif
if (cl_err == GRPC_ERROR_NONE) {
cl_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error in HTTP transport completing operation");
closure->error_data.error =
grpc_error_set_str(closure->error_data.error,
GRPC_ERROR_STR_TARGET_ADDRESS, t->peer_string);
cl_err = grpc_error_set_str(cl_err, GRPC_ERROR_STR_TARGET_ADDRESS,
t->peer_string);
}
closure->error_data.error =
grpc_error_add_child(closure->error_data.error, error);
cl_err = grpc_error_add_child(cl_err, error);
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
#else
closure->error_data.error = reinterpret_cast<intptr_t>(cl_err);
#endif
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
// Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
// closures earlier than when it is safe to do so.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
closure->error_data.error);
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle run_error =
grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
#else
grpc_error_handle run_error =
reinterpret_cast<grpc_error_handle>(closure->error_data.error);
#endif
closure->error_data.error = 0;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
grpc_closure_list_append(&t->run_after_write, closure);
}
}
}
@ -1386,7 +1402,7 @@ static void perform_stream_op_locked(void* stream_op,
// This batch has send ops. Use final_data as a barrier until enqueue time;
// the initial counter is dropped at the end of this function.
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
on_complete->error_data.error = 0;
}
if (op->cancel_stream) {

@ -379,32 +379,8 @@ absl::Status StatusFromProto(google_rpc_Status* msg) {
return status;
}
uintptr_t StatusAllocPtr(absl::Status s) {
// This relies the fact that absl::Status has only one member, StatusRep*
// so the sizeof(absl::Status) has the same size of intptr_t and StatusRep*
// can be stolen using placement allocation.
static_assert(sizeof(intptr_t) == sizeof(absl::Status),
"absl::Status should be as big as intptr_t");
// This does two things;
// 1. Copies StatusRep* of absl::Status to ptr
// 2. Increases the counter of StatusRep if it's not inlined
uintptr_t ptr;
new (&ptr) absl::Status(s);
return ptr;
}
void StatusFreePtr(uintptr_t ptr) {
// Decreases the counter of StatusRep if it's not inlined.
reinterpret_cast<absl::Status*>(&ptr)->~Status();
}
absl::Status StatusGetFromPtr(uintptr_t ptr) {
// Constructs Status from ptr having the address of StatusRep.
return *reinterpret_cast<absl::Status*>(&ptr);
}
uintptr_t StatusAllocHeapPtr(absl::Status s) {
if (s.ok()) return kOkStatusPtr;
if (s.ok()) return 0;
absl::Status* ptr = new absl::Status(s);
return reinterpret_cast<uintptr_t>(ptr);
}
@ -415,13 +391,24 @@ void StatusFreeHeapPtr(uintptr_t ptr) {
}
absl::Status StatusGetFromHeapPtr(uintptr_t ptr) {
if (ptr == kOkStatusPtr) {
if (ptr == 0) {
return absl::OkStatus();
} else {
return *reinterpret_cast<absl::Status*>(ptr);
}
}
absl::Status StatusMoveFromHeapPtr(uintptr_t ptr) {
if (ptr == 0) {
return absl::OkStatus();
} else {
absl::Status* s = reinterpret_cast<absl::Status*>(ptr);
absl::Status ret = std::move(*s);
delete s;
return ret;
}
}
} // namespace internal
} // namespace grpc_core

@ -160,22 +160,6 @@ google_rpc_Status* StatusToProto(const absl::Status& status,
/// This is for internal implementation & test only
absl::Status StatusFromProto(google_rpc_Status* msg) GRPC_MUST_USE_RESULT;
/// The same value of internal::StatusAllocPtr(absl::OkStatus())
static constexpr uintptr_t kOkStatusPtr = 0;
/// Returns ptr where the given status is copied into.
/// This ptr can be used to get Status later and should be freed by
/// StatusFreePtr. This shouldn't be used except migration purpose.
uintptr_t StatusAllocPtr(absl::Status s);
/// Frees the allocated status at ptr.
/// This shouldn't be used except migration purpose.
void StatusFreePtr(uintptr_t ptr);
/// Get the status from ptr.
/// This shouldn't be used except migration purpose.
absl::Status StatusGetFromPtr(uintptr_t ptr);
/// Returns ptr that is allocated in the heap memory and the given status is
/// copied into. This ptr can be used to get Status later and should be
/// freed by StatusFreeHeapPtr. This can be 0 in case of OkStatus.
@ -187,6 +171,9 @@ void StatusFreeHeapPtr(uintptr_t ptr);
/// Get the status from a heap ptr.
absl::Status StatusGetFromHeapPtr(uintptr_t ptr);
/// Move the status from a heap ptr. (GetFrom & FreeHeap)
absl::Status StatusMoveFromHeapPtr(uintptr_t ptr);
} // namespace internal
} // namespace grpc_core

@ -150,7 +150,11 @@ void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
gpr_log(GPR_INFO, " QUEUING");
}
// Queue was not empty, so add closure to queue.
closure->error_data.error = error;
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
closure->error_data.error = internal::StatusAllocHeapPtr(error);
#else
closure->error_data.error = reinterpret_cast<intptr_t>(error);
#endif
queue_.Push(
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
}
@ -185,12 +189,19 @@ void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
}
continue;
}
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle error =
internal::StatusMoveFromHeapPtr(closure->error_data.error);
#else
grpc_error_handle error =
reinterpret_cast<grpc_error_handle>(closure->error_data.error);
#endif
closure->error_data.error = 0;
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
closure,
grpc_error_std_string(closure->error_data.error).c_str());
closure, grpc_error_std_string(error).c_str());
}
ScheduleClosure(closure, closure->error_data.error);
ScheduleClosure(closure, error);
break;
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {

@ -72,7 +72,7 @@ struct grpc_closure {
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error_handle error;
uintptr_t error;
uintptr_t scratch;
} error_data;
@ -98,7 +98,7 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure,
#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->error_data.error = GRPC_ERROR_NONE;
closure->error_data.error = 0;
#ifndef NDEBUG
closure->scheduled = false;
closure->file_initiated = nullptr;
@ -172,16 +172,12 @@ inline void grpc_closure_list_init(grpc_closure_list* closure_list) {
}
/** add \a closure to the end of \a list
and set \a closure's result to \a error
Returns true if \a list becomes non-empty */
inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
grpc_closure* closure,
grpc_error_handle error) {
grpc_closure* closure) {
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return false;
}
closure->error_data.error = error;
closure->next_data.next = nullptr;
bool was_empty = (closure_list->head == nullptr);
if (was_empty) {
@ -193,12 +189,36 @@ inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
return was_empty;
}
/** add \a closure to the end of \a list
and set \a closure's result to \a error
Returns true if \a list becomes non-empty */
inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
grpc_closure* closure,
grpc_error_handle error) {
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return false;
}
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
#else
closure->error_data.error = reinterpret_cast<intptr_t>(error);
#endif
return grpc_closure_list_append(closure_list, closure);
}
/** force all success bits in \a list to false */
inline void grpc_closure_list_fail_all(grpc_closure_list* list,
grpc_error_handle forced_failure) {
for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
if (c->error_data.error == GRPC_ERROR_NONE) {
c->error_data.error = GRPC_ERROR_REF(forced_failure);
if (c->error_data.error == 0) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
c->error_data.error =
grpc_core::internal::StatusAllocHeapPtr(forced_failure);
#else
c->error_data.error =
reinterpret_cast<intptr_t>(GRPC_ERROR_REF(forced_failure));
#endif
}
}
GRPC_ERROR_UNREF(forced_failure);

@ -149,7 +149,11 @@ static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
}
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
assert(cl->cb);
cl->error_data.error = error;
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
cl->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
#else
cl->error_data.error = reinterpret_cast<intptr_t>(error);
#endif
lock->queue.Push(cl->next_data.mpscq_node.get());
}
@ -221,12 +225,21 @@ bool grpc_combiner_continue_exec_ctx() {
return true;
}
grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
grpc_error_handle cl_err = cl->error_data.error;
#ifndef NDEBUG
cl->scheduled = false;
#endif
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle cl_err =
grpc_core::internal::StatusMoveFromHeapPtr(cl->error_data.error);
cl->error_data.error = 0;
cl->cb(cl->cb_arg, std::move(cl_err));
#else
grpc_error_handle cl_err =
reinterpret_cast<grpc_error_handle>(cl->error_data.error);
cl->error_data.error = 0;
cl->cb(cl->cb_arg, cl_err);
GRPC_ERROR_UNREF(cl_err);
#endif
} else {
grpc_closure* c = lock->final_list.head;
GPR_ASSERT(c != nullptr);
@ -236,12 +249,21 @@ bool grpc_combiner_continue_exec_ctx() {
GRPC_COMBINER_TRACE(
gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure* next = c->next_data.next;
grpc_error_handle error = c->error_data.error;
#ifndef NDEBUG
c->scheduled = false;
#endif
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle error =
grpc_core::internal::StatusMoveFromHeapPtr(c->error_data.error);
c->error_data.error = 0;
c->cb(c->cb_arg, std::move(error));
#else
grpc_error_handle error =
reinterpret_cast<grpc_error_handle>(c->error_data.error);
c->error_data.error = 0;
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
#endif
c = next;
}
}

@ -27,7 +27,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/profiling/timers.h"
static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
static void exec_ctx_run(grpc_closure* closure) {
#ifndef NDEBUG
closure->scheduled = false;
if (grpc_trace_closure.enabled()) {
@ -37,18 +37,27 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error_handle error) {
closure->line_initiated);
}
#endif
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle error =
grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
closure->error_data.error = 0;
closure->cb(closure->cb_arg, std::move(error));
#else
grpc_error_handle error =
reinterpret_cast<grpc_error_handle>(closure->error_data.error);
closure->error_data.error = 0;
closure->cb(closure->cb_arg, error);
GRPC_ERROR_UNREF(error);
#endif
#ifndef NDEBUG
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
GRPC_ERROR_UNREF(error);
}
static void exec_ctx_sched(grpc_closure* closure, grpc_error_handle error) {
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure,
error);
static void exec_ctx_sched(grpc_closure* closure) {
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure);
}
static gpr_timespec g_start_time;
@ -151,9 +160,8 @@ bool ExecCtx::Flush() {
closure_list_.head = closure_list_.tail = nullptr;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
grpc_error_handle error = c->error_data.error;
did_something = true;
exec_ctx_run(c, error);
exec_ctx_run(c);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx()) {
@ -195,7 +203,12 @@ void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure,
closure->run = false;
GPR_ASSERT(closure->cb != nullptr);
#endif
exec_ctx_sched(closure, error);
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
closure->error_data.error = internal::StatusAllocHeapPtr(error);
#else
closure->error_data.error = reinterpret_cast<intptr_t>(error);
#endif
exec_ctx_sched(closure);
}
void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) {
@ -218,7 +231,7 @@ void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) {
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
exec_ctx_sched(c, c->error_data.error);
exec_ctx_sched(c);
c = next;
}
list->head = list->tail = nullptr;

@ -114,7 +114,6 @@ size_t Executor::RunClosures(const char* executor_name,
grpc_closure* c = list.head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
grpc_error_handle error = c->error_data.error;
#ifndef NDEBUG
EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
c->file_created, c->line_created);
@ -122,8 +121,18 @@ size_t Executor::RunClosures(const char* executor_name,
#else
EXECUTOR_TRACE("(%s) run %p", executor_name, c);
#endif
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
grpc_error_handle error =
internal::StatusMoveFromHeapPtr(c->error_data.error);
c->error_data.error = 0;
c->cb(c->cb_arg, std::move(error));
#else
grpc_error_handle error =
reinterpret_cast<grpc_error_handle>(c->error_data.error);
c->error_data.error = 0;
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
#endif
c = next;
n++;
ExecCtx::Get()->Flush();

@ -150,23 +150,22 @@ TEST(StatusUtilTest, ComplexErrorWithChildrenToString) {
t);
}
TEST(StatusUtilTest, AllocPtr) {
TEST(StatusUtilTest, AllocHeapPtr) {
absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
absl::AbortedError("Message")};
for (const auto& s : statuses) {
uintptr_t p = internal::StatusAllocPtr(s);
EXPECT_EQ(s, internal::StatusGetFromPtr(p));
internal::StatusFreePtr(p);
uintptr_t p = internal::StatusAllocHeapPtr(s);
EXPECT_EQ(s, internal::StatusGetFromHeapPtr(p));
internal::StatusFreeHeapPtr(p);
}
}
TEST(StatusUtilTest, AllocHeapPtr) {
TEST(StatusUtilTest, MoveHeapPtr) {
absl::Status statuses[] = {absl::OkStatus(), absl::CancelledError(),
absl::AbortedError("Message")};
for (const auto& s : statuses) {
uintptr_t p = internal::StatusAllocHeapPtr(s);
EXPECT_EQ(s, internal::StatusGetFromHeapPtr(p));
internal::StatusFreeHeapPtr(p);
EXPECT_EQ(s, internal::StatusMoveFromHeapPtr(p));
}
}

Loading…
Cancel
Save