|
|
|
@ -20,9 +20,9 @@ |
|
|
|
|
|
|
|
|
|
#include <string.h> |
|
|
|
|
|
|
|
|
|
#include "absl/log/check.h" |
|
|
|
|
#include "absl/log/log.h" |
|
|
|
|
#include "absl/strings/str_format.h" |
|
|
|
|
#include "third_party/absl/log/check.h" |
|
|
|
|
#include "third_party/absl/log/log.h" |
|
|
|
|
#include "third_party/absl/strings/str_format.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/cpu.h> |
|
|
|
@ -39,13 +39,6 @@ |
|
|
|
|
|
|
|
|
|
#define MAX_DEPTH 2 |
|
|
|
|
|
|
|
|
|
#define EXECUTOR_TRACE(format, ...) \ |
|
|
|
|
do { \
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(executor)) { \
|
|
|
|
|
LOG(INFO) << "EXECUTOR " << absl::StrFormat(format, __VA_ARGS__); \
|
|
|
|
|
} \
|
|
|
|
|
} while (0) |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
@ -110,11 +103,13 @@ size_t Executor::RunClosures(const char* executor_name, |
|
|
|
|
while (c != nullptr) { |
|
|
|
|
grpc_closure* next = c->next_data.next; |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c, |
|
|
|
|
c->file_created, c->line_created); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << executor_name << " run " << c << " [created by " |
|
|
|
|
<< c->file_created << ":" << c->line_created << "]"; |
|
|
|
|
c->scheduled = false; |
|
|
|
|
#else |
|
|
|
|
EXECUTOR_TRACE("(%s) run %p", executor_name, c); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << executor_name << " run " << c; |
|
|
|
|
#endif |
|
|
|
|
grpc_error_handle error = |
|
|
|
|
internal::StatusMoveFromHeapPtr(c->error_data.error); |
|
|
|
@ -134,11 +129,14 @@ bool Executor::IsThreaded() const { |
|
|
|
|
|
|
|
|
|
void Executor::SetThreading(bool threading) { |
|
|
|
|
gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); |
|
|
|
|
EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " SetThreading(" << threading << ") begin"; |
|
|
|
|
|
|
|
|
|
if (threading) { |
|
|
|
|
if (curr_num_threads > 0) { |
|
|
|
|
EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads > 0", name_); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ |
|
|
|
|
<< " SetThreading(true). curr_num_threads > 0"; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -160,7 +158,9 @@ void Executor::SetThreading(bool threading) { |
|
|
|
|
thd_state_[0].thd.Start(); |
|
|
|
|
} else { // !threading
|
|
|
|
|
if (curr_num_threads == 0) { |
|
|
|
|
EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ |
|
|
|
|
<< " SetThreading(false). curr_num_threads == 0"; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -179,8 +179,9 @@ void Executor::SetThreading(bool threading) { |
|
|
|
|
curr_num_threads = gpr_atm_no_barrier_load(&num_threads_); |
|
|
|
|
for (gpr_atm i = 0; i < curr_num_threads; i++) { |
|
|
|
|
thd_state_[i].thd.Join(); |
|
|
|
|
EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_, |
|
|
|
|
i + 1, curr_num_threads); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " Thread " << i + 1 << " of " |
|
|
|
|
<< curr_num_threads << " joined"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_atm_rel_store(&num_threads_, 0); |
|
|
|
@ -201,7 +202,8 @@ void Executor::SetThreading(bool threading) { |
|
|
|
|
grpc_iomgr_platform_shutdown_background_closure(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " SetThreading(" << threading << ") done"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Executor::Shutdown() { SetThreading(false); } |
|
|
|
@ -214,8 +216,9 @@ void Executor::ThreadMain(void* arg) { |
|
|
|
|
|
|
|
|
|
size_t subtract_depth = 0; |
|
|
|
|
for (;;) { |
|
|
|
|
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", |
|
|
|
|
ts->name, ts->id, subtract_depth); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << ts->name << " [%" << ts->id |
|
|
|
|
<< "]: step (sub_depth=" << subtract_depth << ")"; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&ts->mu); |
|
|
|
|
ts->depth -= subtract_depth; |
|
|
|
@ -226,7 +229,8 @@ void Executor::ThreadMain(void* arg) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (ts->shutdown) { |
|
|
|
|
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << ts->name << " [%" << ts->id << "]: shutdown"; |
|
|
|
|
gpr_mu_unlock(&ts->mu); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -235,7 +239,8 @@ void Executor::ThreadMain(void* arg) { |
|
|
|
|
ts->elems = GRPC_CLOSURE_LIST_INIT; |
|
|
|
|
gpr_mu_unlock(&ts->mu); |
|
|
|
|
|
|
|
|
|
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << ts->name << " [%" << ts->id << "]: execute"; |
|
|
|
|
|
|
|
|
|
ExecCtx::Get()->InvalidateNow(); |
|
|
|
|
subtract_depth = RunClosures(ts->name, closures); |
|
|
|
@ -257,10 +262,13 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, |
|
|
|
|
// or already shutdown), then queue the closure on the exec context itself
|
|
|
|
|
if (cur_thread_count == 0) { |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure, |
|
|
|
|
closure->file_created, closure->line_created); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " schedule " << closure << " (created " |
|
|
|
|
<< closure->file_created << ":" << closure->line_created |
|
|
|
|
<< ") inline"; |
|
|
|
|
#else |
|
|
|
|
EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " schedule " << closure << " inline"; |
|
|
|
|
#endif |
|
|
|
|
grpc_closure_list_append(ExecCtx::Get()->closure_list(), closure, error); |
|
|
|
|
return; |
|
|
|
@ -280,14 +288,15 @@ void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error, |
|
|
|
|
|
|
|
|
|
for (;;) { |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
EXECUTOR_TRACE( |
|
|
|
|
"(%s) try to schedule %p (%s) (created %s:%d) to thread " |
|
|
|
|
"%" PRIdPTR, |
|
|
|
|
name_, closure, is_short ? "short" : "long", closure->file_created, |
|
|
|
|
closure->line_created, ts->id); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " try to schedule " << closure << " (" |
|
|
|
|
<< (is_short ? "short" : "long") << ") (created " |
|
|
|
|
<< closure->file_created << ":" << closure->line_created |
|
|
|
|
<< ") to thread " << ts->id; |
|
|
|
|
#else |
|
|
|
|
EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_, |
|
|
|
|
closure, is_short ? "short" : "long", ts->id); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "EXECUTOR " << name_ << " try to schedule " << closure << " (" |
|
|
|
|
<< (is_short ? "short" : "long") << ") to thread " << ts->id; |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&ts->mu); |
|
|
|
@ -429,7 +438,8 @@ bool Executor::IsThreadedDefault() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Executor::SetThreadingAll(bool enable) { |
|
|
|
|
EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "Executor::SetThreadingAll(" << enable << ") called"; |
|
|
|
|
for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS); |
|
|
|
|
i++) { |
|
|
|
|
executors[i]->SetThreading(enable); |
|
|
|
@ -437,7 +447,8 @@ void Executor::SetThreadingAll(bool enable) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Executor::SetThreadingDefault(bool enable) { |
|
|
|
|
EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable); |
|
|
|
|
GRPC_TRACE_LOG(executor, INFO) |
|
|
|
|
<< "Executor::SetThreadingDefault(" << enable << ") called"; |
|
|
|
|
executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|