Merge pull request #17973 from ericgribkoff/prefork_handler_bugfix

Don't count internal ApplicationCallbackExecCtx against ExcCtx count
pull/17994/head
Eric Gribkoff 6 years ago committed by GitHub
commit b3ed1f4548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      src/core/lib/iomgr/exec_ctx.h
  2. 3
      src/core/lib/iomgr/executor.cc
  3. 3
      src/core/lib/iomgr/timer_manager.cc

@ -49,6 +49,10 @@ typedef struct grpc_combiner grpc_combiner;
be counted by fork handlers */
#define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4
/* This application callback exec ctx was initialized by an internal thread, and
should not be counted by fork handlers */
#define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1
extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx;
gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock);
@ -229,13 +233,12 @@ class ExecCtx {
class ApplicationCallbackExecCtx {
public:
ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
grpc_core::Fork::IncExecCtxCount();
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(this));
}
}
/** Default Constructor */
ApplicationCallbackExecCtx() { Set(this, flags_); }
/** Parameterised Constructor */
ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) { Set(this, flags_); }
~ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == this) {
@ -248,12 +251,25 @@ class ApplicationCallbackExecCtx {
(*f->functor_run)(f, f->internal_success);
}
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
grpc_core::Fork::DecExecCtxCount();
if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
grpc_core::Fork::DecExecCtxCount();
}
} else {
GPR_DEBUG_ASSERT(head_ == nullptr);
GPR_DEBUG_ASSERT(tail_ == nullptr);
}
}
static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
grpc_core::Fork::IncExecCtxCount();
}
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
}
}
static void Enqueue(grpc_experimental_completion_queue_functor* functor,
int is_success) {
functor->internal_success = is_success;
@ -278,6 +294,7 @@ class ApplicationCallbackExecCtx {
static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); }
private:
uintptr_t flags_{0u};
grpc_experimental_completion_queue_functor* head_{nullptr};
grpc_experimental_completion_queue_functor* tail_{nullptr};
GPR_TLS_CLASS_DECL(callback_exec_ctx_);

@ -116,7 +116,8 @@ size_t Executor::RunClosures(const char* executor_name,
// application-level callbacks. No need to create a new ExecCtx, though,
// since there already is one and it is flushed (but not destructed) in this
// function itself.
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
grpc_closure* c = list.head;
while (c != nullptr) {

@ -110,7 +110,8 @@ static void run_some_timers() {
// could start seeing application-level callbacks. No need to
// create a new ExecCtx, though, since there already is one and it is
// flushed (but not destructed) in this function itself
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
// if there's something to execute...
gpr_mu_lock(&g_mu);

Loading…
Cancel
Save