[event_engine] Fix initialization (#30107)

* [event_engine] Fix initialization

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30109/head
Craig Tiller 3 years ago committed by GitHub
parent 03dcd61068
commit 8249bb97e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpc/event_engine/event_engine.h
  2. 38
      src/core/lib/event_engine/event_engine.cc
  3. 3
      src/core/lib/event_engine/event_engine_factory.h
  4. 37
      test/core/end2end/fuzzers/api_fuzzer.cc
  5. 9
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  6. 4
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  7. 4
      test/core/event_engine/smoke_test.cc
  8. 13
      test/core/event_engine/test_suite/fuzzing_event_engine_test.cc

@ -432,7 +432,7 @@ class EventEngine {
/// created, applications must set a custom EventEngine factory method *before*
/// grpc is initialized.
void SetDefaultEventEngineFactory(
const std::function<std::unique_ptr<EventEngine>()>* factory);
std::function<std::unique_ptr<EventEngine>()> factory);
/// Create an EventEngine using the default factory.
std::unique_ptr<EventEngine> CreateEventEngine();

@ -13,42 +13,56 @@
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <atomic>
#include <functional>
#include <memory>
#include <utility>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
const std::function<std::unique_ptr<EventEngine>()>* g_event_engine_factory =
nullptr;
grpc_core::Mutex* g_mu = new grpc_core::Mutex();
std::atomic<const std::function<std::unique_ptr<EventEngine>()>*>
g_event_engine_factory{nullptr};
std::atomic<EventEngine*> g_event_engine{nullptr};
} // namespace
void SetDefaultEventEngineFactory(
const std::function<std::unique_ptr<EventEngine>()>* factory) {
grpc_core::MutexLock lock(g_mu);
g_event_engine_factory = factory;
std::function<std::unique_ptr<EventEngine>()> factory) {
delete g_event_engine_factory.exchange(
new std::function<std::unique_ptr<EventEngine>()>(std::move(factory)));
}
std::unique_ptr<EventEngine> CreateEventEngine() {
grpc_core::MutexLock lock(g_mu);
if (g_event_engine_factory != nullptr) {
return (*g_event_engine_factory)();
if (auto* factory = g_event_engine_factory.load()) {
return (*factory)();
}
return DefaultEventEngineFactory();
}
EventEngine* GetDefaultEventEngine() {
static EventEngine* default_event_engine = CreateEventEngine().release();
return default_event_engine;
EventEngine* engine = g_event_engine.load(std::memory_order_acquire);
if (engine == nullptr) {
auto* created = CreateEventEngine().release();
if (g_event_engine.compare_exchange_strong(engine, created,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
engine = created;
} else {
delete created;
}
}
return engine;
}
void ResetDefaultEventEngine() {
delete g_event_engine.exchange(nullptr, std::memory_order_acq_rel);
}
void InitializeEventEngine() {

@ -32,6 +32,9 @@ EventEngine* GetDefaultEventEngine();
/// Create an EventEngine using the default factory provided at link time.
std::unique_ptr<EventEngine> DefaultEventEngineFactory();
/// Reset the default event engine
void ResetDefaultEventEngine();
// TODO(hork): remove this when any other EE usage is landed
void InitializeEventEngine();

@ -47,25 +47,6 @@
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/util/passthru_endpoint.h"
static grpc_event_engine::experimental::FuzzingEventEngine* g_event_engine =
new grpc_event_engine::experimental::FuzzingEventEngine(
grpc_event_engine::experimental::FuzzingEventEngine::Options());
static int g_unused_initialize_event_engine = []() {
grpc_event_engine::experimental::SetDefaultEventEngineFactory(
new std::function<std::unique_ptr<
grpc_event_engine::experimental::EventEngine>()>([]() {
// HACK HACK HACK
// We know that this event engine will never be deleted by the
// caller, instead release() will be called and the value stashed in
// a global elsewhere. Therefore it's safe to wrap our global in a
// unique_ptr and return it, knowing it will never be deleted
// elsewhere.
return std::unique_ptr<grpc_event_engine::experimental::EventEngine>(
g_event_engine);
}));
return 42;
}();
// Applicable when simulating channel actions. Prevents overflows.
static constexpr uint64_t kMaxWaitMs =
31536000000; // 1 year (24 * 365 * 3600 * 1000)
@ -768,12 +749,20 @@ static grpc_channel_credentials* ReadChannelCreds(
}
DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_event_engine::experimental::ResetDefaultEventEngine();
grpc_test_only_set_slice_hash_seed(0);
char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER");
if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
gpr_free(grpc_trace_fuzzer);
grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
g_event_engine->Restart(msg.event_engine_actions());
grpc_event_engine::experimental::SetDefaultEventEngineFactory(
[actions = msg.event_engine_actions()]() {
return absl::make_unique<
grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
actions);
});
grpc_event_engine::experimental::GetDefaultEventEngine();
grpc_init();
grpc_timer_manager_set_threading(false);
{
@ -817,10 +806,14 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
while (action_index < msg.actions_size() || g_channel != nullptr ||
g_server != nullptr || pending_channel_watches > 0 ||
pending_pings > 0 || ActiveCall() != nullptr) {
g_event_engine->Tick();
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->Tick();
if (action_index == msg.actions_size()) {
g_event_engine->FuzzingDone();
static_cast<grpc_event_engine::experimental::FuzzingEventEngine*>(
grpc_event_engine::experimental::GetDefaultEventEngine())
->FuzzingDone();
if (g_channel != nullptr) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;

@ -33,19 +33,14 @@ const intptr_t kTaskHandleSalt = 12345;
FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
} // namespace
FuzzingEventEngine::FuzzingEventEngine(Options options)
FuzzingEventEngine::FuzzingEventEngine(
Options options, const fuzzing_event_engine::Actions& actions)
: final_tick_length_(options.final_tick_length) {
GPR_ASSERT(g_fuzzing_event_engine == nullptr);
g_fuzzing_event_engine = this;
gpr_now_impl = GlobalNowImpl;
Restart(fuzzing_event_engine::Actions());
}
void FuzzingEventEngine::Restart(const fuzzing_event_engine::Actions& actions) {
grpc_core::MutexLock lock(&mu_);
tick_increments_.clear();
task_delays_.clear();
tasks_by_id_.clear();

@ -36,10 +36,10 @@ class FuzzingEventEngine : public EventEngine {
// time Now() will be incremented each tick.
Duration final_tick_length = std::chrono::seconds(1);
};
explicit FuzzingEventEngine(Options options);
explicit FuzzingEventEngine(Options options,
const fuzzing_event_engine::Actions& actions);
~FuzzingEventEngine() override;
void Restart(const fuzzing_event_engine::Actions& actions);
void FuzzingDone();
void Tick();

@ -33,8 +33,8 @@ TEST_F(EventEngineSmokeTest, SetDefaultEventEngineFactoryLinks) {
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory;
EXPECT_CALL(factory, Call()).Times(1);
auto stdfn_fact = factory.AsStdFunction();
grpc_event_engine::experimental::SetDefaultEventEngineFactory(&stdfn_fact);
grpc_event_engine::experimental::SetDefaultEventEngineFactory(
factory.AsStdFunction());
EXPECT_EQ(nullptr, grpc_event_engine::experimental::CreateEventEngine());
}

@ -21,6 +21,7 @@
#include <grpc/grpc.h>
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
namespace grpc_event_engine {
@ -30,11 +31,13 @@ namespace {
class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
public:
ThreadedFuzzingEventEngine()
: FuzzingEventEngine([]() {
Options options;
options.final_tick_length = std::chrono::milliseconds(10);
return options;
}()),
: FuzzingEventEngine(
[]() {
Options options;
options.final_tick_length = std::chrono::milliseconds(10);
return options;
}(),
fuzzing_event_engine::Actions()),
main_([this]() {
while (!done_.load()) {
auto tick_start = absl::Now();

Loading…
Cancel
Save