From 8249bb97e239868252b8ce4f9b4f0d4eb3022f5b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Jun 2022 13:37:49 -0700 Subject: [PATCH] [event_engine] Fix initialization (#30107) * [event_engine] Fix initialization * Automated change: Fix sanity tests Co-authored-by: ctiller --- include/grpc/event_engine/event_engine.h | 2 +- src/core/lib/event_engine/event_engine.cc | 38 +++++++++++++------ .../lib/event_engine/event_engine_factory.h | 3 ++ test/core/end2end/fuzzers/api_fuzzer.cc | 37 ++++++++---------- .../fuzzing_event_engine.cc | 9 +---- .../fuzzing_event_engine.h | 4 +- test/core/event_engine/smoke_test.cc | 4 +- .../test_suite/fuzzing_event_engine_test.cc | 13 ++++--- 8 files changed, 59 insertions(+), 51 deletions(-) diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index a34ce7df771..3e58f997a22 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -432,7 +432,7 @@ class EventEngine { /// created, applications must set a custom EventEngine factory method *before* /// grpc is initialized. void SetDefaultEventEngineFactory( - const std::function()>* factory); + std::function()> factory); /// Create an EventEngine using the default factory. std::unique_ptr CreateEventEngine(); diff --git a/src/core/lib/event_engine/event_engine.cc b/src/core/lib/event_engine/event_engine.cc index a84cb90d2a6..97cbb210292 100644 --- a/src/core/lib/event_engine/event_engine.cc +++ b/src/core/lib/event_engine/event_engine.cc @@ -13,42 +13,56 @@ // limitations under the License. #include +#include #include #include +#include #include #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/event_engine/trace.h" -#include "src/core/lib/gprpp/sync.h" namespace grpc_event_engine { namespace experimental { namespace { -const std::function()>* g_event_engine_factory = - nullptr; -grpc_core::Mutex* g_mu = new grpc_core::Mutex(); +std::atomic()>*> + g_event_engine_factory{nullptr}; +std::atomic g_event_engine{nullptr}; } // namespace void SetDefaultEventEngineFactory( - const std::function()>* factory) { - grpc_core::MutexLock lock(g_mu); - g_event_engine_factory = factory; + std::function()> factory) { + delete g_event_engine_factory.exchange( + new std::function()>(std::move(factory))); } std::unique_ptr CreateEventEngine() { - grpc_core::MutexLock lock(g_mu); - if (g_event_engine_factory != nullptr) { - return (*g_event_engine_factory)(); + if (auto* factory = g_event_engine_factory.load()) { + return (*factory)(); } return DefaultEventEngineFactory(); } EventEngine* GetDefaultEventEngine() { - static EventEngine* default_event_engine = CreateEventEngine().release(); - return default_event_engine; + EventEngine* engine = g_event_engine.load(std::memory_order_acquire); + if (engine == nullptr) { + auto* created = CreateEventEngine().release(); + if (g_event_engine.compare_exchange_strong(engine, created, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + engine = created; + } else { + delete created; + } + } + return engine; +} + +void ResetDefaultEventEngine() { + delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel); } void InitializeEventEngine() { diff --git a/src/core/lib/event_engine/event_engine_factory.h b/src/core/lib/event_engine/event_engine_factory.h index b99dca6badc..c282ebca67c 100644 --- a/src/core/lib/event_engine/event_engine_factory.h +++ b/src/core/lib/event_engine/event_engine_factory.h @@ -32,6 +32,9 @@ EventEngine* GetDefaultEventEngine(); /// Create an EventEngine using the default factory provided at link time. std::unique_ptr DefaultEventEngineFactory(); +/// Reset the default event engine +void ResetDefaultEventEngine(); + // TODO(hork): remove this when any other EE usage is landed void InitializeEventEngine(); diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 48056e110a1..d3479d25e17 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -47,25 +47,6 @@ #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/util/passthru_endpoint.h" -static grpc_event_engine::experimental::FuzzingEventEngine* g_event_engine = - new grpc_event_engine::experimental::FuzzingEventEngine( - grpc_event_engine::experimental::FuzzingEventEngine::Options()); -static int g_unused_initialize_event_engine = []() { - grpc_event_engine::experimental::SetDefaultEventEngineFactory( - new std::function()>([]() { - // HACK HACK HACK - // We know that this event engine will never be deleted by the - // caller, instead release() will be called and the value stashed in - // a global elsewhere. Therefore it's safe to wrap our global in a - // unique_ptr and return it, knowing it will never be deleted - // elsewhere. - return std::unique_ptr( - g_event_engine); - })); - return 42; -}(); - // Applicable when simulating channel actions. Prevents overflows. static constexpr uint64_t kMaxWaitMs = 31536000000; // 1 year (24 * 365 * 3600 * 1000) @@ -768,12 +749,20 @@ static grpc_channel_credentials* ReadChannelCreds( } DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { + grpc_event_engine::experimental::ResetDefaultEventEngine(); grpc_test_only_set_slice_hash_seed(0); char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER"); if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log); gpr_free(grpc_trace_fuzzer); grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable); - g_event_engine->Restart(msg.event_engine_actions()); + grpc_event_engine::experimental::SetDefaultEventEngineFactory( + [actions = msg.event_engine_actions()]() { + return absl::make_unique< + grpc_event_engine::experimental::FuzzingEventEngine>( + grpc_event_engine::experimental::FuzzingEventEngine::Options(), + actions); + }); + grpc_event_engine::experimental::GetDefaultEventEngine(); grpc_init(); grpc_timer_manager_set_threading(false); { @@ -817,10 +806,14 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { while (action_index < msg.actions_size() || g_channel != nullptr || g_server != nullptr || pending_channel_watches > 0 || pending_pings > 0 || ActiveCall() != nullptr) { - g_event_engine->Tick(); + static_cast( + grpc_event_engine::experimental::GetDefaultEventEngine()) + ->Tick(); if (action_index == msg.actions_size()) { - g_event_engine->FuzzingDone(); + static_cast( + grpc_event_engine::experimental::GetDefaultEventEngine()) + ->FuzzingDone(); if (g_channel != nullptr) { grpc_channel_destroy(g_channel); g_channel = nullptr; diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index 2f11f2af811..49231207fa4 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -33,19 +33,14 @@ const intptr_t kTaskHandleSalt = 12345; FuzzingEventEngine* g_fuzzing_event_engine = nullptr; } // namespace -FuzzingEventEngine::FuzzingEventEngine(Options options) +FuzzingEventEngine::FuzzingEventEngine( + Options options, const fuzzing_event_engine::Actions& actions) : final_tick_length_(options.final_tick_length) { GPR_ASSERT(g_fuzzing_event_engine == nullptr); g_fuzzing_event_engine = this; gpr_now_impl = GlobalNowImpl; - Restart(fuzzing_event_engine::Actions()); -} - -void FuzzingEventEngine::Restart(const fuzzing_event_engine::Actions& actions) { - grpc_core::MutexLock lock(&mu_); - tick_increments_.clear(); task_delays_.clear(); tasks_by_id_.clear(); diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index 87f8c659a88..dd12b5b5409 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -36,10 +36,10 @@ class FuzzingEventEngine : public EventEngine { // time Now() will be incremented each tick. Duration final_tick_length = std::chrono::seconds(1); }; - explicit FuzzingEventEngine(Options options); + explicit FuzzingEventEngine(Options options, + const fuzzing_event_engine::Actions& actions); ~FuzzingEventEngine() override; - void Restart(const fuzzing_event_engine::Actions& actions); void FuzzingDone(); void Tick(); diff --git a/test/core/event_engine/smoke_test.cc b/test/core/event_engine/smoke_test.cc index 49b70321f71..a4a99e21a76 100644 --- a/test/core/event_engine/smoke_test.cc +++ b/test/core/event_engine/smoke_test.cc @@ -33,8 +33,8 @@ TEST_F(EventEngineSmokeTest, SetDefaultEventEngineFactoryLinks) { std::unique_ptr()> factory; EXPECT_CALL(factory, Call()).Times(1); - auto stdfn_fact = factory.AsStdFunction(); - grpc_event_engine::experimental::SetDefaultEventEngineFactory(&stdfn_fact); + grpc_event_engine::experimental::SetDefaultEventEngineFactory( + factory.AsStdFunction()); EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine()); } diff --git a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc index 1db451fff2e..d1b91fc283b 100644 --- a/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc +++ b/test/core/event_engine/test_suite/fuzzing_event_engine_test.cc @@ -21,6 +21,7 @@ #include +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" #include "test/core/event_engine/test_suite/event_engine_test.h" namespace grpc_event_engine { @@ -30,11 +31,13 @@ namespace { class ThreadedFuzzingEventEngine : public FuzzingEventEngine { public: ThreadedFuzzingEventEngine() - : FuzzingEventEngine([]() { - Options options; - options.final_tick_length = std::chrono::milliseconds(10); - return options; - }()), + : FuzzingEventEngine( + []() { + Options options; + options.final_tick_length = std::chrono::milliseconds(10); + return options; + }(), + fuzzing_event_engine::Actions()), main_([this]() { while (!done_.load()) { auto tick_start = absl::Now();