@ -61,6 +61,14 @@
// IWYU pragma: no_include <ratio>
// TODO(eryu): remove this GRPC_CFSTREAM condition when the CFEngine is ready.
// The posix poller currently crashes iOS.
# if defined(GRPC_POSIX_SOCKET_TCP) && !defined(GRPC_CFSTREAM)
# define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING true
# else
# define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING false
# endif
using namespace std : : chrono_literals ;
namespace grpc_event_engine {
@ -336,13 +344,16 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: connection_shards_ ( std : : max ( 2 * gpr_cpu_num_cores ( ) , 1u ) ) ,
executor_ ( MakeThreadPool ( grpc_core : : Clamp ( gpr_cpu_num_cores ( ) , 2u , 16u ) ) ) ,
timer_manager_ ( executor_ ) {
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
poller_manager_ = std : : make_shared < PosixEnginePollerManager > ( poller ) ;
# endif
}
PosixEventEngine : : PosixEventEngine ( )
: connection_shards_ ( std : : max ( 2 * gpr_cpu_num_cores ( ) , 1u ) ) ,
executor_ ( MakeThreadPool ( grpc_core : : Clamp ( gpr_cpu_num_cores ( ) , 2u , 16u ) ) ) ,
timer_manager_ ( executor_ ) {
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
poller_manager_ = std : : make_shared < PosixEnginePollerManager > ( executor_ ) ;
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
@ -351,6 +362,7 @@ PosixEventEngine::PosixEventEngine()
PollerWorkInternal ( poller_manager ) ;
} ) ;
}
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
void PosixEventEngine : : PollerWorkInternal (
@ -421,11 +433,11 @@ PosixEventEngine::~PosixEventEngine() {
GPR_ASSERT ( GPR_LIKELY ( known_handles_ . empty ( ) ) ) ;
}
timer_manager_ . Shutdown ( ) ;
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
if ( poller_manager_ ! = nullptr ) {
poller_manager_ - > TriggerShutdown ( ) ;
}
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
executor_ - > Quiesce ( ) ;
}
@ -482,7 +494,7 @@ std::unique_ptr<EventEngine::DNSResolver> PosixEventEngine::GetDNSResolver(
bool PosixEventEngine : : IsWorkerThread ( ) { grpc_core : : Crash ( " unimplemented " ) ; }
bool PosixEventEngine : : CancelConnect ( EventEngine : : ConnectionHandle handle ) {
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
int connection_handle = handle . keys [ 0 ] ;
if ( connection_handle < = 0 ) {
return false ;
@ -531,17 +543,17 @@ bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
delete ac ;
}
return connection_cancel_success ;
# else // GRPC_POSIX_SOCKET_TCP
# else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core : : Crash (
" EventEngine::CancelConnect is not supported on this platform " ) ;
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
EventEngine : : ConnectionHandle PosixEventEngine : : Connect (
OnConnectCallback on_connect , const ResolvedAddress & addr ,
const EndpointConfig & args , MemoryAllocator memory_allocator ,
Duration timeout ) {
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
GPR_ASSERT ( poller_manager_ ! = nullptr ) ;
PosixTcpOptions options = TcpOptionsFromEndpointConfig ( args ) ;
absl : : StatusOr < PosixSocketWrapper : : PosixSocketCreateResult > socket =
@ -554,16 +566,16 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
return ConnectInternal ( ( * socket ) . sock , std : : move ( on_connect ) ,
( * socket ) . mapped_target_addr ,
std : : move ( memory_allocator ) , options , timeout ) ;
# else // GRPC_POSIX_SOCKET_TCP
# else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core : : Crash ( " EventEngine::Connect is not supported on this platform " ) ;
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
std : : unique_ptr < PosixEndpointWithFdSupport >
PosixEventEngine : : CreatePosixEndpointFromFd ( int fd ,
const EndpointConfig & config ,
MemoryAllocator memory_allocator ) {
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
GPR_DEBUG_ASSERT ( fd > 0 ) ;
PosixEventPoller * poller = poller_manager_ - > Poller ( ) ;
GPR_DEBUG_ASSERT ( poller ! = nullptr ) ;
@ -572,11 +584,11 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd,
return CreatePosixEndpoint ( handle , nullptr , shared_from_this ( ) ,
std : : move ( memory_allocator ) ,
TcpOptionsFromEndpointConfig ( config ) ) ;
# else // GRPC_POSIX_SOCKET_TCP
# else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core : : Crash (
" PosixEventEngine::CreatePosixEndpointFromFd is not supported on "
" this platform " ) ;
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
absl : : StatusOr < std : : unique_ptr < EventEngine : : Listener > >
@ -585,7 +597,7 @@ PosixEventEngine::CreateListener(
absl : : AnyInvocable < void ( absl : : Status ) > on_shutdown ,
const EndpointConfig & config ,
std : : unique_ptr < MemoryAllocatorFactory > memory_allocator_factory ) {
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
PosixEventEngineWithFdSupport : : PosixAcceptCallback posix_on_accept =
[ on_accept_cb = std : : move ( on_accept ) ] (
int /*listener_fd*/ , std : : unique_ptr < EventEngine : : Endpoint > ep ,
@ -597,10 +609,10 @@ PosixEventEngine::CreateListener(
std : : move ( posix_on_accept ) , std : : move ( on_shutdown ) , config ,
std : : move ( memory_allocator_factory ) , poller_manager_ - > Poller ( ) ,
shared_from_this ( ) ) ;
# else // GRPC_POSIX_SOCKET_TCP
# else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core : : Crash (
" EventEngine::CreateListener is not supported on this platform " ) ;
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
absl : : StatusOr < std : : unique_ptr < PosixListenerWithFdSupport > >
@ -609,15 +621,15 @@ PosixEventEngine::CreatePosixListener(
absl : : AnyInvocable < void ( absl : : Status ) > on_shutdown ,
const EndpointConfig & config ,
std : : unique_ptr < MemoryAllocatorFactory > memory_allocator_factory ) {
# ifdef GRPC_POSIX_SOCKET_TCP
# if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
return std : : make_unique < PosixEngineListener > (
std : : move ( on_accept ) , std : : move ( on_shutdown ) , config ,
std : : move ( memory_allocator_factory ) , poller_manager_ - > Poller ( ) ,
shared_from_this ( ) ) ;
# else // GRPC_POSIX_SOCKET_TCP
# else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core : : Crash (
" EventEngine::CreateListener is not supported on this platform " ) ;
# endif // GRPC_POSIX_SOCKET_TCP
# endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
} // namespace experimental