From fbe051fb51b25a04fc526849a087cba769f2651e Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Fri, 15 Jul 2022 00:20:49 +0000 Subject: [PATCH] Forking the posix epoll1 poller for iomgr event engine (#30135) * start * Forking the posix epoll1 poller for event engine * re-generate projects * fix test * fix sanity checks * fix * update fix * more build issue fixes * even more fixes * add no_windows tag * re-generate projects * update comment * cleanup * Automated change: Fix sanity tests * review comments * fix tsan issue * re-generate projects * cleanup * fix missing build dep * fix mac build issue * minor fix in test * simplifying build graph * re-generate projects * fix macOS build issues * review comments * re-generate projects * add missing generated file * review comments * fix sanity checks * rename one more build target * fix sanity checks * retry fix sanity checks Co-authored-by: Vignesh2208 --- BUILD | 179 ++++++ CMakeLists.txt | 125 ++++ build_autogenerated.yaml | 60 ++ .../iomgr_engine/ev_epoll1_linux.cc | 589 ++++++++++++++++++ .../iomgr_engine/ev_epoll1_linux.h | 95 +++ .../event_engine/iomgr_engine/event_poller.h | 105 ++++ .../event_poller_posix_default.cc | 29 + .../iomgr_engine/event_poller_posix_default.h | 33 + .../iomgr_engine/iomgr_engine_closure.h | 74 +++ .../iomgr_engine/lockfree_event.cc | 267 ++++++++ .../iomgr_engine/lockfree_event.h | 73 +++ .../iomgr_engine/wakeup_fd_eventfd.cc | 125 ++++ .../iomgr_engine/wakeup_fd_eventfd.h | 45 ++ .../iomgr_engine/wakeup_fd_pipe.cc | 152 +++++ .../iomgr_engine/wakeup_fd_pipe.h | 45 ++ .../iomgr_engine/wakeup_fd_posix.h | 76 +++ .../iomgr_engine/wakeup_fd_posix_default.cc | 67 ++ .../iomgr_engine/wakeup_fd_posix_default.h | 37 ++ .../event_engine/iomgr_event_engine/BUILD | 55 ++ .../event_poller_posix_test.cc | 499 +++++++++++++++ .../lock_free_event_test.cc | 153 +++++ .../wakeup_fd_posix_test.cc | 61 ++ tools/run_tests/generated/tests.json | 68 ++ 23 files changed, 3012 insertions(+) create mode 100644 src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h create mode 100644 src/core/lib/event_engine/iomgr_engine/event_poller.h create mode 100644 src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h create mode 100644 src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h create mode 100644 src/core/lib/event_engine/iomgr_engine/lockfree_event.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/lockfree_event.h create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc create mode 100644 src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h create mode 100644 test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc create mode 100644 test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc create mode 100644 test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc diff --git a/BUILD b/BUILD index fc47367685c..1ae96e1057b 100644 --- a/BUILD +++ b/BUILD @@ -2345,6 +2345,185 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "iomgr_ee_event_poller", + srcs = [], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/event_poller.h", + ], + external_deps = [ + "absl/status", + "absl/strings", + ], + tags = ["grpc-autodeps"], + deps = [ + "event_engine_base_hdrs", + "gpr_platform", + "iomgr_ee_closure", + "time", + ], +) + +grpc_cc_library( + name = "iomgr_ee_closure", + srcs = [], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h", + ], + external_deps = [ + "absl/status", + "absl/utility", + ], + tags = ["grpc-autodeps"], + deps = [ + "event_engine_base_hdrs", + "gpr_platform", + ], +) + +grpc_cc_library( + name = "iomgr_ee_lockfree_event", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/lockfree_event.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/lockfree_event.h", + ], + external_deps = ["absl/status"], + tags = ["grpc-autodeps"], + deps = [ + "gpr_base", + "iomgr_ee_closure", + "iomgr_ee_event_poller", + "status_helper", + ], +) + +grpc_cc_library( + name = "iomgr_ee_wakeup_fd_posix", + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h", + ], + external_deps = ["absl/status"], + tags = ["grpc-autodeps"], + deps = ["gpr_platform"], +) + +grpc_cc_library( + name = "iomgr_ee_wakeup_fd_posix_pipe", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h", + ], + external_deps = [ + "absl/memory", + "absl/status", + "absl/status:statusor", + "absl/strings", + ], + tags = ["grpc-autodeps"], + deps = [ + "gpr_base", + "gpr_platform", + "iomgr_ee_wakeup_fd_posix", + "iomgr_port", + ], +) + +grpc_cc_library( + name = "iomgr_ee_wakeup_fd_posix_eventfd", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h", + ], + external_deps = [ + "absl/memory", + "absl/status", + "absl/status:statusor", + "absl/strings", + ], + tags = ["grpc-autodeps"], + deps = [ + "gpr_base", + "gpr_platform", + "iomgr_ee_wakeup_fd_posix", + "iomgr_port", + ], +) + +grpc_cc_library( + name = "iomgr_ee_wakeup_fd_posix_default", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + ], + tags = ["grpc-autodeps"], + deps = [ + "gpr_platform", + "iomgr_ee_wakeup_fd_posix", + "iomgr_ee_wakeup_fd_posix_eventfd", + "iomgr_ee_wakeup_fd_posix_pipe", + "iomgr_port", + ], +) + +grpc_cc_library( + name = "iomgr_ee_poller_posix_epoll1", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/memory", + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/synchronization", + ], + tags = ["grpc-autodeps"], + deps = [ + "gpr_base", + "gpr_codegen", + "gpr_platform", + "iomgr_ee_closure", + "iomgr_ee_event_poller", + "iomgr_ee_lockfree_event", + "iomgr_ee_wakeup_fd_posix", + "iomgr_ee_wakeup_fd_posix_default", + "iomgr_port", + "time", + ], +) + +grpc_cc_library( + name = "iomgr_ee_poller_posix_default", + srcs = [ + "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc", + ], + hdrs = [ + "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h", + ], + tags = ["grpc-autodeps"], + deps = [ + "gpr_platform", + "iomgr_ee_event_poller", + "iomgr_ee_poller_posix_epoll1", + ], +) + grpc_cc_library( name = "iomgr_event_engine", srcs = ["src/core/lib/event_engine/iomgr_engine/iomgr_engine.cc"], diff --git a/CMakeLists.txt b/CMakeLists.txt index dfa908a094b..d70bc3a3896 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -953,6 +953,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx error_test) add_dependencies(buildtests_cxx error_utils_test) add_dependencies(buildtests_cxx evaluate_args_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx event_poller_posix_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx examine_stack_test) endif() @@ -1040,6 +1043,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx latch_test) add_dependencies(buildtests_cxx lb_get_cpu_stats_test) add_dependencies(buildtests_cxx lb_load_data_store_test) + add_dependencies(buildtests_cxx lock_free_event_test) add_dependencies(buildtests_cxx log_test) add_dependencies(buildtests_cxx loop_test) add_dependencies(buildtests_cxx match_test) @@ -1205,6 +1209,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx uri_parser_test) add_dependencies(buildtests_cxx useful_test) add_dependencies(buildtests_cxx varint_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx wakeup_fd_posix_test) + endif() add_dependencies(buildtests_cxx window_overflow_bad_client_test) add_dependencies(buildtests_cxx wire_reader_test) add_dependencies(buildtests_cxx wire_writer_test) @@ -9222,6 +9229,49 @@ target_link_libraries(evaluate_args_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(event_poller_posix_test + src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc + src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc + src/core/lib/event_engine/iomgr_engine/lockfree_event.cc + src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc + src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc + src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc + test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(event_poller_posix_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(event_poller_posix_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -12434,6 +12484,42 @@ target_link_libraries(lb_load_data_store_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(lock_free_event_test + src/core/lib/event_engine/iomgr_engine/lockfree_event.cc + test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(lock_free_event_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(lock_free_event_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) @@ -18233,6 +18319,45 @@ target_link_libraries(varint_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(wakeup_fd_posix_test + src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc + src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc + test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(wakeup_fd_posix_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(wakeup_fd_posix_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 4496e26a1b0..5ff40a32c30 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5153,6 +5153,34 @@ targets: - test/core/security/evaluate_args_test.cc deps: - grpc_test_util +- name: event_poller_posix_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h + - src/core/lib/event_engine/iomgr_engine/event_poller.h + - src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h + - src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h + - src/core/lib/event_engine/iomgr_engine/lockfree_event.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h + src: + - src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc + - src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc + - src/core/lib/event_engine/iomgr_engine/lockfree_event.cc + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc + - test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac - name: examine_stack_test gtest: true build: test @@ -6487,6 +6515,20 @@ targets: deps: - grpc++ - grpc_test_util +- name: lock_free_event_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/iomgr_engine/event_poller.h + - src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h + - src/core/lib/event_engine/iomgr_engine/lockfree_event.h + src: + - src/core/lib/event_engine/iomgr_engine/lockfree_event.cc + - test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc + deps: + - grpc_test_util + uses_polling: false - name: log_test gtest: true build: test @@ -9083,6 +9125,24 @@ targets: deps: - grpc_test_util uses_polling: false +- name: wakeup_fd_posix_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h + src: + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc + - src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc + - test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac - name: window_overflow_bad_client_test gtest: true build: test diff --git a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc new file mode 100644 index 00000000000..dc0419fd341 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.cc @@ -0,0 +1,589 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h" + +#include + +#include +#include + +#include "absl/memory/memory.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" + +#include +#include +#include +#include + +#include "src/core/lib/iomgr/port.h" + +// This polling engine is only relevant on linux kernels supporting epoll +// epoll_create() or epoll_create1() +#ifdef GRPC_LINUX_EPOLL +#include +#include +#include +#include +#include +#include + +#include + +#include "absl/synchronization/mutex.h" + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h" +#include "src/core/lib/gprpp/fork.h" +#include "src/core/lib/gprpp/time.h" + +using ::grpc_event_engine::iomgr_engine::LockfreeEvent; +using ::grpc_event_engine::iomgr_engine::WakeupFd; + +#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1 + +namespace grpc_event_engine { +namespace iomgr_engine { + +namespace { +// Only used when GRPC_ENABLE_FORK_SUPPORT=1 +struct ForkFdList { + Epoll1EventHandle* handle; + Epoll1EventHandle* next; + Epoll1EventHandle* prev; +}; +} // namespace + +class Epoll1EventHandle : public EventHandle { + public: + Epoll1EventHandle(int fd, Epoll1Poller* poller) + : fd_(fd), + pending_actions_(0), + list_(), + poller_(poller), + read_closure_(absl::make_unique(poller->GetScheduler())), + write_closure_( + absl::make_unique(poller->GetScheduler())), + error_closure_( + absl::make_unique(poller->GetScheduler())) { + read_closure_->InitEvent(); + write_closure_->InitEvent(); + error_closure_->InitEvent(); + pending_actions_ = 0; + } + Epoll1Poller* Poller() { return poller_; } + void SetPendingActions(bool pending_read, bool pending_write, + bool pending_error) { + pending_actions_ |= pending_read; + if (pending_write) { + pending_actions_ |= (1 << 2); + } + if (pending_error) { + pending_actions_ |= (1 << 3); + } + } + int WrappedFd() override { return fd_; } + void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + absl::string_view reason) override; + void ShutdownHandle(absl::Status why) override; + void NotifyOnRead(IomgrEngineClosure* on_read) override; + void NotifyOnWrite(IomgrEngineClosure* on_write) override; + void NotifyOnError(IomgrEngineClosure* on_error) override; + void SetReadable() override; + void SetWritable() override; + void SetHasError() override; + bool IsHandleShutdown() override; + void ExecutePendingActions() override { + if (pending_actions_ & 1UL) { + read_closure_->SetReady(); + } + if ((pending_actions_ >> 2) & 1UL) { + write_closure_->SetReady(); + } + if ((pending_actions_ >> 3) & 1UL) { + error_closure_->SetReady(); + } + pending_actions_ = 0; + } + absl::Mutex* mu() { return &mu_; } + LockfreeEvent* ReadClosure() { return read_closure_.get(); } + LockfreeEvent* WriteClosure() { return write_closure_.get(); } + LockfreeEvent* ErrorClosure() { return error_closure_.get(); } + ForkFdList& ForkFdListPos() { return list_; } + + private: + void HandleShutdownInternal(absl::Status why, bool releasing_fd); + // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is + // required. + absl::Mutex mu_; + int fd_; + int pending_actions_; + ForkFdList list_; + Epoll1Poller* poller_; + std::unique_ptr read_closure_; + std::unique_ptr write_closure_; + std::unique_ptr error_closure_; +}; + +namespace { + +bool kEpoll1PollerSupported = false; +gpr_once g_init_epoll1_poller = GPR_ONCE_INIT; + +int EpollCreateAndCloexec() { +#ifdef GRPC_LINUX_EPOLL_CREATE1 + int fd = epoll_create1(EPOLL_CLOEXEC); + if (fd < 0) { + gpr_log(GPR_ERROR, "epoll_create1 unavailable"); + } +#else + int fd = epoll_create(MAX_EPOLL_EVENTS); + if (fd < 0) { + gpr_log(GPR_ERROR, "epoll_create unavailable"); + } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) { + gpr_log(GPR_ERROR, "fcntl following epoll_create failed"); + return -1; + } +#endif + return fd; +} + +// Only used when GRPC_ENABLE_FORK_SUPPORT=1 +std::list fork_poller_list; + +// Only used when GRPC_ENABLE_FORK_SUPPORT=1 +Epoll1EventHandle* fork_fd_list_head = nullptr; +gpr_mu fork_fd_list_mu; + +void ForkFdListAddHandle(Epoll1EventHandle* handle) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + handle->ForkFdListPos().next = fork_fd_list_head; + handle->ForkFdListPos().prev = nullptr; + if (fork_fd_list_head != nullptr) { + fork_fd_list_head->ForkFdListPos().prev = handle; + } + fork_fd_list_head = handle; + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkFdListRemoveHandle(Epoll1EventHandle* handle) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + if (fork_fd_list_head == handle) { + fork_fd_list_head = handle->ForkFdListPos().next; + } + if (handle->ForkFdListPos().prev != nullptr) { + handle->ForkFdListPos().prev->ForkFdListPos().next = + handle->ForkFdListPos().next; + } + if (handle->ForkFdListPos().next != nullptr) { + handle->ForkFdListPos().next->ForkFdListPos().prev = + handle->ForkFdListPos().prev; + } + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkPollerListAddPoller(Epoll1Poller* poller) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fork_poller_list.push_back(poller); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +void ForkPollerListRemovePoller(Epoll1Poller* poller) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fork_poller_list.remove(poller); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) { + if (millis == grpc_core::Timestamp::InfFuture()) return -1; + grpc_core::Timestamp now = + grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); + int64_t delta = (millis - now).millis(); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { + return 0; + } else { + return static_cast(delta); + } +} + +void InitEpoll1PollerLinux(); + +// Called by the child process's post-fork handler to close open fds, +// including the global epoll fd of each poller. This allows gRPC to shutdown in +// the child process without interfering with connections or RPCs ongoing in the +// parent. +void ResetEventManagerOnFork() { + // Delete all pending Epoll1EventHandles. + gpr_mu_lock(&fork_fd_list_mu); + while (fork_fd_list_head != nullptr) { + close(fork_fd_list_head->WrappedFd()); + Epoll1EventHandle* next = fork_fd_list_head->ForkFdListPos().next; + delete fork_fd_list_head; + fork_fd_list_head = next; + } + // Delete all registered pollers. This also closes all open epoll_sets + while (!fork_poller_list.empty()) { + Epoll1Poller* poller = fork_poller_list.front(); + fork_poller_list.pop_front(); + delete poller; + } + gpr_mu_unlock(&fork_fd_list_mu); + if (grpc_core::Fork::Enabled()) { + gpr_mu_destroy(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); + } + InitEpoll1PollerLinux(); +} + +// It is possible that GLIBC has epoll but the underlying kernel doesn't. +// Create epoll_fd to make sure epoll support is available +void InitEpoll1PollerLinux() { + if (!grpc_event_engine::iomgr_engine::SupportsWakeupFd()) { + kEpoll1PollerSupported = false; + return; + } + int fd = EpollCreateAndCloexec(); + if (fd <= 0) { + kEpoll1PollerSupported = false; + return; + } + kEpoll1PollerSupported = true; + if (grpc_core::Fork::Enabled()) { + gpr_mu_init(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork); + } + close(fd); +} + +} // namespace + +void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done, + int* release_fd, + absl::string_view reason) { + bool is_release_fd = (release_fd != nullptr); + if (!read_closure_->IsShutdown()) { + HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason), + is_release_fd); + } + + // If release_fd is not NULL, we should be relinquishing control of the file + // descriptor fd->fd (but we still own the grpc_fd structure). + if (is_release_fd) { + *release_fd = fd_; + } else { + close(fd_); + } + + ForkFdListRemoveHandle(this); + { + // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is + // required here. + absl::MutexLock lock(&mu_); + read_closure_->DestroyEvent(); + write_closure_->DestroyEvent(); + error_closure_->DestroyEvent(); + } + + { + absl::MutexLock lock(&poller_->mu_); + poller_->free_epoll1_handles_list_.push_back(this); + } + + if (on_done != nullptr) { + on_done->SetStatus(absl::OkStatus()); + poller_->GetScheduler()->Run(on_done); + } +} + +// if 'releasing_fd' is true, it means that we are going to detach the internal +// fd from grpc_fd structure (i.e which means we should not be calling +// shutdown() syscall on that fd) +void Epoll1EventHandle::HandleShutdownInternal(absl::Status why, + bool releasing_fd) { + if (read_closure_->SetShutdown(why)) { + if (!releasing_fd) { + shutdown(fd_, SHUT_RDWR); + } else { + epoll_event phony_event; + if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_, + &phony_event) != 0) { + gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); + } + } + write_closure_->SetShutdown(why); + write_closure_->SetShutdown(why); + } +} + +Epoll1Poller::Epoll1Poller(Scheduler* scheduler) + : scheduler_(scheduler), was_kicked_(false) { + g_epoll_set_.epfd = EpollCreateAndCloexec(); + wakeup_fd_ = *CreateWakeupFd(); + GPR_ASSERT(wakeup_fd_ != nullptr); + GPR_ASSERT(g_epoll_set_.epfd >= 0); + gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set_.epfd); + struct epoll_event ev; + ev.events = static_cast(EPOLLIN | EPOLLET); + ev.data.ptr = wakeup_fd_.get(); + GPR_ASSERT(epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, wakeup_fd_->ReadFd(), + &ev) == 0); + g_epoll_set_.num_events = 0; + g_epoll_set_.cursor = 0; + ForkPollerListAddPoller(this); +} + +void Epoll1Poller::Shutdown() { + ForkPollerListRemovePoller(this); + delete this; +} + +Epoll1Poller::~Epoll1Poller() { + if (g_epoll_set_.epfd >= 0) { + close(g_epoll_set_.epfd); + g_epoll_set_.epfd = -1; + } + { + absl::MutexLock lock(&mu_); + while (!free_epoll1_handles_list_.empty()) { + Epoll1EventHandle* handle = reinterpret_cast( + free_epoll1_handles_list_.front()); + free_epoll1_handles_list_.pop_front(); + delete handle; + } + } +} + +EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/, + bool track_err) { + Epoll1EventHandle* new_handle = nullptr; + { + absl::MutexLock lock(&mu_); + if (free_epoll1_handles_list_.empty()) { + new_handle = new Epoll1EventHandle(fd, this); + } else { + new_handle = reinterpret_cast( + free_epoll1_handles_list_.front()); + free_epoll1_handles_list_.pop_front(); + } + } + ForkFdListAddHandle(new_handle); + struct epoll_event ev; + ev.events = static_cast(EPOLLIN | EPOLLOUT | EPOLLET); + // Use the least significant bit of ev.data.ptr to store track_err. We expect + // the addresses to be word aligned. We need to store track_err to avoid + // synchronization issues when accessing it after receiving an event. + // Accessing fd would be a data race there because the fd might have been + // returned to the free list at that point. + ev.data.ptr = reinterpret_cast(reinterpret_cast(new_handle) | + (track_err ? 1 : 0)); + if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { + gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); + } + + return new_handle; +} + +// Process the epoll events found by DoEpollWait() function. +// - g_epoll_set.cursor points to the index of the first event to be processed +// - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and +// updates the g_epoll_set.cursor +absl::Status Epoll1Poller::ProcessEpollEvents( + int max_epoll_events_to_handle, std::vector& pending_events) { + int64_t num_events = g_epoll_set_.num_events; + int64_t cursor = g_epoll_set_.cursor; + bool was_kicked = false; + for (int idx = 0; (idx < max_epoll_events_to_handle) && cursor != num_events; + idx++) { + int64_t c = cursor++; + struct epoll_event* ev = &g_epoll_set_.events[c]; + void* data_ptr = ev->data.ptr; + if (data_ptr == wakeup_fd_.get()) { + GPR_ASSERT(wakeup_fd_->ConsumeWakeup().ok()); + was_kicked = true; + } else { + Epoll1EventHandle* handle = reinterpret_cast( + reinterpret_cast(data_ptr) & ~static_cast(1)); + bool track_err = + reinterpret_cast(data_ptr) & static_cast(1); + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; + bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + handle->SetPendingActions(read_ev || cancel || err_fallback, + write_ev || cancel || err_fallback, + error && !err_fallback); + pending_events.push_back(handle); + } + } + g_epoll_set_.cursor = cursor; + return was_kicked ? absl::Status(absl::StatusCode::kInternal, "Kicked") + : absl::OkStatus(); +} + +// Do epoll_wait and store the events in g_epoll_set.events field. This does +// not "process" any of the events yet; that is done in ProcessEpollEvents(). +// See ProcessEpollEvents() function for more details. +absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp deadline) { + int r; + int timeout = PollDeadlineToMillisTimeout(deadline); + do { + r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS, + timeout); + } while (r < 0 && errno == EINTR); + if (r < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("epoll_wait: ", strerror(errno))); + } + g_epoll_set_.num_events = r; + g_epoll_set_.cursor = 0; + return absl::OkStatus(); +} + +// Might be called multiple times +void Epoll1EventHandle::ShutdownHandle(absl::Status why) { + // A mutex is required here because, the SetShutdown method of the + // lockfree event may schedule a closure if it is already ready and that + // closure may call OrphanHandle. Execution of ShutdownHandle and OrphanHandle + // in parallel is not safe because some of the lockfree event types e.g, read, + // write, error may-not have called SetShutdown when DestroyEvent gets + // called in the OrphanHandle method. + absl::MutexLock lock(&mu_); + HandleShutdownInternal(why, false); +} + +bool Epoll1EventHandle::IsHandleShutdown() { + return read_closure_->IsShutdown(); +} + +void Epoll1EventHandle::NotifyOnRead(IomgrEngineClosure* on_read) { + read_closure_->NotifyOn(on_read); +} + +void Epoll1EventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) { + write_closure_->NotifyOn(on_write); +} + +void Epoll1EventHandle::NotifyOnError(IomgrEngineClosure* on_error) { + error_closure_->NotifyOn(on_error); +} + +void Epoll1EventHandle::SetReadable() { read_closure_->SetReady(); } + +void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); } + +void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); } + +absl::Status Epoll1Poller::Work(grpc_core::Timestamp deadline, + std::vector& pending_events) { + if (g_epoll_set_.cursor == g_epoll_set_.num_events) { + auto status = DoEpollWait(deadline); + if (!status.ok()) { + return status; + } + } + { + absl::MutexLock lock(&mu_); + // If was_kicked_ is true, collect all pending events in this iteration. + auto status = ProcessEpollEvents( + was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION, + pending_events); + if (!status.ok()) { + was_kicked_ = false; + } + return status; + } +} + +void Epoll1Poller::Kick() { + absl::MutexLock lock(&mu_); + if (was_kicked_) { + return; + } + was_kicked_ = true; + GPR_ASSERT(wakeup_fd_->Wakeup().ok()); +} + +Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler) { + gpr_once_init(&g_init_epoll1_poller, []() { InitEpoll1PollerLinux(); }); + if (kEpoll1PollerSupported) { + return new Epoll1Poller(scheduler); + } + return nullptr; +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#else /* defined(GRPC_LINUX_EPOLL) */ +#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1) + +namespace grpc_event_engine { +namespace iomgr_engine { + +Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) { + GPR_ASSERT(false && "unimplemented"); +} + +void Epoll1Poller::Shutdown() { GPR_ASSERT(false && "unimplemented"); } + +Epoll1Poller::~Epoll1Poller() { GPR_ASSERT(false && "unimplemented"); } + +EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/, + bool /*track_err*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status Epoll1Poller::ProcessEpollEvents( + int /*max_epoll_events_to_handle*/, + std::vector& /*pending_events*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp /*deadline*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status Epoll1Poller::Work(grpc_core::Timestamp /*deadline*/, + std::vector& /*pending_events*/) { + GPR_ASSERT(false && "unimplemented"); +} + +void Epoll1Poller::Kick() { GPR_ASSERT(false && "unimplemented"); } + +// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return +// nullptr. +Epoll1Poller* GetEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; } + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */ +#endif /* !defined(GRPC_LINUX_EPOLL) */ diff --git a/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h new file mode 100644 index 00000000000..4424b834d1d --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h @@ -0,0 +1,95 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H +#include + +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/synchronization/mutex.h" + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_LINUX_EPOLL +#include +#endif + +#define MAX_EPOLL_EVENTS 100 + +namespace grpc_event_engine { +namespace iomgr_engine { + +class Epoll1EventHandle; + +// Definition of epoll1 based poller. +class Epoll1Poller : public EventPoller { + public: + explicit Epoll1Poller(Scheduler* scheduler); + EventHandle* CreateHandle(int fd, absl::string_view name, + bool track_err) override; + absl::Status Work(grpc_core::Timestamp deadline, + std::vector& pending_events) override; + void Kick() override; + Scheduler* GetScheduler() { return scheduler_; } + void Shutdown() override; + ~Epoll1Poller() override; + + private: + absl::Status ProcessEpollEvents(int max_epoll_events_to_handle, + std::vector& pending_events); + absl::Status DoEpollWait(grpc_core::Timestamp deadline); + friend class Epoll1EventHandle; +#ifdef GRPC_LINUX_EPOLL + struct EpollSet { + int epfd; + + // The epoll_events after the last call to epoll_wait() + struct epoll_event events[MAX_EPOLL_EVENTS]; + + // The number of epoll_events after the last call to epoll_wait() + int num_events; + + // Index of the first event in epoll_events that has to be processed. This + // field is only valid if num_events > 0 + int cursor; + }; +#else + struct EpollSet {}; +#endif + absl::Mutex mu_; + Scheduler* scheduler_; + // A singleton epoll set + EpollSet g_epoll_set_; + bool was_kicked_ ABSL_GUARDED_BY(mu_); + std::list free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_); + std::unique_ptr wakeup_fd_; +}; + +// Return an instance of a epoll1 based poller tied to the specified event +// engine. +Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler); + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EV_EPOLL1_LINUX_H diff --git a/src/core/lib/event_engine/iomgr_engine/event_poller.h b/src/core/lib/event_engine/iomgr_engine/event_poller.h new file mode 100644 index 00000000000..7efe8e456b3 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/event_poller.h @@ -0,0 +1,105 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H +#include + +#include + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" + +#include + +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/gprpp/time.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class Scheduler { + public: + virtual void Run(experimental::EventEngine::Closure* closure) = 0; + virtual ~Scheduler() = default; +}; + +class EventHandle { + public: + virtual int WrappedFd() = 0; + // Delete the handle and optionally close the underlying file descriptor if + // release_fd != nullptr. The on_done closure is scheduled to be invoked + // after the operation is complete. After this operation, NotifyXXX and SetXXX + // operations cannot be performed on the handle. + virtual void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd, + absl::string_view reason) = 0; + // Shutdown a handle. After this operation, NotifyXXX and SetXXX operations + // cannot be performed. + virtual void ShutdownHandle(absl::Status why) = 0; + // Schedule on_read to be invoked when the underlying file descriptor + // becomes readable. + virtual void NotifyOnRead(IomgrEngineClosure* on_read) = 0; + // Schedule on_write to be invoked when the underlying file descriptor + // becomes writable. + virtual void NotifyOnWrite(IomgrEngineClosure* on_write) = 0; + // Schedule on_error to be invoked when the underlying file descriptor + // encounters errors. + virtual void NotifyOnError(IomgrEngineClosure* on_error) = 0; + // Force set a readable event on the underlying file descriptor. + virtual void SetReadable() = 0; + // Force set a writable event on the underlying file descriptor. + virtual void SetWritable() = 0; + // Force set a error event on the underlying file descriptor. + virtual void SetHasError() = 0; + // Returns true if the handle has been shutdown. + virtual bool IsHandleShutdown() = 0; + // Execute any pending actions that may have been set to a handle after the + // last invocation of Work(...) function. + virtual void ExecutePendingActions() = 0; + virtual ~EventHandle() = default; +}; + +class EventPoller { + public: + // Return an opaque handle to perform actions on the provided file descriptor. + virtual EventHandle* CreateHandle(int fd, absl::string_view name, + bool track_err) = 0; + // Shuts down and deletes the poller. It is legal to call this function + // only when no other poller method is in progress. For instance, it is + // not safe to call this method, while a thread is blocked on Work(...). + // A graceful way to terminate the poller could be to: + // 1. First orphan all created handles. + // 2. Send a Kick() to the thread executing Work(...) and wait for the + // thread to return. + // 3. Call Shutdown() on the poller. + virtual void Shutdown() = 0; + // Poll all the underlying file descriptors for the specified period + // and return a vector containing a list of handles which have pending + // events. The calling thread should invoke ExecutePendingActions on each + // returned handle to take the necessary pending actions. Only one thread + // may invoke the Work function at any given point in time. The Work(...) + // method returns an absl Non-OK status if it was Kicked. + virtual absl::Status Work(grpc_core::Timestamp deadline, + std::vector& pending_events) = 0; + // Trigger the thread executing Work(..) to break out as soon as possible. + // This function is useful in tests. It may also be used to break a thread + // out of Work(...) before calling Shutdown() on the poller. + virtual void Kick() = 0; + virtual ~EventPoller() = default; +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_H \ No newline at end of file diff --git a/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc new file mode 100644 index 00000000000..113cdc558b5 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.cc @@ -0,0 +1,29 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/iomgr_engine/ev_epoll1_linux.h" +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +EventPoller* GetDefaultPoller(Scheduler* scheduler) { + EventPoller* poller = GetEpoll1Poller(scheduler); + return poller; +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h new file mode 100644 index 00000000000..1402e074884 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h @@ -0,0 +1,33 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H + +#include + +namespace grpc_event_engine { +namespace iomgr_engine { + +class EventPoller; +class Scheduler; + +// Return an instance of an event poller which is tied to the specified +// scheduler. +EventPoller* GetDefaultPoller(Scheduler* scheduler); + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_EVENT_POLLER_POSIX_DEFAULT_H \ No newline at end of file diff --git a/src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h b/src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h new file mode 100644 index 00000000000..60dacae492a --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h @@ -0,0 +1,74 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H +#include + +#include +#include + +#include "absl/status/status.h" +#include "absl/utility/utility.h" + +#include + +namespace grpc_event_engine { +namespace iomgr_engine { + +// The callbacks for Endpoint read and write take an absl::Status as +// argument - this is important for the tcp code to function correctly. We need +// a custom closure type because the default EventEngine::Closure type doesn't +// provide a way to pass a status when the callback is run. +class IomgrEngineClosure final + : public grpc_event_engine::experimental::EventEngine::Closure { + public: + IomgrEngineClosure() = default; + IomgrEngineClosure(std::function&& cb, bool is_permanent) + : cb_(std::move(cb)), + is_permanent_(is_permanent), + status_(absl::OkStatus()) {} + ~IomgrEngineClosure() final = default; + void SetStatus(absl::Status status) { status_ = status; } + void Run() override { + cb_(absl::exchange(status_, absl::OkStatus())); + if (!is_permanent_) { + delete this; + } + } + + // This closure clean doesn't itself up after execution. It is expected to be + // cleaned up by the caller at the appropriate time. + static IomgrEngineClosure* ToPermanentClosure( + std::function&& cb) { + return new IomgrEngineClosure(std::move(cb), true); + } + + // This closure clean's itself up after execution. It is expected to be + // used only in tests. + static IomgrEngineClosure* TestOnlyToClosure( + std::function&& cb) { + return new IomgrEngineClosure(std::move(cb), false); + } + + private: + std::function cb_; + bool is_permanent_ = false; + absl::Status status_; +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_IOMGR_ENGINE_CLOSURE_H diff --git a/src/core/lib/event_engine/iomgr_engine/lockfree_event.cc b/src/core/lib/event_engine/iomgr_engine/lockfree_event.cc new file mode 100644 index 00000000000..8119ceb203b --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/lockfree_event.cc @@ -0,0 +1,267 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h" + +#include + +#include +#include + +#include "absl/status/status.h" + +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/gprpp/status_helper.h" + +// 'state' holds the to call when the fd is readable or writable respectively. +// It can contain one of the following values: +// kClosureReady : The fd has an I/O event of interest but there is no +// closure yet to execute + +// kClosureNotReady : The fd has no I/O event of interest + +// closure ptr : The closure to be executed when the fd has an I/O +// event of interest + +// shutdown_error | kShutdownBit : +// 'shutdown_error' field ORed with kShutdownBit. +// This indicates that the fd is shutdown. Since all +// memory allocations are word-aligned, the lower two +// bits of the shutdown_error pointer are always 0. So +// it is safe to OR these with kShutdownBit + +// Valid state transitions: + +// <-----3------ kClosureNotReady -----1-------> kClosureReady +// | | ^ | ^ | | +// | | | | | | | +// | +--------------4----------+ 6 +---------2---------------+ | +// | | | +// | v | +// +-----5-------> [shutdown_error | kShutdownBit] <-------7---------+ + +// For 1, 4 : See SetReady() function +// For 2, 3 : See NotifyOn() function +// For 5,6,7: See SetShutdown() function + +namespace grpc_event_engine { +namespace iomgr_engine { + +void LockfreeEvent::InitEvent() { + // Perform an atomic store to start the state machine. + + // Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed + // state, while a file descriptor is on a freelist. In such a state it may + // be SetReady'd, and so we need to perform an atomic operation here to + // ensure no races + state_.store(kClosureNotReady, std::memory_order_relaxed); +} + +void LockfreeEvent::DestroyEvent() { + intptr_t curr; + do { + curr = state_.load(std::memory_order_relaxed); + if (curr & kShutdownBit) { + grpc_core::internal::StatusFreeHeapPtr(curr & ~kShutdownBit); + } else { + GPR_ASSERT(curr == kClosureNotReady || curr == kClosureReady); + } + // we CAS in a shutdown, no error value here. If this event is interacted + // with post-deletion (see the note in the constructor) we want the bit + // pattern to prevent error retention in a deleted object + } while (!state_.compare_exchange_strong(curr, kShutdownBit, + std::memory_order_relaxed, + std::memory_order_relaxed)); +} + +void LockfreeEvent::NotifyOn(IomgrEngineClosure* closure) { + // This load needs to be an acquire load because this can be a shutdown + // error that we might need to reference. Adding acquire semantics makes + // sure that the shutdown error has been initialized properly before us + // referencing it. The load() needs to be performed only once before entry + // into the loop. This is because if any of the compare_exchange_strong + // operations inside the loop return false, they automatically update curr + // with the new value. So it doesn't need to be loaded again. + intptr_t curr = state_.load(std::memory_order_acquire); + while (true) { + switch (curr) { + case kClosureNotReady: { + // kClosureNotReady -> . + + // We're guaranteed by API that there's an acquire barrier before here, + // so there's no need to double-dip and this can be a release-only. + + // The release itself pairs with the acquire half of a set_ready full + // barrier. + if (state_.compare_exchange_strong( + curr, reinterpret_cast(closure), + std::memory_order_release, std::memory_order_relaxed)) { + return; // Successful. Return + } + + break; // retry + } + + case kClosureReady: { + // Change the state to kClosureNotReady. Schedule the closure if + // successful. If not, the state most likely transitioned to shutdown. + // We should retry. + + // This can be a no-barrier cas since the state is being transitioned to + // kClosureNotReady; set_ready and set_shutdown do not schedule any + // closure when transitioning out of CLOSURE_NO_READY state (i.e there + // is no other code that needs to 'happen-after' this) + if (state_.compare_exchange_strong(curr, kClosureNotReady, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + scheduler_->Run(closure); + return; // Successful. Return. + } + break; // retry + } + + default: { + // 'curr' is either a closure or the fd is shutdown(in which case 'curr' + // contains a pointer to the shutdown-error). If the fd is shutdown, + // schedule the closure with the shutdown error + if ((curr & kShutdownBit) > 0) { + absl::Status shutdown_err = + grpc_core::internal::StatusGetFromHeapPtr(curr & ~kShutdownBit); + closure->SetStatus(shutdown_err); + scheduler_->Run(closure); + return; + } + + // There is already a closure!. This indicates a bug in the code. + gpr_log(GPR_ERROR, + "LockfreeEvent::NotifyOn: notify_on called with a previous " + "callback still pending"); + abort(); + } + } + } + + GPR_UNREACHABLE_CODE(return ); +} + +bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) { + intptr_t status_ptr = grpc_core::internal::StatusAllocHeapPtr(shutdown_error); + gpr_atm new_state = status_ptr | kShutdownBit; + // The load() needs to be performed only once before entry + // into the loop. This is because if any of the compare_exchange_strong + // operations inside the loop return false, they automatically update curr + // with the new value. So it doesn't need to be loaded again. + intptr_t curr = state_.load(std::memory_order_acquire); + + while (true) { + switch (curr) { + case kClosureReady: + case kClosureNotReady: + // Need a full barrier here so that the initial load in notify_on + // doesn't need a barrier + if (state_.compare_exchange_strong(curr, new_state, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return true; // early out + } + break; // retry + + default: { + // 'curr' is either a closure or the fd is already shutdown + + // If fd is already shutdown, we are done. + if ((curr & kShutdownBit) > 0) { + grpc_core::internal::StatusFreeHeapPtr(status_ptr); + return false; + } + + // Fd is not shutdown. Schedule the closure and move the state to + // shutdown state. + // Needs an acquire to pair with setting the closure (and get a + // happens-after on that edge), and a release to pair with anything + // loading the shutdown state. + if (state_.compare_exchange_strong(curr, new_state, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + auto closure = reinterpret_cast(curr); + closure->SetStatus(shutdown_error); + scheduler_->Run(closure); + return true; + } + // 'curr' was a closure but now changed to a different state. We will + // have to retry + break; + } + } + } + GPR_UNREACHABLE_CODE(return false); +} + +void LockfreeEvent::SetReady() { + // The load() needs to be performed only once before entry + // into the loop. This is because if any of the compare_exchange_strong + // operations inside the loop return false, they automatically update curr + // with the new value. So it doesn't need to be loaded again. + intptr_t curr = state_.load(std::memory_order_acquire); + while (true) { + switch (curr) { + case kClosureReady: { + // Already ready. We are done here. + return; + } + + case kClosureNotReady: { + // No barrier required as we're transitioning to a state that does not + // involve a closure + if (state_.compare_exchange_strong(curr, kClosureReady, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + return; // early out + } + break; // retry + } + + default: { + // 'curr' is either a closure or the fd is shutdown + if ((curr & kShutdownBit) > 0) { + // The fd is shutdown. Do nothing. + return; + } else if (state_.compare_exchange_strong(curr, kClosureNotReady, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + // Full cas: acquire pairs with this cas' release in the event of a + // spurious set_ready; release pairs with this or the acquire in + // notify_on (or set_shutdown) + auto closure = reinterpret_cast(curr); + closure->SetStatus(absl::OkStatus()); + scheduler_->Run(closure); + return; + } + // else the state changed again (only possible by either a racing + // set_ready or set_shutdown functions. In both these cases, the + // closure would have been scheduled for execution. So we are done + // here + return; + } + } + } +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/iomgr_engine/lockfree_event.h b/src/core/lib/event_engine/iomgr_engine/lockfree_event.h new file mode 100644 index 00000000000..8f49a952989 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/lockfree_event.h @@ -0,0 +1,73 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H +#include + +#include +#include + +#include "absl/status/status.h" + +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class Scheduler; + +class LockfreeEvent { + public: + explicit LockfreeEvent(Scheduler* scheduler) : scheduler_(scheduler) {} + + LockfreeEvent(const LockfreeEvent&) = delete; + LockfreeEvent& operator=(const LockfreeEvent&) = delete; + + // These methods are used to initialize and destroy the internal state. These + // cannot be done in constructor and destructor because SetReady may be called + // when the event is destroyed and put in a freelist. + void InitEvent(); + void DestroyEvent(); + + // Returns true if fd has been shutdown, false otherwise. + bool IsShutdown() const { + return (state_.load(std::memory_order_relaxed) & kShutdownBit) != 0; + } + + // Schedules \a closure when the event is received (see SetReady()) or the + // shutdown state has been set. Note that the event may have already been + // received, in which case the closure would be scheduled immediately. + // If the shutdown state has already been set, then \a closure is scheduled + // with the shutdown error. + void NotifyOn(IomgrEngineClosure* closure); + + // Sets the shutdown state. If a closure had been provided by NotifyOn and has + // not yet been scheduled, it will be scheduled with \a shutdown_error. + bool SetShutdown(absl::Status shutdown_error); + + // Signals that the event has been received. + void SetReady(); + + private: + enum State { kClosureNotReady = 0, kClosureReady = 2, kShutdownBit = 1 }; + + std::atomic state_; + Scheduler* scheduler_; +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_LOCKFREE_EVENT_H diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc new file mode 100644 index 00000000000..d52737ddc91 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.cc @@ -0,0 +1,125 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "absl/memory/memory.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" + +#include // IWYU pragma: keep + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_LINUX_EVENTFD + +#include +#include +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#endif + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +#ifdef GRPC_LINUX_EVENTFD + +absl::Status EventFdWakeupFd::Init() { + int read_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + int write_fd = -1; + if (read_fd < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("eventfd: ", strerror(errno))); + } + SetWakeupFds(read_fd, write_fd); + return absl::OkStatus(); +} + +absl::Status EventFdWakeupFd::ConsumeWakeup() { + eventfd_t value; + int err; + do { + err = eventfd_read(ReadFd(), &value); + } while (err < 0 && errno == EINTR); + if (err < 0 && errno != EAGAIN) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("eventfd_read: ", strerror(errno))); + } + return absl::OkStatus(); +} + +absl::Status EventFdWakeupFd::Wakeup() { + int err; + do { + err = eventfd_write(ReadFd(), 1); + } while (err < 0 && errno == EINTR); + if (err < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("eventfd_write: ", strerror(errno))); + } + return absl::OkStatus(); +} + +EventFdWakeupFd::~EventFdWakeupFd() { + if (ReadFd() != 0) { + close(ReadFd()); + } +} + +bool EventFdWakeupFd::IsSupported() { + EventFdWakeupFd event_fd_wakeup_fd; + return event_fd_wakeup_fd.Init().ok(); +} + +absl::StatusOr> +EventFdWakeupFd::CreateEventFdWakeupFd() { + static bool kIsEventFdWakeupFdSupported = EventFdWakeupFd::IsSupported(); + if (kIsEventFdWakeupFdSupported) { + auto event_fd_wakeup_fd = absl::make_unique(); + auto status = event_fd_wakeup_fd->Init(); + if (status.ok()) { + return std::unique_ptr(std::move(event_fd_wakeup_fd)); + } + return status; + } + return absl::NotFoundError("Eventfd wakeup fd is not supported"); +} + +#else // GRPC_LINUX_EVENTFD + +absl::Status EventFdWakeupFd::Init() { GPR_ASSERT(false && "unimplemented"); } + +absl::Status EventFdWakeupFd::ConsumeWakeup() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status EventFdWakeupFd::Wakeup() { GPR_ASSERT(false && "unimplemented"); } + +bool EventFdWakeupFd::IsSupported() { return false; } + +absl::StatusOr> +EventFdWakeupFd::CreateEventFdWakeupFd() { + return absl::NotFoundError("Eventfd wakeup fd is not supported"); +} + +#endif // GRPC_LINUX_EVENTFD + +} // namespace iomgr_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h new file mode 100644 index 00000000000..cddd62eaa1d --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h @@ -0,0 +1,45 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H + +#include + +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class EventFdWakeupFd : public WakeupFd { + public: + EventFdWakeupFd() : WakeupFd() {} + ~EventFdWakeupFd() override; + absl::Status ConsumeWakeup() override; + absl::Status Wakeup() override; + static absl::StatusOr> CreateEventFdWakeupFd(); + static bool IsSupported(); + + private: + absl::Status Init(); +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_EVENTFD_H diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc new file mode 100644 index 00000000000..099814ac5fe --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.cc @@ -0,0 +1,152 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "absl/memory/memory.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" + +#include // IWYU pragma: keep + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_WAKEUP_FD +#include +#include +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#endif + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +#ifdef GRPC_POSIX_WAKEUP_FD + +namespace { + +absl::Status SetSocketNonBlocking(int fd) { + int oldflags = fcntl(fd, F_GETFL, 0); + if (oldflags < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + oldflags |= O_NONBLOCK; + + if (fcntl(fd, F_SETFL, oldflags) != 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + return absl::OkStatus(); +} +} // namespace + +absl::Status PipeWakeupFd::Init() { + int pipefd[2]; + int r = pipe(pipefd); + if (0 != r) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("pipe: ", strerror(errno))); + } + auto status = SetSocketNonBlocking(pipefd[0]); + if (!status.ok()) return status; + status = SetSocketNonBlocking(pipefd[1]); + if (!status.ok()) return status; + SetWakeupFds(pipefd[0], pipefd[1]); + return absl::OkStatus(); +} + +absl::Status PipeWakeupFd::ConsumeWakeup() { + char buf[128]; + ssize_t r; + + for (;;) { + r = read(ReadFd(), buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return absl::OkStatus(); + switch (errno) { + case EAGAIN: + return absl::OkStatus(); + case EINTR: + continue; + default: + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("read: ", strerror(errno))); + } + } +} + +absl::Status PipeWakeupFd::Wakeup() { + char c = 0; + while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) { + } + return absl::OkStatus(); +} + +PipeWakeupFd::~PipeWakeupFd() { + if (ReadFd() != 0) { + close(ReadFd()); + } + if (WriteFd() != 0) { + close(WriteFd()); + } +} + +bool PipeWakeupFd::IsSupported() { + PipeWakeupFd pipe_wakeup_fd; + return pipe_wakeup_fd.Init().ok(); +} + +absl::StatusOr> PipeWakeupFd::CreatePipeWakeupFd() { + static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported(); + if (kIsPipeWakeupFdSupported) { + auto pipe_wakeup_fd = absl::make_unique(); + auto status = pipe_wakeup_fd->Init(); + if (status.ok()) { + return std::unique_ptr(std::move(pipe_wakeup_fd)); + } + return status; + } + return absl::NotFoundError("Pipe wakeup fd is not supported"); +} + +#else // GRPC_POSIX_WAKEUP_FD + +absl::Status PipeWakeupFd::Init() { GPR_ASSERT(false && "unimplemented"); } + +absl::Status PipeWakeupFd::ConsumeWakeup() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PipeWakeupFd::Wakeup() { GPR_ASSERT(false && "unimplemented"); } + +bool PipeWakeupFd::IsSupported() { return false; } + +absl::StatusOr> PipeWakeupFd::CreatePipeWakeupFd() { + return absl::NotFoundError("Pipe wakeup fd is not supported"); +} + +#endif // GRPC_POSIX_WAKEUP_FD + +} // namespace iomgr_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h new file mode 100644 index 00000000000..a4fb55455ff --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h @@ -0,0 +1,45 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H + +#include + +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class PipeWakeupFd : public WakeupFd { + public: + PipeWakeupFd() : WakeupFd() {} + ~PipeWakeupFd() override; + absl::Status ConsumeWakeup() override; + absl::Status Wakeup() override; + static absl::StatusOr> CreatePipeWakeupFd(); + static bool IsSupported(); + + private: + absl::Status Init(); +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_PIPE_H diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h new file mode 100644 index 00000000000..807a39c8a34 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h @@ -0,0 +1,76 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * WakeupFd abstracts the concept of a file descriptor for the purpose of + * waking up a thread in select()/poll()/epoll_wait()/etc. + + * The poll() family of system calls provide a way for a thread to block until + * there is activity on one (or more) of a set of file descriptors. An + * application may wish to wake up this thread to do non file related work. The + * typical way to do this is to add a pipe to the set of file descriptors, then + * write to the pipe to wake up the thread in poll(). + * + * Linux has a lighter weight eventfd specifically designed for this purpose. + * WakeupFd abstracts the difference between the two. + * + * Setup: + * 1. Call CreateWakeupFd() to crete an initialized WakeupFd. + * 2. Add the result of WakeupFd::ReadFd() to the set of monitored file + * descriptors for the poll() style API you are using. Monitor the file + * descriptor for readability. + * 3. To tear down, call WakeupFd::Destroy(). This closes the underlying + * file descriptor. + * + * Usage: + * 1. To wake up a polling thread, call WakeupFd::Wakeup() on a wakeup_fd + * it is monitoring. + * 2. If the polling thread was awakened by a WakeupFd event, call + * WakeupFd::Consume() on it. + */ +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H + +#include + +#include "absl/status/status.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class WakeupFd { + public: + virtual absl::Status ConsumeWakeup() = 0; + virtual absl::Status Wakeup() = 0; + virtual ~WakeupFd() = default; + + int ReadFd() { return read_fd_; } + int WriteFd() { return write_fd_; } + + protected: + WakeupFd() : read_fd_(0), write_fd_(0) {} + void SetWakeupFds(int read_fd, int write_fd) { + read_fd_ = read_fd; + write_fd_ = write_fd; + } + + private: + int read_fd_; + int write_fd_; +}; + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_H diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc new file mode 100644 index 00000000000..2ededda4e6b --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.cc @@ -0,0 +1,67 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/port.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +#ifdef GRPC_POSIX_WAKEUP_FD + +absl::StatusOr> NotSupported() { + return absl::NotFoundError("Wakeup-fd is not supported on this system"); +} + +namespace { +absl::StatusOr> (*g_wakeup_fd_fn)() = + []() -> absl::StatusOr> (*)() { +#ifndef GRPC_POSIX_NO_SPECIAL_WAKEUP_FD + if (EventFdWakeupFd::IsSupported()) { + return &EventFdWakeupFd::CreateEventFdWakeupFd; + } +#endif // GRPC_POSIX_NO_SPECIAL_WAKEUP_FD + if (PipeWakeupFd::IsSupported()) { + return &PipeWakeupFd::CreatePipeWakeupFd; + } + return NotSupported; +}(); +} // namespace + +bool SupportsWakeupFd() { return g_wakeup_fd_fn != NotSupported; } + +absl::StatusOr> CreateWakeupFd() { + return g_wakeup_fd_fn(); +} + +#else /* GRPC_POSIX_WAKEUP_FD */ + +bool SupportsWakeupFd() { return false; } + +absl::StatusOr> CreateWakeupFd() { + return absl::NotFoundError("Wakeup-fd is not supported on this system"); +} + +#endif /* GRPC_POSIX_WAKEUP_FD */ + +} // namespace iomgr_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h new file mode 100644 index 00000000000..dae51a0a690 --- /dev/null +++ b/src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix_default.h @@ -0,0 +1,37 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H +#define GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H +#include + +#include + +#include "absl/status/statusor.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +class WakeupFd; + +// Returns true if wakeup-fd is supported by the system. +bool SupportsWakeupFd(); + +// Create and return an initialized WakeupFd instance if supported. +absl::StatusOr> CreateWakeupFd(); + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_IOMGR_ENGINE_WAKEUP_FD_POSIX_DEFAULT_H \ No newline at end of file diff --git a/test/core/event_engine/iomgr_event_engine/BUILD b/test/core/event_engine/iomgr_event_engine/BUILD index 3204974dab7..af561c461ec 100644 --- a/test/core/event_engine/iomgr_event_engine/BUILD +++ b/test/core/event_engine/iomgr_event_engine/BUILD @@ -47,3 +47,58 @@ grpc_cc_test( "//test/core/util:grpc_suppressions", ], ) + +grpc_cc_test( + name = "event_poller_posix_test", + srcs = ["event_poller_posix_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_windows", + ], + uses_event_engine = True, + uses_polling = True, + deps = [ + "//:iomgr_ee_closure", + "//:iomgr_ee_event_poller", + "//:iomgr_ee_poller_posix_default", + "//:iomgr_event_engine", + "//test/core/util:grpc_suppressions", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "lock_free_event_test", + srcs = ["lock_free_event_test.cc"], + external_deps = ["gtest"], + language = "C++", + uses_event_engine = True, + uses_polling = False, + deps = [ + "//:iomgr_ee_closure", + "//:iomgr_ee_event_poller", + "//:iomgr_ee_lockfree_event", + "//:iomgr_event_engine", + "//test/core/util:grpc_suppressions", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "wakeup_fd_posix_test", + srcs = ["wakeup_fd_posix_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_windows", + ], + uses_event_engine = False, + uses_polling = True, + deps = [ + "//:iomgr_ee_wakeup_fd_posix_eventfd", + "//:iomgr_ee_wakeup_fd_posix_pipe", + "//test/core/util:grpc_suppressions", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc b/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc new file mode 100644 index 00000000000..8dd0d541b1a --- /dev/null +++ b/test/core/event_engine/iomgr_event_engine/event_poller_posix_test.cc @@ -0,0 +1,499 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET_EV + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "absl/status/status.h" + +#include +#include +#include +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/event_poller_posix_default.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "test/core/util/port.h" + +using ::grpc_event_engine::iomgr_engine::EventPoller; + +static gpr_mu g_mu; +static EventPoller* g_event_poller = nullptr; + +// buffer size used to send and receive data. +// 1024 is the minimal value to set TCP send and receive buffer. +#define BUF_SIZE 1024 +// Max number of connections pending to be accepted by listen(). +#define MAX_NUM_FD 1024 +// Client write buffer size +#define CLIENT_WRITE_BUF_SIZE 10 +// Total number of times that the client fills up the write buffer +#define CLIENT_TOTAL_WRITE_CNT 3 + +namespace grpc_event_engine { +namespace iomgr_engine { + +namespace { + +class TestScheduler : public Scheduler { + public: + explicit TestScheduler(experimental::EventEngine* engine) : engine_(engine) {} + void Run(experimental::EventEngine::Closure* closure) override { + engine_->Run(closure); + } + + private: + experimental::EventEngine* engine_; +}; + +// Create a test socket with the right properties for testing. +// port is the TCP port to listen or connect to. +// Return a socket FD and sockaddr_in. +void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) { + int fd; + int one = 1; + int buffer_size_bytes = BUF_SIZE; + int flags; + + fd = socket(AF_INET6, SOCK_STREAM, 0); + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + // Reset the size of socket send buffer to the minimal value to facilitate + // buffer filling up and triggering notify_on_write + EXPECT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE); + EXPECT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), GRPC_ERROR_NONE); + // Make fd non-blocking. + flags = fcntl(fd, F_GETFL, 0); + EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0); + *socket_fd = fd; + + // Use local address for test. + memset(sin, 0, sizeof(struct sockaddr_in6)); + sin->sin6_family = AF_INET6; + (reinterpret_cast(&sin->sin6_addr))[15] = 1; + EXPECT_TRUE(port >= 0 && port < 65536); + sin->sin6_port = htons(static_cast(port)); +} + +// =======An upload server to test notify_on_read=========== +// The server simply reads and counts a stream of bytes. + +// An upload server. +typedef struct { + EventHandle* em_fd; /* listening fd */ + ssize_t read_bytes_total; /* total number of received bytes */ + int done; /* set to 1 when a server finishes serving */ + IomgrEngineClosure* listen_closure; +} server; + +void ServerInit(server* sv) { + sv->read_bytes_total = 0; + sv->done = 0; +} + +// An upload session. +// Created when a new upload request arrives in the server. +typedef struct { + server* sv; /* not owned by a single session */ + EventHandle* em_fd; /* fd to read upload bytes */ + char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ + IomgrEngineClosure* session_read_closure; +} session; + +// Called when an upload session can be safely shutdown. +// Close session FD and start to shutdown listen FD. +void SessionShutdownCb(session* se, bool /*success*/) { + server* sv = se->sv; + se->em_fd->OrphanHandle(nullptr, nullptr, "a"); + gpr_free(se); + // Start to shutdown listen fd. + sv->em_fd->ShutdownHandle( + absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb")); +} + +// Called when data become readable in a session. +void SessionReadCb(session* se, absl::Status status) { + int fd = se->em_fd->WrappedFd(); + + ssize_t read_once = 0; + ssize_t read_total = 0; + + if (!status.ok()) { + SessionShutdownCb(se, true); + return; + } + + do { + read_once = read(fd, se->read_buf, BUF_SIZE); + if (read_once > 0) read_total += read_once; + } while (read_once > 0); + se->sv->read_bytes_total += read_total; + + // read() returns 0 to indicate the TCP connection was closed by the + // client read(fd, read_buf, 0) also returns 0 which should never be called as + // such. It is possible to read nothing due to spurious edge event or data has + // been drained, In such a case, read() returns -1 and set errno to + // EAGAIN. + if (read_once == 0) { + SessionShutdownCb(se, true); + } else if (read_once == -1) { + EXPECT_EQ(errno, EAGAIN); + // An edge triggered event is cached in the kernel until next poll. + // In the current single thread implementation, SessionReadCb is called + // in the polling thread, such that polling only happens after this + // callback, and will catch read edge event if data is available again + // before notify_on_read. + se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure( + [se](absl::Status status) { SessionReadCb(se, status); }); + se->em_fd->NotifyOnRead(se->session_read_closure); + } +} + +// Called when the listen FD can be safely shutdown. Close listen FD and +// signal that server can be shutdown. +void ListenShutdownCb(server* sv) { + sv->em_fd->OrphanHandle(nullptr, nullptr, "b"); + gpr_mu_lock(&g_mu); + sv->done = 1; + g_event_poller->Kick(); + gpr_mu_unlock(&g_mu); +} + +// Called when a new TCP connection request arrives in the listening port. +void ListenCb(server* sv, absl::Status status) { + int fd; + int flags; + session* se; + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + EventHandle* listen_em_fd = sv->em_fd; + + if (!status.ok()) { + ListenShutdownCb(sv); + return; + } + + do { + fd = accept(listen_em_fd->WrappedFd(), + reinterpret_cast(&ss), &slen); + } while (fd < 0 && errno == EINTR); + if (fd < 0 && errno == EAGAIN) { + sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + [sv](absl::Status status) { ListenCb(sv, status); }); + listen_em_fd->NotifyOnRead(sv->listen_closure); + return; + } + EXPECT_GE(fd, 0); + EXPECT_LT(fd, FD_SETSIZE); + flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + se = static_cast(gpr_malloc(sizeof(*se))); + se->sv = sv; + se->em_fd = g_event_poller->CreateHandle(fd, "listener", false); + se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure( + [se](absl::Status status) { SessionReadCb(se, status); }); + se->em_fd->NotifyOnRead(se->session_read_closure); + sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + [sv](absl::Status status) { ListenCb(sv, status); }); + listen_em_fd->NotifyOnRead(sv->listen_closure); +} + +// Start a test server, return the TCP listening port bound to listen_fd. +// ListenCb() is registered to be interested in reading from listen_fd. +// When connection request arrives, ListenCb() is called to accept the +// connection request. +int ServerStart(server* sv) { + int port = grpc_pick_unused_port_or_die(); + int fd; + struct sockaddr_in6 sin; + socklen_t addr_len; + + CreateTestSocket(port, &fd, &sin); + addr_len = sizeof(sin); + EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0); + EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0); + port = ntohs(sin.sin6_port); + EXPECT_EQ(listen(fd, MAX_NUM_FD), 0); + + sv->em_fd = g_event_poller->CreateHandle(fd, "server", false); + sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure( + [sv](absl::Status status) { ListenCb(sv, status); }); + sv->em_fd->NotifyOnRead(sv->listen_closure); + return port; +} + +// ===An upload client to test notify_on_write=== + +// An upload client. +typedef struct { + EventHandle* em_fd; + char write_buf[CLIENT_WRITE_BUF_SIZE]; + ssize_t write_bytes_total; + // Number of times that the client fills up the write buffer and calls + // notify_on_write to schedule another write. + int client_write_cnt; + int done; + IomgrEngineClosure* write_closure; +} client; + +void ClientInit(client* cl) { + memset(cl->write_buf, 0, sizeof(cl->write_buf)); + cl->write_bytes_total = 0; + cl->client_write_cnt = 0; + cl->done = 0; +} + +// Called when a client upload session is ready to shutdown. +void ClientSessionShutdownCb(client* cl) { + cl->em_fd->OrphanHandle(nullptr, nullptr, "c"); + gpr_mu_lock(&g_mu); + cl->done = 1; + g_event_poller->Kick(); + gpr_mu_unlock(&g_mu); +} + +// Write as much as possible, then register notify_on_write. +void ClientSessionWrite(client* cl, absl::Status status) { + int fd = cl->em_fd->WrappedFd(); + ssize_t write_once = 0; + + if (!status.ok()) { + ClientSessionShutdownCb(cl); + return; + } + + do { + write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE); + if (write_once > 0) cl->write_bytes_total += write_once; + } while (write_once > 0); + + EXPECT_EQ(errno, EAGAIN); + gpr_mu_lock(&g_mu); + if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { + cl->write_closure = IomgrEngineClosure::TestOnlyToClosure( + [cl](absl::Status status) { ClientSessionWrite(cl, status); }); + cl->client_write_cnt++; + gpr_mu_unlock(&g_mu); + cl->em_fd->NotifyOnWrite(cl->write_closure); + } else { + gpr_mu_unlock(&g_mu); + ClientSessionShutdownCb(cl); + } +} + +// Start a client to send a stream of bytes. +void ClientStart(client* cl, int port) { + int fd; + struct sockaddr_in6 sin; + CreateTestSocket(port, &fd, &sin); + if (connect(fd, reinterpret_cast(&sin), sizeof(sin)) == + -1) { + if (errno == EINPROGRESS) { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLOUT; + pfd.revents = 0; + if (poll(&pfd, 1, -1) == -1) { + gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno); + abort(); + } + } else { + gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno); + abort(); + } + } + + cl->em_fd = g_event_poller->CreateHandle(fd, "client", false); + ClientSessionWrite(cl, absl::OkStatus()); +} + +// Wait for the signal to shutdown client and server. +void WaitAndShutdown(server* sv, client* cl) { + std::vector pending_events; + gpr_mu_lock(&g_mu); + while (!sv->done || !cl->done) { + gpr_mu_unlock(&g_mu); + (void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(), + pending_events); + for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { + (*it)->ExecutePendingActions(); + } + pending_events.clear(); + gpr_mu_lock(&g_mu); + } + gpr_mu_unlock(&g_mu); +} + +// Test grpc_fd. Start an upload server and client, upload a stream of bytes +// from the client to the server, and verify that the total number of sent bytes +// is equal to the total number of received bytes. +TEST(EventPollerTest, TestEventPollerHandle) { + server sv; + client cl; + int port; + + ServerInit(&sv); + port = ServerStart(&sv); + ClientInit(&cl); + ClientStart(&cl, port); + + WaitAndShutdown(&sv, &cl); + EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total); +} + +typedef struct FdChangeData { + void (*cb_that_ran)(struct FdChangeData*, absl::Status); +} FdChangeData; + +void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; } + +void DestroyChangeData(FdChangeData* /*fdc*/) {} + +void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) { + gpr_mu_lock(&g_mu); + fdc->cb_that_ran = FirstReadCallback; + g_event_poller->Kick(); + gpr_mu_unlock(&g_mu); +} + +void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) { + gpr_mu_lock(&g_mu); + fdc->cb_that_ran = SecondReadCallback; + g_event_poller->Kick(); + gpr_mu_unlock(&g_mu); +} + +// Test that changing the callback we use for notify_on_read actually works. +// Note that we have two different but almost identical callbacks above -- the +// point is to have two different function pointers and two different data +// pointers and make sure that changing both really works. +TEST(EventPollerTest, TestEventPollerHandleChange) { + EventHandle* em_fd; + FdChangeData a, b; + int flags; + int sv[2]; + char data; + ssize_t result; + IomgrEngineClosure* first_closure = IomgrEngineClosure::TestOnlyToClosure( + [a = &a](absl::Status status) { FirstReadCallback(a, status); }); + IomgrEngineClosure* second_closure = IomgrEngineClosure::TestOnlyToClosure( + [b = &b](absl::Status status) { SecondReadCallback(b, status); }); + InitChangeData(&a); + InitChangeData(&b); + + EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0); + flags = fcntl(sv[0], F_GETFL, 0); + EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0); + flags = fcntl(sv[1], F_GETFL, 0); + EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0); + + em_fd = + g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false); + EXPECT_NE(em_fd, nullptr); + // Register the first callback, then make its FD readable + em_fd->NotifyOnRead(first_closure); + data = 0; + result = write(sv[1], &data, 1); + EXPECT_EQ(result, 1); + + // And now wait for it to run. + auto poller_work = [](FdChangeData* fdc) { + std::vector pending_events; + gpr_mu_lock(&g_mu); + while (fdc->cb_that_ran == nullptr) { + gpr_mu_unlock(&g_mu); + (void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(), + pending_events); + for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { + (*it)->ExecutePendingActions(); + } + pending_events.clear(); + gpr_mu_lock(&g_mu); + } + }; + poller_work(&a); + EXPECT_EQ(a.cb_that_ran, FirstReadCallback); + gpr_mu_unlock(&g_mu); + + // And drain the socket so we can generate a new read edge + result = read(sv[0], &data, 1); + EXPECT_EQ(result, 1); + + // Now register a second callback with distinct change data, and do the same + // thing again. + em_fd->NotifyOnRead(second_closure); + data = 0; + result = write(sv[1], &data, 1); + EXPECT_EQ(result, 1); + + // And now wait for it to run. + poller_work(&b); + // Except now we verify that SecondReadCallback ran instead. + EXPECT_EQ(b.cb_that_ran, SecondReadCallback); + gpr_mu_unlock(&g_mu); + + em_fd->OrphanHandle(nullptr, nullptr, "d"); + DestroyChangeData(&a); + DestroyChangeData(&b); + close(sv[1]); +} + +} // namespace +} // namespace iomgr_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + auto engine = + absl::make_unique(); + EXPECT_NE(engine, nullptr); + grpc_event_engine::iomgr_engine::TestScheduler scheduler(engine.get()); + g_event_poller = + grpc_event_engine::iomgr_engine::GetDefaultPoller(&scheduler); + if (g_event_poller == nullptr) { + // Poller is not supported on this system. + return 0; + } + int result = RUN_ALL_TESTS(); + g_event_poller->Shutdown(); + return result; +} + +#else /* GRPC_POSIX_SOCKET_EV */ + +int main(int argc, char** argv) { return 1; } + +#endif /* GRPC_POSIX_SOCKET_EV */ diff --git a/test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc b/test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc new file mode 100644 index 00000000000..5e0e5152bc0 --- /dev/null +++ b/test/core/event_engine/iomgr_event_engine/lock_free_event_test.cc @@ -0,0 +1,153 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include +#include + +#include "src/core/lib/event_engine/event_engine_factory.h" +#include "src/core/lib/event_engine/iomgr_engine/event_poller.h" +#include "src/core/lib/event_engine/iomgr_engine/iomgr_engine_closure.h" +#include "src/core/lib/event_engine/iomgr_engine/lockfree_event.h" +#include "src/core/lib/gprpp/sync.h" + +using ::grpc_event_engine::iomgr_engine::Scheduler; + +namespace { +class TestScheduler : public Scheduler { + public: + explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine) + : engine_(engine) {} + void Run( + grpc_event_engine::experimental::EventEngine::Closure* closure) override { + engine_->Run(closure); + } + + private: + grpc_event_engine::experimental::EventEngine* engine_; +}; + +TestScheduler* g_scheduler; + +} // namespace + +namespace grpc_event_engine { +namespace iomgr_engine { + +TEST(LockFreeEventTest, BasicTest) { + LockfreeEvent event(g_scheduler); + grpc_core::Mutex mu; + grpc_core::CondVar cv; + event.InitEvent(); + grpc_core::MutexLock lock(&mu); + // Set NotifyOn first and then SetReady + event.NotifyOn( + IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + grpc_core::MutexLock lock(&mu); + EXPECT_TRUE(status.ok()); + cv.Signal(); + })); + event.SetReady(); + EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10))); + + // SetReady first first and then call NotifyOn + event.SetReady(); + event.NotifyOn( + IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + grpc_core::MutexLock lock(&mu); + EXPECT_TRUE(status.ok()); + cv.Signal(); + })); + EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10))); + + // Set NotifyOn and then call SetShutdown + event.NotifyOn( + IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) { + grpc_core::MutexLock lock(&mu); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status, absl::CancelledError("Shutdown")); + cv.Signal(); + })); + event.SetShutdown(absl::CancelledError("Shutdown")); + EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10))); + event.DestroyEvent(); +} + +TEST(LockFreeEventTest, MultiThreadedTest) { + std::vector threads; + LockfreeEvent event(g_scheduler); + grpc_core::Mutex mu; + grpc_core::CondVar cv; + bool signalled = false; + int active = 0; + static constexpr int kNumOperations = 100; + threads.reserve(2); + event.InitEvent(); + // Spin up two threads alternating between NotifyOn and SetReady + for (int i = 0; i < 2; i++) { + threads.emplace_back([&, thread_id = i]() { + for (int j = 0; j < kNumOperations; j++) { + grpc_core::MutexLock lock(&mu); + // Wait for both threads to process the previous operation before + // starting the next one. + while (signalled) { + cv.Wait(&mu); + } + active++; + if (thread_id == 0) { + event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure( + [&mu, &cv, &signalled](absl::Status status) { + grpc_core::MutexLock lock(&mu); + EXPECT_TRUE(status.ok()); + signalled = true; + cv.SignalAll(); + })); + } else { + event.SetReady(); + } + while (!signalled) { + cv.Wait(&mu); + } + // The last thread to finish the current operation sets signalled to + // false and wakes up the other thread if its blocked waiting to + // start the next operation. + if (--active == 0) { + signalled = false; + cv.Signal(); + } + } + }); + } + for (auto& t : threads) { + t.join(); + } + event.SetShutdown(absl::OkStatus()); + event.DestroyEvent(); +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_event_engine::experimental::EventEngine* engine = + grpc_event_engine::experimental::GetDefaultEventEngine(); + EXPECT_NE(engine, nullptr); + g_scheduler = new TestScheduler(engine); + return RUN_ALL_TESTS(); +} diff --git a/test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc b/test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc new file mode 100644 index 00000000000..57a1e1b5b2f --- /dev/null +++ b/test/core/event_engine/iomgr_event_engine/wakeup_fd_posix_test.cc @@ -0,0 +1,61 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_posix.h" + +#include +#include + +#include + +#include +#include + +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_eventfd.h" +#include "src/core/lib/event_engine/iomgr_engine/wakeup_fd_pipe.h" + +namespace grpc_event_engine { +namespace iomgr_engine { + +TEST(WakeupFdPosixTest, PipeWakeupFdTest) { + if (!PipeWakeupFd::IsSupported()) { + return; + } + auto pipe_wakeup_fd = PipeWakeupFd::CreatePipeWakeupFd(); + EXPECT_TRUE(pipe_wakeup_fd.ok()); + EXPECT_GE((*pipe_wakeup_fd)->ReadFd(), 0); + EXPECT_GE((*pipe_wakeup_fd)->WriteFd(), 0); + EXPECT_TRUE((*pipe_wakeup_fd)->Wakeup().ok()); + EXPECT_TRUE((*pipe_wakeup_fd)->ConsumeWakeup().ok()); +} + +TEST(WakeupFdPosixTest, EventFdWakeupFdTest) { + if (!EventFdWakeupFd::IsSupported()) { + return; + } + auto eventfd_wakeup_fd = EventFdWakeupFd::CreateEventFdWakeupFd(); + EXPECT_TRUE(eventfd_wakeup_fd.ok()); + EXPECT_GE((*eventfd_wakeup_fd)->ReadFd(), 0); + EXPECT_EQ((*eventfd_wakeup_fd)->WriteFd(), -1); + EXPECT_TRUE((*eventfd_wakeup_fd)->Wakeup().ok()); + EXPECT_TRUE((*eventfd_wakeup_fd)->ConsumeWakeup().ok()); +} + +} // namespace iomgr_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 8b1e5a914f4..bef7dea0ce3 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2935,6 +2935,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "event_poller_posix_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -4427,6 +4449,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "lock_free_event_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -7517,6 +7563,28 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "wakeup_fd_posix_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,