Merge pull request #24158 from grpc/revert-24081-revert-23710-shutdown

Fix-forward "Implemented conditional shutdown" (#23710)
pull/24177/head
Esun Kim 4 years ago committed by GitHub
commit 83c3fe5d7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      CMakeLists.txt
  2. 27
      build_autogenerated.yaml
  3. 18
      src/core/lib/iomgr/exec_ctx.h
  4. 35
      src/core/lib/surface/init.cc
  5. 3
      test/core/surface/BUILD
  6. 102
      test/core/surface/init_test.cc
  7. 48
      tools/run_tests/generated/tests.json

@ -606,7 +606,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c httpscli_test)
endif()
add_dependencies(buildtests_c init_test)
add_dependencies(buildtests_c inproc_callback_test)
add_dependencies(buildtests_c invalid_call_argument_test)
add_dependencies(buildtests_c json_token_test)
@ -839,6 +838,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx health_service_end2end_test)
add_dependencies(buildtests_cxx http2_client)
add_dependencies(buildtests_cxx hybrid_end2end_test)
add_dependencies(buildtests_cxx init_test)
add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test)
add_dependencies(buildtests_cxx interop_client)
add_dependencies(buildtests_cxx interop_server)
@ -5973,36 +5973,6 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(init_test
test/core/surface/init_test.cc
)
target_include_directories(init_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(init_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
address_sorting
upb
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(inproc_callback_test
test/core/end2end/inproc_callback_test.cc
)
@ -12001,6 +11971,45 @@ target_link_libraries(hybrid_end2end_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(init_test
test/core/surface/init_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(init_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(init_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
address_sorting
upb
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)

@ -3566,19 +3566,6 @@ targets:
- linux
- posix
- mac
- name: init_test
build: test
language: c
headers: []
src:
- test/core/surface/init_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- address_sorting
- upb
uses_polling: false
- name: inproc_callback_test
build: test
language: c
@ -6200,6 +6187,20 @@ targets:
- gpr
- address_sorting
- upb
- name: init_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/surface/init_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- address_sorting
- upb
uses_polling: false
- name: initial_settings_frame_bad_client_test
gtest: true
build: test

@ -331,9 +331,15 @@ class ApplicationCallbackExecCtx {
}
}
uintptr_t Flags() { return flags_; }
static ApplicationCallbackExecCtx* Get() {
return reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_));
}
static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
if (Get() == nullptr) {
if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) {
grpc_core::Fork::IncExecCtxCount();
}
@ -346,8 +352,7 @@ class ApplicationCallbackExecCtx {
functor->internal_success = is_success;
functor->internal_next = nullptr;
auto* ctx = reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_));
ApplicationCallbackExecCtx* ctx = Get();
if (ctx->head_ == nullptr) {
ctx->head_ = functor;
@ -364,10 +369,7 @@ class ApplicationCallbackExecCtx {
/** Global shutdown for ApplicationCallbackExecCtx. Called by init. */
static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); }
static bool Available() {
return reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) != nullptr;
}
static bool Available() { return Get() != nullptr; }
private:
uintptr_t flags_{0u};

@ -18,6 +18,8 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/surface/init.h"
#include <limits.h>
#include <memory.h>
@ -26,6 +28,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/channel/connected_channel.h"
@ -37,6 +40,7 @@
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
@ -47,7 +51,6 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/init.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/bdp_estimator.h"
@ -211,15 +214,29 @@ void grpc_shutdown_internal(void* /*ignored*/) {
void grpc_shutdown(void) {
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_initializations++;
g_shutting_down = true;
// spawn a detached thread to do the actual clean up in case we are
// currently in an executor thread.
grpc_core::Thread cleanup_thread(
"grpc_shutdown", grpc_shutdown_internal, nullptr, nullptr,
grpc_core::Thread::Options().set_joinable(false).set_tracked(false));
cleanup_thread.Start();
grpc_core::ApplicationCallbackExecCtx* acec =
grpc_core::ApplicationCallbackExecCtx::Get();
if (!grpc_iomgr_is_any_background_poller_thread() &&
(acec == nullptr ||
(acec->Flags() & GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD) ==
0)) {
// just run clean-up when this is called on non-executor thread.
gpr_log(GPR_DEBUG, "grpc_shutdown starts clean-up now");
g_shutting_down = true;
grpc_shutdown_internal_locked();
} else {
// spawn a detached thread to do the actual clean up in case we are
// currently in an executor thread.
gpr_log(GPR_DEBUG, "grpc_shutdown spawns clean-up thread");
g_initializations++;
g_shutting_down = true;
grpc_core::Thread cleanup_thread(
"grpc_shutdown", grpc_shutdown_internal, nullptr, nullptr,
grpc_core::Thread::Options().set_joinable(false).set_tracked(false));
cleanup_thread.Start();
}
}
}

@ -79,6 +79,9 @@ grpc_cc_test(
grpc_cc_test(
name = "init_test",
srcs = ["init_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_polling = False,
deps = [

@ -16,14 +16,22 @@
*
*/
#include "src/core/lib/surface/init.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <gtest/gtest.h>
#include "src/core/lib/surface/init.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
static int g_flag;
static int g_plugin_state;
static void plugin_init(void) { g_plugin_state = 1; }
static void plugin_destroy(void) { g_plugin_state = 2; }
static bool plugin_is_intialized(void) { return g_plugin_state == 1; }
static bool plugin_is_destroyed(void) { return g_plugin_state == 2; }
static void test(int rounds) {
int i;
@ -33,7 +41,13 @@ static void test(int rounds) {
for (i = 0; i < rounds; i++) {
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
EXPECT_FALSE(grpc_is_initialized());
}
TEST(Init, test) {
test(1);
test(2);
test(3);
}
static void test_blocking(int rounds) {
@ -44,55 +58,87 @@ static void test_blocking(int rounds) {
for (i = 0; i < rounds; i++) {
grpc_shutdown_blocking();
}
EXPECT_FALSE(grpc_is_initialized());
}
TEST(Init, blocking) {
test_blocking(1);
test_blocking(2);
test_blocking(3);
}
TEST(Init, shutdown_with_thread) {
grpc_init();
{
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
EXPECT_FALSE(grpc_is_initialized());
}
static void test_mixed(void) {
TEST(Init, mixed) {
grpc_init();
grpc_init();
grpc_shutdown();
grpc_init();
grpc_shutdown();
grpc_shutdown();
grpc_maybe_wait_for_async_shutdown();
EXPECT_FALSE(grpc_is_initialized());
}
static void plugin_init(void) { g_flag = 1; }
static void plugin_destroy(void) { g_flag = 2; }
TEST(Init, mixed_with_thread) {
grpc_init();
{
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
grpc_init();
grpc_shutdown();
grpc_init();
grpc_shutdown();
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
EXPECT_FALSE(grpc_is_initialized());
}
static void test_plugin() {
grpc_register_plugin(plugin_init, plugin_destroy);
TEST(Init, plugin) {
grpc_init();
GPR_ASSERT(g_flag == 1);
EXPECT_TRUE(plugin_is_intialized());
grpc_shutdown_blocking();
GPR_ASSERT(g_flag == 2);
EXPECT_TRUE(plugin_is_destroyed());
EXPECT_FALSE(grpc_is_initialized());
}
static void test_repeatedly() {
for (int i = 0; i < 1000; i++) {
TEST(Init, repeatedly) {
for (int i = 0; i < 10; i++) {
grpc_init();
grpc_shutdown();
{
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
grpc_shutdown();
}
}
grpc_maybe_wait_for_async_shutdown();
EXPECT_FALSE(grpc_is_initialized());
}
static void test_repeatedly_blocking() {
for (int i = 0; i < 1000; i++) {
TEST(Init, repeatedly_blocking) {
for (int i = 0; i < 10; i++) {
grpc_init();
grpc_shutdown_blocking();
{
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx(
GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
grpc_shutdown_blocking();
}
}
EXPECT_FALSE(grpc_is_initialized());
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
test(1);
test(2);
test(3);
test_blocking(1);
test_blocking(2);
test_blocking(3);
test_mixed();
test_plugin();
test_repeatedly();
test_repeatedly_blocking();
return 0;
::testing::InitGoogleTest(&argc, argv);
grpc_register_plugin(plugin_init, plugin_destroy);
return RUN_ALL_TESTS();
}

@ -1607,30 +1607,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "init_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -4741,6 +4717,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "init_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save