[EventEngine] Always initialize PosixEventEngine when created. (#32724)

If an engine is created, it should be fully functional regardless of
whether gRPC-core experiments are on or off. The trade-off for now is
that when the core experiments are not enabled, the engine will be
slowly polling with nothing to do.

---------

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/32841/head
AJ Heller 2 years ago committed by GitHub
parent c268ac2c25
commit 20a5324696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 34
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  3. 3
      test/core/event_engine/posix/posix_event_engine_connect_test.cc
  4. 4
      test/core/event_engine/test_suite/posix_event_engine_test.cc
  5. 4
      test/core/event_engine/test_suite/thready_posix_event_engine_test.cc

@ -1931,7 +1931,6 @@ grpc_cc_library(
deps = [
"event_engine_common",
"event_engine_poller",
"event_engine_shim",
"event_engine_tcp_socket_utils",
"event_engine_thread_pool",
"event_engine_trace",

@ -41,7 +41,6 @@
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
@ -66,10 +65,6 @@ using namespace std::chrono_literals;
namespace grpc_event_engine {
namespace experimental {
bool NeedPosixEngine() {
return UseEventEngineClient() || UseEventEngineListener();
}
#ifdef GRPC_POSIX_SOCKET_TCP
void AsyncConnect::Start(EventEngine::Duration timeout) {
@ -340,24 +335,20 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}
PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager);
});
}
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager);
});
}
}
@ -549,9 +540,6 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
GPR_ASSERT(poller_manager_ != nullptr);
PosixTcpOptions options = TcpOptionsFromEndpointConfig(args);
@ -574,9 +562,6 @@ std::unique_ptr<PosixEndpointWithFdSupport>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
GPR_DEBUG_ASSERT(fd > 0);
PosixEventPoller* poller = poller_manager_->Poller();
@ -623,9 +608,6 @@ PosixEventEngine::CreatePosixListener(
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
return std::make_unique<PosixEngineListener>(
std::move(on_accept), std::move(on_shutdown), config,

@ -44,7 +44,6 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -209,8 +208,6 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
// TODO(vigneshbabu): remove when the experiment is over
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();

@ -18,7 +18,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/experiments/config.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
#include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
#include "test/core/event_engine/test_suite/tests/client_test.h"
@ -41,9 +40,6 @@ int main(int argc, char** argv) {
grpc_event_engine::experimental::InitTimerTests();
grpc_event_engine::experimental::InitClientTests();
grpc_event_engine::experimental::InitServerTests();
// TODO(vigneshbabu): remove when the experiment is over
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();

@ -18,7 +18,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/experiments/config.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
#include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
#include "test/core/event_engine/test_suite/tests/client_test.h"
@ -46,9 +45,6 @@ int main(int argc, char** argv) {
grpc_event_engine::experimental::InitTimerTests();
grpc_event_engine::experimental::InitClientTests();
grpc_event_engine::experimental::InitServerTests();
// TODO(vigneshbabu): remove when the experiment is over
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();

Loading…
Cancel
Save