[EventEngine] Fix the shims for iOS (#32363)

The shims assumed that all platforms with Posix socket support would use
the PosixEventEngine. This is not the case for iOS.




<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32373/head
AJ Heller 2 years ago committed by GitHub
parent d6bb5157a4
commit cfb05a9945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/lib/event_engine/shim.cc
  2. 4
      src/core/lib/event_engine/shim.h
  3. 31
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  4. 1
      src/core/lib/transport/tcp_connect_handshaker.cc

@ -25,7 +25,7 @@ namespace experimental {
bool UseEventEngineClient() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become
// available for other platforms.
#ifdef GRPC_POSIX_SOCKET_TCP
#if defined(GRPC_POSIX_SOCKET_TCP) and !defined(GRPC_CFSTREAM)
return grpc_core::IsEventEngineClientEnabled();
#else
return false;
@ -35,12 +35,20 @@ bool UseEventEngineClient() {
bool UseEventEngineListener() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become
// available for other platforms.
#ifdef GRPC_POSIX_SOCKET_TCP
#if defined(GRPC_POSIX_SOCKET_TCP) and !defined(GRPC_CFSTREAM)
return grpc_core::IsEventEngineListenerEnabled();
#else
return false;
#endif
}
bool EventEngineSupportsFd() {
#if defined(GRPC_POSIX_SOCKET_TCP) and !defined(GRPC_CFSTREAM)
return true;
#else
return false;
#endif
}
} // namespace experimental
} // namespace grpc_event_engine

@ -14,6 +14,8 @@
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H
// Platform-specific configuration for use of the EventEngine shims.
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
@ -23,6 +25,8 @@ bool UseEventEngineClient();
bool UseEventEngineListener();
bool EventEngineSupportsFd();
} // namespace experimental
} // namespace grpc_event_engine

@ -29,6 +29,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gpr/string.h"
@ -128,12 +129,10 @@ class EventEngineEndpointWrapper {
void ShutdownUnref() {
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
#ifdef GRPC_POSIX_SOCKET_TCP
if (fd_ > 0 && on_release_fd_) {
if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
}
#endif // GRPC_POSIX_SOCKET_TCP
OnShutdownInternal();
}
}
@ -144,11 +143,9 @@ class EventEngineEndpointWrapper {
// invocation would simply return.
void TriggerShutdown(
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
#ifdef GRPC_POSIX_SOCKET_TCP
on_release_fd_ = std::move(on_release_fd);
#else
(void)on_release_fd;
#endif // GRPC_POSIX_SOCKET_TCP
if (EventEngineSupportsFd()) {
on_release_fd_ = std::move(on_release_fd);
}
int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
while (true) {
if (curr & kShutdownBit) {
@ -160,12 +157,10 @@ class EventEngineEndpointWrapper {
Ref();
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
#ifdef GRPC_POSIX_SOCKET_TCP
if (fd_ > 0 && on_release_fd_) {
if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
}
#endif // GRPC_POSIX_SOCKET_TCP
OnShutdownInternal();
}
return;
@ -189,9 +184,7 @@ class EventEngineEndpointWrapper {
std::unique_ptr<grpc_event_engine_endpoint> eeep_;
std::atomic<int64_t> refs_{1};
std::atomic<int64_t> shutdown_ref_{1};
#ifdef GRPC_POSIX_SOCKET_TCP
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
#endif // GRPC_POSIX_SOCKET_TCP
grpc_core::Mutex mu_;
std::string peer_address_;
std::string local_address_;
@ -387,12 +380,12 @@ EventEngineEndpointWrapper::EventEngineEndpointWrapper(
if (peer_addr.ok()) {
peer_address_ = *peer_addr;
}
#ifdef GRPC_POSIX_SOCKET_TCP
fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->GetWrappedFd();
#else // GRPC_POSIX_SOCKET_TCP
fd_ = -1;
#endif // GRPC_POSIX_SOCKET_TCP
if (EventEngineSupportsFd()) {
fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->GetWrappedFd();
} else {
fd_ = -1;
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
}

@ -152,7 +152,6 @@ void TCPConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
// we don't want to pass args->endpoint directly.
// Instead pass endpoint_ and swap this endpoint to
// args endpoint on success.
// TODO(hork): use EventEngine::Connect if(IsEventEngineClientEnabled())
grpc_tcp_client_connect(
&connected_, &endpoint_to_destroy_, interested_parties_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args->args),

Loading…
Cancel
Save