diff --git a/BUILD b/BUILD index 1dd568c816c..c9c1bb2a1e0 100644 --- a/BUILD +++ b/BUILD @@ -1275,6 +1275,7 @@ grpc_cc_library( # These headers used to be vended by this target, but they have to be # removed after landing event engine. [ + "//src/core:lib/iomgr/event_engine_shims/closure.cc", "//src/core:lib/iomgr/event_engine_shims/endpoint.cc", "//src/core:lib/iomgr/event_engine_shims/tcp_client.cc", ], @@ -1365,6 +1366,7 @@ grpc_cc_library( # These headers used to be vended by this target, but they have to be # removed after landing event engine. [ + "//src/core:lib/iomgr/event_engine_shims/closure.h", "//src/core:lib/iomgr/event_engine_shims/endpoint.h", "//src/core:lib/iomgr/event_engine_shims/tcp_client.h", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index d0c685e470b..b3107cce4df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2228,6 +2228,7 @@ add_library(grpc src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/closure.cc src/core/lib/iomgr/event_engine_shims/endpoint.cc src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc @@ -2903,6 +2904,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/closure.cc src/core/lib/iomgr/event_engine_shims/endpoint.cc src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc @@ -4391,6 +4393,7 @@ add_library(grpc_authorization_provider src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/closure.cc src/core/lib/iomgr/event_engine_shims/endpoint.cc src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc @@ -11457,6 +11460,7 @@ add_executable(frame_test src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/closure.cc src/core/lib/iomgr/event_engine_shims/endpoint.cc src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc diff --git a/Makefile b/Makefile index 5c69ee042da..2687a0414fd 100644 --- a/Makefile +++ b/Makefile @@ -1485,6 +1485,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/closure.cc \ src/core/lib/iomgr/event_engine_shims/endpoint.cc \ src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ @@ -2019,6 +2020,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/closure.cc \ src/core/lib/iomgr/event_engine_shims/endpoint.cc \ src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 23ee23a81a3..bfcbebb19ee 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -33,6 +33,9 @@ EXPERIMENTS = { "event_engine_client_test": [ "event_engine_client", ], + "event_engine_listener_test": [ + "event_engine_listener", + ], "flow_control_test": [ "peer_state_based_framing", "tcp_frame_size_tuning", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 60aba34bd64..7e40b8af917 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -868,6 +868,7 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/closure.h - src/core/lib/iomgr/event_engine_shims/endpoint.h - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h @@ -1619,6 +1620,7 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/closure.cc - src/core/lib/iomgr/event_engine_shims/endpoint.cc - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc @@ -2195,6 +2197,7 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/closure.h - src/core/lib/iomgr/event_engine_shims/endpoint.h - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h @@ -2560,6 +2563,7 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/closure.cc - src/core/lib/iomgr/event_engine_shims/endpoint.cc - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc @@ -3639,6 +3643,7 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/closure.h - src/core/lib/iomgr/event_engine_shims/endpoint.h - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h @@ -3883,6 +3888,7 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/closure.cc - src/core/lib/iomgr/event_engine_shims/endpoint.cc - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc @@ -7434,6 +7440,7 @@ targets: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/closure.h - src/core/lib/iomgr/event_engine_shims/endpoint.h - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h @@ -7660,6 +7667,7 @@ targets: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/closure.cc - src/core/lib/iomgr/event_engine_shims/endpoint.cc - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc diff --git a/config.m4 b/config.m4 index 766d8b9d41e..c4a1b6a5d5d 100644 --- a/config.m4 +++ b/config.m4 @@ -610,6 +610,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/closure.cc \ src/core/lib/iomgr/event_engine_shims/endpoint.cc \ src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ diff --git a/config.w32 b/config.w32 index d8ef4fc5a9d..b0c69178baa 100644 --- a/config.w32 +++ b/config.w32 @@ -576,6 +576,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\ev_poll_posix.cc " + "src\\core\\lib\\iomgr\\ev_posix.cc " + "src\\core\\lib\\iomgr\\ev_windows.cc " + + "src\\core\\lib\\iomgr\\event_engine_shims\\closure.cc " + "src\\core\\lib\\iomgr\\event_engine_shims\\endpoint.cc " + "src\\core\\lib\\iomgr\\event_engine_shims\\tcp_client.cc " + "src\\core\\lib\\iomgr\\exec_ctx.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 27c236ed825..32f3a432490 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -843,6 +843,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/closure.h', 'src/core/lib/iomgr/event_engine_shims/endpoint.h', 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', @@ -1769,6 +1770,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/closure.h', 'src/core/lib/iomgr/event_engine_shims/endpoint.h', 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2e4adc7bdf9..2e202149607 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1343,6 +1343,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.h', 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', 'src/core/lib/iomgr/event_engine_shims/endpoint.h', 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', @@ -2450,6 +2452,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/closure.h', 'src/core/lib/iomgr/event_engine_shims/endpoint.h', 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', diff --git a/grpc.gemspec b/grpc.gemspec index 433c4dd3939..d4e49dcc928 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1254,6 +1254,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/ev_posix.cc ) s.files += %w( src/core/lib/iomgr/ev_posix.h ) s.files += %w( src/core/lib/iomgr/ev_windows.cc ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/closure.cc ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/closure.h ) s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.cc ) s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.h ) s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.cc ) diff --git a/grpc.gyp b/grpc.gyp index 682da38da73..910cbce0e3a 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -898,6 +898,7 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.cc', 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', @@ -1374,6 +1375,7 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.cc', 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', @@ -1873,6 +1875,7 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.cc', 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', diff --git a/package.xml b/package.xml index 22d17ae9db1..dc827fa0eb4 100644 --- a/package.xml +++ b/package.xml @@ -1236,6 +1236,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 9979f981bc7..87e08aa5118 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1741,6 +1741,7 @@ grpc_cc_library( "status_helper", "strerror", "time", + "//:debug_location", "//:event_engine_base_hdrs", "//:gpr", "//:grpc_public_hdrs", diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index b212756b080..99e47a8b712 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -285,7 +285,9 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) { bool is_release_fd = (release_fd != nullptr); + bool was_shutdown = false; if (!read_closure_->IsShutdown()) { + was_shutdown = true; HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason), is_release_fd); } @@ -293,8 +295,17 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done, // If release_fd is not NULL, we should be relinquishing control of the file // descriptor fd->fd (but we still own the grpc_fd structure). if (is_release_fd) { + if (!was_shutdown) { + epoll_event phony_event; + if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_, + &phony_event) != 0) { + gpr_log(GPR_ERROR, "OrphanHandle: epoll_ctl failed: %s", + grpc_core::StrError(errno).c_str()); + } + } *release_fd = fd_; } else { + shutdown(fd_, SHUT_RDWR); close(fd_); } @@ -328,13 +339,11 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why, grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); if (read_closure_->SetShutdown(why)) { - if (!releasing_fd) { - shutdown(fd_, SHUT_RDWR); - } else { + if (releasing_fd) { epoll_event phony_event; if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_, &phony_event) != 0) { - gpr_log(GPR_ERROR, "epoll_ctl failed: %s", + gpr_log(GPR_ERROR, "HandleShutdownInternal: epoll_ctl failed: %s", grpc_core::StrError(errno).c_str()); } } diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index 32b892a6d7c..348cb0d8cec 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -378,13 +378,13 @@ void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, grpc_core::StatusSetInt(&shutdown_error_, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); - // signal read/write closed to OS so that future operations fail. - if (!released_) { - shutdown(fd_, SHUT_RDWR); - } SetReadyLocked(&read_closure_); SetReadyLocked(&write_closure_); } + // signal read/write closed to OS so that future operations fail. + if (!released_) { + shutdown(fd_, SHUT_RDWR); + } if (!IsWatched()) { CloseFd(); } else { @@ -455,8 +455,6 @@ void PollEventHandle::ShutdownHandle(absl::Status why) { grpc_core::StatusSetInt(&shutdown_error_, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); - // signal read/write closed to OS so that future operations fail. - shutdown(fd_, SHUT_RDWR); SetReadyLocked(&read_closure_); SetReadyLocked(&write_closure_); } diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index 0a82fbd0925..604cfd69868 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -44,6 +44,7 @@ #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/load_file.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" @@ -465,9 +466,11 @@ void PosixEndpointImpl::MaybePostReclaimer() { has_posted_reclaimer_ = true; memory_owner_.PostReclaimer( grpc_core::ReclamationPass::kBenign, - [this](absl::optional sweep) { - if (!sweep.has_value()) return; - PerformReclamation(); + [self = Ref(DEBUG_LOCATION, "Posix Reclaimer")]( + absl::optional sweep) { + if (sweep.has_value()) { + self->PerformReclamation(); + } }); } } @@ -548,7 +551,7 @@ void PosixEndpointImpl::MaybeMakeReadSlices() { void PosixEndpointImpl::HandleRead(absl::Status status) { read_mu_.Lock(); - if (status.ok()) { + if (status.ok() && memory_owner_.is_valid()) { MaybeMakeReadSlices(); if (!TcpDoRead(status)) { UpdateRcvLowat(); @@ -558,6 +561,9 @@ void PosixEndpointImpl::HandleRead(absl::Status status) { return; } } else { + if (!memory_owner_.is_valid()) { + status = absl::UnknownError("Shutting down endpoint"); + } incoming_buffer_->Clear(); last_read_buffer_.Clear(); } @@ -1158,6 +1164,9 @@ void PosixEndpointImpl::MaybeShutdown( grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); handle_->ShutdownHandle(why); + read_mu_.Lock(); + memory_owner_.Reset(); + read_mu_.Unlock(); Unref(); } @@ -1189,7 +1198,8 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, fd_ = handle_->WrappedFd(); GPR_ASSERT(options.resource_quota != nullptr); auto peer_addr_string = sock.PeerAddressString(); - memory_owner_ = options.resource_quota->memory_quota()->CreateMemoryOwner( + mem_quota_ = options.resource_quota->memory_quota(); + memory_owner_ = mem_quota_->CreateMemoryOwner( peer_addr_string.ok() ? *peer_addr_string : ""); self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl)); auto local_address = sock.LocalAddress(); diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index 52e550205c8..636df9b97f2 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -564,6 +564,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted { grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_; grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_; + // Maintain a shared_ptr to mem_quota_ to ensure the underlying basic memory + // quota is not deleted until the endpoint is destroyed. + grpc_core::MemoryQuotaRefPtr mem_quota_; grpc_core::MemoryOwner memory_owner_; grpc_core::MemoryAllocator::Reservation self_reservation_; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 9543b72d553..c64f83c379a 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -66,6 +66,10 @@ using namespace std::chrono_literals; namespace grpc_event_engine { namespace experimental { +bool NeedPosixEngine() { + return UseEventEngineClient() || UseEventEngineListener(); +} + #ifdef GRPC_POSIX_SOCKET_TCP void AsyncConnect::Start(EventEngine::Duration timeout) { @@ -332,7 +336,7 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller) : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)), executor_(std::make_shared()), timer_manager_(executor_) { - if (UseEventEngineClient()) { + if (NeedPosixEngine()) { poller_manager_ = std::make_shared(poller); } } @@ -341,7 +345,7 @@ PosixEventEngine::PosixEventEngine() : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)), executor_(std::make_shared()), timer_manager_(executor_) { - if (UseEventEngineClient()) { + if (NeedPosixEngine()) { poller_manager_ = std::make_shared(executor_); if (poller_manager_->Poller() != nullptr) { executor_->Run([poller_manager = poller_manager_]() { @@ -539,7 +543,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect( OnConnectCallback on_connect, const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, Duration timeout) { - if (!UseEventEngineClient()) { + if (!NeedPosixEngine()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP @@ -564,7 +568,7 @@ std::unique_ptr PosixEventEngine::CreatePosixEndpointFromFd(int fd, const EndpointConfig& config, MemoryAllocator memory_allocator) { - if (!UseEventEngineClient()) { + if (!NeedPosixEngine()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP @@ -613,7 +617,7 @@ PosixEventEngine::CreatePosixListener( absl::AnyInvocable on_shutdown, const EndpointConfig& config, std::unique_ptr memory_allocator_factory) { - if (!UseEventEngineClient()) { + if (!NeedPosixEngine()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 7d982c8ae8f..4a959b3a4e8 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -160,6 +160,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( Unref(); return; } + addr = EventEngine::ResolvedAddress(addr.address(), len); } PosixSocketWrapper sock(fd); @@ -176,14 +177,22 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( } // Create an Endpoint here. - std::string peer_name = *ResolvedAddressToNormalizedString(addr); + auto peer_name = ResolvedAddressToURI(addr); + if (!peer_name.ok()) { + gpr_log(GPR_ERROR, "Invalid address: %s", + peer_name.status().ToString().c_str()); + // Shutting down the acceptor. Unref the ref grabbed in + // AsyncConnectionAcceptor::Start(). + Unref(); + return; + } auto endpoint = CreatePosixEndpoint( /*handle=*/listener_->poller_->CreateHandle( - fd, peer_name, listener_->poller_->CanTrackErrors()), + fd, *peer_name, listener_->poller_->CanTrackErrors()), /*on_shutdown=*/nullptr, /*engine=*/listener_->engine_, // allocator= listener_->memory_allocator_factory_->CreateMemoryAllocator( - absl::StrCat("endpoint-tcp-server-connection: ", peer_name)), + absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)), /*options=*/listener_->options_); // Call on_accept_ and then resume accepting new connections by continuing // the parent for-loop. @@ -192,7 +201,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( /*is_external=*/false, /*memory_allocator=*/ listener_->memory_allocator_factory_->CreateMemoryAllocator( - absl::StrCat("on-accept-tcp-server-connection: ", peer_name)), + absl::StrCat("on-accept-tcp-server-connection: ", *peer_name)), /*pending_data=*/nullptr); } GPR_UNREACHABLE_CODE(return); diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h index d4f6c2a8db1..4bf793b1973 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -238,8 +238,7 @@ class PosixEngineListener : public PosixListenerWithFdSupport { #include "src/core/lib/gprpp/crash.h" -class PosixEngineListener - : public grpc_event_engine::experimental::EventEngine::Listener { +class PosixEngineListener : public PosixListenerWithFdSupport { public: PosixEngineListener() = default; ~PosixEngineListener() override = default; diff --git a/src/core/lib/event_engine/shim.cc b/src/core/lib/event_engine/shim.cc index f4249a42f04..27a5ae691f2 100644 --- a/src/core/lib/event_engine/shim.cc +++ b/src/core/lib/event_engine/shim.cc @@ -23,7 +23,7 @@ namespace grpc_event_engine { namespace experimental { bool UseEventEngineClient() { -// TODO(hork, eryu): Adjust the ifdefs accordingly when event engine's become +// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become // available for other platforms. #ifdef GRPC_POSIX_SOCKET_TCP return grpc_core::IsEventEngineClientEnabled(); @@ -32,5 +32,15 @@ bool UseEventEngineClient() { #endif } +bool UseEventEngineListener() { +// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become +// available for other platforms. +#ifdef GRPC_POSIX_SOCKET_TCP + return grpc_core::IsEventEngineListenerEnabled(); +#else + return false; +#endif +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/shim.h b/src/core/lib/event_engine/shim.h index 918ed576df5..fe0fad9e71c 100644 --- a/src/core/lib/event_engine/shim.h +++ b/src/core/lib/event_engine/shim.h @@ -21,6 +21,8 @@ namespace experimental { bool UseEventEngineClient(); +bool UseEventEngineListener(); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 6e8bac2d892..b860e1f1c7e 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -51,6 +51,8 @@ const char* const description_free_large_allocator = const char* const description_transport_supplies_client_latency = "If set, use the transport represented value for client latency in " "opencensus"; +const char* const description_event_engine_listener = + "Use EventEngine listeners instead of iomgr's grpc_tcp_server"; } // namespace namespace grpc_core { @@ -71,6 +73,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"free_large_allocator", description_free_large_allocator, false}, {"transport_supplies_client_latency", description_transport_supplies_client_latency, false}, + {"event_engine_listener", description_event_engine_listener, false}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index c7b42801eb8..5e8adedf3fa 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -43,6 +43,7 @@ inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(10); } inline bool IsTransportSuppliesClientLatencyEnabled() { return IsExperimentEnabled(11); } +inline bool IsEventEngineListenerEnabled() { return IsExperimentEnabled(12); } struct ExperimentMetadata { const char* name; @@ -50,7 +51,7 @@ struct ExperimentMetadata { bool default_value; }; -constexpr const size_t kNumExperiments = 12; +constexpr const size_t kNumExperiments = 13; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index fef97a1b4b4..8f917b5439e 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -126,3 +126,10 @@ expiry: 2023/06/01 owner: ctiller@google.com test_tags: [census_test] +- name: event_engine_listener + description: + Use EventEngine listeners instead of iomgr's grpc_tcp_server + default: false + expiry: 2023/02/13 + owner: vigneshbabu@google.com + test_tags: ["event_engine_listener_test"] diff --git a/src/core/lib/iomgr/event_engine_shims/closure.cc b/src/core/lib/iomgr/event_engine_shims/closure.cc new file mode 100644 index 00000000000..fbcfe60d312 --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/closure.cc @@ -0,0 +1,62 @@ +// Copyright 2021 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/iomgr/event_engine_shims/closure.h" + +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" + +#include + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/transport/error_utils.h" + +namespace grpc_event_engine { +namespace experimental { + +void RunEventEngineClosure(grpc_closure* closure, grpc_error_handle error) { + if (closure == nullptr) { + return; + } + grpc_core::ApplicationCallbackExecCtx app_ctx; + grpc_core::ExecCtx exec_ctx; +#ifndef NDEBUG + closure->scheduled = false; + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, + "EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]", + closure, closure->file_created, closure->line_created, + closure->run ? "run" : "scheduled", closure->file_initiated, + closure->line_initiated); + } +#endif + closure->cb(closure->cb_arg, error); +#ifndef NDEBUG + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure); + } +#endif +} + +absl::AnyInvocable GrpcClosureToStatusCallback( + grpc_closure* closure) { + return [closure](absl::Status status) { + RunEventEngineClosure(closure, absl_status_to_grpc_error(status)); + }; +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/event_engine_shims/closure.h b/src/core/lib/iomgr/event_engine_shims/closure.h new file mode 100644 index 00000000000..7a3f66c25f6 --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/closure.h @@ -0,0 +1,39 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H +#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H + +#include + +#include "absl/functional/any_invocable.h" + +#include + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" + +namespace grpc_event_engine { +namespace experimental { + +/// Runs a grpc_closure inline with the specified error handle. +void RunEventEngineClosure(grpc_closure* closure, grpc_error_handle error); + +/// Creates a callback that takes an error status argument. +absl::AnyInvocable GrpcClosureToStatusCallback( + grpc_closure* closure); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H diff --git a/src/core/lib/iomgr/event_engine_shims/endpoint.cc b/src/core/lib/iomgr/event_engine_shims/endpoint.cc index ff727d51dff..77c0d277ba7 100644 --- a/src/core/lib/iomgr/event_engine_shims/endpoint.cc +++ b/src/core/lib/iomgr/event_engine_shims/endpoint.cc @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/event_engine_shims/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -45,7 +46,6 @@ extern grpc_core::TraceFlag grpc_tcp_trace; namespace grpc_event_engine { namespace experimental { - namespace { constexpr int64_t kShutdownBit = static_cast(1) << 32; @@ -128,6 +128,12 @@ class EventEngineEndpointWrapper { void ShutdownUnref() { if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == kShutdownBit + 1) { +#ifdef GRPC_POSIX_SOCKET_TCP + if (fd_ > 0 && on_release_fd_) { + reinterpret_cast(endpoint_.get()) + ->Shutdown(std::move(on_release_fd_)); + } +#endif // GRPC_POSIX_SOCKET_TCP OnShutdownInternal(); } } @@ -136,7 +142,13 @@ class EventEngineEndpointWrapper { // and decrements the shutdown ref. If trigger shutdown has been called // before or in parallel, only one of them would win the race. The other // invocation would simply return. - void TriggerShutdown() { + void TriggerShutdown( + absl::AnyInvocable)> on_release_fd) { +#ifdef GRPC_POSIX_SOCKET_TCP + on_release_fd_ = std::move(on_release_fd); +#else + (void)on_release_fd; +#endif // GRPC_POSIX_SOCKET_TCP int64_t curr = shutdown_ref_.load(std::memory_order_acquire); while (true) { if (curr & kShutdownBit) { @@ -148,6 +160,12 @@ class EventEngineEndpointWrapper { Ref(); if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == kShutdownBit + 1) { +#ifdef GRPC_POSIX_SOCKET_TCP + if (fd_ > 0 && on_release_fd_) { + reinterpret_cast(endpoint_.get()) + ->Shutdown(std::move(on_release_fd_)); + } +#endif // GRPC_POSIX_SOCKET_TCP OnShutdownInternal(); } return; @@ -171,6 +189,9 @@ class EventEngineEndpointWrapper { std::unique_ptr eeep_; std::atomic refs_{1}; std::atomic shutdown_ref_{1}; +#ifdef GRPC_POSIX_SOCKET_TCP + absl::AnyInvocable)> on_release_fd_; +#endif // GRPC_POSIX_SOCKET_TCP grpc_core::Mutex mu_; std::string peer_address_; std::string local_address_; @@ -304,7 +325,7 @@ void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) { } GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper, why.ToString().c_str()); - eeep->wrapper->TriggerShutdown(); + eeep->wrapper->TriggerShutdown(nullptr); } // Attempts to free the underlying data structures. @@ -312,8 +333,8 @@ void EndpointDestroy(grpc_endpoint* ep) { auto* eeep = reinterpret_cast( ep); - eeep->wrapper->Unref(); GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper); + eeep->wrapper->Unref(); } absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) { @@ -384,5 +405,33 @@ grpc_endpoint* grpc_event_engine_endpoint_create( return wrapper->GetGrpcEndpoint(); } +bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) { + return ep->vtable == &grpc_event_engine_endpoint_vtable; +} + +void grpc_event_engine_endpoint_destroy_and_release_fd( + grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) { + auto* eeep = + reinterpret_cast( + ep); + if (fd == nullptr || on_release_fd == nullptr) { + if (fd != nullptr) { + *fd = -1; + } + eeep->wrapper->TriggerShutdown(nullptr); + } else { + *fd = -1; + eeep->wrapper->TriggerShutdown( + [fd, on_release_fd](absl::StatusOr release_fd) { + if (release_fd.ok()) { + *fd = *release_fd; + } + RunEventEngineClosure(on_release_fd, + absl_status_to_grpc_error(release_fd.status())); + }); + } + eeep->wrapper->Unref(); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/event_engine_shims/endpoint.h b/src/core/lib/iomgr/event_engine_shims/endpoint.h index 430cc8cddf0..bc018f1e4d7 100644 --- a/src/core/lib/iomgr/event_engine_shims/endpoint.h +++ b/src/core/lib/iomgr/event_engine_shims/endpoint.h @@ -28,6 +28,15 @@ namespace experimental { grpc_endpoint* grpc_event_engine_endpoint_create( std::unique_ptr ee_endpoint); +/// Returns true if the passed endpoint is an event engine shim endpoint. +bool grpc_is_event_engine_endpoint(grpc_endpoint* ep); + +/// Destroys the passed in event engine shim endpoint and schedules the +/// asynchronous execution of the on_release_fd callback. The int pointer fd is +/// set to the underlying endpoint's file descriptor. +void grpc_event_engine_endpoint_destroy_and_release_fd( + grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 2205da5f422..e134551a3ec 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -61,6 +61,7 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -2016,6 +2017,10 @@ int grpc_tcp_fd(grpc_endpoint* ep) { void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_closure* done) { + if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(ep)) { + return grpc_event_engine::experimental:: + grpc_event_engine_endpoint_destroy_and_release_fd(ep, fd, done); + } grpc_tcp* tcp = reinterpret_cast(ep); GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index bc227936e31..e28776a7dfe 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -43,6 +43,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include #include #include #include @@ -50,10 +51,16 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/resolved_address_internal.h" +#include "src/core/lib/event_engine/shim.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/strerror.h" +#include "src/core/lib/iomgr/event_engine_shims/closure.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -64,10 +71,127 @@ #include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/resource_quota/api.h" +#include "src/core/lib/transport/error_utils.h" static std::atomic num_dropped_connections{0}; using ::grpc_event_engine::experimental::EndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::MemoryAllocator; +using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport; +using ::grpc_event_engine::experimental::SliceBuffer; + +class MemoryAllocatorFactoryWrapper + : public grpc_event_engine::experimental::MemoryAllocatorFactory { + public: + explicit MemoryAllocatorFactoryWrapper( + grpc_core::MemoryQuotaRefPtr memory_quota) + : memory_quota_(std::move(memory_quota)) {} + + MemoryAllocator CreateMemoryAllocator(absl::string_view name) override { + return memory_quota_->CreateMemoryAllocator(name); + } + + private: + grpc_core::MemoryQuotaRefPtr memory_quota_; +}; + +static grpc_error_handle CreateEventEngineListener( + grpc_tcp_server* s, grpc_closure* shutdown_complete, + const EndpointConfig& config, grpc_tcp_server** server) { + PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb = + [s](int listener_fd, std::unique_ptr ep, + bool is_external, MemoryAllocator /*allocator*/, + SliceBuffer* pending_data) { + grpc_core::ApplicationCallbackExecCtx app_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_tcp_server_acceptor* acceptor = + static_cast( + gpr_malloc(sizeof(*acceptor))); + acceptor->from_server = s; + acceptor->port_index = -1; + acceptor->fd_index = -1; + if (!is_external) { + auto it = s->listen_fd_to_index_map.find(listener_fd); + if (it != s->listen_fd_to_index_map.end()) { + acceptor->port_index = std::get<0>(it->second); + acceptor->fd_index = std::get<1>(it->second); + } + } else { + // External connection handling. + grpc_resolved_address addr; + memset(&addr, 0, sizeof(addr)); + addr.len = static_cast(sizeof(struct sockaddr_storage)); + // Get the fd of the socket connected to peer. + int fd = + reinterpret_cast< + grpc_event_engine::experimental::PosixEndpointWithFdSupport*>( + ep.get()) + ->GetWrappedFd(); + if (getpeername(fd, reinterpret_cast(addr.addr), + &(addr.len)) < 0) { + gpr_log(GPR_ERROR, "Failed getpeername: %s", + grpc_core::StrError(errno).c_str()); + close(fd); + return; + } + (void)grpc_set_socket_no_sigpipe_if_possible(fd); + auto addr_uri = grpc_sockaddr_to_uri(&addr); + if (!addr_uri.ok()) { + gpr_log(GPR_ERROR, "Invalid address: %s", + addr_uri.status().ToString().c_str()); + return; + } + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, + "SERVER_CONNECT: incoming external connection: %s", + addr_uri->c_str()); + } + } + grpc_pollset* read_notifier_pollset = + (*(s->pollsets))[static_cast(gpr_atm_no_barrier_fetch_add( + &s->next_pollset_to_assign, 1)) % + s->pollsets->size()]; + acceptor->external_connection = is_external; + acceptor->listener_fd = listener_fd; + grpc_byte_buffer* buf = nullptr; + if (pending_data != nullptr && pending_data->Length() > 0) { + buf = grpc_raw_byte_buffer_create(nullptr, 0); + grpc_slice_buffer_swap(&buf->data.raw.slice_buffer, + pending_data->c_slice_buffer()); + pending_data->Clear(); + } + acceptor->pending_data = buf; + s->on_accept_cb( + s->on_accept_cb_arg, + grpc_event_engine::experimental::grpc_event_engine_endpoint_create( + std::move(ep)), + read_notifier_pollset, acceptor); + }; + auto on_shutdown_complete_cb = + grpc_event_engine::experimental::GrpcClosureToStatusCallback( + shutdown_complete); + auto listener = + reinterpret_cast( + grpc_event_engine::experimental::GetDefaultEventEngine().get()) + ->CreatePosixListener( + std::move(accept_cb), + [s, shutdown_complete](absl::Status status) { + grpc_event_engine::experimental::RunEventEngineClosure( + shutdown_complete, absl_status_to_grpc_error(status)); + delete s->fd_handler; + delete s; + }, + config, + std::make_unique(s->memory_quota)); + if (!listener.ok()) { + delete s; + *server = nullptr; + return listener.status(); + } + s->ee_listener = std::move(*listener); + return absl::OkStatus(); +} static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, const EndpointConfig& config, @@ -92,7 +216,11 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, s->shutdown = false; s->shutdown_starting.head = nullptr; s->shutdown_starting.tail = nullptr; - s->shutdown_complete = shutdown_complete; + if (!grpc_event_engine::experimental::UseEventEngineListener()) { + s->shutdown_complete = shutdown_complete; + } else { + s->shutdown_complete = nullptr; + } s->on_accept_cb = on_accept_cb; s->on_accept_cb_arg = on_accept_cb_arg; s->head = nullptr; @@ -105,7 +233,13 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, s->memory_quota = s->options.resource_quota->memory_quota(); s->pre_allocated_fd = -1; gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); + s->n_bind_ports = 0; + new (&s->listen_fd_to_index_map) + absl::flat_hash_map>(); *server = s; + if (grpc_event_engine::experimental::UseEventEngineListener()) { + return CreateEventEngineListener(s, shutdown_complete, config, server); + } return absl::OkStatus(); } @@ -123,8 +257,14 @@ static void finish_shutdown(grpc_tcp_server* s) { s->head = sp->next; gpr_free(sp); } - delete s->fd_handler; - delete s; + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // This will trigger asynchronous execution of the on_shutdown_complete + // callback when appropriate. That callback will delete the server + s->ee_listener.reset(); + } else { + delete s->fd_handler; + delete s; + } } static void destroyed_port(void* server, grpc_error_handle /*error*/) { @@ -423,6 +563,30 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener, static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s, const grpc_resolved_address* addr, int* out_port) { + if (grpc_event_engine::experimental::UseEventEngineListener()) { + gpr_mu_lock(&s->mu); + if (s->shutdown_listeners) { + gpr_mu_unlock(&s->mu); + return absl::UnknownError("Server already shutdown"); + } + int fd_index = 0; + auto port = s->ee_listener->BindWithFd( + grpc_event_engine::experimental::CreateResolvedAddress(*addr), + [s, &fd_index](absl::StatusOr listen_fd) { + if (!listen_fd.ok()) { + return; + } + GPR_DEBUG_ASSERT(*listen_fd > 0); + s->listen_fd_to_index_map.insert_or_assign( + *listen_fd, std::make_tuple(s->n_bind_ports, fd_index++)); + }); + if (port.ok()) { + s->n_bind_ports++; + *out_port = *port; + } + gpr_mu_unlock(&s->mu); + return port.status(); + } GPR_ASSERT(addr->len <= GRPC_MAX_SOCKADDR_SIZE); grpc_tcp_listener* sp; grpc_resolved_address sockname_temp; @@ -500,6 +664,17 @@ static grpc_tcp_listener* get_port_index(grpc_tcp_server* s, unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) { unsigned num_fds = 0; gpr_mu_lock(&s->mu); + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // This doesn't need to be very fast. Used in tests. + for (auto it = s->listen_fd_to_index_map.begin(); + it != s->listen_fd_to_index_map.end(); it++) { + if (std::get<0>(it->second) == static_cast(port_index)) { + num_fds++; + } + } + gpr_mu_unlock(&s->mu); + return num_fds; + } grpc_tcp_listener* sp = get_port_index(s, port_index); for (; sp; sp = sp->sibling) { ++num_fds; @@ -511,6 +686,19 @@ unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) { static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index, unsigned fd_index) { gpr_mu_lock(&s->mu); + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // This doesn't need to be very fast. Used in tests. + for (auto it = s->listen_fd_to_index_map.begin(); + it != s->listen_fd_to_index_map.end(); it++) { + if (std::get<0>(it->second) == static_cast(port_index) && + std::get<1>(it->second) == static_cast(fd_index)) { + gpr_mu_unlock(&s->mu); + return it->first; + } + } + gpr_mu_unlock(&s->mu); + return -1; + } grpc_tcp_listener* sp = get_port_index(s, port_index); for (; sp; sp = sp->sibling, --fd_index) { if (fd_index == 0) { @@ -530,6 +718,12 @@ static void tcp_server_start(grpc_tcp_server* s, GPR_ASSERT(s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; + if (grpc_event_engine::experimental::UseEventEngineListener()) { + GPR_ASSERT(!s->shutdown_listeners); + GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start())); + gpr_mu_unlock(&s->mu); + return; + } sp = s->head; while (sp != nullptr) { if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) && @@ -584,7 +778,10 @@ static void tcp_server_unref(grpc_tcp_server* s) { static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { gpr_mu_lock(&s->mu); s->shutdown_listeners = true; - // shutdown all fd's + if (grpc_event_engine::experimental::UseEventEngineListener()) { + s->ee_listener->ShutdownListeningFds(); + } + /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { @@ -611,6 +808,18 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler { // TODO(yangg) resolve duplicate code with on_read void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override { + if (grpc_event_engine::experimental::UseEventEngineListener()) { + grpc_event_engine::experimental::SliceBuffer pending_data; + if (buf != nullptr) { + pending_data = + grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer( + buf->data.raw.slice_buffer); + } + GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_handle_external_connection", + s_->ee_listener->HandleExternalConnection( + listener_fd, fd, &pending_data))); + return; + } grpc_pollset* read_notifier_pollset; grpc_resolved_address addr; memset(&addr, 0, sizeof(addr)); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 7157aa2a0a7..26cef0209fb 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -21,6 +21,11 @@ #include +#include + +#include "absl/container/flat_hash_map.h" + +#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -99,6 +104,11 @@ struct grpc_tcp_server { // used to create slice allocators for endpoints, owned grpc_core::MemoryQuotaRefPtr memory_quota; + /* used when event engine based servers are enabled */ + int n_bind_ports = 0; + absl::flat_hash_map> listen_fd_to_index_map; + std::unique_ptr + ee_listener = nullptr; /* used to store a pre-allocated FD assigned to a socket */ int pre_allocated_fd; }; diff --git a/src/core/lib/transport/handshaker.cc b/src/core/lib/transport/handshaker.cc index 3ab00827e5d..5357fc96e6d 100644 --- a/src/core/lib/transport/handshaker.cc +++ b/src/core/lib/transport/handshaker.cc @@ -28,6 +28,7 @@ #include "absl/status/status.h" #include "absl/strings/str_format.h" +#include #include #include #include @@ -38,6 +39,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { @@ -189,6 +191,14 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint, acceptor->pending_data != nullptr) { grpc_slice_buffer_swap(args_.read_buffer, &(acceptor->pending_data->data.raw.slice_buffer)); + // TODO(vigneshbabu): For connections accepted through event engine + // listeners, the ownership of the byte buffer received is transferred to + // this callback and it is thus this callback's duty to delete it. + // Make this hack default once event engine is rolled out. + if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint( + endpoint)) { + grpc_byte_buffer_destroy(acceptor->pending_data); + } } // Initialize state needed for calling handshakers. acceptor_ = acceptor; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2f441b1a2c5..cb801eb771d 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -585,6 +585,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/closure.cc', 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index ddd5fb0a2e7..5af9a430af1 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -656,6 +656,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( grpc_sockaddr_in* addr = reinterpret_cast(resolved_addr.addr); memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(grpc_sockaddr_in); addr->sin_family = GRPC_AF_INET; grpc_sockaddr_set_port(&resolved_addr, proxy_port); int port; diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 266d9425674..63b1f2c58ef 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -266,6 +266,7 @@ grpc_cc_test( language = "C++", tags = [ "endpoint_test", + "event_engine_listener_test", "no_mac", # TODO(jtattermusch): Reenable once https://github.com/grpc/grpc/issues/21282 is fixed. "no_windows", ], @@ -313,7 +314,10 @@ grpc_cc_test( srcs = ["tcp_server_posix_test.cc"], external_deps = ["gtest"], language = "C++", - tags = ["no_windows"], + tags = [ + "event_engine_listener_test", + "no_windows", + ], deps = [ "//:gpr", "//:grpc", diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index eba45b045a8..cb924e7c187 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -16,7 +16,10 @@ // // +#include "absl/time/time.h" + #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/port.h" @@ -37,11 +40,14 @@ #include #include "src/core/lib/event_engine/channel_args_endpoint_config.h" -#include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/posix.h" +#include "src/core/lib/event_engine/shim.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/sockaddr_posix.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -524,11 +530,15 @@ static void write_test(size_t num_bytes, size_t slice_size, static_cast(a[1].value.pointer.p)); } +struct release_fd_arg { + std::atomic fd_released_done{0}; + grpc_core::Notification notify; +}; + void on_fd_released(void* arg, grpc_error_handle /*errors*/) { - int* done = static_cast(arg); - *done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + release_fd_arg* rel_fd = static_cast(arg); + rel_fd->fd_released_done = 1; + rel_fd->notify.Notify(); } // Do a read_test, then release fd and try to read/write again. Verify that @@ -543,8 +553,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_timeout_seconds_to_deadline(20)); grpc_core::ExecCtx exec_ctx; grpc_closure fd_released_cb; - int fd_released_done = 0; - GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done, + release_fd_arg rel_fd; + GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &rel_fd, grpc_schedule_on_exec_ctx); gpr_log(GPR_INFO, @@ -561,14 +571,30 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { a[1].type = GRPC_ARG_POINTER; a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); + auto memory_quota = std::make_unique("bar"); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create( - grpc_fd_create(sv[1], "read_test", false), - TcpOptionsFromEndpointConfig( - grpc_event_engine::experimental::ChannelArgsEndpointConfig( - grpc_core::ChannelArgs::FromC(&args))), - "test"); - GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // Create an event engine wrapped endpoint to test release_fd operations. + auto eeep = + reinterpret_cast< + grpc_event_engine::experimental::PosixEventEngineWithFdSupport*>( + grpc_event_engine::experimental::GetDefaultEventEngine().get()) + ->CreatePosixEndpointFromFd( + sv[1], + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args)), + memory_quota->CreateMemoryAllocator("test")); + ep = grpc_event_engine::experimental::grpc_event_engine_endpoint_create( + std::move(eeep)); + } else { + ep = grpc_tcp_create( + grpc_fd_create(sv[1], "read_test", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); + GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); + } grpc_endpoint_add_to_pollset(ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -601,17 +627,9 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb); grpc_core::ExecCtx::Get()->Flush(); - gpr_mu_lock(g_mu); - while (!fd_released_done) { - grpc_pollset_worker* worker = nullptr; - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); - gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); - } - gpr_mu_unlock(g_mu); - GPR_ASSERT(fd_released_done == 1); + rel_fd.notify.WaitForNotificationWithTimeout(absl::Seconds(20)); + GPR_ASSERT(rel_fd.fd_released_done == 1); GPR_ASSERT(fd == sv[1]); - written_bytes = fill_socket_partial(sv[0], num_bytes); drain_socket_blocking(fd, written_bytes, written_bytes); written_bytes = fill_socket_partial(fd, num_bytes); diff --git a/test/core/iomgr/tcp_server_posix_test.cc b/test/core/iomgr/tcp_server_posix_test.cc index 5c529b82a22..0f801a49d43 100644 --- a/test/core/iomgr/tcp_server_posix_test.cc +++ b/test/core/iomgr/tcp_server_posix_test.cc @@ -18,6 +18,7 @@ #include +#include "src/core/lib/event_engine/shim.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/port.h" #include "test/core/util/test_config.h" @@ -513,6 +514,11 @@ static void test_pre_allocated_inet_fd() { struct sockaddr_in6* addr = reinterpret_cast(resolved_addr.addr); grpc_tcp_server* s; + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // TODO(vigneshbabu): Skip the test when event engine is enabled. + // Pre-allocated fd support will be added to event engine later. + return; + } auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(nullptr); @@ -610,6 +616,11 @@ static void test_pre_allocated_unix_fd() { struct sockaddr_un* addr = reinterpret_cast(resolved_addr.addr); grpc_tcp_server* s; + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // TODO(vigneshbabu): Skip the test when event engine is enabled. + // Pre-allocated fd support will be added to event engine later. + return; + } auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(nullptr); diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 89aa91ab612..abf0ac7cd22 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -154,6 +154,7 @@ void bad_server_thread(void* vargs) { ASSERT_TRUE(error.ok()); memset(&resolved_addr, 0, sizeof(resolved_addr)); addr->sa_family = GRPC_AF_INET; + resolved_addr.len = sizeof(grpc_sockaddr_in); error = grpc_tcp_server_add_port(s, &resolved_addr, &port); ASSERT_TRUE(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error)); ASSERT_GT(port, 0); diff --git a/test/core/util/test_tcp_server.cc b/test/core/util/test_tcp_server.cc index 986065cbe01..78d28a38b6c 100644 --- a/test/core/util/test_tcp_server.cc +++ b/test/core/util/test_tcp_server.cc @@ -44,7 +44,9 @@ static void on_server_destroyed(void* data, grpc_error_handle /*error*/) { test_tcp_server* server = static_cast(data); + gpr_mu_lock(server->mu); server->shutdown = true; + gpr_mu_unlock(server->mu); } void test_tcp_server_init(test_tcp_server* server, @@ -116,10 +118,14 @@ void test_tcp_server_destroy(test_tcp_server* server) { shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(5, GPR_TIMESPAN)); grpc_core::ExecCtx::Get()->Flush(); + gpr_mu_lock(server->mu); while (!server->shutdown && gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) { - test_tcp_server_poll(server, 1000); + gpr_mu_unlock(server->mu); + test_tcp_server_poll(server, 100); + gpr_mu_lock(server->mu); } + gpr_mu_unlock(server->mu); grpc_pollset_shutdown(server->pollset[0], GRPC_CLOSURE_CREATE(finish_pollset, server->pollset[0], grpc_schedule_on_exec_ctx)); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 30e9485e4f6..3652cac6bda 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2248,6 +2248,8 @@ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/ev_windows.cc \ +src/core/lib/iomgr/event_engine_shims/closure.cc \ +src/core/lib/iomgr/event_engine_shims/closure.h \ src/core/lib/iomgr/event_engine_shims/endpoint.cc \ src/core/lib/iomgr/event_engine_shims/endpoint.h \ src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 0882641e400..edaac2095b4 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2030,6 +2030,8 @@ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/ev_windows.cc \ +src/core/lib/iomgr/event_engine_shims/closure.cc \ +src/core/lib/iomgr/event_engine_shims/closure.h \ src/core/lib/iomgr/event_engine_shims/endpoint.cc \ src/core/lib/iomgr/event_engine_shims/endpoint.h \ src/core/lib/iomgr/event_engine_shims/tcp_client.cc \