mirror of https://github.com/grpc/grpc.git
Merge pull request #14647 from kpayson64/fork_exec_ctx_check
Add exec_ctx check to fork handlerspull/15360/head
commit
5fc081acd1
28 changed files with 671 additions and 225 deletions
@ -1,78 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/gpr/fork.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
|
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
|
||||
/*
|
||||
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK |
||||
* AROUND VERY SPECIFIC USE CASES. |
||||
*/ |
||||
|
||||
static int override_fork_support_enabled = -1; |
||||
static int fork_support_enabled; |
||||
|
||||
void grpc_fork_support_init() { |
||||
#ifdef GRPC_ENABLE_FORK_SUPPORT |
||||
fork_support_enabled = 1; |
||||
#else |
||||
fork_support_enabled = 0; |
||||
#endif |
||||
bool env_var_set = false; |
||||
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); |
||||
if (env != nullptr) { |
||||
static const char* truthy[] = {"yes", "Yes", "YES", "true", |
||||
"True", "TRUE", "1"}; |
||||
static const char* falsey[] = {"no", "No", "NO", "false", |
||||
"False", "FALSE", "0"}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { |
||||
if (0 == strcmp(env, truthy[i])) { |
||||
fork_support_enabled = 1; |
||||
env_var_set = true; |
||||
break; |
||||
} |
||||
} |
||||
if (!env_var_set) { |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { |
||||
if (0 == strcmp(env, falsey[i])) { |
||||
fork_support_enabled = 0; |
||||
env_var_set = true; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
gpr_free(env); |
||||
} |
||||
if (override_fork_support_enabled != -1) { |
||||
fork_support_enabled = override_fork_support_enabled; |
||||
} |
||||
} |
||||
|
||||
int grpc_fork_support_enabled() { return fork_support_enabled; } |
||||
|
||||
void grpc_enable_fork_support(int enable) { |
||||
override_fork_support_enabled = enable; |
||||
} |
@ -1,35 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_GPR_FORK_H |
||||
#define GRPC_CORE_LIB_GPR_FORK_H |
||||
|
||||
/*
|
||||
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK |
||||
* AROUND VERY SPECIFIC USE CASES. |
||||
*/ |
||||
|
||||
void grpc_fork_support_init(void); |
||||
|
||||
int grpc_fork_support_enabled(void); |
||||
|
||||
// Test only: Must be called before grpc_init(), and overrides
|
||||
// environment variables/compile flags
|
||||
void grpc_enable_fork_support(int enable); |
||||
|
||||
#endif /* GRPC_CORE_LIB_GPR_FORK_H */ |
@ -0,0 +1,260 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/gprpp/fork.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/sync.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
|
||||
/*
|
||||
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK |
||||
* AROUND VERY SPECIFIC USE CASES. |
||||
*/ |
||||
|
||||
namespace grpc_core { |
||||
namespace internal { |
||||
// The exec_ctx_count has 2 modes, blocked and unblocked.
|
||||
// When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
|
||||
// 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
|
||||
|
||||
// When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
|
||||
// creation can only be blocked if there is exactly 1 outstanding ExecCtx,
|
||||
// meaning that BLOCKED and UNBLOCKED counts partition the integers
|
||||
#define UNBLOCKED(n) (n + 2) |
||||
#define BLOCKED(n) (n) |
||||
|
||||
class ExecCtxState { |
||||
public: |
||||
ExecCtxState() : fork_complete_(true) { |
||||
gpr_mu_init(&mu_); |
||||
gpr_cv_init(&cv_); |
||||
gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); |
||||
} |
||||
|
||||
void IncExecCtxCount() { |
||||
gpr_atm count = gpr_atm_no_barrier_load(&count_); |
||||
while (true) { |
||||
if (count <= BLOCKED(1)) { |
||||
// This only occurs if we are trying to fork. Wait until the fork()
|
||||
// operation completes before allowing new ExecCtxs.
|
||||
gpr_mu_lock(&mu_); |
||||
if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) { |
||||
while (!fork_complete_) { |
||||
gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); |
||||
} |
||||
} |
||||
gpr_mu_unlock(&mu_); |
||||
} else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) { |
||||
break; |
||||
} |
||||
count = gpr_atm_no_barrier_load(&count_); |
||||
} |
||||
} |
||||
|
||||
void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); } |
||||
|
||||
bool BlockExecCtx() { |
||||
// Assumes there is an active ExecCtx when this function is called
|
||||
if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) { |
||||
gpr_mu_lock(&mu_); |
||||
fork_complete_ = false; |
||||
gpr_mu_unlock(&mu_); |
||||
return true; |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
void AllowExecCtx() { |
||||
gpr_mu_lock(&mu_); |
||||
gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); |
||||
fork_complete_ = true; |
||||
gpr_cv_broadcast(&cv_); |
||||
gpr_mu_unlock(&mu_); |
||||
} |
||||
|
||||
~ExecCtxState() { |
||||
gpr_mu_destroy(&mu_); |
||||
gpr_cv_destroy(&cv_); |
||||
} |
||||
|
||||
private: |
||||
bool fork_complete_; |
||||
gpr_mu mu_; |
||||
gpr_cv cv_; |
||||
gpr_atm count_; |
||||
}; |
||||
|
||||
class ThreadState { |
||||
public: |
||||
ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) { |
||||
gpr_mu_init(&mu_); |
||||
gpr_cv_init(&cv_); |
||||
} |
||||
|
||||
void IncThreadCount() { |
||||
gpr_mu_lock(&mu_); |
||||
count_++; |
||||
gpr_mu_unlock(&mu_); |
||||
} |
||||
|
||||
void DecThreadCount() { |
||||
gpr_mu_lock(&mu_); |
||||
count_--; |
||||
if (awaiting_threads_ && count_ == 0) { |
||||
threads_done_ = true; |
||||
gpr_cv_signal(&cv_); |
||||
} |
||||
gpr_mu_unlock(&mu_); |
||||
} |
||||
void AwaitThreads() { |
||||
gpr_mu_lock(&mu_); |
||||
awaiting_threads_ = true; |
||||
threads_done_ = (count_ == 0); |
||||
while (!threads_done_) { |
||||
gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); |
||||
} |
||||
awaiting_threads_ = true; |
||||
gpr_mu_unlock(&mu_); |
||||
} |
||||
|
||||
~ThreadState() { |
||||
gpr_mu_destroy(&mu_); |
||||
gpr_cv_destroy(&cv_); |
||||
} |
||||
|
||||
private: |
||||
bool awaiting_threads_; |
||||
bool threads_done_; |
||||
gpr_mu mu_; |
||||
gpr_cv cv_; |
||||
int count_; |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
void Fork::GlobalInit() { |
||||
if (!overrideEnabled_) { |
||||
#ifdef GRPC_ENABLE_FORK_SUPPORT |
||||
supportEnabled_ = true; |
||||
#else |
||||
supportEnabled_ = false; |
||||
#endif |
||||
bool env_var_set = false; |
||||
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); |
||||
if (env != nullptr) { |
||||
static const char* truthy[] = {"yes", "Yes", "YES", "true", |
||||
"True", "TRUE", "1"}; |
||||
static const char* falsey[] = {"no", "No", "NO", "false", |
||||
"False", "FALSE", "0"}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { |
||||
if (0 == strcmp(env, truthy[i])) { |
||||
supportEnabled_ = true; |
||||
env_var_set = true; |
||||
break; |
||||
} |
||||
} |
||||
if (!env_var_set) { |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { |
||||
if (0 == strcmp(env, falsey[i])) { |
||||
supportEnabled_ = false; |
||||
env_var_set = true; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
gpr_free(env); |
||||
} |
||||
} |
||||
if (supportEnabled_) { |
||||
execCtxState_ = grpc_core::New<internal::ExecCtxState>(); |
||||
threadState_ = grpc_core::New<internal::ThreadState>(); |
||||
} |
||||
} |
||||
|
||||
void Fork::GlobalShutdown() { |
||||
if (supportEnabled_) { |
||||
grpc_core::Delete(execCtxState_); |
||||
grpc_core::Delete(threadState_); |
||||
} |
||||
} |
||||
|
||||
bool Fork::Enabled() { return supportEnabled_; } |
||||
|
||||
// Testing Only
|
||||
void Fork::Enable(bool enable) { |
||||
overrideEnabled_ = true; |
||||
supportEnabled_ = enable; |
||||
} |
||||
|
||||
void Fork::IncExecCtxCount() { |
||||
if (supportEnabled_) { |
||||
execCtxState_->IncExecCtxCount(); |
||||
} |
||||
} |
||||
|
||||
void Fork::DecExecCtxCount() { |
||||
if (supportEnabled_) { |
||||
execCtxState_->DecExecCtxCount(); |
||||
} |
||||
} |
||||
|
||||
bool Fork::BlockExecCtx() { |
||||
if (supportEnabled_) { |
||||
return execCtxState_->BlockExecCtx(); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
void Fork::AllowExecCtx() { |
||||
if (supportEnabled_) { |
||||
execCtxState_->AllowExecCtx(); |
||||
} |
||||
} |
||||
|
||||
void Fork::IncThreadCount() { |
||||
if (supportEnabled_) { |
||||
threadState_->IncThreadCount(); |
||||
} |
||||
} |
||||
|
||||
void Fork::DecThreadCount() { |
||||
if (supportEnabled_) { |
||||
threadState_->DecThreadCount(); |
||||
} |
||||
} |
||||
void Fork::AwaitThreads() { |
||||
if (supportEnabled_) { |
||||
threadState_->AwaitThreads(); |
||||
} |
||||
} |
||||
|
||||
internal::ExecCtxState* Fork::execCtxState_ = nullptr; |
||||
internal::ThreadState* Fork::threadState_ = nullptr; |
||||
bool Fork::supportEnabled_ = false; |
||||
bool Fork::overrideEnabled_ = false; |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,79 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_GPRPP_FORK_H |
||||
#define GRPC_CORE_LIB_GPRPP_FORK_H |
||||
|
||||
/*
|
||||
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK |
||||
* AROUND VERY SPECIFIC USE CASES. |
||||
*/ |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace internal { |
||||
class ExecCtxState; |
||||
class ThreadState; |
||||
} // namespace internal
|
||||
|
||||
class Fork { |
||||
public: |
||||
static void GlobalInit(); |
||||
static void GlobalShutdown(); |
||||
|
||||
// Returns true if fork suppport is enabled, false otherwise
|
||||
static bool Enabled(); |
||||
|
||||
// Increment the count of active ExecCtxs.
|
||||
// Will block until a pending fork is complete if one is in progress.
|
||||
static void IncExecCtxCount(); |
||||
|
||||
// Decrement the count of active ExecCtxs
|
||||
static void DecExecCtxCount(); |
||||
|
||||
// Check if there is a single active ExecCtx
|
||||
// (the one used to invoke this function). If there are more,
|
||||
// return false. Otherwise, return true and block creation of
|
||||
// more ExecCtx s until AlloWExecCtx() is called
|
||||
//
|
||||
static bool BlockExecCtx(); |
||||
static void AllowExecCtx(); |
||||
|
||||
// Increment the count of active threads.
|
||||
static void IncThreadCount(); |
||||
|
||||
// Decrement the count of active threads.
|
||||
static void DecThreadCount(); |
||||
|
||||
// Await all core threads to be joined.
|
||||
static void AwaitThreads(); |
||||
|
||||
// Test only: overrides environment variables/compile flags
|
||||
// Must be called before grpc_init()
|
||||
static void Enable(bool enable); |
||||
|
||||
private: |
||||
static internal::ExecCtxState* execCtxState_; |
||||
static internal::ThreadState* threadState_; |
||||
static bool supportEnabled_; |
||||
static bool overrideEnabled_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_LIB_GPRPP_FORK_H */ |
@ -0,0 +1,135 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/gprpp/fork.h" |
||||
|
||||
#include "src/core/lib/gprpp/thd.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
static void test_init() { |
||||
GPR_ASSERT(!grpc_core::Fork::Enabled()); |
||||
|
||||
// Default fork support (disabled)
|
||||
grpc_core::Fork::GlobalInit(); |
||||
GPR_ASSERT(!grpc_core::Fork::Enabled()); |
||||
grpc_core::Fork::GlobalShutdown(); |
||||
|
||||
// Explicitly disabled fork support
|
||||
grpc_core::Fork::Enable(false); |
||||
grpc_core::Fork::GlobalInit(); |
||||
GPR_ASSERT(!grpc_core::Fork::Enabled()); |
||||
grpc_core::Fork::GlobalShutdown(); |
||||
|
||||
// Explicitly enabled fork support
|
||||
grpc_core::Fork::Enable(true); |
||||
grpc_core::Fork::GlobalInit(); |
||||
GPR_ASSERT(grpc_core::Fork::Enabled()); |
||||
grpc_core::Fork::GlobalShutdown(); |
||||
} |
||||
|
||||
#define THREAD_DELAY_MS 3000 |
||||
#define THREAD_DELAY_EPSILON 500 |
||||
#define CONCURRENT_TEST_THREADS 100 |
||||
|
||||
static void sleeping_thd(void* arg) { |
||||
int64_t sleep_ms = (int64_t)arg; |
||||
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |
||||
gpr_time_from_millis(sleep_ms, GPR_TIMESPAN))); |
||||
} |
||||
|
||||
static void test_thd_count() { |
||||
// Test no active threads
|
||||
grpc_core::Fork::Enable(true); |
||||
grpc_core::Fork::GlobalInit(); |
||||
grpc_core::Fork::AwaitThreads(); |
||||
grpc_core::Fork::GlobalShutdown(); |
||||
|
||||
grpc_core::Fork::Enable(true); |
||||
grpc_core::Fork::GlobalInit(); |
||||
grpc_core::Thread thds[CONCURRENT_TEST_THREADS]; |
||||
gpr_timespec est_end_time = |
||||
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |
||||
gpr_time_from_millis(THREAD_DELAY_MS, GPR_TIMESPAN)); |
||||
gpr_timespec tolerance = |
||||
gpr_time_from_millis(THREAD_DELAY_EPSILON, GPR_TIMESPAN); |
||||
for (int i = 0; i < CONCURRENT_TEST_THREADS; i++) { |
||||
intptr_t sleep_time_ms = |
||||
(i * THREAD_DELAY_MS) / (CONCURRENT_TEST_THREADS - 1); |
||||
thds[i] = |
||||
grpc_core::Thread("grpc_fork_test", sleeping_thd, (void*)sleep_time_ms); |
||||
thds[i].Start(); |
||||
} |
||||
grpc_core::Fork::AwaitThreads(); |
||||
gpr_timespec end_time = gpr_now(GPR_CLOCK_REALTIME); |
||||
for (auto& thd : thds) { |
||||
thd.Join(); |
||||
} |
||||
GPR_ASSERT(gpr_time_similar(end_time, est_end_time, tolerance)); |
||||
grpc_core::Fork::GlobalShutdown(); |
||||
} |
||||
|
||||
static void exec_ctx_thread(void* arg) { |
||||
bool* exec_ctx_created = (bool*)arg; |
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
*exec_ctx_created = true; |
||||
} |
||||
|
||||
static void test_exec_count() { |
||||
grpc_core::Fork::Enable(true); |
||||
grpc_core::Fork::GlobalInit(); |
||||
|
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
GPR_ASSERT(grpc_core::Fork::BlockExecCtx()); |
||||
grpc_core::Fork::DecExecCtxCount(); |
||||
grpc_core::Fork::AllowExecCtx(); |
||||
|
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
GPR_ASSERT(!grpc_core::Fork::BlockExecCtx()); |
||||
grpc_core::Fork::DecExecCtxCount(); |
||||
grpc_core::Fork::DecExecCtxCount(); |
||||
|
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
GPR_ASSERT(grpc_core::Fork::BlockExecCtx()); |
||||
grpc_core::Fork::DecExecCtxCount(); |
||||
grpc_core::Fork::AllowExecCtx(); |
||||
|
||||
// Test that block_exec_ctx() blocks grpc_core::Fork::IncExecCtxCount
|
||||
bool exec_ctx_created = false; |
||||
grpc_core::Thread thd = |
||||
grpc_core::Thread("grpc_fork_test", exec_ctx_thread, &exec_ctx_created); |
||||
grpc_core::Fork::IncExecCtxCount(); |
||||
GPR_ASSERT(grpc_core::Fork::BlockExecCtx()); |
||||
grpc_core::Fork::DecExecCtxCount(); |
||||
thd.Start(); |
||||
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), |
||||
gpr_time_from_seconds(1, GPR_TIMESPAN))); |
||||
GPR_ASSERT(!exec_ctx_created); |
||||
grpc_core::Fork::AllowExecCtx(); |
||||
thd.Join(); // This ensure that the call got un-blocked
|
||||
grpc_core::Fork::GlobalShutdown(); |
||||
} |
||||
|
||||
int main(int argc, char* argv[]) { |
||||
grpc_test_init(argc, argv); |
||||
test_init(); |
||||
test_thd_count(); |
||||
test_exec_count(); |
||||
|
||||
return 0; |
||||
} |
Loading…
Reference in new issue