[event_engine] Fix for the EE/iomgr shutdown ordering problem (#31265)

* Remove ResetDefaultEventEngine

Now that it is a weak_ptr, there's no need to explicitly reset it. When
the tracked shared_ptr is deleted, the weak_ptr will fail to lock, and a
new default EventEngine will be created.

* forget existing engine with FactoryReset

* init/shutdown in event engine for now

* fix

* fix

* fix windows deadlock

* Automated change: Fix sanity tests

* fix

* better windows fix

Co-authored-by: AJ Heller <hork@google.com>
Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30936/head^2
Craig Tiller 2 years ago committed by GitHub
parent a210506b3d
commit afa3a6c890
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 7
      src/core/lib/event_engine/default_event_engine.cc
  3. 3
      src/core/lib/event_engine/default_event_engine.h
  4. 6
      src/core/lib/event_engine/posix_engine/posix_engine.h
  5. 6
      src/core/lib/event_engine/windows/windows_engine.h
  6. 13
      src/core/lib/iomgr/resolve_address_windows.cc
  7. 3
      src/core/lib/iomgr/resolve_address_windows.h
  8. 2
      src/core/lib/surface/init.cc
  9. 8
      src/core/lib/surface/init_internally.h
  10. 1
      test/core/end2end/fuzzers/api_fuzzer.cc
  11. 9
      test/core/event_engine/posix/event_poller_posix_test.cc
  12. 8
      test/core/event_engine/posix/lock_free_event_test.cc
  13. 7
      test/core/event_engine/posix/posix_endpoint_test.cc
  14. 8
      test/core/event_engine/test_suite/posix_event_engine_test.cc
  15. 4
      test/cpp/microbenchmarks/bm_event_engine_run.cc

@ -2877,6 +2877,7 @@ grpc_cc_library(
"event_engine_utils",
"gpr",
"grpc_trace",
"init_internally",
"posix_event_engine_timer",
"posix_event_engine_timer_manager",
],
@ -2898,6 +2899,7 @@ grpc_cc_library(
"event_engine_trace",
"event_engine_utils",
"gpr",
"init_internally",
"posix_event_engine_timer_manager",
"time",
"windows_iocp",

@ -51,7 +51,7 @@ void SetEventEngineFactory(
void EventEngineFactoryReset() {
delete g_event_engine_factory.exchange(nullptr);
ResetDefaultEventEngine();
g_event_engine->reset();
}
std::unique_ptr<EventEngine> CreateEventEngine() {
@ -74,10 +74,5 @@ std::shared_ptr<EventEngine> GetDefaultEventEngine() {
return engine;
}
void ResetDefaultEventEngine() {
grpc_core::MutexLock lock(&*g_mu);
g_event_engine->reset();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -37,9 +37,6 @@ namespace experimental {
/// Strongly consider whether you could use \a CreateEventEngine instead.
std::shared_ptr<EventEngine> GetDefaultEventEngine();
/// Reset the default event engine
void ResetDefaultEventEngine();
} // namespace experimental
} // namespace grpc_event_engine

@ -35,13 +35,17 @@
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/init_internally.h"
namespace grpc_event_engine {
namespace experimental {
// An iomgr-based Posix EventEngine implementation.
// All methods require an ExecCtx to already exist on the thread's stack.
class PosixEventEngine final : public EventEngine {
// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that
// event engine is shut down before we shut down iomgr.
class PosixEventEngine final : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
class PosixEndpoint : public EventEngine::Endpoint {
public:

@ -34,11 +34,15 @@
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/surface/init_internally.h"
namespace grpc_event_engine {
namespace experimental {
class WindowsEventEngine : public EventEngine {
// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that
// event engine is shut down before we shut down iomgr.
class WindowsEventEngine : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
class WindowsEndpoint : public EventEngine::Endpoint {
public:

@ -82,8 +82,7 @@ class NativeDNSRequest {
} // namespace
NativeDNSResolver::NativeDNSResolver()
: engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {}
NativeDNSResolver::NativeDNSResolver() {}
DNSResolver::TaskHandle NativeDNSResolver::LookupHostname(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
@ -102,7 +101,6 @@ NativeDNSResolver::LookupHostnameBlocking(absl::string_view name,
struct addrinfo hints;
struct addrinfo *result = NULL, *resp;
int s;
size_t i;
grpc_error_handle error;
std::vector<grpc_resolved_address> addresses;
@ -156,13 +154,18 @@ done:
return error_result;
}
void RunCallbackOnDefaultEventEngine(absl::AnyInvocable<void()> f) {
auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
engine->Run([f = std::move(f), engine]() mutable { f(); });
}
DNSResolver::TaskHandle NativeDNSResolver::LookupSRV(
std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
on_resolved,
absl::string_view /* name */, Duration /* deadline */,
grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) {
engine_->Run([on_resolved] {
RunCallbackOnDefaultEventEngine([on_resolved] {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
on_resolved(absl::UnimplementedError(
@ -177,7 +180,7 @@ DNSResolver::TaskHandle NativeDNSResolver::LookupTXT(
grpc_pollset_set* /* interested_parties */,
absl::string_view /* name_server */) {
// Not supported
engine_->Run([on_resolved] {
RunCallbackOnDefaultEventEngine([on_resolved] {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
on_resolved(absl::UnimplementedError(

@ -56,9 +56,6 @@ class NativeDNSResolver : public DNSResolver {
// NativeDNSResolver does not support cancellation.
bool Cancel(TaskHandle handle) override;
private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
} // namespace grpc_core

@ -37,7 +37,6 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/experiments/config.h"
@ -156,7 +155,6 @@ void grpc_shutdown_internal_locked(void)
grpc_iomgr_shutdown_background_closure();
grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
grpc_resolver_dns_ares_shutdown();
grpc_event_engine::experimental::ResetDefaultEventEngine();
grpc_iomgr_shutdown();
}
g_shutting_down = false;

@ -23,6 +23,14 @@ namespace grpc_core {
extern void (*InitInternally)();
extern void (*ShutdownInternally)();
class KeepsGrpcInitialized {
public:
KeepsGrpcInitialized() { InitInternally(); }
~KeepsGrpcInitialized() { ShutdownInternally(); }
KeepsGrpcInitialized(const KeepsGrpcInitialized&) = delete;
KeepsGrpcInitialized& operator=(const KeepsGrpcInitialized&) = delete;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_SURFACE_INIT_INTERNALLY_H

@ -812,7 +812,6 @@ static grpc_channel_credentials* ReadChannelCreds(
}
DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_event_engine::experimental::ResetDefaultEventEngine();
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}

@ -29,6 +29,8 @@
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
@ -714,7 +716,12 @@ int main(int argc, char** argv) {
// Skip the test entirely if poll strategy is none.
return 0;
}
return RUN_ALL_TESTS();
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}
#else /* GRPC_POSIX_SOCKET_EV */

@ -24,6 +24,7 @@
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
@ -155,7 +156,12 @@ TEST(LockFreeEventTest, MultiThreadedTest) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
g_scheduler = new TestScheduler(
grpc_event_engine::experimental::GetDefaultEventEngine());
return RUN_ALL_TESTS();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -342,5 +342,10 @@ int main(int argc, char** argv) {
// Skip the test entirely if poll strategy is none.
return 0;
}
return RUN_ALL_TESTS();
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -16,6 +16,7 @@
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
@ -30,5 +31,10 @@ int main(int argc, char** argv) {
grpc_event_engine::experimental::PosixEventEngine>();
},
nullptr);
return RUN_ALL_TESTS();
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -33,7 +33,6 @@ namespace {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using ::grpc_event_engine::experimental::ResetDefaultEventEngine;
struct FanoutParameters {
int depth;
@ -58,7 +57,6 @@ void BM_EventEngine_RunSmallLambda(benchmark::State& state) {
signal.WaitForNotification();
count.store(0);
}
ResetDefaultEventEngine();
state.SetItemsProcessed(cb_count * state.iterations());
}
BENCHMARK(BM_EventEngine_RunSmallLambda)
@ -86,7 +84,6 @@ void BM_EventEngine_RunLargeLambda(benchmark::State& state) {
signal.WaitForNotification();
count.store(0);
}
ResetDefaultEventEngine();
state.SetItemsProcessed(cb_count * state.iterations());
}
BENCHMARK(BM_EventEngine_RunLargeLambda)
@ -116,7 +113,6 @@ void BM_EventEngine_RunClosure(benchmark::State& state) {
state.ResumeTiming();
}
delete signal;
ResetDefaultEventEngine();
state.SetItemsProcessed(cb_count * state.iterations());
}
BENCHMARK(BM_EventEngine_RunClosure)

Loading…
Cancel
Save