From afa3a6c890eecf07eb1f5ad35df6ccafa2a1698a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 7 Oct 2022 21:35:49 -0700 Subject: [PATCH] [event_engine] Fix for the EE/iomgr shutdown ordering problem (#31265) * Remove ResetDefaultEventEngine Now that it is a weak_ptr, there's no need to explicitly reset it. When the tracked shared_ptr is deleted, the weak_ptr will fail to lock, and a new default EventEngine will be created. * forget existing engine with FactoryReset * init/shutdown in event engine for now * fix * fix * fix windows deadlock * Automated change: Fix sanity tests * fix * better windows fix Co-authored-by: AJ Heller Co-authored-by: ctiller --- BUILD | 2 ++ src/core/lib/event_engine/default_event_engine.cc | 7 +------ src/core/lib/event_engine/default_event_engine.h | 3 --- .../lib/event_engine/posix_engine/posix_engine.h | 6 +++++- src/core/lib/event_engine/windows/windows_engine.h | 6 +++++- src/core/lib/iomgr/resolve_address_windows.cc | 13 ++++++++----- src/core/lib/iomgr/resolve_address_windows.h | 3 --- src/core/lib/surface/init.cc | 2 -- src/core/lib/surface/init_internally.h | 8 ++++++++ test/core/end2end/fuzzers/api_fuzzer.cc | 1 - .../event_engine/posix/event_poller_posix_test.cc | 9 ++++++++- .../core/event_engine/posix/lock_free_event_test.cc | 8 +++++++- test/core/event_engine/posix/posix_endpoint_test.cc | 7 ++++++- .../test_suite/posix_event_engine_test.cc | 8 +++++++- test/cpp/microbenchmarks/bm_event_engine_run.cc | 4 ---- 15 files changed, 57 insertions(+), 30 deletions(-) diff --git a/BUILD b/BUILD index 99003a07a20..4964206ef57 100644 --- a/BUILD +++ b/BUILD @@ -2877,6 +2877,7 @@ grpc_cc_library( "event_engine_utils", "gpr", "grpc_trace", + "init_internally", "posix_event_engine_timer", "posix_event_engine_timer_manager", ], @@ -2898,6 +2899,7 @@ grpc_cc_library( "event_engine_trace", "event_engine_utils", "gpr", + "init_internally", "posix_event_engine_timer_manager", "time", "windows_iocp", diff --git a/src/core/lib/event_engine/default_event_engine.cc b/src/core/lib/event_engine/default_event_engine.cc index 8e52d3e163e..6e46e58991e 100644 --- a/src/core/lib/event_engine/default_event_engine.cc +++ b/src/core/lib/event_engine/default_event_engine.cc @@ -51,7 +51,7 @@ void SetEventEngineFactory( void EventEngineFactoryReset() { delete g_event_engine_factory.exchange(nullptr); - ResetDefaultEventEngine(); + g_event_engine->reset(); } std::unique_ptr CreateEventEngine() { @@ -74,10 +74,5 @@ std::shared_ptr GetDefaultEventEngine() { return engine; } -void ResetDefaultEventEngine() { - grpc_core::MutexLock lock(&*g_mu); - g_event_engine->reset(); -} - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/default_event_engine.h b/src/core/lib/event_engine/default_event_engine.h index 88181ad423e..e116a471a0a 100644 --- a/src/core/lib/event_engine/default_event_engine.h +++ b/src/core/lib/event_engine/default_event_engine.h @@ -37,9 +37,6 @@ namespace experimental { /// Strongly consider whether you could use \a CreateEventEngine instead. std::shared_ptr GetDefaultEventEngine(); -/// Reset the default event engine -void ResetDefaultEventEngine(); - } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index babb197349d..344d5465f6f 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -35,13 +35,17 @@ #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/surface/init_internally.h" namespace grpc_event_engine { namespace experimental { // An iomgr-based Posix EventEngine implementation. // All methods require an ExecCtx to already exist on the thread's stack. -class PosixEventEngine final : public EventEngine { +// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that +// event engine is shut down before we shut down iomgr. +class PosixEventEngine final : public EventEngine, + public grpc_core::KeepsGrpcInitialized { public: class PosixEndpoint : public EventEngine::Endpoint { public: diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 00fb35203f2..8a6aa4d6102 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -34,11 +34,15 @@ #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/surface/init_internally.h" namespace grpc_event_engine { namespace experimental { -class WindowsEventEngine : public EventEngine { +// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that +// event engine is shut down before we shut down iomgr. +class WindowsEventEngine : public EventEngine, + public grpc_core::KeepsGrpcInitialized { public: class WindowsEndpoint : public EventEngine::Endpoint { public: diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 738bfbf50b5..dfda9177bcb 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -82,8 +82,7 @@ class NativeDNSRequest { } // namespace -NativeDNSResolver::NativeDNSResolver() - : engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {} +NativeDNSResolver::NativeDNSResolver() {} DNSResolver::TaskHandle NativeDNSResolver::LookupHostname( std::function>)> @@ -102,7 +101,6 @@ NativeDNSResolver::LookupHostnameBlocking(absl::string_view name, struct addrinfo hints; struct addrinfo *result = NULL, *resp; int s; - size_t i; grpc_error_handle error; std::vector addresses; @@ -156,13 +154,18 @@ done: return error_result; } +void RunCallbackOnDefaultEventEngine(absl::AnyInvocable f) { + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); + engine->Run([f = std::move(f), engine]() mutable { f(); }); +} + DNSResolver::TaskHandle NativeDNSResolver::LookupSRV( std::function>)> on_resolved, absl::string_view /* name */, Duration /* deadline */, grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { - engine_->Run([on_resolved] { + RunCallbackOnDefaultEventEngine([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( @@ -177,7 +180,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT( grpc_pollset_set* /* interested_parties */, absl::string_view /* name_server */) { // Not supported - engine_->Run([on_resolved] { + RunCallbackOnDefaultEventEngine([on_resolved] { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; on_resolved(absl::UnimplementedError( diff --git a/src/core/lib/iomgr/resolve_address_windows.h b/src/core/lib/iomgr/resolve_address_windows.h index 64690464830..1854cb70eec 100644 --- a/src/core/lib/iomgr/resolve_address_windows.h +++ b/src/core/lib/iomgr/resolve_address_windows.h @@ -56,9 +56,6 @@ class NativeDNSResolver : public DNSResolver { // NativeDNSResolver does not support cancellation. bool Cancel(TaskHandle handle) override; - - private: - std::shared_ptr engine_; }; } // namespace grpc_core diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 9cbcbb2ba1d..9e52314223e 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -37,7 +37,6 @@ #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/experiments/config.h" @@ -156,7 +155,6 @@ void grpc_shutdown_internal_locked(void) grpc_iomgr_shutdown_background_closure(); grpc_timer_manager_set_threading(false); // shutdown timer_manager thread grpc_resolver_dns_ares_shutdown(); - grpc_event_engine::experimental::ResetDefaultEventEngine(); grpc_iomgr_shutdown(); } g_shutting_down = false; diff --git a/src/core/lib/surface/init_internally.h b/src/core/lib/surface/init_internally.h index 551c3ae5085..12e118930e2 100644 --- a/src/core/lib/surface/init_internally.h +++ b/src/core/lib/surface/init_internally.h @@ -23,6 +23,14 @@ namespace grpc_core { extern void (*InitInternally)(); extern void (*ShutdownInternally)(); +class KeepsGrpcInitialized { + public: + KeepsGrpcInitialized() { InitInternally(); } + ~KeepsGrpcInitialized() { ShutdownInternally(); } + KeepsGrpcInitialized(const KeepsGrpcInitialized&) = delete; + KeepsGrpcInitialized& operator=(const KeepsGrpcInitialized&) = delete; +}; + } // namespace grpc_core #endif // GRPC_CORE_LIB_SURFACE_INIT_INTERNALLY_H diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index cc4eba9ed0f..9b1b092632c 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -812,7 +812,6 @@ static grpc_channel_credentials* ReadChannelCreds( } DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { - grpc_event_engine::experimental::ResetDefaultEventEngine(); if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) { gpr_set_log_function(dont_log); } diff --git a/test/core/event_engine/posix/event_poller_posix_test.cc b/test/core/event_engine/posix/event_poller_posix_test.cc index 75efcdf445a..74a667c4fe5 100644 --- a/test/core/event_engine/posix/event_poller_posix_test.cc +++ b/test/core/event_engine/posix/event_poller_posix_test.cc @@ -29,6 +29,8 @@ #include "absl/strings/string_view.h" #include "gtest/gtest.h" +#include + #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" @@ -714,7 +716,12 @@ int main(int argc, char** argv) { // Skip the test entirely if poll strategy is none. return 0; } - return RUN_ALL_TESTS(); + // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first + // until we clear out the iomgr shutdown code. + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } #else /* GRPC_POSIX_SOCKET_EV */ diff --git a/test/core/event_engine/posix/lock_free_event_test.cc b/test/core/event_engine/posix/lock_free_event_test.cc index 9c042374b82..b854a7c970b 100644 --- a/test/core/event_engine/posix/lock_free_event_test.cc +++ b/test/core/event_engine/posix/lock_free_event_test.cc @@ -24,6 +24,7 @@ #include "gtest/gtest.h" #include +#include #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" @@ -155,7 +156,12 @@ TEST(LockFreeEventTest, MultiThreadedTest) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); + // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first + // until we clear out the iomgr shutdown code. + grpc_init(); g_scheduler = new TestScheduler( grpc_event_engine::experimental::GetDefaultEventEngine()); - return RUN_ALL_TESTS(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } diff --git a/test/core/event_engine/posix/posix_endpoint_test.cc b/test/core/event_engine/posix/posix_endpoint_test.cc index 716524cfea8..9f92b769396 100644 --- a/test/core/event_engine/posix/posix_endpoint_test.cc +++ b/test/core/event_engine/posix/posix_endpoint_test.cc @@ -342,5 +342,10 @@ int main(int argc, char** argv) { // Skip the test entirely if poll strategy is none. return 0; } - return RUN_ALL_TESTS(); + // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first + // until we clear out the iomgr shutdown code. + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } diff --git a/test/core/event_engine/test_suite/posix_event_engine_test.cc b/test/core/event_engine/test_suite/posix_event_engine_test.cc index 11aa6128aba..e31e5126b1b 100644 --- a/test/core/event_engine/test_suite/posix_event_engine_test.cc +++ b/test/core/event_engine/test_suite/posix_event_engine_test.cc @@ -16,6 +16,7 @@ #include #include +#include #include "src/core/lib/event_engine/posix_engine/posix_engine.h" #include "test/core/event_engine/test_suite/event_engine_test.h" @@ -30,5 +31,10 @@ int main(int argc, char** argv) { grpc_event_engine::experimental::PosixEventEngine>(); }, nullptr); - return RUN_ALL_TESTS(); + // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first + // until we clear out the iomgr shutdown code. + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } diff --git a/test/cpp/microbenchmarks/bm_event_engine_run.cc b/test/cpp/microbenchmarks/bm_event_engine_run.cc index 4db0e21f51d..23891898ebc 100644 --- a/test/cpp/microbenchmarks/bm_event_engine_run.cc +++ b/test/cpp/microbenchmarks/bm_event_engine_run.cc @@ -33,7 +33,6 @@ namespace { using ::grpc_event_engine::experimental::AnyInvocableClosure; using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine; -using ::grpc_event_engine::experimental::ResetDefaultEventEngine; struct FanoutParameters { int depth; @@ -58,7 +57,6 @@ void BM_EventEngine_RunSmallLambda(benchmark::State& state) { signal.WaitForNotification(); count.store(0); } - ResetDefaultEventEngine(); state.SetItemsProcessed(cb_count * state.iterations()); } BENCHMARK(BM_EventEngine_RunSmallLambda) @@ -86,7 +84,6 @@ void BM_EventEngine_RunLargeLambda(benchmark::State& state) { signal.WaitForNotification(); count.store(0); } - ResetDefaultEventEngine(); state.SetItemsProcessed(cb_count * state.iterations()); } BENCHMARK(BM_EventEngine_RunLargeLambda) @@ -116,7 +113,6 @@ void BM_EventEngine_RunClosure(benchmark::State& state) { state.ResumeTiming(); } delete signal; - ResetDefaultEventEngine(); state.SetItemsProcessed(cb_count * state.iterations()); } BENCHMARK(BM_EventEngine_RunClosure)