[EventEngine] Add QueryExtension Interfaces for *SupportsFd (#35648)

This PR:
* adds FD extensions to the public headers
* Adds the query extension interface to EventEngine, Listener, and Endpoint, via a new `Extensible` interface
* Refactors the PosixEventEngine to use the Extensible interface.

Closes #35648

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35648 from drfloob:posix-ee-query-interface 7cae28e0b3
PiperOrigin-RevId: 601794970
pull/35759/head
AJ Heller 1 year ago committed by Copybara-Service
parent 714640d71f
commit 71fa68f7fb
  1. 4
      BUILD
  2. 3
      CMakeLists.txt
  3. 2
      Makefile
  4. 4
      Package.swift
  5. 13
      build_autogenerated.yaml
  6. 6
      gRPC-C++.podspec
  7. 7
      gRPC-Core.podspec
  8. 4
      grpc.gemspec
  9. 48
      include/grpc/event_engine/event_engine.h
  10. 68
      include/grpc/event_engine/extensible.h
  11. 4
      package.xml
  12. 21
      src/core/BUILD
  13. 40
      src/core/lib/event_engine/extensions/can_track_errors.h
  14. 160
      src/core/lib/event_engine/extensions/supports_fd.h
  15. 133
      src/core/lib/event_engine/posix.h
  16. 1
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  17. 4
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  18. 5
      src/core/lib/event_engine/posix_engine/posix_engine.h
  19. 29
      src/core/lib/event_engine/query_extensions.h
  20. 15
      src/core/lib/event_engine/shim.cc
  21. 2
      src/core/lib/event_engine/shim.h
  22. 34
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  23. 101
      src/core/lib/iomgr/tcp_server_posix.cc
  24. 7
      test/core/end2end/end2end_test_fuzzer.cc
  25. 7
      test/core/end2end/fuzzers/fuzzing_common.cc
  26. 4
      test/core/event_engine/query_extensions_test.cc
  27. 6
      test/core/transport/test_suite/fuzzer_main.cc
  28. 1
      tools/doxygen/Doxyfile.c++
  29. 4
      tools/doxygen/Doxyfile.c++.internal
  30. 1
      tools/doxygen/Doxyfile.core
  31. 4
      tools/doxygen/Doxyfile.core.internal

@ -287,6 +287,7 @@ GRPC_PUBLIC_HDRS = [
GRPC_PUBLIC_EVENT_ENGINE_HDRS = [
"include/grpc/event_engine/endpoint_config.h",
"include/grpc/event_engine/event_engine.h",
"include/grpc/event_engine/extensible.h",
"include/grpc/event_engine/port.h",
"include/grpc/event_engine/memory_allocator.h",
"include/grpc/event_engine/memory_request.h",
@ -1550,7 +1551,9 @@ grpc_cc_library(
"//src/core:error",
"//src/core:error_utils",
"//src/core:event_engine_common",
"//src/core:event_engine_extensions",
"//src/core:event_engine_memory_allocator_factory",
"//src/core:event_engine_query_extensions",
"//src/core:event_engine_shim",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:event_engine_trace",
@ -1580,6 +1583,7 @@ grpc_cc_library(
"//src/core:poll",
"//src/core:pollset_set",
"//src/core:posix_event_engine_base_hdrs",
"//src/core:posix_event_engine_endpoint",
"//src/core:prioritized_race",
"//src/core:promise_status",
"//src/core:race",

3
CMakeLists.txt generated

@ -2651,6 +2651,7 @@ foreach(_hdr
include/grpc/compression.h
include/grpc/event_engine/endpoint_config.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/extensible.h
include/grpc/event_engine/internal/memory_allocator_impl.h
include/grpc/event_engine/internal/slice_cast.h
include/grpc/event_engine/memory_allocator.h
@ -3351,6 +3352,7 @@ foreach(_hdr
include/grpc/compression.h
include/grpc/event_engine/endpoint_config.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/extensible.h
include/grpc/event_engine/internal/memory_allocator_impl.h
include/grpc/event_engine/internal/slice_cast.h
include/grpc/event_engine/memory_allocator.h
@ -5425,6 +5427,7 @@ foreach(_hdr
include/grpc/compression.h
include/grpc/event_engine/endpoint_config.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/extensible.h
include/grpc/event_engine/internal/memory_allocator_impl.h
include/grpc/event_engine/internal/slice_cast.h
include/grpc/event_engine/memory_allocator.h

2
Makefile generated

@ -1764,6 +1764,7 @@ PUBLIC_HEADERS_C += \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \
@ -2299,6 +2300,7 @@ PUBLIC_HEADERS_C += \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \

4
Package.swift generated

@ -47,6 +47,7 @@ let package = Package(
"include/grpc/compression.h",
"include/grpc/event_engine/endpoint_config.h",
"include/grpc/event_engine/event_engine.h",
"include/grpc/event_engine/extensible.h",
"include/grpc/event_engine/internal/memory_allocator_impl.h",
"include/grpc/event_engine/internal/slice_cast.h",
"include/grpc/event_engine/memory_allocator.h",
@ -1276,6 +1277,8 @@ let package = Package(
"src/core/lib/event_engine/default_event_engine_factory.cc",
"src/core/lib/event_engine/default_event_engine_factory.h",
"src/core/lib/event_engine/event_engine.cc",
"src/core/lib/event_engine/extensions/can_track_errors.h",
"src/core/lib/event_engine/extensions/supports_fd.h",
"src/core/lib/event_engine/forkable.cc",
"src/core/lib/event_engine/forkable.h",
"src/core/lib/event_engine/grpc_polled_fd.h",
@ -1324,6 +1327,7 @@ let package = Package(
"src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h",
"src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc",
"src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h",
"src/core/lib/event_engine/query_extensions.h",
"src/core/lib/event_engine/ref_counted_dns_resolver_interface.h",
"src/core/lib/event_engine/resolved_address.cc",
"src/core/lib/event_engine/resolved_address_internal.h",

@ -149,6 +149,7 @@ libs:
- include/grpc/compression.h
- include/grpc/event_engine/endpoint_config.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/extensible.h
- include/grpc/event_engine/internal/memory_allocator_impl.h
- include/grpc/event_engine/internal/slice_cast.h
- include/grpc/event_engine/memory_allocator.h
@ -876,6 +877,8 @@ libs:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/extensions/can_track_errors.h
- src/core/lib/event_engine/extensions/supports_fd.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/grpc_polled_fd.h
- src/core/lib/event_engine/handle_containers.h
@ -905,6 +908,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/query_extensions.h
- src/core/lib/event_engine/ref_counted_dns_resolver_interface.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
@ -2118,6 +2122,7 @@ libs:
- include/grpc/compression.h
- include/grpc/event_engine/endpoint_config.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/extensible.h
- include/grpc/event_engine/internal/memory_allocator_impl.h
- include/grpc/event_engine/internal/slice_cast.h
- include/grpc/event_engine/memory_allocator.h
@ -2357,6 +2362,8 @@ libs:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/extensions/can_track_errors.h
- src/core/lib/event_engine/extensions/supports_fd.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/grpc_polled_fd.h
- src/core/lib/event_engine/handle_containers.h
@ -2386,6 +2393,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/query_extensions.h
- src/core/lib/event_engine/ref_counted_dns_resolver_interface.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
@ -4282,6 +4290,7 @@ libs:
- include/grpc/compression.h
- include/grpc/event_engine/endpoint_config.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/extensible.h
- include/grpc/event_engine/internal/memory_allocator_impl.h
- include/grpc/event_engine/internal/slice_cast.h
- include/grpc/event_engine/memory_allocator.h
@ -4400,6 +4409,8 @@ libs:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/extensions/can_track_errors.h
- src/core/lib/event_engine/extensions/supports_fd.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/grpc_polled_fd.h
- src/core/lib/event_engine/handle_containers.h
@ -4429,6 +4440,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/query_extensions.h
- src/core/lib/event_engine/ref_counted_dns_resolver_interface.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
@ -16479,6 +16491,7 @@ targets:
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/extensions/can_track_errors.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/iomgr/port.h

6
gRPC-C++.podspec generated

@ -958,6 +958,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/extensions/can_track_errors.h',
'src/core/lib/event_engine/extensions/supports_fd.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/grpc_polled_fd.h',
'src/core/lib/event_engine/handle_containers.h',
@ -987,6 +989,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/query_extensions.h',
'src/core/lib/event_engine/ref_counted_dns_resolver_interface.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',
@ -2209,6 +2212,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/extensions/can_track_errors.h',
'src/core/lib/event_engine/extensions/supports_fd.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/grpc_polled_fd.h',
'src/core/lib/event_engine/handle_containers.h',
@ -2238,6 +2243,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/query_extensions.h',
'src/core/lib/event_engine/ref_counted_dns_resolver_interface.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',

7
gRPC-Core.podspec generated

@ -121,6 +121,7 @@ Pod::Spec.new do |s|
'include/grpc/compression.h',
'include/grpc/event_engine/endpoint_config.h',
'include/grpc/event_engine/event_engine.h',
'include/grpc/event_engine/extensible.h',
'include/grpc/event_engine/internal/memory_allocator_impl.h',
'include/grpc/event_engine/internal/slice_cast.h',
'include/grpc/event_engine/memory_allocator.h',
@ -1389,6 +1390,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/extensions/can_track_errors.h',
'src/core/lib/event_engine/extensions/supports_fd.h',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/grpc_polled_fd.h',
@ -1437,6 +1440,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/query_extensions.h',
'src/core/lib/event_engine/ref_counted_dns_resolver_interface.h',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/resolved_address_internal.h',
@ -2985,6 +2989,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/extensions/can_track_errors.h',
'src/core/lib/event_engine/extensions/supports_fd.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/grpc_polled_fd.h',
'src/core/lib/event_engine/handle_containers.h',
@ -3014,6 +3020,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/query_extensions.h',
'src/core/lib/event_engine/ref_counted_dns_resolver_interface.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',

4
grpc.gemspec generated

@ -53,6 +53,7 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/compression.h )
s.files += %w( include/grpc/event_engine/endpoint_config.h )
s.files += %w( include/grpc/event_engine/event_engine.h )
s.files += %w( include/grpc/event_engine/extensible.h )
s.files += %w( include/grpc/event_engine/internal/memory_allocator_impl.h )
s.files += %w( include/grpc/event_engine/internal/slice_cast.h )
s.files += %w( include/grpc/event_engine/memory_allocator.h )
@ -1282,6 +1283,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/default_event_engine_factory.cc )
s.files += %w( src/core/lib/event_engine/default_event_engine_factory.h )
s.files += %w( src/core/lib/event_engine/event_engine.cc )
s.files += %w( src/core/lib/event_engine/extensions/can_track_errors.h )
s.files += %w( src/core/lib/event_engine/extensions/supports_fd.h )
s.files += %w( src/core/lib/event_engine/forkable.cc )
s.files += %w( src/core/lib/event_engine/forkable.h )
s.files += %w( src/core/lib/event_engine/grpc_polled_fd.h )
@ -1330,6 +1333,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h )
s.files += %w( src/core/lib/event_engine/query_extensions.h )
s.files += %w( src/core/lib/event_engine/ref_counted_dns_resolver_interface.h )
s.files += %w( src/core/lib/event_engine/resolved_address.cc )
s.files += %w( src/core/lib/event_engine/resolved_address_internal.h )

@ -16,7 +16,6 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <vector>
#include "absl/functional/any_invocable.h"
@ -24,6 +23,7 @@
#include "absl/status/statusor.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/extensible.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/port.h>
#include <grpc/event_engine/slice_buffer.h>
@ -100,7 +100,8 @@ namespace experimental {
/// application state synchronization must be managed by the application.
///
////////////////////////////////////////////////////////////////////////////////
class EventEngine : public std::enable_shared_from_this<EventEngine> {
class EventEngine : public std::enable_shared_from_this<EventEngine>,
public Extensible {
public:
/// A duration between two events.
///
@ -176,7 +177,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// allocations. gRPC allows applications to set memory constraints per
/// Channel or Server, and the implementation depends on all dynamic memory
/// allocation being handled by the quota system.
class Endpoint {
class Endpoint : public Extensible {
public:
/// Shuts down all connections and invokes all pending read or write
/// callbacks with an error status.
@ -255,45 +256,6 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// values are expected to remain valid for the life of the Endpoint.
virtual const ResolvedAddress& GetPeerAddress() const = 0;
virtual const ResolvedAddress& GetLocalAddress() const = 0;
/// A method which allows users to query whether an Endpoint implementation
/// supports a specified extension. The name of the extension is provided
/// as an input.
///
/// An extension could be any type with a unique string id. Each extension
/// may support additional capabilities and if the Endpoint implementation
/// supports the queried extension, it should return a valid pointer to the
/// extension type.
///
/// E.g., use case of an EventEngine::Endpoint supporting a custom
/// extension.
///
/// class CustomEndpointExtension {
/// public:
/// static constexpr std::string name = "my.namespace.extension_name";
/// void Process() { ... }
/// }
///
///
/// class CustomEndpoint :
/// public EventEngine::Endpoint, CustomEndpointExtension {
/// public:
/// void* QueryExtension(absl::string_view id) override {
/// if (id == CustomEndpointExtension::name) {
/// return static_cast<CustomEndpointExtension*>(this);
/// }
/// return nullptr;
/// }
/// ...
/// }
///
/// auto ext_ =
/// static_cast<CustomEndpointExtension*>(
/// endpoint->QueryExtension(CustomrEndpointExtension::name));
/// if (ext_ != nullptr) { ext_->Process(); }
///
///
virtual void* QueryExtension(absl::string_view /*id*/) { return nullptr; }
};
/// Called when a new connection is established.
@ -307,7 +269,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// Listens for incoming connection requests from gRPC clients and initiates
/// request processing once connections are established.
class Listener {
class Listener : public Extensible {
public:
/// Called when the listener has accepted a new client connection.
using AcceptCallback = absl::AnyInvocable<void(

@ -0,0 +1,68 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_EXTENSIBLE_H
#define GRPC_EVENT_ENGINE_EXTENSIBLE_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
namespace grpc_event_engine {
namespace experimental {
class Extensible {
public:
/// A method which allows users to query whether an implementation supports a
/// specified extension. The name of the extension is provided as an input.
///
/// An extension could be any type with a unique string id. Each extension may
/// support additional capabilities and if the implementation supports the
/// queried extension, it should return a valid pointer to the extension type.
///
/// E.g., use case of an EventEngine::Endpoint supporting a custom extension.
///
/// class CustomEndpointExtension {
/// public:
/// static std::string EndpointExtensionName() {
/// return "my.namespace.extension_name";
/// }
/// virtual void Process() = 0;
/// }
///
/// class CustomEndpoint :
/// public EventEngine::Endpoint, public CustomEndpointExtension {
/// public:
/// void* QueryExtension(absl::string_view id) override {
/// if (id == CustomEndpointExtension::EndpointExtensionName()) {
/// return static_cast<CustomEndpointExtension*>(this);
/// }
/// return nullptr;
/// }
/// void Process() override { ... }
/// ...
/// }
///
/// auto endpoint =
/// static_cast<CustomEndpointExtension*>(endpoint->QueryExtension(
/// CustomEndpointExtension::EndpointExtensionName()));
/// if (endpoint != nullptr) endpoint->Process();
///
virtual void* QueryExtension(absl::string_view /*id*/) { return nullptr; }
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_EXTENSIBLE_H

4
package.xml generated

@ -35,6 +35,7 @@
<file baseinstalldir="/" name="include/grpc/compression.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/endpoint_config.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/event_engine.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/extensible.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/internal/memory_allocator_impl.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/internal/slice_cast.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/memory_allocator.h" role="src" />
@ -1264,6 +1265,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/default_event_engine_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/default_event_engine_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/event_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/extensions/can_track_errors.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/extensions/supports_fd.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/grpc_polled_fd.h" role="src" />
@ -1312,6 +1315,7 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/query_extensions.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/ref_counted_dns_resolver_interface.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address_internal.h" role="src" />

@ -55,6 +55,23 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_extensions",
hdrs = [
"lib/event_engine/extensions/can_track_errors.h",
"lib/event_engine/extensions/supports_fd.h",
],
external_deps = [
"absl/status:statusor",
"absl/functional:any_invocable",
"absl/strings",
],
deps = [
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "event_engine_common",
srcs = [
@ -64,6 +81,7 @@ grpc_cc_library(
"lib/event_engine/slice_buffer.cc",
],
hdrs = [
"lib/event_engine/extensions/can_track_errors.h",
"lib/event_engine/handle_containers.h",
"lib/event_engine/resolved_address_internal.h",
"//:include/grpc/event_engine/slice.h",
@ -1742,6 +1760,8 @@ grpc_cc_library(
"absl/status:statusor",
],
deps = [
"event_engine_extensions",
"event_engine_query_extensions",
"//:event_engine_base_hdrs",
"//:gpr",
],
@ -2043,6 +2063,7 @@ grpc_cc_library(
],
deps = [
"event_engine_common",
"event_engine_extensions",
"event_engine_tcp_socket_utils",
"experiments",
"iomgr_port",

@ -0,0 +1,40 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_CAN_TRACK_ERRORS_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_CAN_TRACK_ERRORS_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
namespace grpc_event_engine {
namespace experimental {
class EndpointCanTrackErrorsExtension {
public:
virtual ~EndpointCanTrackErrorsExtension() = default;
static absl::string_view EndpointExtensionName() {
return "io.grpc.event_engine.extension.can_track_errors";
}
/// Returns if the Endpoint supports tracking events from errmsg queues on
/// posix systems.
virtual bool CanTrackErrors() = 0;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_CAN_TRACK_ERRORS_H

@ -0,0 +1,160 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_SUPPORTS_FD_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_SUPPORTS_FD_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
namespace grpc_event_engine {
namespace experimental {
class EndpointSupportsFdExtension {
public:
virtual ~EndpointSupportsFdExtension() = default;
static absl::string_view EndpointExtensionName() {
return "io.grpc.event_engine.extension.endpoint_supports_fd";
}
/// Returns the file descriptor associated with the posix endpoint.
virtual int GetWrappedFd() = 0;
/// Shutdown the endpoint. This function call should trigger execution of
/// any pending endpoint Read/Write callbacks with appropriate error
/// absl::Status. After this function call any subsequent endpoint
/// Read/Write operations until endpoint deletion should fail with an
/// appropriate absl::Status.
///
/// \a on_release_fd - If specified, the callback is invoked when the
/// endpoint is destroyed/deleted. The underlying file descriptor is
/// released instead of being closed. The callback will get the released
/// file descriptor as its argument if the release operation is successful.
/// Otherwise it would get an appropriate error status as its argument.
virtual void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) = 0;
};
class ListenerSupportsFdExtension {
public:
virtual ~ListenerSupportsFdExtension() = default;
static absl::string_view EndpointExtensionName() {
return "io.grpc.event_engine.extension.listener_supports_fd";
}
/// Called when a posix listener bind operation completes. A single bind
/// operation may trigger creation of multiple listener fds. This callback
/// should be invoked once on each newly created and bound fd. If the
/// corresponding bind operation fails for a particular fd, this callback
/// must be invoked with a absl::FailedPreConditionError status.
///
/// \a listener_fd - The listening socket fd that was bound to the specified
/// address.
using OnPosixBindNewFdCallback =
absl::AnyInvocable<void(absl::StatusOr<int> listener_fd)>;
/// Bind an address/port to this Listener.
///
/// It is expected that multiple addresses/ports can be bound to this
/// Listener before Listener::Start has been called. Returns either the
/// bound port or an appropriate error status.
/// \a addr - The address to listen for incoming connections.
/// \a on_bind_new_fd The callback is invoked once for each newly bound
/// listener fd that may be created by this Bind operation.
virtual absl::StatusOr<int> BindWithFd(
const EventEngine::ResolvedAddress& addr,
OnPosixBindNewFdCallback on_bind_new_fd) = 0;
/// Handle an externally accepted client connection. It must return an
/// appropriate error status in case of failure.
///
/// This may be invoked to process a new client connection accepted by an
/// external listening fd.
/// \a listener_fd - The external listening socket fd that accepted the new
/// client connection.
/// \a fd - The socket file descriptor representing the new client
/// connection.
/// \a pending_data - If specified, it holds any pending data that may have
/// already been read over the externally accepted client connection.
/// Otherwise, it is assumed that no data has been read over the new client
/// connection.
virtual absl::Status HandleExternalConnection(int listener_fd, int fd,
SliceBuffer* pending_data) = 0;
/// Shutdown/stop listening on all bind Fds.
virtual void ShutdownListeningFds() = 0;
};
class EventEngineSupportsFdExtension {
public:
virtual ~EventEngineSupportsFdExtension() = default;
static absl::string_view EndpointExtensionName() {
return "io.grpc.event_engine.extension.event_engine_supports_fd";
}
/// Creates a posix specific EventEngine::Endpoint from an fd which is already
/// assumed to be connected to a remote peer. \a fd - The connected socket
/// file descriptor. \a config - Additional configuration to applied to the
/// endpoint. \a memory_allocator - The endpoint may use the provided memory
/// allocator to track memory allocations.
virtual std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) = 0;
/// Called when the posix listener has accepted a new client connection.
/// \a listener_fd - The listening socket fd that accepted the new client
/// connection.
/// \a endpoint - The EventEngine endpoint to handle data exchange over the
/// new client connection.
/// \a is_external - A boolean indicating whether the new client connection
/// is accepted by an external listener_fd or by a listener_fd that is
/// managed by the EventEngine listener.
/// \a memory_allocator - The callback may use the provided memory
/// allocator to handle memory allocation operations.
/// \a pending_data - If specified, it holds any pending data that may have
/// already been read over the new client connection. Otherwise, it is
/// assumed that no data has been read over the new client connection.
using PosixAcceptCallback = absl::AnyInvocable<void(
int listener_fd, std::unique_ptr<EventEngine::Endpoint> endpoint,
bool is_external, MemoryAllocator memory_allocator,
SliceBuffer* pending_data)>;
/// Factory method to create a posix specific network listener / server with
/// fd support.
///
/// Once a \a Listener is created and started, the \a on_accept callback will
/// be called once asynchronously for each established connection. This method
/// may return a non-OK status immediately if an error was encountered in any
/// synchronous steps required to create the Listener. In this case,
/// \a on_shutdown will never be called.
///
/// If this method returns a Listener, then \a on_shutdown will be invoked
/// exactly once, when the Listener is shut down. The status passed to it will
/// indicate if there was a problem during shutdown.
///
/// The provided \a MemoryAllocatorFactory is used to create \a
/// MemoryAllocators for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreatePosixListener(
PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_EXTENSIONS_SUPPORTS_FD_H

@ -16,145 +16,34 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/extensions/can_track_errors.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
namespace grpc_event_engine {
namespace experimental {
/// This defines an interface that posix specific EventEngines endpoints
/// may implement to support additional file descriptor related functionality.
class PosixEndpointWithFdSupport : public EventEngine::Endpoint {
public:
/// Returns the file descriptor associated with the posix endpoint.
virtual int GetWrappedFd() = 0;
/// Returns if the Endpoint supports tracking events from errmsg queues on
/// posix systems.
virtual bool CanTrackErrors() = 0;
/// Shutdown the endpoint. This function call should trigger execution of
/// any pending endpoint Read/Write callbacks with appropriate error
/// absl::Status. After this function call any subsequent endpoint
/// Read/Write operations until endpoint deletion should fail with an
/// appropriate absl::Status.
///
/// \a on_release_fd - If specifed, the callback is invoked when the
/// endpoint is destroyed/deleted. The underlying file descriptor is
/// released instead of being closed. The callback will get the released
/// file descriptor as its argument if the release operation is successful.
/// Otherwise it would get an appropriate error status as its argument.
virtual void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) = 0;
};
class PosixEndpointWithFdSupport
: public ExtendedType<EventEngine::Endpoint, EndpointSupportsFdExtension,
EndpointCanTrackErrorsExtension> {};
/// Defines an interface that posix EventEngine listeners may implement to
/// support additional file descriptor related functionality.
class PosixListenerWithFdSupport : public EventEngine::Listener {
public:
/// Called when a posix listener bind operation completes. A single bind
/// operation may trigger creation of multiple listener fds. This callback
/// should be invoked once on each newly created and bound fd. If the
/// corresponding bind operation fails for a particular fd, this callback
/// must be invoked with a absl::FailedPreConditionError status.
///
/// \a listener_fd - The listening socket fd that was bound to the specified
/// address.
using OnPosixBindNewFdCallback =
absl::AnyInvocable<void(absl::StatusOr<int> listener_fd)>;
/// Bind an address/port to this Listener.
///
/// It is expected that multiple addresses/ports can be bound to this
/// Listener before Listener::Start has been called. Returns either the
/// bound port or an appropriate error status.
/// \a addr - The address to listen for incoming connections.
/// \a on_bind_new_fd The callback is invoked once for each newly bound
/// listener fd that may be created by this Bind operation.
virtual absl::StatusOr<int> BindWithFd(
const EventEngine::ResolvedAddress& addr,
OnPosixBindNewFdCallback on_bind_new_fd) = 0;
/// Handle an externally accepted client connection. It must return an
/// appropriate error status in case of failure.
///
/// This may be invoked to process a new client connection accepted by an
/// external listening fd.
/// \a listener_fd - The external listening socket fd that accepted the new
/// client connection.
/// \a fd - The socket file descriptor representing the new client
/// connection.
/// \a pending_data - If specified, it holds any pending data that may have
/// already been read over the externally accepted client connection.
/// Otherwise, it is assumed that no data has been read over the new client
/// connection.
virtual absl::Status HandleExternalConnection(int listener_fd, int fd,
SliceBuffer* pending_data) = 0;
/// Shutdown/stop listening on all bind Fds.
virtual void ShutdownListeningFds() = 0;
class PosixListenerWithFdSupport
: public ExtendedType<EventEngine::Listener, ListenerSupportsFdExtension> {
};
/// Defines an interface that posix EventEngines may implement to
/// support additional file descriptor related functionality.
class PosixEventEngineWithFdSupport : public EventEngine {
public:
/// Creates a posix specific EventEngine::Endpoint from an fd which is already
/// assumed to be connected to a remote peer. \a fd - The connected socket
/// file descriptor. \a config - Additional configuration to applied to the
/// endpoint. \a memory_allocator - The endpoint may use the provided memory
/// allocator to track memory allocations.
virtual std::unique_ptr<PosixEndpointWithFdSupport> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) = 0;
/// Called when the posix listener has accepted a new client connection.
/// \a listener_fd - The listening socket fd that accepted the new client
/// connection.
/// \a endpoint - The EventEngine endpoint to handle data exchange over the
/// new client connection.
/// \a is_external - A boolean indicating whether the new client connection
/// is accepted by an external listener_fd or by a listener_fd that is
/// managed by the EventEngine listener.
/// \a memory_allocator - The callback may use the provided memory
/// allocator to handle memory allocation operations.
/// \a pending_data - If specified, it holds any pending data that may have
/// already been read over the new client connection. Otherwise, it is
/// assumed that no data has been read over the new client connection.
using PosixAcceptCallback = absl::AnyInvocable<void(
int listener_fd, std::unique_ptr<EventEngine::Endpoint> endpoint,
bool is_external, MemoryAllocator memory_allocator,
SliceBuffer* pending_data)>;
/// Factory method to create a posix specific network listener / server with
/// fd support.
///
/// Once a \a Listener is created and started, the \a on_accept callback will
/// be called once asynchronously for each established connection. This method
/// may return a non-OK status immediately if an error was encountered in any
/// synchronous steps required to create the Listener. In this case,
/// \a on_shutdown will never be called.
///
/// If this method returns a Listener, then \a on_shutdown will be invoked
/// exactly once, when the Listener is shut down. The status passed to it will
/// indicate if there was a problem during shutdown.
///
/// The provided \a MemoryAllocatorFactory is used to create \a
/// MemoryAllocators for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
CreatePosixListener(
PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
};
class PosixEventEngineWithFdSupport
: public ExtendedType<EventEngine, EventEngineSupportsFdExtension> {};
} // namespace experimental
} // namespace grpc_event_engine

@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"

@ -657,7 +657,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
std::unique_ptr<PosixEndpointWithFdSupport>
std::unique_ptr<EventEngine::Endpoint>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
@ -701,7 +701,7 @@ PosixEventEngine::CreateListener(
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreatePosixListener(
PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,

@ -170,7 +170,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
~PosixEventEngine() override;
std::unique_ptr<PosixEndpointWithFdSupport> CreatePosixEndpointFromFd(
std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) override;
@ -181,8 +181,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
CreatePosixListener(
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> CreatePosixListener(
PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,

@ -45,25 +45,40 @@ struct QueryExtensionRecursion<Querying> {
// A helper class to derive from some set of base classes and export
// QueryExtension for them all.
// Endpoint implementations which need to support different extensions just need
// to derive from ExtendedEndpoint class.
template <typename... Exports>
class ExtendedEndpoint : public EventEngine::Endpoint, public Exports... {
// EventEngine Extensible object implementations which need to support different
// extensions just need to derive from this class.
template <typename EEClass, typename... Exports>
class ExtendedType : public EEClass, public Exports... {
public:
void* QueryExtension(absl::string_view id) override {
return endpoint_detail::QueryExtensionRecursion<ExtendedEndpoint,
return endpoint_detail::QueryExtensionRecursion<ExtendedType,
Exports...>::Query(id,
this);
}
};
/// A helper method which returns a valid pointer if the extension is supported
/// by the endpoint.
/// A helper method which returns a valid pointer if the extension is
/// supported by the endpoint.
template <typename T>
T* QueryExtension(EventEngine::Endpoint* endpoint) {
if (endpoint == nullptr) return nullptr;
return static_cast<T*>(endpoint->QueryExtension(T::EndpointExtensionName()));
}
/// A helper method which returns a valid pointer if the extension is
/// supported by the listener.
template <typename T>
T* QueryExtension(EventEngine::Listener* listener) {
return static_cast<T*>(listener->QueryExtension(T::EndpointExtensionName()));
}
/// A helper method which returns a valid pointer if the extension is
/// supported by the EventEngine.
template <typename T>
T* QueryExtension(EventEngine* engine) {
return static_cast<T*>(engine->QueryExtension(T::EndpointExtensionName()));
}
} // namespace experimental
} // namespace grpc_event_engine

@ -22,12 +22,6 @@
namespace grpc_event_engine {
namespace experimental {
#if defined(GRPC_POSIX_SOCKET_TCP) && !defined(GRPC_CFSTREAM)
bool g_event_engine_supports_fd = true;
#else
bool g_event_engine_supports_fd = false;
#endif
bool UseEventEngineClient() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become
// available for other platforms.
@ -53,14 +47,5 @@ bool UseEventEngineListener() {
#endif
}
bool EventEngineSupportsFd() {
#if defined(GRPC_POSIX_SOCKET_TCP) && !defined(GRPC_CFSTREAM) && \
!defined(GRPC_DO_NOT_INSTANTIATE_POSIX_POLLER)
return g_event_engine_supports_fd;
#else
return false;
#endif
}
} // namespace experimental
} // namespace grpc_event_engine

@ -25,8 +25,6 @@ bool UseEventEngineClient();
bool UseEventEngineListener();
bool EventEngineSupportsFd();
} // namespace experimental
} // namespace grpc_event_engine

@ -31,7 +31,9 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/extensions/can_track_errors.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
@ -217,9 +219,10 @@ class EventEngineEndpointWrapper {
void ShutdownUnref() {
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
auto* supports_fd =
QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
supports_fd->Shutdown(std::move(on_release_fd_));
}
OnShutdownInternal();
}
@ -231,7 +234,9 @@ class EventEngineEndpointWrapper {
// invocation would simply return.
void TriggerShutdown(
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
if (EventEngineSupportsFd()) {
auto* supports_fd =
QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
if (supports_fd != nullptr) {
on_release_fd_ = std::move(on_release_fd);
}
int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
@ -245,9 +250,8 @@ class EventEngineEndpointWrapper {
Ref();
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
supports_fd->Shutdown(std::move(on_release_fd_));
}
OnShutdownInternal();
}
@ -257,9 +261,10 @@ class EventEngineEndpointWrapper {
}
bool CanTrackErrors() {
if (EventEngineSupportsFd()) {
return reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->CanTrackErrors();
auto* can_track_errors =
QueryExtension<EndpointCanTrackErrorsExtension>(endpoint_.get());
if (can_track_errors != nullptr) {
return can_track_errors->CanTrackErrors();
} else {
return false;
}
@ -413,9 +418,10 @@ EventEngineEndpointWrapper::EventEngineEndpointWrapper(
eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
eeep_->wrapper = this;
if (EventEngineSupportsFd()) {
fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->GetWrappedFd();
auto* supports_fd =
QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
if (supports_fd != nullptr) {
fd_ = supports_fd->GetWrappedFd();
} else {
fd_ = -1;
}

@ -58,6 +58,8 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/memory_allocator_factory.h"
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
@ -95,7 +97,20 @@ static grpc_error_handle CreateEventEngineListener(
grpc_tcp_server* s, grpc_closure* shutdown_complete,
const EndpointConfig& config, grpc_tcp_server** server) {
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener;
if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
auto* engine = reinterpret_cast<EventEngine*>(
config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
// Keeps the engine alive for some tests that have not otherwise
// instantiated an EventEngine
std::shared_ptr<EventEngine> keeper;
if (engine == nullptr) {
keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
engine = keeper.get();
}
auto* event_engine_supports_fd =
grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::EventEngineSupportsFdExtension>(
engine);
if (event_engine_supports_fd != nullptr) {
PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
[s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
bool is_external, MemoryAllocator /*allocator*/,
@ -120,9 +135,10 @@ static grpc_error_handle CreateEventEngineListener(
memset(&addr, 0, sizeof(addr));
addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
// Get the fd of the socket connected to peer.
int fd = reinterpret_cast<grpc_event_engine::experimental::
PosixEndpointWithFdSupport*>(ep.get())
->GetWrappedFd();
int fd =
reinterpret_cast<
grpc_event_engine::experimental::PosixEndpoint*>(ep.get())
->GetWrappedFd();
if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
&(addr.len)) < 0) {
gpr_log(GPR_ERROR, "Failed getpeername: %s",
@ -162,18 +178,7 @@ static grpc_error_handle CreateEventEngineListener(
grpc_event_engine_endpoint_create(std::move(ep)),
read_notifier_pollset, acceptor);
};
PosixEventEngineWithFdSupport* engine_ptr =
reinterpret_cast<PosixEventEngineWithFdSupport*>(
config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
// Keeps the engine alive for some tests that have not otherwise
// instantiated an EventEngine
std::shared_ptr<EventEngine> keeper;
if (engine_ptr == nullptr) {
keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
engine_ptr =
reinterpret_cast<PosixEventEngineWithFdSupport*>(keeper.get());
}
listener = engine_ptr->CreatePosixListener(
listener = event_engine_supports_fd->CreatePosixListener(
std::move(accept_cb),
[s, shutdown_complete](absl::Status status) {
grpc_event_engine::experimental::RunEventEngineClosure(
@ -194,10 +199,9 @@ static grpc_error_handle CreateEventEngineListener(
grpc_event_engine_endpoint_create(std::move(ep)),
nullptr, nullptr);
};
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine();
listener = ee->CreateListener(
listener = engine->CreateListener(
std::move(accept_cb),
[s, ee, shutdown_complete](absl::Status status) {
[s, ee = keeper, shutdown_complete](absl::Status status) {
GPR_ASSERT(gpr_atm_no_barrier_load(&s->refs.count) == 0);
grpc_event_engine::experimental::RunEventEngineClosure(
shutdown_complete, absl_status_to_grpc_error(status));
@ -250,7 +254,7 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
s->head = nullptr;
s->tail = nullptr;
s->nports = 0;
s->options = TcpOptionsFromEndpointConfig(config);
s->options = ::TcpOptionsFromEndpointConfig(config);
s->fd_handler = nullptr;
GPR_ASSERT(s->options.resource_quota != nullptr);
GPR_ASSERT(s->on_accept_cb);
@ -618,22 +622,21 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
}
int fd_index = 0;
absl::StatusOr<int> port;
if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
port =
static_cast<
grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
s->ee_listener.get())
->BindWithFd(
grpc_event_engine::experimental::CreateResolvedAddress(*addr),
[s, &fd_index](absl::StatusOr<int> listen_fd) {
if (!listen_fd.ok()) {
return;
}
GPR_DEBUG_ASSERT(*listen_fd > 0);
s->listen_fd_to_index_map.insert_or_assign(
*listen_fd,
std::make_tuple(s->n_bind_ports, fd_index++));
});
auto* listener_supports_fd =
grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::ListenerSupportsFdExtension>(
s->ee_listener.get());
if (listener_supports_fd != nullptr) {
port = listener_supports_fd->BindWithFd(
grpc_event_engine::experimental::CreateResolvedAddress(*addr),
[s, &fd_index](absl::StatusOr<int> listen_fd) {
if (!listen_fd.ok()) {
return;
}
GPR_DEBUG_ASSERT(*listen_fd > 0);
s->listen_fd_to_index_map.insert_or_assign(
*listen_fd, std::make_tuple(s->n_bind_ports, fd_index++));
});
} else {
port = s->ee_listener->Bind(
grpc_event_engine::experimental::CreateResolvedAddress(*addr));
@ -837,10 +840,12 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
s->shutdown_listeners = true;
if (grpc_event_engine::experimental::UseEventEngineListener()) {
if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
static_cast<grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
s->ee_listener.get())
->ShutdownListeningFds();
auto* listener_supports_fd =
grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::ListenerSupportsFdExtension>(
s->ee_listener.get());
if (listener_supports_fd != nullptr) {
listener_supports_fd->ShutdownListeningFds();
}
}
/* shutdown all fd's */
@ -872,19 +877,21 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
// TODO(yangg) resolve duplicate code with on_read
void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
if (grpc_event_engine::experimental::UseEventEngineListener()) {
GPR_ASSERT(grpc_event_engine::experimental::EventEngineSupportsFd());
auto* listener_supports_fd =
grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::ListenerSupportsFdExtension>(
s_->ee_listener.get());
GPR_ASSERT(listener_supports_fd != nullptr);
grpc_event_engine::experimental::SliceBuffer pending_data;
if (buf != nullptr) {
pending_data =
grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
buf->data.raw.slice_buffer);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"listener_handle_external_connection",
static_cast<
grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
s_->ee_listener.get())
->HandleExternalConnection(listener_fd, fd, &pending_data)));
GPR_ASSERT(
GRPC_LOG_IF_ERROR("listener_handle_external_connection",
listener_supports_fd->HandleExternalConnection(
listener_fd, fd, &pending_data)));
return;
}
grpc_pollset* read_notifier_pollset;

@ -50,12 +50,6 @@
using ::grpc_event_engine::experimental::FuzzingEventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
namespace grpc_event_engine {
namespace experimental {
extern bool g_event_engine_supports_fd;
}
} // namespace grpc_event_engine
bool squelch = true;
static void dont_log(gpr_log_func_args* /*args*/) {}
@ -74,7 +68,6 @@ DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
grpc_core::CoreEnd2endTestRegistry::Get().AllTests();
static const auto tests = []() {
grpc_core::g_is_fuzzing_core_e2e_tests = true;
grpc_event_engine::experimental::g_event_engine_supports_fd = false;
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);

@ -45,17 +45,10 @@
#include "src/core/lib/surface/channel.h"
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
namespace grpc_event_engine {
namespace experimental {
extern bool g_event_engine_supports_fd;
}
} // namespace grpc_event_engine
namespace grpc_core {
namespace {
int force_experiments = []() {
grpc_event_engine::experimental::g_event_engine_supports_fd = false;
ForceEnableExperiment("event_engine_client", true);
ForceEnableExperiment("event_engine_listener", true);
return 1;

@ -47,8 +47,8 @@ class TestExtension {
};
class ExtendedTestEndpoint
: public ExtendedEndpoint<TestExtension<0>, TestExtension<1>,
TestExtension<2>> {
: public ExtendedType<EventEngine::Endpoint, TestExtension<0>,
TestExtension<1>, TestExtension<2>> {
public:
ExtendedTestEndpoint() = default;
~ExtendedTestEndpoint() override = default;

@ -31,12 +31,6 @@
#include "test/core/util/fuzz_config_vars.h"
#include "test/core/util/proto_bit_gen.h"
namespace grpc_event_engine {
namespace experimental {
extern bool g_event_engine_supports_fd;
}
} // namespace grpc_event_engine
bool squelch = true;
static void dont_log(gpr_log_func_args* /*args*/) {}

@ -882,6 +882,7 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \

@ -882,6 +882,7 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \
@ -2281,6 +2282,8 @@ src/core/lib/event_engine/default_event_engine.h \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/extensions/can_track_errors.h \
src/core/lib/event_engine/extensions/supports_fd.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/grpc_polled_fd.h \
@ -2329,6 +2332,7 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \
src/core/lib/event_engine/query_extensions.h \
src/core/lib/event_engine/ref_counted_dns_resolver_interface.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/resolved_address_internal.h \

@ -815,6 +815,7 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \

@ -815,6 +815,7 @@ include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/endpoint_config.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/extensible.h \
include/grpc/event_engine/internal/memory_allocator_impl.h \
include/grpc/event_engine/internal/slice_cast.h \
include/grpc/event_engine/memory_allocator.h \
@ -2057,6 +2058,8 @@ src/core/lib/event_engine/default_event_engine.h \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/extensions/can_track_errors.h \
src/core/lib/event_engine/extensions/supports_fd.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/grpc_polled_fd.h \
@ -2105,6 +2108,7 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \
src/core/lib/event_engine/query_extensions.h \
src/core/lib/event_engine/ref_counted_dns_resolver_interface.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/resolved_address_internal.h \

Loading…
Cancel
Save