Forking the posix epoll1 poller for iomgr event engine (#30135)

* start

* Forking the posix epoll1 poller for event engine

* re-generate projects

* fix test

* fix sanity checks

* fix

* update fix

* more build issue fixes

* even more fixes

* add no_windows tag

* re-generate projects

* update comment

* cleanup

* Automated change: Fix sanity tests

* review comments

* fix tsan issue

* re-generate projects

* cleanup

* fix missing build dep

* fix mac build issue

* minor fix in test

* simplifying build graph

* re-generate projects

* fix macOS build issues

* review comments

* re-generate projects

* add missing generated file

* review comments

* fix sanity checks

* rename one more build target

* fix sanity checks

* retry fix sanity checks

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/30302/head
Vignesh Babu 2 years ago committed by GitHub
parent b37996d50e
commit fbe051fb51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 179
      BUILD
  2. 125
      CMakeLists.txt
  3. 60
      build_autogenerated.yaml
  4. 589
      src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc
  5. 95
      src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h
  6. 105
      src/core/lib/event_engine/iomgr_engine/event_poller.h
  7. 29
      src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc
  8. 33
      src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h
  9. 74
      src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h
  10. 267
      src/core/lib/event_engine/iomgr_engine/lockfree_event.cc
  11. 73
      src/core/lib/event_engine/iomgr_engine/lockfree_event.h
  12. 125
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc
  13. 45
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h
  14. 152
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc
  15. 45
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h
  16. 76
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h
  17. 67
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc
  18. 37
      src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h
  19. 55
      test/core/event_engine/iomgr_event_engine/BUILD
  20. 499
      test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc
  21. 153
      test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc
  22. 61
      test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc
  23. 68
      tools/run_tests/generated/tests.json

179
BUILD

@ -2345,6 +2345,185 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "iomgr_ee_event_poller",
srcs = [],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/event_poller.h",
],
external_deps = [
"absl/status",
"absl/strings",
],
tags = ["grpc-autodeps"],
deps = [
"event_engine_base_hdrs",
"gpr_platform",
"iomgr_ee_closure",
"time",
],
)
grpc_cc_library(
name = "iomgr_ee_closure",
srcs = [],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h",
],
external_deps = [
"absl/status",
"absl/utility",
],
tags = ["grpc-autodeps"],
deps = [
"event_engine_base_hdrs",
"gpr_platform",
],
)
grpc_cc_library(
name = "iomgr_ee_lockfree_event",
srcs = [
"src/core/lib/event_engine/iomgr_engine/lockfree_event.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/lockfree_event.h",
],
external_deps = ["absl/status"],
tags = ["grpc-autodeps"],
deps = [
"gpr_base",
"iomgr_ee_closure",
"iomgr_ee_event_poller",
"status_helper",
],
)
grpc_cc_library(
name = "iomgr_ee_wakeup_fd_posix",
hdrs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h",
],
external_deps = ["absl/status"],
tags = ["grpc-autodeps"],
deps = ["gpr_platform"],
)
grpc_cc_library(
name = "iomgr_ee_wakeup_fd_posix_pipe",
srcs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h",
],
external_deps = [
"absl/memory",
"absl/status",
"absl/status:statusor",
"absl/strings",
],
tags = ["grpc-autodeps"],
deps = [
"gpr_base",
"gpr_platform",
"iomgr_ee_wakeup_fd_posix",
"iomgr_port",
],
)
grpc_cc_library(
name = "iomgr_ee_wakeup_fd_posix_eventfd",
srcs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h",
],
external_deps = [
"absl/memory",
"absl/status",
"absl/status:statusor",
"absl/strings",
],
tags = ["grpc-autodeps"],
deps = [
"gpr_base",
"gpr_platform",
"iomgr_ee_wakeup_fd_posix",
"iomgr_port",
],
)
grpc_cc_library(
name = "iomgr_ee_wakeup_fd_posix_default",
srcs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h",
],
external_deps = [
"absl/status",
"absl/status:statusor",
],
tags = ["grpc-autodeps"],
deps = [
"gpr_platform",
"iomgr_ee_wakeup_fd_posix",
"iomgr_ee_wakeup_fd_posix_eventfd",
"iomgr_ee_wakeup_fd_posix_pipe",
"iomgr_port",
],
)
grpc_cc_library(
name = "iomgr_ee_poller_posix_epoll1",
srcs = [
"src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h",
],
external_deps = [
"absl/base:core_headers",
"absl/memory",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
],
tags = ["grpc-autodeps"],
deps = [
"gpr_base",
"gpr_codegen",
"gpr_platform",
"iomgr_ee_closure",
"iomgr_ee_event_poller",
"iomgr_ee_lockfree_event",
"iomgr_ee_wakeup_fd_posix",
"iomgr_ee_wakeup_fd_posix_default",
"iomgr_port",
"time",
],
)
grpc_cc_library(
name = "iomgr_ee_poller_posix_default",
srcs = [
"src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc",
],
hdrs = [
"src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h",
],
tags = ["grpc-autodeps"],
deps = [
"gpr_platform",
"iomgr_ee_event_poller",
"iomgr_ee_poller_posix_epoll1",
],
)
grpc_cc_library(
name = "iomgr_event_engine",
srcs = ["src/core/lib/event_engine/iomgr_engine/iomgr_engine.cc"],

125
CMakeLists.txt generated

@ -953,6 +953,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx error_test)
add_dependencies(buildtests_cxx error_utils_test)
add_dependencies(buildtests_cxx evaluate_args_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx event_poller_posix_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx examine_stack_test)
endif()
@ -1040,6 +1043,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx latch_test)
add_dependencies(buildtests_cxx lb_get_cpu_stats_test)
add_dependencies(buildtests_cxx lb_load_data_store_test)
add_dependencies(buildtests_cxx lock_free_event_test)
add_dependencies(buildtests_cxx log_test)
add_dependencies(buildtests_cxx loop_test)
add_dependencies(buildtests_cxx match_test)
@ -1205,6 +1209,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx uri_parser_test)
add_dependencies(buildtests_cxx useful_test)
add_dependencies(buildtests_cxx varint_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx wakeup_fd_posix_test)
endif()
add_dependencies(buildtests_cxx window_overflow_bad_client_test)
add_dependencies(buildtests_cxx wire_reader_test)
add_dependencies(buildtests_cxx wire_writer_test)
@ -9222,6 +9229,49 @@ target_link_libraries(evaluate_args_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(event_poller_posix_test
src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc
src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc
src/core/lib/event_engine/iomgr_engine/lockfree_event.cc
src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc
test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(event_poller_posix_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(event_poller_posix_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -12434,6 +12484,42 @@ target_link_libraries(lb_load_data_store_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(lock_free_event_test
src/core/lib/event_engine/iomgr_engine/lockfree_event.cc
test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(lock_free_event_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(lock_free_event_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -18233,6 +18319,45 @@ target_link_libraries(varint_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(wakeup_fd_posix_test
src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc
test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(wakeup_fd_posix_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(wakeup_fd_posix_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -5153,6 +5153,34 @@ targets:
- test/core/security/evaluate_args_test.cc
deps:
- grpc_test_util
- name: event_poller_posix_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/iomgr_engine/event_poller.h
- src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h
- src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h
- src/core/lib/event_engine/iomgr_engine/lockfree_event.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h
src:
- src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc
- src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc
- src/core/lib/event_engine/iomgr_engine/lockfree_event.cc
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc
- test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: examine_stack_test
gtest: true
build: test
@ -6487,6 +6515,20 @@ targets:
deps:
- grpc++
- grpc_test_util
- name: lock_free_event_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/iomgr_engine/event_poller.h
- src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h
- src/core/lib/event_engine/iomgr_engine/lockfree_event.h
src:
- src/core/lib/event_engine/iomgr_engine/lockfree_event.cc
- test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: log_test
gtest: true
build: test
@ -9083,6 +9125,24 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: wakeup_fd_posix_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h
src:
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc
- test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: window_overflow_bad_client_test
gtest: true
build: test

@ -0,0 +1,589 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h"
#include <stdint.h>
#include <algorithm>
#include <memory>
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/port.h"
// This polling engine is only relevant on linux kernels supporting epoll
// epoll_create() or epoll_create1()
#ifdef GRPC_LINUX_EPOLL
#include <errno.h>
#include <limits.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include "absl/synchronization/mutex.h"
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/time.h"
using ::grpc_event_engine::iomgr_engine::LockfreeEvent;
using ::grpc_event_engine::iomgr_engine::WakeupFd;
#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
namespace grpc_event_engine {
namespace iomgr_engine {
namespace {
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
struct ForkFdList {
Epoll1EventHandle* handle;
Epoll1EventHandle* next;
Epoll1EventHandle* prev;
};
} // namespace
class Epoll1EventHandle : public EventHandle {
public:
Epoll1EventHandle(int fd, Epoll1Poller* poller)
: fd_(fd),
pending_actions_(0),
list_(),
poller_(poller),
read_closure_(absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
write_closure_(
absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
error_closure_(
absl::make_unique<LockfreeEvent>(poller->GetScheduler())) {
read_closure_->InitEvent();
write_closure_->InitEvent();
error_closure_->InitEvent();
pending_actions_ = 0;
}
Epoll1Poller* Poller() { return poller_; }
void SetPendingActions(bool pending_read, bool pending_write,
bool pending_error) {
pending_actions_ |= pending_read;
if (pending_write) {
pending_actions_ |= (1 << 2);
}
if (pending_error) {
pending_actions_ |= (1 << 3);
}
}
int WrappedFd() override { return fd_; }
void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(IomgrEngineClosure* on_read) override;
void NotifyOnWrite(IomgrEngineClosure* on_write) override;
void NotifyOnError(IomgrEngineClosure* on_error) override;
void SetReadable() override;
void SetWritable() override;
void SetHasError() override;
bool IsHandleShutdown() override;
void ExecutePendingActions() override {
if (pending_actions_ & 1UL) {
read_closure_->SetReady();
}
if ((pending_actions_ >> 2) & 1UL) {
write_closure_->SetReady();
}
if ((pending_actions_ >> 3) & 1UL) {
error_closure_->SetReady();
}
pending_actions_ = 0;
}
absl::Mutex* mu() { return &mu_; }
LockfreeEvent* ReadClosure() { return read_closure_.get(); }
LockfreeEvent* WriteClosure() { return write_closure_.get(); }
LockfreeEvent* ErrorClosure() { return error_closure_.get(); }
ForkFdList& ForkFdListPos() { return list_; }
private:
void HandleShutdownInternal(absl::Status why, bool releasing_fd);
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
absl::Mutex mu_;
int fd_;
int pending_actions_;
ForkFdList list_;
Epoll1Poller* poller_;
std::unique_ptr<LockfreeEvent> read_closure_;
std::unique_ptr<LockfreeEvent> write_closure_;
std::unique_ptr<LockfreeEvent> error_closure_;
};
namespace {
bool kEpoll1PollerSupported = false;
gpr_once g_init_epoll1_poller = GPR_ONCE_INIT;
int EpollCreateAndCloexec() {
#ifdef GRPC_LINUX_EPOLL_CREATE1
int fd = epoll_create1(EPOLL_CLOEXEC);
if (fd < 0) {
gpr_log(GPR_ERROR, "epoll_create1 unavailable");
}
#else
int fd = epoll_create(MAX_EPOLL_EVENTS);
if (fd < 0) {
gpr_log(GPR_ERROR, "epoll_create unavailable");
} else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
return -1;
}
#endif
return fd;
}
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
std::list<Epoll1Poller*> fork_poller_list;
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
Epoll1EventHandle* fork_fd_list_head = nullptr;
gpr_mu fork_fd_list_mu;
void ForkFdListAddHandle(Epoll1EventHandle* handle) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
handle->ForkFdListPos().next = fork_fd_list_head;
handle->ForkFdListPos().prev = nullptr;
if (fork_fd_list_head != nullptr) {
fork_fd_list_head->ForkFdListPos().prev = handle;
}
fork_fd_list_head = handle;
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkFdListRemoveHandle(Epoll1EventHandle* handle) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
if (fork_fd_list_head == handle) {
fork_fd_list_head = handle->ForkFdListPos().next;
}
if (handle->ForkFdListPos().prev != nullptr) {
handle->ForkFdListPos().prev->ForkFdListPos().next =
handle->ForkFdListPos().next;
}
if (handle->ForkFdListPos().next != nullptr) {
handle->ForkFdListPos().next->ForkFdListPos().prev =
handle->ForkFdListPos().prev;
}
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkPollerListAddPoller(Epoll1Poller* poller) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
fork_poller_list.push_back(poller);
gpr_mu_unlock(&fork_fd_list_mu);
}
}
void ForkPollerListRemovePoller(Epoll1Poller* poller) {
if (grpc_core::Fork::Enabled()) {
gpr_mu_lock(&fork_fd_list_mu);
fork_poller_list.remove(poller);
gpr_mu_unlock(&fork_fd_list_mu);
}
}
int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) {
if (millis == grpc_core::Timestamp::InfFuture()) return -1;
grpc_core::Timestamp now =
grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
int64_t delta = (millis - now).millis();
if (delta > INT_MAX) {
return INT_MAX;
} else if (delta < 0) {
return 0;
} else {
return static_cast<int>(delta);
}
}
void InitEpoll1PollerLinux();
// Called by the child process's post-fork handler to close open fds,
// including the global epoll fd of each poller. This allows gRPC to shutdown in
// the child process without interfering with connections or RPCs ongoing in the
// parent.
void ResetEventManagerOnFork() {
// Delete all pending Epoll1EventHandles.
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
close(fork_fd_list_head->WrappedFd());
Epoll1EventHandle* next = fork_fd_list_head->ForkFdListPos().next;
delete fork_fd_list_head;
fork_fd_list_head = next;
}
// Delete all registered pollers. This also closes all open epoll_sets
while (!fork_poller_list.empty()) {
Epoll1Poller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
InitEpoll1PollerLinux();
}
// It is possible that GLIBC has epoll but the underlying kernel doesn't.
// Create epoll_fd to make sure epoll support is available
void InitEpoll1PollerLinux() {
if (!grpc_event_engine::iomgr_engine::SupportsWakeupFd()) {
kEpoll1PollerSupported = false;
return;
}
int fd = EpollCreateAndCloexec();
if (fd <= 0) {
kEpoll1PollerSupported = false;
return;
}
kEpoll1PollerSupported = true;
if (grpc_core::Fork::Enabled()) {
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork);
}
close(fd);
}
} // namespace
void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done,
int* release_fd,
absl::string_view reason) {
bool is_release_fd = (release_fd != nullptr);
if (!read_closure_->IsShutdown()) {
HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason),
is_release_fd);
}
// If release_fd is not NULL, we should be relinquishing control of the file
// descriptor fd->fd (but we still own the grpc_fd structure).
if (is_release_fd) {
*release_fd = fd_;
} else {
close(fd_);
}
ForkFdListRemoveHandle(this);
{
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required here.
absl::MutexLock lock(&mu_);
read_closure_->DestroyEvent();
write_closure_->DestroyEvent();
error_closure_->DestroyEvent();
}
{
absl::MutexLock lock(&poller_->mu_);
poller_->free_epoll1_handles_list_.push_back(this);
}
if (on_done != nullptr) {
on_done->SetStatus(absl::OkStatus());
poller_->GetScheduler()->Run(on_done);
}
}
// if 'releasing_fd' is true, it means that we are going to detach the internal
// fd from grpc_fd structure (i.e which means we should not be calling
// shutdown() syscall on that fd)
void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
bool releasing_fd) {
if (read_closure_->SetShutdown(why)) {
if (!releasing_fd) {
shutdown(fd_, SHUT_RDWR);
} else {
epoll_event phony_event;
if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
&phony_event) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
}
write_closure_->SetShutdown(why);
write_closure_->SetShutdown(why);
}
}
Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
: scheduler_(scheduler), was_kicked_(false) {
g_epoll_set_.epfd = EpollCreateAndCloexec();
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
GPR_ASSERT(g_epoll_set_.epfd >= 0);
gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set_.epfd);
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
ev.data.ptr = wakeup_fd_.get();
GPR_ASSERT(epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, wakeup_fd_->ReadFd(),
&ev) == 0);
g_epoll_set_.num_events = 0;
g_epoll_set_.cursor = 0;
ForkPollerListAddPoller(this);
}
void Epoll1Poller::Shutdown() {
ForkPollerListRemovePoller(this);
delete this;
}
Epoll1Poller::~Epoll1Poller() {
if (g_epoll_set_.epfd >= 0) {
close(g_epoll_set_.epfd);
g_epoll_set_.epfd = -1;
}
{
absl::MutexLock lock(&mu_);
while (!free_epoll1_handles_list_.empty()) {
Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>(
free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
delete handle;
}
}
}
EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) {
Epoll1EventHandle* new_handle = nullptr;
{
absl::MutexLock lock(&mu_);
if (free_epoll1_handles_list_.empty()) {
new_handle = new Epoll1EventHandle(fd, this);
} else {
new_handle = reinterpret_cast<Epoll1EventHandle*>(
free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
}
}
ForkFdListAddHandle(new_handle);
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
// Use the least significant bit of ev.data.ptr to store track_err. We expect
// the addresses to be word aligned. We need to store track_err to avoid
// synchronization issues when accessing it after receiving an event.
// Accessing fd would be a data race there because the fd might have been
// returned to the free list at that point.
ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_handle) |
(track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
return new_handle;
}
// Process the epoll events found by DoEpollWait() function.
// - g_epoll_set.cursor points to the index of the first event to be processed
// - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
// updates the g_epoll_set.cursor
absl::Status Epoll1Poller::ProcessEpollEvents(
int max_epoll_events_to_handle, std::vector<EventHandle*>& pending_events) {
int64_t num_events = g_epoll_set_.num_events;
int64_t cursor = g_epoll_set_.cursor;
bool was_kicked = false;
for (int idx = 0; (idx < max_epoll_events_to_handle) && cursor != num_events;
idx++) {
int64_t c = cursor++;
struct epoll_event* ev = &g_epoll_set_.events[c];
void* data_ptr = ev->data.ptr;
if (data_ptr == wakeup_fd_.get()) {
GPR_ASSERT(wakeup_fd_->ConsumeWakeup().ok());
was_kicked = true;
} else {
Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>(
reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
bool track_err =
reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
bool cancel = (ev->events & EPOLLHUP) != 0;
bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
bool err_fallback = error && !track_err;
handle->SetPendingActions(read_ev || cancel || err_fallback,
write_ev || cancel || err_fallback,
error && !err_fallback);
pending_events.push_back(handle);
}
}
g_epoll_set_.cursor = cursor;
return was_kicked ? absl::Status(absl::StatusCode::kInternal, "Kicked")
: absl::OkStatus();
}
// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details.
absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp deadline) {
int r;
int timeout = PollDeadlineToMillisTimeout(deadline);
do {
r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS,
timeout);
} while (r < 0 && errno == EINTR);
if (r < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("epoll_wait: ", strerror(errno)));
}
g_epoll_set_.num_events = r;
g_epoll_set_.cursor = 0;
return absl::OkStatus();
}
// Might be called multiple times
void Epoll1EventHandle::ShutdownHandle(absl::Status why) {
// A mutex is required here because, the SetShutdown method of the
// lockfree event may schedule a closure if it is already ready and that
// closure may call OrphanHandle. Execution of ShutdownHandle and OrphanHandle
// in parallel is not safe because some of the lockfree event types e.g, read,
// write, error may-not have called SetShutdown when DestroyEvent gets
// called in the OrphanHandle method.
absl::MutexLock lock(&mu_);
HandleShutdownInternal(why, false);
}
bool Epoll1EventHandle::IsHandleShutdown() {
return read_closure_->IsShutdown();
}
void Epoll1EventHandle::NotifyOnRead(IomgrEngineClosure* on_read) {
read_closure_->NotifyOn(on_read);
}
void Epoll1EventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) {
write_closure_->NotifyOn(on_write);
}
void Epoll1EventHandle::NotifyOnError(IomgrEngineClosure* on_error) {
error_closure_->NotifyOn(on_error);
}
void Epoll1EventHandle::SetReadable() { read_closure_->SetReady(); }
void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); }
void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); }
absl::Status Epoll1Poller::Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) {
if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
auto status = DoEpollWait(deadline);
if (!status.ok()) {
return status;
}
}
{
absl::MutexLock lock(&mu_);
// If was_kicked_ is true, collect all pending events in this iteration.
auto status = ProcessEpollEvents(
was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
pending_events);
if (!status.ok()) {
was_kicked_ = false;
}
return status;
}
}
void Epoll1Poller::Kick() {
absl::MutexLock lock(&mu_);
if (was_kicked_) {
return;
}
was_kicked_ = true;
GPR_ASSERT(wakeup_fd_->Wakeup().ok());
}
Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler) {
gpr_once_init(&g_init_epoll1_poller, []() { InitEpoll1PollerLinux(); });
if (kEpoll1PollerSupported) {
return new Epoll1Poller(scheduler);
}
return nullptr;
}
} // namespace iomgr_engine
} // namespace grpc_event_engine
#else /* defined(GRPC_LINUX_EPOLL) */
#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
namespace grpc_event_engine {
namespace iomgr_engine {
Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) {
GPR_ASSERT(false && "unimplemented");
}
void Epoll1Poller::Shutdown() { GPR_ASSERT(false && "unimplemented"); }
Epoll1Poller::~Epoll1Poller() { GPR_ASSERT(false && "unimplemented"); }
EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
bool /*track_err*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::ProcessEpollEvents(
int /*max_epoll_events_to_handle*/,
std::vector<EventHandle*>& /*pending_events*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp /*deadline*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::Work(grpc_core::Timestamp /*deadline*/,
std::vector<EventHandle*>& /*pending_events*/) {
GPR_ASSERT(false && "unimplemented");
}
void Epoll1Poller::Kick() { GPR_ASSERT(false && "unimplemented"); }
// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
// nullptr.
Epoll1Poller* GetEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
#endif /* !defined(GRPC_LINUX_EPOLL) */

@ -0,0 +1,95 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H
#include <grpc/support/port_platform.h>
#include <list>
#include <memory>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_EPOLL
#include <sys/epoll.h>
#endif
#define MAX_EPOLL_EVENTS 100
namespace grpc_event_engine {
namespace iomgr_engine {
class Epoll1EventHandle;
// Definition of epoll1 based poller.
class Epoll1Poller : public EventPoller {
public:
explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) override;
absl::Status Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) override;
void Kick() override;
Scheduler* GetScheduler() { return scheduler_; }
void Shutdown() override;
~Epoll1Poller() override;
private:
absl::Status ProcessEpollEvents(int max_epoll_events_to_handle,
std::vector<EventHandle*>& pending_events);
absl::Status DoEpollWait(grpc_core::Timestamp deadline);
friend class Epoll1EventHandle;
#ifdef GRPC_LINUX_EPOLL
struct EpollSet {
int epfd;
// The epoll_events after the last call to epoll_wait()
struct epoll_event events[MAX_EPOLL_EVENTS];
// The number of epoll_events after the last call to epoll_wait()
int num_events;
// Index of the first event in epoll_events that has to be processed. This
// field is only valid if num_events > 0
int cursor;
};
#else
struct EpollSet {};
#endif
absl::Mutex mu_;
Scheduler* scheduler_;
// A singleton epoll set
EpollSet g_epoll_set_;
bool was_kicked_ ABSL_GUARDED_BY(mu_);
std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<WakeupFd> wakeup_fd_;
};
// Return an instance of a epoll1 based poller tied to the specified event
// engine.
Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler);
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H

@ -0,0 +1,105 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H
#include <grpc/support/port_platform.h>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class Scheduler {
public:
virtual void Run(experimental::EventEngine::Closure* closure) = 0;
virtual ~Scheduler() = default;
};
class EventHandle {
public:
virtual int WrappedFd() = 0;
// Delete the handle and optionally close the underlying file descriptor if
// release_fd != nullptr. The on_done closure is scheduled to be invoked
// after the operation is complete. After this operation, NotifyXXX and SetXXX
// operations cannot be performed on the handle.
virtual void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
absl::string_view reason) = 0;
// Shutdown a handle. After this operation, NotifyXXX and SetXXX operations
// cannot be performed.
virtual void ShutdownHandle(absl::Status why) = 0;
// Schedule on_read to be invoked when the underlying file descriptor
// becomes readable.
virtual void NotifyOnRead(IomgrEngineClosure* on_read) = 0;
// Schedule on_write to be invoked when the underlying file descriptor
// becomes writable.
virtual void NotifyOnWrite(IomgrEngineClosure* on_write) = 0;
// Schedule on_error to be invoked when the underlying file descriptor
// encounters errors.
virtual void NotifyOnError(IomgrEngineClosure* on_error) = 0;
// Force set a readable event on the underlying file descriptor.
virtual void SetReadable() = 0;
// Force set a writable event on the underlying file descriptor.
virtual void SetWritable() = 0;
// Force set a error event on the underlying file descriptor.
virtual void SetHasError() = 0;
// Returns true if the handle has been shutdown.
virtual bool IsHandleShutdown() = 0;
// Execute any pending actions that may have been set to a handle after the
// last invocation of Work(...) function.
virtual void ExecutePendingActions() = 0;
virtual ~EventHandle() = default;
};
class EventPoller {
public:
// Return an opaque handle to perform actions on the provided file descriptor.
virtual EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) = 0;
// Shuts down and deletes the poller. It is legal to call this function
// only when no other poller method is in progress. For instance, it is
// not safe to call this method, while a thread is blocked on Work(...).
// A graceful way to terminate the poller could be to:
// 1. First orphan all created handles.
// 2. Send a Kick() to the thread executing Work(...) and wait for the
// thread to return.
// 3. Call Shutdown() on the poller.
virtual void Shutdown() = 0;
// Poll all the underlying file descriptors for the specified period
// and return a vector containing a list of handles which have pending
// events. The calling thread should invoke ExecutePendingActions on each
// returned handle to take the necessary pending actions. Only one thread
// may invoke the Work function at any given point in time. The Work(...)
// method returns an absl Non-OK status if it was Kicked.
virtual absl::Status Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) = 0;
// Trigger the thread executing Work(..) to break out as soon as possible.
// This function is useful in tests. It may also be used to break a thread
// out of Work(...) before calling Shutdown() on the poller.
virtual void Kick() = 0;
virtual ~EventPoller() = default;
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H

@ -0,0 +1,29 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h"
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
namespace grpc_event_engine {
namespace iomgr_engine {
EventPoller* GetDefaultPoller(Scheduler* scheduler) {
EventPoller* poller = GetEpoll1Poller(scheduler);
return poller;
}
} // namespace iomgr_engine
} // namespace grpc_event_engine

@ -0,0 +1,33 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace iomgr_engine {
class EventPoller;
class Scheduler;
// Return an instance of an event poller which is tied to the specified
// scheduler.
EventPoller* GetDefaultPoller(Scheduler* scheduler);
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H

@ -0,0 +1,74 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H
#include <grpc/support/port_platform.h>
#include <functional>
#include <utility>
#include "absl/status/status.h"
#include "absl/utility/utility.h"
#include <grpc/event_engine/event_engine.h>
namespace grpc_event_engine {
namespace iomgr_engine {
// The callbacks for Endpoint read and write take an absl::Status as
// argument - this is important for the tcp code to function correctly. We need
// a custom closure type because the default EventEngine::Closure type doesn't
// provide a way to pass a status when the callback is run.
class IomgrEngineClosure final
: public grpc_event_engine::experimental::EventEngine::Closure {
public:
IomgrEngineClosure() = default;
IomgrEngineClosure(std::function<void(absl::Status)>&& cb, bool is_permanent)
: cb_(std::move(cb)),
is_permanent_(is_permanent),
status_(absl::OkStatus()) {}
~IomgrEngineClosure() final = default;
void SetStatus(absl::Status status) { status_ = status; }
void Run() override {
cb_(absl::exchange(status_, absl::OkStatus()));
if (!is_permanent_) {
delete this;
}
}
// This closure clean doesn't itself up after execution. It is expected to be
// cleaned up by the caller at the appropriate time.
static IomgrEngineClosure* ToPermanentClosure(
std::function<void(absl::Status)>&& cb) {
return new IomgrEngineClosure(std::move(cb), true);
}
// This closure clean's itself up after execution. It is expected to be
// used only in tests.
static IomgrEngineClosure* TestOnlyToClosure(
std::function<void(absl::Status)>&& cb) {
return new IomgrEngineClosure(std::move(cb), false);
}
private:
std::function<void(absl::Status)> cb_;
bool is_permanent_ = false;
absl::Status status_;
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H

@ -0,0 +1,267 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h"
#include <stdlib.h>
#include <atomic>
#include <cstdint>
#include "absl/status/status.h"
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
#include "src/core/lib/gprpp/status_helper.h"
// 'state' holds the to call when the fd is readable or writable respectively.
// It can contain one of the following values:
// kClosureReady : The fd has an I/O event of interest but there is no
// closure yet to execute
// kClosureNotReady : The fd has no I/O event of interest
// closure ptr : The closure to be executed when the fd has an I/O
// event of interest
// shutdown_error | kShutdownBit :
// 'shutdown_error' field ORed with kShutdownBit.
// This indicates that the fd is shutdown. Since all
// memory allocations are word-aligned, the lower two
// bits of the shutdown_error pointer are always 0. So
// it is safe to OR these with kShutdownBit
// Valid state transitions:
// <closure ptr> <-----3------ kClosureNotReady -----1-------> kClosureReady
// | | ^ | ^ | |
// | | | | | | |
// | +--------------4----------+ 6 +---------2---------------+ |
// | | |
// | v |
// +-----5-------> [shutdown_error | kShutdownBit] <-------7---------+
// For 1, 4 : See SetReady() function
// For 2, 3 : See NotifyOn() function
// For 5,6,7: See SetShutdown() function
namespace grpc_event_engine {
namespace iomgr_engine {
void LockfreeEvent::InitEvent() {
// Perform an atomic store to start the state machine.
// Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed
// state, while a file descriptor is on a freelist. In such a state it may
// be SetReady'd, and so we need to perform an atomic operation here to
// ensure no races
state_.store(kClosureNotReady, std::memory_order_relaxed);
}
void LockfreeEvent::DestroyEvent() {
intptr_t curr;
do {
curr = state_.load(std::memory_order_relaxed);
if (curr & kShutdownBit) {
grpc_core::internal::StatusFreeHeapPtr(curr & ~kShutdownBit);
} else {
GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady);
}
// we CAS in a shutdown, no error value here. If this event is interacted
// with post-deletion (see the note in the constructor) we want the bit
// pattern to prevent error retention in a deleted object
} while (!state_.compare_exchange_strong(curr, kShutdownBit,
std::memory_order_relaxed,
std::memory_order_relaxed));
}
void LockfreeEvent::NotifyOn(IomgrEngineClosure* closure) {
// This load needs to be an acquire load because this can be a shutdown
// error that we might need to reference. Adding acquire semantics makes
// sure that the shutdown error has been initialized properly before us
// referencing it. The load() needs to be performed only once before entry
// into the loop. This is because if any of the compare_exchange_strong
// operations inside the loop return false, they automatically update curr
// with the new value. So it doesn't need to be loaded again.
intptr_t curr = state_.load(std::memory_order_acquire);
while (true) {
switch (curr) {
case kClosureNotReady: {
// kClosureNotReady -> <closure>.
// We're guaranteed by API that there's an acquire barrier before here,
// so there's no need to double-dip and this can be a release-only.
// The release itself pairs with the acquire half of a set_ready full
// barrier.
if (state_.compare_exchange_strong(
curr, reinterpret_cast<intptr_t>(closure),
std::memory_order_release, std::memory_order_relaxed)) {
return; // Successful. Return
}
break; // retry
}
case kClosureReady: {
// Change the state to kClosureNotReady. Schedule the closure if
// successful. If not, the state most likely transitioned to shutdown.
// We should retry.
// This can be a no-barrier cas since the state is being transitioned to
// kClosureNotReady; set_ready and set_shutdown do not schedule any
// closure when transitioning out of CLOSURE_NO_READY state (i.e there
// is no other code that needs to 'happen-after' this)
if (state_.compare_exchange_strong(curr, kClosureNotReady,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
scheduler_->Run(closure);
return; // Successful. Return.
}
break; // retry
}
default: {
// 'curr' is either a closure or the fd is shutdown(in which case 'curr'
// contains a pointer to the shutdown-error). If the fd is shutdown,
// schedule the closure with the shutdown error
if ((curr & kShutdownBit) > 0) {
absl::Status shutdown_err =
grpc_core::internal::StatusGetFromHeapPtr(curr & ~kShutdownBit);
closure->SetStatus(shutdown_err);
scheduler_->Run(closure);
return;
}
// There is already a closure!. This indicates a bug in the code.
gpr_log(GPR_ERROR,
"LockfreeEvent::NotifyOn: notify_on called with a previous "
"callback still pending");
abort();
}
}
}
GPR_UNREACHABLE_CODE(return );
}
bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) {
intptr_t status_ptr = grpc_core::internal::StatusAllocHeapPtr(shutdown_error);
gpr_atm new_state = status_ptr | kShutdownBit;
// The load() needs to be performed only once before entry
// into the loop. This is because if any of the compare_exchange_strong
// operations inside the loop return false, they automatically update curr
// with the new value. So it doesn't need to be loaded again.
intptr_t curr = state_.load(std::memory_order_acquire);
while (true) {
switch (curr) {
case kClosureReady:
case kClosureNotReady:
// Need a full barrier here so that the initial load in notify_on
// doesn't need a barrier
if (state_.compare_exchange_strong(curr, new_state,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return true; // early out
}
break; // retry
default: {
// 'curr' is either a closure or the fd is already shutdown
// If fd is already shutdown, we are done.
if ((curr & kShutdownBit) > 0) {
grpc_core::internal::StatusFreeHeapPtr(status_ptr);
return false;
}
// Fd is not shutdown. Schedule the closure and move the state to
// shutdown state.
// Needs an acquire to pair with setting the closure (and get a
// happens-after on that edge), and a release to pair with anything
// loading the shutdown state.
if (state_.compare_exchange_strong(curr, new_state,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
auto closure = reinterpret_cast<IomgrEngineClosure*>(curr);
closure->SetStatus(shutdown_error);
scheduler_->Run(closure);
return true;
}
// 'curr' was a closure but now changed to a different state. We will
// have to retry
break;
}
}
}
GPR_UNREACHABLE_CODE(return false);
}
void LockfreeEvent::SetReady() {
// The load() needs to be performed only once before entry
// into the loop. This is because if any of the compare_exchange_strong
// operations inside the loop return false, they automatically update curr
// with the new value. So it doesn't need to be loaded again.
intptr_t curr = state_.load(std::memory_order_acquire);
while (true) {
switch (curr) {
case kClosureReady: {
// Already ready. We are done here.
return;
}
case kClosureNotReady: {
// No barrier required as we're transitioning to a state that does not
// involve a closure
if (state_.compare_exchange_strong(curr, kClosureReady,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
return; // early out
}
break; // retry
}
default: {
// 'curr' is either a closure or the fd is shutdown
if ((curr & kShutdownBit) > 0) {
// The fd is shutdown. Do nothing.
return;
} else if (state_.compare_exchange_strong(curr, kClosureNotReady,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// Full cas: acquire pairs with this cas' release in the event of a
// spurious set_ready; release pairs with this or the acquire in
// notify_on (or set_shutdown)
auto closure = reinterpret_cast<IomgrEngineClosure*>(curr);
closure->SetStatus(absl::OkStatus());
scheduler_->Run(closure);
return;
}
// else the state changed again (only possible by either a racing
// set_ready or set_shutdown functions. In both these cases, the
// closure would have been scheduled for execution. So we are done
// here
return;
}
}
}
}
} // namespace iomgr_engine
} // namespace grpc_event_engine

@ -0,0 +1,73 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include <cstdint>
#include "absl/status/status.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class Scheduler;
class LockfreeEvent {
public:
explicit LockfreeEvent(Scheduler* scheduler) : scheduler_(scheduler) {}
LockfreeEvent(const LockfreeEvent&) = delete;
LockfreeEvent& operator=(const LockfreeEvent&) = delete;
// These methods are used to initialize and destroy the internal state. These
// cannot be done in constructor and destructor because SetReady may be called
// when the event is destroyed and put in a freelist.
void InitEvent();
void DestroyEvent();
// Returns true if fd has been shutdown, false otherwise.
bool IsShutdown() const {
return (state_.load(std::memory_order_relaxed) & kShutdownBit) != 0;
}
// Schedules \a closure when the event is received (see SetReady()) or the
// shutdown state has been set. Note that the event may have already been
// received, in which case the closure would be scheduled immediately.
// If the shutdown state has already been set, then \a closure is scheduled
// with the shutdown error.
void NotifyOn(IomgrEngineClosure* closure);
// Sets the shutdown state. If a closure had been provided by NotifyOn and has
// not yet been scheduled, it will be scheduled with \a shutdown_error.
bool SetShutdown(absl::Status shutdown_error);
// Signals that the event has been received.
void SetReady();
private:
enum State { kClosureNotReady = 0, kClosureReady = 2, kShutdownBit = 1 };
std::atomic<intptr_t> state_;
Scheduler* scheduler_;
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H

@ -0,0 +1,125 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <utility>
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h> // IWYU pragma: keep
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_EVENTFD
#include <errno.h>
#include <string.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#endif
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h"
namespace grpc_event_engine {
namespace iomgr_engine {
#ifdef GRPC_LINUX_EVENTFD
absl::Status EventFdWakeupFd::Init() {
int read_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
int write_fd = -1;
if (read_fd < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("eventfd: ", strerror(errno)));
}
SetWakeupFds(read_fd, write_fd);
return absl::OkStatus();
}
absl::Status EventFdWakeupFd::ConsumeWakeup() {
eventfd_t value;
int err;
do {
err = eventfd_read(ReadFd(), &value);
} while (err < 0 && errno == EINTR);
if (err < 0 && errno != EAGAIN) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("eventfd_read: ", strerror(errno)));
}
return absl::OkStatus();
}
absl::Status EventFdWakeupFd::Wakeup() {
int err;
do {
err = eventfd_write(ReadFd(), 1);
} while (err < 0 && errno == EINTR);
if (err < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("eventfd_write: ", strerror(errno)));
}
return absl::OkStatus();
}
EventFdWakeupFd::~EventFdWakeupFd() {
if (ReadFd() != 0) {
close(ReadFd());
}
}
bool EventFdWakeupFd::IsSupported() {
EventFdWakeupFd event_fd_wakeup_fd;
return event_fd_wakeup_fd.Init().ok();
}
absl::StatusOr<std::unique_ptr<WakeupFd>>
EventFdWakeupFd::CreateEventFdWakeupFd() {
static bool kIsEventFdWakeupFdSupported = EventFdWakeupFd::IsSupported();
if (kIsEventFdWakeupFdSupported) {
auto event_fd_wakeup_fd = absl::make_unique<EventFdWakeupFd>();
auto status = event_fd_wakeup_fd->Init();
if (status.ok()) {
return std::unique_ptr<WakeupFd>(std::move(event_fd_wakeup_fd));
}
return status;
}
return absl::NotFoundError("Eventfd wakeup fd is not supported");
}
#else // GRPC_LINUX_EVENTFD
absl::Status EventFdWakeupFd::Init() { GPR_ASSERT(false && "unimplemented"); }
absl::Status EventFdWakeupFd::ConsumeWakeup() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status EventFdWakeupFd::Wakeup() { GPR_ASSERT(false && "unimplemented"); }
bool EventFdWakeupFd::IsSupported() { return false; }
absl::StatusOr<std::unique_ptr<WakeupFd>>
EventFdWakeupFd::CreateEventFdWakeupFd() {
return absl::NotFoundError("Eventfd wakeup fd is not supported");
}
#endif // GRPC_LINUX_EVENTFD
} // namespace iomgr_engine
} // namespace grpc_event_engine

@ -0,0 +1,45 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class EventFdWakeupFd : public WakeupFd {
public:
EventFdWakeupFd() : WakeupFd() {}
~EventFdWakeupFd() override;
absl::Status ConsumeWakeup() override;
absl::Status Wakeup() override;
static absl::StatusOr<std::unique_ptr<WakeupFd>> CreateEventFdWakeupFd();
static bool IsSupported();
private:
absl::Status Init();
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H

@ -0,0 +1,152 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include <utility>
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h> // IWYU pragma: keep
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_WAKEUP_FD
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#endif
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h"
namespace grpc_event_engine {
namespace iomgr_engine {
#ifdef GRPC_POSIX_WAKEUP_FD
namespace {
absl::Status SetSocketNonBlocking(int fd) {
int oldflags = fcntl(fd, F_GETFL, 0);
if (oldflags < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
oldflags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, oldflags) != 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
return absl::OkStatus();
}
} // namespace
absl::Status PipeWakeupFd::Init() {
int pipefd[2];
int r = pipe(pipefd);
if (0 != r) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("pipe: ", strerror(errno)));
}
auto status = SetSocketNonBlocking(pipefd[0]);
if (!status.ok()) return status;
status = SetSocketNonBlocking(pipefd[1]);
if (!status.ok()) return status;
SetWakeupFds(pipefd[0], pipefd[1]);
return absl::OkStatus();
}
absl::Status PipeWakeupFd::ConsumeWakeup() {
char buf[128];
ssize_t r;
for (;;) {
r = read(ReadFd(), buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return absl::OkStatus();
switch (errno) {
case EAGAIN:
return absl::OkStatus();
case EINTR:
continue;
default:
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("read: ", strerror(errno)));
}
}
}
absl::Status PipeWakeupFd::Wakeup() {
char c = 0;
while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) {
}
return absl::OkStatus();
}
PipeWakeupFd::~PipeWakeupFd() {
if (ReadFd() != 0) {
close(ReadFd());
}
if (WriteFd() != 0) {
close(WriteFd());
}
}
bool PipeWakeupFd::IsSupported() {
PipeWakeupFd pipe_wakeup_fd;
return pipe_wakeup_fd.Init().ok();
}
absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported();
if (kIsPipeWakeupFdSupported) {
auto pipe_wakeup_fd = absl::make_unique<PipeWakeupFd>();
auto status = pipe_wakeup_fd->Init();
if (status.ok()) {
return std::unique_ptr<WakeupFd>(std::move(pipe_wakeup_fd));
}
return status;
}
return absl::NotFoundError("Pipe wakeup fd is not supported");
}
#else // GRPC_POSIX_WAKEUP_FD
absl::Status PipeWakeupFd::Init() { GPR_ASSERT(false && "unimplemented"); }
absl::Status PipeWakeupFd::ConsumeWakeup() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PipeWakeupFd::Wakeup() { GPR_ASSERT(false && "unimplemented"); }
bool PipeWakeupFd::IsSupported() { return false; }
absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
return absl::NotFoundError("Pipe wakeup fd is not supported");
}
#endif // GRPC_POSIX_WAKEUP_FD
} // namespace iomgr_engine
} // namespace grpc_event_engine

@ -0,0 +1,45 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class PipeWakeupFd : public WakeupFd {
public:
PipeWakeupFd() : WakeupFd() {}
~PipeWakeupFd() override;
absl::Status ConsumeWakeup() override;
absl::Status Wakeup() override;
static absl::StatusOr<std::unique_ptr<WakeupFd>> CreatePipeWakeupFd();
static bool IsSupported();
private:
absl::Status Init();
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H

@ -0,0 +1,76 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
* WakeupFd abstracts the concept of a file descriptor for the purpose of
* waking up a thread in select()/poll()/epoll_wait()/etc.
* The poll() family of system calls provide a way for a thread to block until
* there is activity on one (or more) of a set of file descriptors. An
* application may wish to wake up this thread to do non file related work. The
* typical way to do this is to add a pipe to the set of file descriptors, then
* write to the pipe to wake up the thread in poll().
*
* Linux has a lighter weight eventfd specifically designed for this purpose.
* WakeupFd abstracts the difference between the two.
*
* Setup:
* 1. Call CreateWakeupFd() to crete an initialized WakeupFd.
* 2. Add the result of WakeupFd::ReadFd() to the set of monitored file
* descriptors for the poll() style API you are using. Monitor the file
* descriptor for readability.
* 3. To tear down, call WakeupFd::Destroy(). This closes the underlying
* file descriptor.
*
* Usage:
* 1. To wake up a polling thread, call WakeupFd::Wakeup() on a wakeup_fd
* it is monitoring.
* 2. If the polling thread was awakened by a WakeupFd event, call
* WakeupFd::Consume() on it.
*/
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H
#include <grpc/support/port_platform.h>
#include "absl/status/status.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class WakeupFd {
public:
virtual absl::Status ConsumeWakeup() = 0;
virtual absl::Status Wakeup() = 0;
virtual ~WakeupFd() = default;
int ReadFd() { return read_fd_; }
int WriteFd() { return write_fd_; }
protected:
WakeupFd() : read_fd_(0), write_fd_(0) {}
void SetWakeupFds(int read_fd, int write_fd) {
read_fd_ = read_fd;
write_fd_ = write_fd;
}
private:
int read_fd_;
int write_fd_;
};
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H

@ -0,0 +1,67 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/port.h"
namespace grpc_event_engine {
namespace iomgr_engine {
#ifdef GRPC_POSIX_WAKEUP_FD
absl::StatusOr<std::unique_ptr<WakeupFd>> NotSupported() {
return absl::NotFoundError("Wakeup-fd is not supported on this system");
}
namespace {
absl::StatusOr<std::unique_ptr<WakeupFd>> (*g_wakeup_fd_fn)() =
[]() -> absl::StatusOr<std::unique_ptr<WakeupFd>> (*)() {
#ifndef GRPC_POSIX_NO_SPECIAL_WAKEUP_FD
if (EventFdWakeupFd::IsSupported()) {
return &EventFdWakeupFd::CreateEventFdWakeupFd;
}
#endif // GRPC_POSIX_NO_SPECIAL_WAKEUP_FD
if (PipeWakeupFd::IsSupported()) {
return &PipeWakeupFd::CreatePipeWakeupFd;
}
return NotSupported;
}();
} // namespace
bool SupportsWakeupFd() { return g_wakeup_fd_fn != NotSupported; }
absl::StatusOr<std::unique_ptr<WakeupFd>> CreateWakeupFd() {
return g_wakeup_fd_fn();
}
#else /* GRPC_POSIX_WAKEUP_FD */
bool SupportsWakeupFd() { return false; }
absl::StatusOr<std::unique_ptr<WakeupFd>> CreateWakeupFd() {
return absl::NotFoundError("Wakeup-fd is not supported on this system");
}
#endif /* GRPC_POSIX_WAKEUP_FD */
} // namespace iomgr_engine
} // namespace grpc_event_engine

@ -0,0 +1,37 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H
#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/statusor.h"
namespace grpc_event_engine {
namespace iomgr_engine {
class WakeupFd;
// Returns true if wakeup-fd is supported by the system.
bool SupportsWakeupFd();
// Create and return an initialized WakeupFd instance if supported.
absl::StatusOr<std::unique_ptr<WakeupFd>> CreateWakeupFd();
} // namespace iomgr_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H

@ -47,3 +47,58 @@ grpc_cc_test(
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "event_poller_posix_test",
srcs = ["event_poller_posix_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = True,
uses_polling = True,
deps = [
"//:iomgr_ee_closure",
"//:iomgr_ee_event_poller",
"//:iomgr_ee_poller_posix_default",
"//:iomgr_event_engine",
"//test/core/util:grpc_suppressions",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "lock_free_event_test",
srcs = ["lock_free_event_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = True,
uses_polling = False,
deps = [
"//:iomgr_ee_closure",
"//:iomgr_ee_event_poller",
"//:iomgr_ee_lockfree_event",
"//:iomgr_event_engine",
"//test/core/util:grpc_suppressions",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "wakeup_fd_posix_test",
srcs = ["wakeup_fd_posix_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = False,
uses_polling = True,
deps = [
"//:iomgr_ee_wakeup_fd_posix_eventfd",
"//:iomgr_ee_wakeup_fd_posix_pipe",
"//test/core/util:grpc_suppressions",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,499 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <ostream>
#include "src/core/lib/iomgr/port.h"
// This test won't work except with posix sockets enabled
#ifdef GRPC_POSIX_SOCKET_EV
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/status/status.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
#include "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "test/core/util/port.h"
using ::grpc_event_engine::iomgr_engine::EventPoller;
static gpr_mu g_mu;
static EventPoller* g_event_poller = nullptr;
// buffer size used to send and receive data.
// 1024 is the minimal value to set TCP send and receive buffer.
#define BUF_SIZE 1024
// Max number of connections pending to be accepted by listen().
#define MAX_NUM_FD 1024
// Client write buffer size
#define CLIENT_WRITE_BUF_SIZE 10
// Total number of times that the client fills up the write buffer
#define CLIENT_TOTAL_WRITE_CNT 3
namespace grpc_event_engine {
namespace iomgr_engine {
namespace {
class TestScheduler : public Scheduler {
public:
explicit TestScheduler(experimental::EventEngine* engine) : engine_(engine) {}
void Run(experimental::EventEngine::Closure* closure) override {
engine_->Run(closure);
}
private:
experimental::EventEngine* engine_;
};
// Create a test socket with the right properties for testing.
// port is the TCP port to listen or connect to.
// Return a socket FD and sockaddr_in.
void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
int fd;
int one = 1;
int buffer_size_bytes = BUF_SIZE;
int flags;
fd = socket(AF_INET6, SOCK_STREAM, 0);
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
// Reset the size of socket send buffer to the minimal value to facilitate
// buffer filling up and triggering notify_on_write
EXPECT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE);
EXPECT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE);
// Make fd non-blocking.
flags = fcntl(fd, F_GETFL, 0);
EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
*socket_fd = fd;
// Use local address for test.
memset(sin, 0, sizeof(struct sockaddr_in6));
sin->sin6_family = AF_INET6;
(reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
EXPECT_TRUE(port >= 0 && port < 65536);
sin->sin6_port = htons(static_cast<uint16_t>(port));
}
// =======An upload server to test notify_on_read===========
// The server simply reads and counts a stream of bytes.
// An upload server.
typedef struct {
EventHandle* em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
int done; /* set to 1 when a server finishes serving */
IomgrEngineClosure* listen_closure;
} server;
void ServerInit(server* sv) {
sv->read_bytes_total = 0;
sv->done = 0;
}
// An upload session.
// Created when a new upload request arrives in the server.
typedef struct {
server* sv; /* not owned by a single session */
EventHandle* em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
IomgrEngineClosure* session_read_closure;
} session;
// Called when an upload session can be safely shutdown.
// Close session FD and start to shutdown listen FD.
void SessionShutdownCb(session* se, bool /*success*/) {
server* sv = se->sv;
se->em_fd->OrphanHandle(nullptr, nullptr, "a");
gpr_free(se);
// Start to shutdown listen fd.
sv->em_fd->ShutdownHandle(
absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
}
// Called when data become readable in a session.
void SessionReadCb(session* se, absl::Status status) {
int fd = se->em_fd->WrappedFd();
ssize_t read_once = 0;
ssize_t read_total = 0;
if (!status.ok()) {
SessionShutdownCb(se, true);
return;
}
do {
read_once = read(fd, se->read_buf, BUF_SIZE);
if (read_once > 0) read_total += read_once;
} while (read_once > 0);
se->sv->read_bytes_total += read_total;
// read() returns 0 to indicate the TCP connection was closed by the
// client read(fd, read_buf, 0) also returns 0 which should never be called as
// such. It is possible to read nothing due to spurious edge event or data has
// been drained, In such a case, read() returns -1 and set errno to
// EAGAIN.
if (read_once == 0) {
SessionShutdownCb(se, true);
} else if (read_once == -1) {
EXPECT_EQ(errno, EAGAIN);
// An edge triggered event is cached in the kernel until next poll.
// In the current single thread implementation, SessionReadCb is called
// in the polling thread, such that polling only happens after this
// callback, and will catch read edge event if data is available again
// before notify_on_read.
se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
}
}
// Called when the listen FD can be safely shutdown. Close listen FD and
// signal that server can be shutdown.
void ListenShutdownCb(server* sv) {
sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
gpr_mu_lock(&g_mu);
sv->done = 1;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Called when a new TCP connection request arrives in the listening port.
void ListenCb(server* sv, absl::Status status) {
int fd;
int flags;
session* se;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
EventHandle* listen_em_fd = sv->em_fd;
if (!status.ok()) {
ListenShutdownCb(sv);
return;
}
do {
fd = accept(listen_em_fd->WrappedFd(),
reinterpret_cast<struct sockaddr*>(&ss), &slen);
} while (fd < 0 && errno == EINTR);
if (fd < 0 && errno == EAGAIN) {
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
return;
}
EXPECT_GE(fd, 0);
EXPECT_LT(fd, FD_SETSIZE);
flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
}
// Start a test server, return the TCP listening port bound to listen_fd.
// ListenCb() is registered to be interested in reading from listen_fd.
// When connection request arrives, ListenCb() is called to accept the
// connection request.
int ServerStart(server* sv) {
int port = grpc_pick_unused_port_or_die();
int fd;
struct sockaddr_in6 sin;
socklen_t addr_len;
CreateTestSocket(port, &fd, &sin);
addr_len = sizeof(sin);
EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
port = ntohs(sin.sin6_port);
EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
sv->em_fd->NotifyOnRead(sv->listen_closure);
return port;
}
// ===An upload client to test notify_on_write===
// An upload client.
typedef struct {
EventHandle* em_fd;
char write_buf[CLIENT_WRITE_BUF_SIZE];
ssize_t write_bytes_total;
// Number of times that the client fills up the write buffer and calls
// notify_on_write to schedule another write.
int client_write_cnt;
int done;
IomgrEngineClosure* write_closure;
} client;
void ClientInit(client* cl) {
memset(cl->write_buf, 0, sizeof(cl->write_buf));
cl->write_bytes_total = 0;
cl->client_write_cnt = 0;
cl->done = 0;
}
// Called when a client upload session is ready to shutdown.
void ClientSessionShutdownCb(client* cl) {
cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
gpr_mu_lock(&g_mu);
cl->done = 1;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Write as much as possible, then register notify_on_write.
void ClientSessionWrite(client* cl, absl::Status status) {
int fd = cl->em_fd->WrappedFd();
ssize_t write_once = 0;
if (!status.ok()) {
ClientSessionShutdownCb(cl);
return;
}
do {
write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
if (write_once > 0) cl->write_bytes_total += write_once;
} while (write_once > 0);
EXPECT_EQ(errno, EAGAIN);
gpr_mu_lock(&g_mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure = IomgrEngineClosure::TestOnlyToClosure(
[cl](absl::Status status) { ClientSessionWrite(cl, status); });
cl->client_write_cnt++;
gpr_mu_unlock(&g_mu);
cl->em_fd->NotifyOnWrite(cl->write_closure);
} else {
gpr_mu_unlock(&g_mu);
ClientSessionShutdownCb(cl);
}
}
// Start a client to send a stream of bytes.
void ClientStart(client* cl, int port) {
int fd;
struct sockaddr_in6 sin;
CreateTestSocket(port, &fd, &sin);
if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
-1) {
if (errno == EINPROGRESS) {
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLOUT;
pfd.revents = 0;
if (poll(&pfd, 1, -1) == -1) {
gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
abort();
}
} else {
gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
abort();
}
}
cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
ClientSessionWrite(cl, absl::OkStatus());
}
// Wait for the signal to shutdown client and server.
void WaitAndShutdown(server* sv, client* cl) {
std::vector<EventHandle*> pending_events;
gpr_mu_lock(&g_mu);
while (!sv->done || !cl->done) {
gpr_mu_unlock(&g_mu);
(void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(),
pending_events);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->ExecutePendingActions();
}
pending_events.clear();
gpr_mu_lock(&g_mu);
}
gpr_mu_unlock(&g_mu);
}
// Test grpc_fd. Start an upload server and client, upload a stream of bytes
// from the client to the server, and verify that the total number of sent bytes
// is equal to the total number of received bytes.
TEST(EventPollerTest, TestEventPollerHandle) {
server sv;
client cl;
int port;
ServerInit(&sv);
port = ServerStart(&sv);
ClientInit(&cl);
ClientStart(&cl, port);
WaitAndShutdown(&sv, &cl);
EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
}
typedef struct FdChangeData {
void (*cb_that_ran)(struct FdChangeData*, absl::Status);
} FdChangeData;
void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
void DestroyChangeData(FdChangeData* /*fdc*/) {}
void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
gpr_mu_lock(&g_mu);
fdc->cb_that_ran = FirstReadCallback;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
gpr_mu_lock(&g_mu);
fdc->cb_that_ran = SecondReadCallback;
g_event_poller->Kick();
gpr_mu_unlock(&g_mu);
}
// Test that changing the callback we use for notify_on_read actually works.
// Note that we have two different but almost identical callbacks above -- the
// point is to have two different function pointers and two different data
// pointers and make sure that changing both really works.
TEST(EventPollerTest, TestEventPollerHandleChange) {
EventHandle* em_fd;
FdChangeData a, b;
int flags;
int sv[2];
char data;
ssize_t result;
IomgrEngineClosure* first_closure = IomgrEngineClosure::TestOnlyToClosure(
[a = &a](absl::Status status) { FirstReadCallback(a, status); });
IomgrEngineClosure* second_closure = IomgrEngineClosure::TestOnlyToClosure(
[b = &b](absl::Status status) { SecondReadCallback(b, status); });
InitChangeData(&a);
InitChangeData(&b);
EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
flags = fcntl(sv[0], F_GETFL, 0);
EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
flags = fcntl(sv[1], F_GETFL, 0);
EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
em_fd =
g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
EXPECT_NE(em_fd, nullptr);
// Register the first callback, then make its FD readable
em_fd->NotifyOnRead(first_closure);
data = 0;
result = write(sv[1], &data, 1);
EXPECT_EQ(result, 1);
// And now wait for it to run.
auto poller_work = [](FdChangeData* fdc) {
std::vector<EventHandle*> pending_events;
gpr_mu_lock(&g_mu);
while (fdc->cb_that_ran == nullptr) {
gpr_mu_unlock(&g_mu);
(void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(),
pending_events);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->ExecutePendingActions();
}
pending_events.clear();
gpr_mu_lock(&g_mu);
}
};
poller_work(&a);
EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
gpr_mu_unlock(&g_mu);
// And drain the socket so we can generate a new read edge
result = read(sv[0], &data, 1);
EXPECT_EQ(result, 1);
// Now register a second callback with distinct change data, and do the same
// thing again.
em_fd->NotifyOnRead(second_closure);
data = 0;
result = write(sv[1], &data, 1);
EXPECT_EQ(result, 1);
// And now wait for it to run.
poller_work(&b);
// Except now we verify that SecondReadCallback ran instead.
EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
gpr_mu_unlock(&g_mu);
em_fd->OrphanHandle(nullptr, nullptr, "d");
DestroyChangeData(&a);
DestroyChangeData(&b);
close(sv[1]);
}
} // namespace
} // namespace iomgr_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
auto engine =
absl::make_unique<grpc_event_engine::experimental::IomgrEventEngine>();
EXPECT_NE(engine, nullptr);
grpc_event_engine::iomgr_engine::TestScheduler scheduler(engine.get());
g_event_poller =
grpc_event_engine::iomgr_engine::GetDefaultPoller(&scheduler);
if (g_event_poller == nullptr) {
// Poller is not supported on this system.
return 0;
}
int result = RUN_ALL_TESTS();
g_event_poller->Shutdown();
return result;
}
#else /* GRPC_POSIX_SOCKET_EV */
int main(int argc, char** argv) { return 1; }
#endif /* GRPC_POSIX_SOCKET_EV */

@ -0,0 +1,153 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdlib.h>
#include <string.h>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/event_engine/iomgr_engine/event_poller.h"
#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h"
#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h"
#include "src/core/lib/gprpp/sync.h"
using ::grpc_event_engine::iomgr_engine::Scheduler;
namespace {
class TestScheduler : public Scheduler {
public:
explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine)
: engine_(engine) {}
void Run(
grpc_event_engine::experimental::EventEngine::Closure* closure) override {
engine_->Run(closure);
}
private:
grpc_event_engine::experimental::EventEngine* engine_;
};
TestScheduler* g_scheduler;
} // namespace
namespace grpc_event_engine {
namespace iomgr_engine {
TEST(LockFreeEventTest, BasicTest) {
LockfreeEvent event(g_scheduler);
grpc_core::Mutex mu;
grpc_core::CondVar cv;
event.InitEvent();
grpc_core::MutexLock lock(&mu);
// Set NotifyOn first and then SetReady
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());
cv.Signal();
}));
event.SetReady();
EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
// SetReady first first and then call NotifyOn
event.SetReady();
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());
cv.Signal();
}));
EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
// Set NotifyOn and then call SetShutdown
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status, absl::CancelledError("Shutdown"));
cv.Signal();
}));
event.SetShutdown(absl::CancelledError("Shutdown"));
EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
event.DestroyEvent();
}
TEST(LockFreeEventTest, MultiThreadedTest) {
std::vector<std::thread> threads;
LockfreeEvent event(g_scheduler);
grpc_core::Mutex mu;
grpc_core::CondVar cv;
bool signalled = false;
int active = 0;
static constexpr int kNumOperations = 100;
threads.reserve(2);
event.InitEvent();
// Spin up two threads alternating between NotifyOn and SetReady
for (int i = 0; i < 2; i++) {
threads.emplace_back([&, thread_id = i]() {
for (int j = 0; j < kNumOperations; j++) {
grpc_core::MutexLock lock(&mu);
// Wait for both threads to process the previous operation before
// starting the next one.
while (signalled) {
cv.Wait(&mu);
}
active++;
if (thread_id == 0) {
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure(
[&mu, &cv, &signalled](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());
signalled = true;
cv.SignalAll();
}));
} else {
event.SetReady();
}
while (!signalled) {
cv.Wait(&mu);
}
// The last thread to finish the current operation sets signalled to
// false and wakes up the other thread if its blocked waiting to
// start the next operation.
if (--active == 0) {
signalled = false;
cv.Signal();
}
}
});
}
for (auto& t : threads) {
t.join();
}
event.SetShutdown(absl::OkStatus());
event.DestroyEvent();
}
} // namespace iomgr_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_event_engine::experimental::EventEngine* engine =
grpc_event_engine::experimental::GetDefaultEventEngine();
EXPECT_NE(engine, nullptr);
g_scheduler = new TestScheduler(engine);
return RUN_ALL_TESTS();
}

@ -0,0 +1,61 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h"
#include <stdlib.h>
#include <string.h>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h"
#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h"
namespace grpc_event_engine {
namespace iomgr_engine {
TEST(WakeupFdPosixTest, PipeWakeupFdTest) {
if (!PipeWakeupFd::IsSupported()) {
return;
}
auto pipe_wakeup_fd = PipeWakeupFd::CreatePipeWakeupFd();
EXPECT_TRUE(pipe_wakeup_fd.ok());
EXPECT_GE((*pipe_wakeup_fd)->ReadFd(), 0);
EXPECT_GE((*pipe_wakeup_fd)->WriteFd(), 0);
EXPECT_TRUE((*pipe_wakeup_fd)->Wakeup().ok());
EXPECT_TRUE((*pipe_wakeup_fd)->ConsumeWakeup().ok());
}
TEST(WakeupFdPosixTest, EventFdWakeupFdTest) {
if (!EventFdWakeupFd::IsSupported()) {
return;
}
auto eventfd_wakeup_fd = EventFdWakeupFd::CreateEventFdWakeupFd();
EXPECT_TRUE(eventfd_wakeup_fd.ok());
EXPECT_GE((*eventfd_wakeup_fd)->ReadFd(), 0);
EXPECT_EQ((*eventfd_wakeup_fd)->WriteFd(), -1);
EXPECT_TRUE((*eventfd_wakeup_fd)->Wakeup().ok());
EXPECT_TRUE((*eventfd_wakeup_fd)->ConsumeWakeup().ok());
}
} // namespace iomgr_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2935,6 +2935,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "event_poller_posix_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -4427,6 +4449,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "lock_free_event_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -7517,6 +7563,28 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "wakeup_fd_posix_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save