diff --git a/BUILD b/BUILD index 58206feeb03..e87e2e81c85 100644 --- a/BUILD +++ b/BUILD @@ -1170,7 +1170,6 @@ grpc_cc_library( "//src/core:lib/compression/compression.cc", "//src/core:lib/compression/compression_internal.cc", "//src/core:lib/compression/message_compress.cc", - "//src/core:lib/event_engine/channel_args_endpoint_config.cc", "//src/core:lib/iomgr/buffer_list.cc", "//src/core:lib/iomgr/call_combiner.cc", "//src/core:lib/iomgr/cfstream_handle.cc", @@ -1274,7 +1273,6 @@ grpc_cc_library( "//src/core:lib/channel/status_util.h", "//src/core:lib/compression/compression_internal.h", "//src/core:lib/compression/message_compress.h", - "//src/core:lib/event_engine/channel_args_endpoint_config.h", "//src/core:lib/iomgr/block_annotate.h", "//src/core:lib/iomgr/buffer_list.h", "//src/core:lib/iomgr/call_combiner.h", @@ -1388,6 +1386,7 @@ grpc_cc_library( "//src/core:atomic_utils", "//src/core:bitset", "//src/core:channel_args", + "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", "//src/core:channel_fwd", "//src/core:channel_init", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f38d2afb0b..a60ff6848b7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2153,6 +2153,7 @@ add_library(grpc src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc src/core/lib/event_engine/windows/win_socket.cc + src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc @@ -2822,6 +2823,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc src/core/lib/event_engine/windows/win_socket.cc + src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc @@ -4308,6 +4310,7 @@ add_library(grpc_authorization_provider src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc src/core/lib/event_engine/windows/win_socket.cc + src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc @@ -9926,6 +9929,10 @@ endif() if(gRPC_BUILD_TESTS) add_executable(endpoint_config_test + src/core/lib/channel/channel_args.cc + src/core/lib/event_engine/channel_args_endpoint_config.cc + src/core/lib/gprpp/time.cc + src/core/lib/surface/channel_stack_type.cc test/core/event_engine/endpoint_config_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -9955,7 +9962,10 @@ target_link_libraries(endpoint_config_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} - grpc + absl::any_invocable + absl::type_traits + absl::statusor + gpr ) @@ -11270,6 +11280,7 @@ add_executable(frame_test src/core/lib/event_engine/utils.cc src/core/lib/event_engine/windows/iocp.cc src/core/lib/event_engine/windows/win_socket.cc + src/core/lib/event_engine/windows/windows_endpoint.cc src/core/lib/event_engine/windows/windows_engine.cc src/core/lib/experiments/config.cc src/core/lib/experiments/experiments.cc @@ -21791,7 +21802,6 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS) add_executable(windows_endpoint_test - src/core/lib/event_engine/windows/windows_endpoint.cc test/core/event_engine/windows/create_sockpair.cc test/core/event_engine/windows/windows_endpoint_test.cc third_party/googletest/googletest/src/gtest-all.cc diff --git a/Makefile b/Makefile index 446c37a5fa7..95b48a4ed49 100644 --- a/Makefile +++ b/Makefile @@ -1428,6 +1428,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ src/core/lib/event_engine/windows/win_socket.cc \ + src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ @@ -1956,6 +1957,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ src/core/lib/event_engine/windows/win_socket.cc \ + src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b3e03fe744a..643154e0cf2 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -786,6 +786,7 @@ libs: - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h - src/core/lib/event_engine/windows/win_socket.h + - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h @@ -1529,6 +1530,7 @@ libs: - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc - src/core/lib/event_engine/windows/win_socket.cc + - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc @@ -2077,6 +2079,7 @@ libs: - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h - src/core/lib/event_engine/windows/win_socket.h + - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h @@ -2456,6 +2459,7 @@ libs: - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc - src/core/lib/event_engine/windows/win_socket.cc + - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc @@ -3512,6 +3516,7 @@ libs: - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h - src/core/lib/event_engine/windows/win_socket.h + - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h @@ -3771,6 +3776,7 @@ libs: - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc - src/core/lib/event_engine/windows/win_socket.cc + - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc @@ -6373,11 +6379,31 @@ targets: gtest: true build: test language: c++ - headers: [] + headers: + - src/core/lib/avl/avl.h + - src/core/lib/channel/channel_args.h + - src/core/lib/event_engine/channel_args_endpoint_config.h + - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/debug_location.h + - src/core/lib/gprpp/dual_ref_counted.h + - src/core/lib/gprpp/match.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/overload.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h + - src/core/lib/gprpp/time.h + - src/core/lib/surface/channel_stack_type.h src: + - src/core/lib/channel/channel_args.cc + - src/core/lib/event_engine/channel_args_endpoint_config.cc + - src/core/lib/gprpp/time.cc + - src/core/lib/surface/channel_stack_type.cc - test/core/event_engine/endpoint_config_test.cc deps: - - grpc + - absl/functional:any_invocable + - absl/meta:type_traits + - absl/status:statusor + - gpr uses_polling: false - name: endpoint_pair_test gtest: true @@ -7226,6 +7252,7 @@ targets: - src/core/lib/event_engine/utils.h - src/core/lib/event_engine/windows/iocp.h - src/core/lib/event_engine/windows/win_socket.h + - src/core/lib/event_engine/windows/windows_endpoint.h - src/core/lib/event_engine/windows/windows_engine.h - src/core/lib/experiments/config.h - src/core/lib/experiments/experiments.h @@ -7467,6 +7494,7 @@ targets: - src/core/lib/event_engine/utils.cc - src/core/lib/event_engine/windows/iocp.cc - src/core/lib/event_engine/windows/win_socket.cc + - src/core/lib/event_engine/windows/windows_endpoint.cc - src/core/lib/event_engine/windows/windows_engine.cc - src/core/lib/experiments/config.cc - src/core/lib/experiments/experiments.cc @@ -12186,10 +12214,8 @@ targets: build: test language: c++ headers: - - src/core/lib/event_engine/windows/windows_endpoint.h - test/core/event_engine/windows/create_sockpair.h src: - - src/core/lib/event_engine/windows/windows_endpoint.cc - test/core/event_engine/windows/create_sockpair.cc - test/core/event_engine/windows/windows_endpoint_test.cc deps: diff --git a/config.m4 b/config.m4 index db476c3bb39..267eb0bf676 100644 --- a/config.m4 +++ b/config.m4 @@ -511,6 +511,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/utils.cc \ src/core/lib/event_engine/windows/iocp.cc \ src/core/lib/event_engine/windows/win_socket.cc \ + src/core/lib/event_engine/windows/windows_endpoint.cc \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/experiments/config.cc \ src/core/lib/experiments/experiments.cc \ diff --git a/config.w32 b/config.w32 index 3edbcb8466d..8dc7b220d7a 100644 --- a/config.w32 +++ b/config.w32 @@ -477,6 +477,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\utils.cc " + "src\\core\\lib\\event_engine\\windows\\iocp.cc " + "src\\core\\lib\\event_engine\\windows\\win_socket.cc " + + "src\\core\\lib\\event_engine\\windows\\windows_endpoint.cc " + "src\\core\\lib\\event_engine\\windows\\windows_engine.cc " + "src\\core\\lib\\experiments\\config.cc " + "src\\core\\lib\\experiments\\experiments.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 997576ae10d..8a7b315eaf5 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -739,6 +739,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', 'src/core/lib/event_engine/windows/win_socket.h', + 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', @@ -1632,6 +1633,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', 'src/core/lib/event_engine/windows/win_socket.h', + 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index a9f45eb5afa..81adb310660 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1140,6 +1140,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/windows/iocp.h', 'src/core/lib/event_engine/windows/win_socket.cc', 'src/core/lib/event_engine/windows/win_socket.h', + 'src/core/lib/event_engine/windows/windows_endpoint.cc', + 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/event_engine/windows/windows_engine.h', 'src/core/lib/experiments/config.cc', @@ -2283,6 +2285,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/utils.h', 'src/core/lib/event_engine/windows/iocp.h', 'src/core/lib/event_engine/windows/win_socket.h', + 'src/core/lib/event_engine/windows/windows_endpoint.h', 'src/core/lib/event_engine/windows/windows_engine.h', 'src/core/lib/experiments/config.h', 'src/core/lib/experiments/experiments.h', diff --git a/grpc.gemspec b/grpc.gemspec index bd46521e276..a82c45bfa96 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1051,6 +1051,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/windows/iocp.h ) s.files += %w( src/core/lib/event_engine/windows/win_socket.cc ) s.files += %w( src/core/lib/event_engine/windows/win_socket.h ) + s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.cc ) + s.files += %w( src/core/lib/event_engine/windows/windows_endpoint.h ) s.files += %w( src/core/lib/event_engine/windows/windows_engine.cc ) s.files += %w( src/core/lib/event_engine/windows/windows_engine.h ) s.files += %w( src/core/lib/experiments/config.cc ) diff --git a/grpc.gyp b/grpc.gyp index 2ba5cda5dbf..23d5df2bb3c 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -841,6 +841,7 @@ 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', 'src/core/lib/event_engine/windows/win_socket.cc', + 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', @@ -1311,6 +1312,7 @@ 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', 'src/core/lib/event_engine/windows/win_socket.cc', + 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', @@ -1808,6 +1810,7 @@ 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', 'src/core/lib/event_engine/windows/win_socket.cc', + 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', diff --git a/package.xml b/package.xml index 87c80c7da69..52b63520658 100644 --- a/package.xml +++ b/package.xml @@ -1033,6 +1033,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 2f16d6dfc35..58f74e7246d 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -908,6 +908,7 @@ grpc_cc_library( ], deps = [ "channel_args", + "channel_args_endpoint_config", "closure", "error", "handshaker_factory", @@ -1862,13 +1863,19 @@ grpc_cc_library( "absl/strings", ], deps = [ + "channel_args_endpoint_config", + "common_event_engine_closures", + "error", "event_engine_common", + "event_engine_executor", + "event_engine_tcp_socket_utils", "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", "init_internally", "posix_event_engine_timer_manager", "time", + "windows_endpoint", "windows_iocp", "//:event_engine_base_hdrs", "//:gpr", @@ -1991,6 +1998,26 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "channel_args_endpoint_config", + srcs = [ + "//src/core:lib/event_engine/channel_args_endpoint_config.cc", + ], + hdrs = [ + "//src/core:lib/event_engine/channel_args_endpoint_config.h", + ], + external_deps = [ + "absl/strings", + "absl/types:optional", + ], + visibility = ["@grpc:alt_grpc_base_legacy"], + deps = [ + "channel_args", + "//:event_engine_base_hdrs", + "//:gpr_platform", + ], +) + grpc_cc_library( name = "default_event_engine", srcs = [ @@ -4855,6 +4882,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "channel_args_endpoint_config", "channel_args_preconditioning", "channel_stack_type", "closure", @@ -4904,6 +4932,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "channel_args_endpoint_config", "closure", "error", "grpc_insecure_credentials", diff --git a/src/core/lib/event_engine/handle_containers.h b/src/core/lib/event_engine/handle_containers.h index a1739ef3c08..e966ca47457 100644 --- a/src/core/lib/event_engine/handle_containers.h +++ b/src/core/lib/event_engine/handle_containers.h @@ -54,6 +54,13 @@ using TaskHandleSet = absl::flat_hash_set< TaskHandleComparator< grpc_event_engine::experimental::EventEngine::TaskHandle>::Eq>; +using ConnectionHandleSet = absl::flat_hash_set< + grpc_event_engine::experimental::EventEngine::ConnectionHandle, + TaskHandleComparator< + grpc_event_engine::experimental::EventEngine::ConnectionHandle>::Hash, + TaskHandleComparator< + grpc_event_engine::experimental::EventEngine::ConnectionHandle>::Eq>; + using LookupTaskHandleSet = absl::flat_hash_set< grpc_event_engine::experimental::EventEngine::DNSResolver::LookupTaskHandle, TaskHandleComparatorsin6_addr.s6_addr[12], &addr4->sin_addr, 4); addr6_out->sin6_port = addr4->sin_port; *resolved_addr6_out = EventEngine::ResolvedAddress( - reinterpret_cast(addr6_out), - static_cast(sizeof(sockaddr_in6))); + reinterpret_cast(addr6_out), sizeof(sockaddr_in6)); return true; } return false; diff --git a/src/core/lib/event_engine/utils.cc b/src/core/lib/event_engine/utils.cc index 9769faeeb8b..0c9dddf97df 100644 --- a/src/core/lib/event_engine/utils.cc +++ b/src/core/lib/event_engine/utils.cc @@ -28,8 +28,8 @@ namespace grpc_event_engine { namespace experimental { -std::string HandleToString(EventEngine::TaskHandle handle) { - return absl::StrCat("{", handle.keys[0], ",", handle.keys[1], "}"); +std::string HandleToStringInternal(uintptr_t a, uintptr_t b) { + return absl::StrCat("{", a, ",", b, "}"); } grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, diff --git a/src/core/lib/event_engine/utils.h b/src/core/lib/event_engine/utils.h index 768d16ceea9..04068d1d215 100644 --- a/src/core/lib/event_engine/utils.h +++ b/src/core/lib/event_engine/utils.h @@ -16,6 +16,8 @@ #include +#include + #include #include @@ -25,7 +27,13 @@ namespace grpc_event_engine { namespace experimental { -std::string HandleToString(EventEngine::TaskHandle handle); +std::string HandleToStringInternal(uintptr_t a, uintptr_t b); + +// Returns a string representation of the EventEngine::*Handle types +template +std::string HandleToString(const Handle& handle) { + return HandleToStringInternal(handle.keys[0], handle.keys[1]); +} grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, EventEngine::Duration delta); @@ -33,4 +41,4 @@ grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now, } // namespace experimental } // namespace grpc_event_engine -#endif // GRPC_CORE_LIB_EVENT_ENGINE_UTILS_H \ No newline at end of file +#endif // GRPC_CORE_LIB_EVENT_ENGINE_UTILS_H diff --git a/src/core/lib/event_engine/windows/iocp.cc b/src/core/lib/event_engine/windows/iocp.cc index 23fe4969eb0..dbfbbc56b2c 100644 --- a/src/core/lib/event_engine/windows/iocp.cc +++ b/src/core/lib/event_engine/windows/iocp.cc @@ -32,8 +32,8 @@ namespace experimental { IOCP::IOCP(Executor* executor) noexcept : executor_(executor), - iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, - (ULONG_PTR)NULL, 0)) { + iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, + (ULONG_PTR) nullptr, 0)) { GPR_ASSERT(iocp_handle_); WSASocketFlagsInit(); } @@ -58,6 +58,10 @@ std::unique_ptr IOCP::Watch(SOCKET socket) { } void IOCP::Shutdown() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { + gpr_log(GPR_DEBUG, "IOCP::%p shutting down. Outstanding kicks: %d", this, + outstanding_kicks_.load()); + } while (outstanding_kicks_.load() > 0) { Work(std::chrono::hours(42), []() {}); } @@ -66,10 +70,6 @@ void IOCP::Shutdown() { Poller::WorkResult IOCP::Work(EventEngine::Duration timeout, absl::FunctionRef schedule_poll_again) { - static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError( - absl::StrFormat("IOCP::%p: Received no completions", this)); - static const absl::Status kKicked = - absl::AbortedError(absl::StrFormat("IOCP::%p: Awoken from a kick", this)); DWORD bytes = 0; ULONG_PTR completion_key; LPOVERLAPPED overlapped; @@ -79,7 +79,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout, BOOL success = GetQueuedCompletionStatus( iocp_handle_, &bytes, &completion_key, &overlapped, static_cast(Milliseconds(timeout))); - if (success == 0 && overlapped == NULL) { + if (success == 0 && overlapped == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this); } @@ -139,7 +139,7 @@ DWORD IOCP::WSASocketFlagsInit() { // versions, see // https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx // for details. - SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT); if (sock != INVALID_SOCKET) { // Windows 7, Windows 2008 R2 with SP1 or later diff --git a/src/core/lib/event_engine/windows/win_socket.cc b/src/core/lib/event_engine/windows/win_socket.cc index 4ae64aaf41a..c6a9f3c7423 100644 --- a/src/core/lib/event_engine/windows/win_socket.cc +++ b/src/core/lib/event_engine/windows/win_socket.cc @@ -98,7 +98,9 @@ void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) { } WinSocket::OpState::OpState(WinSocket* win_socket) noexcept - : win_socket_(win_socket), closure_(nullptr) {} + : win_socket_(win_socket), closure_(nullptr) { + memset(&overlapped_, 0, sizeof(OVERLAPPED)); +} void WinSocket::OpState::SetReady() { GPR_ASSERT(!has_pending_iocp_); diff --git a/src/core/lib/event_engine/windows/windows_endpoint.cc b/src/core/lib/event_engine/windows/windows_endpoint.cc index c77b925c96f..9afb25e4d0e 100644 --- a/src/core/lib/event_engine/windows/windows_endpoint.cc +++ b/src/core/lib/event_engine/windows/windows_endpoint.cc @@ -60,13 +60,17 @@ WindowsEndpoint::WindowsEndpoint( handle_read_event_(this), handle_write_event_(this), executor_(executor) { - sockaddr addr; + char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES]; int addr_len = sizeof(addr); - if (getsockname(socket_->socket(), &addr, &addr_len) < 0) { - gpr_log(GPR_ERROR, "Unrecoverable error: Failed to get local socket name."); + if (getsockname(socket_->socket(), reinterpret_cast(addr), + &addr_len) < 0) { + gpr_log( + GPR_ERROR, "Unrecoverable error: Failed to get local socket name. %s", + GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()); abort(); } - local_address_ = EventEngine::ResolvedAddress(&addr, addr_len); + local_address_ = + EventEngine::ResolvedAddress(reinterpret_cast(addr), addr_len); local_address_string_ = *ResolvedAddressToURI(local_address_); peer_address_string_ = *ResolvedAddressToURI(peer_address_); } diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 28b648848c3..6772fe486f8 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -26,33 +26,67 @@ #include #include +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/common_closures.h" +#include "src/core/lib/event_engine/executor/executor.h" #include "src/core/lib/event_engine/handle_containers.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/utils.h" #include "src/core/lib/event_engine/windows/iocp.h" +#include "src/core/lib/event_engine/windows/windows_endpoint.h" #include "src/core/lib/event_engine/windows/windows_engine.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/error.h" namespace grpc_event_engine { namespace experimental { +// ---- IOCPWorkClosure ---- + +WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(Executor* executor, + IOCP* iocp) + : executor_(executor), iocp_(iocp) { + executor_->Run(this); +} + +void WindowsEventEngine::IOCPWorkClosure::Run() { + auto result = iocp_->Work(std::chrono::seconds(60), [this] { + workers_.fetch_add(1); + executor_->Run(this); + }); + if (result == Poller::WorkResult::kDeadlineExceeded) { + // iocp received no messages. restart the worker + workers_.fetch_add(1); + executor_->Run(this); + } + if (workers_.fetch_sub(1) == 1) done_signal_.Notify(); +} + +void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() { + done_signal_.WaitForNotification(); +} + +// ---- WindowsEventEngine ---- + // TODO(hork): The iomgr timer and execution engine can be reused. It should // be separated out from the posix_engine and instantiated as components. It is // effectively copied below. -struct WindowsEventEngine::Closure final : public EventEngine::Closure { +struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure { absl::AnyInvocable cb; Timer timer; WindowsEventEngine* engine; EventEngine::TaskHandle handle; void Run() override { - GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p executing callback:%s", - engine, HandleToString(handle).c_str()); + GRPC_EVENT_ENGINE_TRACE( + "WindowsEventEngine:%p executing callback:%s", engine, + HandleToString(handle).c_str()); { - grpc_core::MutexLock lock(&engine->mu_); + grpc_core::MutexLock lock(&engine->task_mu_); engine->known_handles_.erase(handle); } cb(); @@ -63,7 +97,8 @@ struct WindowsEventEngine::Closure final : public EventEngine::Closure { WindowsEventEngine::WindowsEventEngine() : executor_(std::make_shared()), iocp_(executor_.get()), - timer_manager_(executor_) { + timer_manager_(executor_), + iocp_worker_(executor_.get(), &iocp_) { WSADATA wsaData; int status = WSAStartup(MAKEWORD(2, 0), &wsaData); GPR_ASSERT(status == 0); @@ -71,25 +106,28 @@ WindowsEventEngine::WindowsEventEngine() WindowsEventEngine::~WindowsEventEngine() { { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { for (auto handle : known_handles_) { gpr_log(GPR_ERROR, "WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s", - this, HandleToString(handle).c_str()); + this, HandleToString(handle).c_str()); } } GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); - GPR_ASSERT(WSACleanup() == 0); - timer_manager_.Shutdown(); } + iocp_.Kick(); + iocp_worker_.WaitForShutdown(); + iocp_.Shutdown(); + GPR_ASSERT(WSACleanup() == 0); + timer_manager_.Shutdown(); executor_->Quiesce(); } bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (!known_handles_.contains(handle)) return false; - auto* cd = reinterpret_cast(handle.keys[0]); + auto* cd = reinterpret_cast(handle.keys[0]); bool r = timer_manager_.TimerCancel(&cd->timer); known_handles_.erase(handle); if (r) delete cd; @@ -117,16 +155,17 @@ void WindowsEventEngine::Run(EventEngine::Closure* closure) { EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal( Duration when, absl::AnyInvocable cb) { auto when_ts = ToTimestamp(timer_manager_.Now(), when); - auto* cd = new Closure; + auto* cd = new TimerClosure; cd->cb = std::move(cb); cd->engine = this; EventEngine::TaskHandle handle{reinterpret_cast(cd), aba_token_.fetch_add(1)}; - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); known_handles_.insert(handle); cd->handle = handle; - GRPC_EVENT_ENGINE_TRACE("WindowsEventEngine:%p scheduling callback:%s", this, - HandleToString(handle).c_str()); + GRPC_EVENT_ENGINE_TRACE( + "WindowsEventEngine:%p scheduling callback:%s", this, + HandleToString(handle).c_str()); timer_manager_.TimerInit(&cd->timer, when_ts, cd); return handle; } @@ -140,15 +179,194 @@ bool WindowsEventEngine::IsWorkerThread() { GPR_ASSERT(false && "unimplemented"); } -bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { - GPR_ASSERT(false && "unimplemented"); +void WindowsEventEngine::OnConnectCompleted( + std::shared_ptr state) { + // Connection attempt complete! + grpc_core::MutexLock lock(&state->mu); + state->on_connected = nullptr; + { + grpc_core::MutexLock handle_lock(&connection_mu_); + known_connection_handles_.erase(state->connection_handle); + } + // return early if we cannot cancel the connection timeout timer. + if (!Cancel(state->timer_handle)) return; + auto write_info = state->socket->write_info(); + if (write_info->wsa_error() != 0) { + auto error = GRPC_WSA_ERROR(write_info->wsa_error(), "ConnectEx"); + state->socket->MaybeShutdown(error); + state->on_connected_user_callback(error); + return; + } + // This code should be running in an executor thread already, so the callback + // can be run directly. + ChannelArgsEndpointConfig cfg; + state->on_connected_user_callback(std::make_unique( + state->address, std::move(state->socket), std::move(state->allocator), + cfg, executor_.get())); } EventEngine::ConnectionHandle WindowsEventEngine::Connect( OnConnectCallback on_connect, const ResolvedAddress& addr, - const EndpointConfig& args, MemoryAllocator memory_allocator, - Duration deadline) { - GPR_ASSERT(false && "unimplemented"); + const EndpointConfig& /* args */, MemoryAllocator memory_allocator, + Duration timeout) { + // TODO(hork): utilize the endpoint config + absl::Status status; + int istatus; + auto uri = ResolvedAddressToURI(addr); + if (!uri.ok()) { + Run([on_connect = std::move(on_connect), status = uri.status()]() mutable { + on_connect(status); + }); + return invalid_connection_handle; + } + GRPC_EVENT_ENGINE_TRACE("EventEngine::%p connecting to %s", this, + uri->c_str()); + // Use dualstack sockets where available. + ResolvedAddress address = addr; + ResolvedAddress addr6_v4mapped; + if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) { + address = addr6_v4mapped; + } + SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, + IOCP::GetDefaultSocketFlags()); + if (sock == INVALID_SOCKET) { + Run([on_connect = std::move(on_connect), + status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable { + on_connect(status); + }); + return invalid_connection_handle; + } + status = PrepareSocket(sock); + if (!status.ok()) { + Run([on_connect = std::move(on_connect), status]() mutable { + on_connect(status); + }); + return invalid_connection_handle; + } + // Grab the function pointer for ConnectEx for that specific socket It may + // change depending on the interface. + LPFN_CONNECTEX ConnectEx; + GUID guid = WSAID_CONNECTEX; + DWORD ioctl_num_bytes; + istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, + sizeof(guid), &ConnectEx, sizeof(ConnectEx), + &ioctl_num_bytes, nullptr, nullptr); + if (istatus != 0) { + Run([on_connect = std::move(on_connect), + status = GRPC_WSA_ERROR( + WSAGetLastError(), + "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable { + on_connect(status); + }); + return invalid_connection_handle; + } + // bind the local address + auto local_address = ResolvedAddressMakeWild6(0); + istatus = bind(sock, local_address.address(), local_address.size()); + if (istatus != 0) { + Run([on_connect = std::move(on_connect), + status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable { + on_connect(status); + }); + return invalid_connection_handle; + } + // Connect + auto watched_socket = iocp_.Watch(sock); + auto* info = watched_socket->write_info(); + bool success = + ConnectEx(watched_socket->socket(), address.address(), address.size(), + nullptr, 0, nullptr, info->overlapped()); + // It wouldn't be unusual to get a success immediately. But we'll still get an + // IOCP notification, so let's ignore it. + if (!success) { + int last_error = WSAGetLastError(); + if (last_error != ERROR_IO_PENDING) { + auto status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); + Run([on_connect = std::move(on_connect), status]() mutable { + on_connect(status); + }); + watched_socket->MaybeShutdown(status); + return invalid_connection_handle; + } + } + GPR_ASSERT(watched_socket != nullptr); + auto connection_state = std::make_shared(); + grpc_core::MutexLock lock(&connection_state->mu); + connection_state->address = address; + connection_state->socket = std::move(watched_socket); + connection_state->on_connected_user_callback = std::move(on_connect); + connection_state->allocator = std::move(memory_allocator); + connection_state->on_connected = + SelfDeletingClosure::Create([this, connection_state]() mutable { + OnConnectCompleted(std::move(connection_state)); + }); + { + grpc_core::MutexLock conn_lock(&connection_mu_); + connection_state->connection_handle = + ConnectionHandle{reinterpret_cast(connection_state.get()), + aba_token_.fetch_add(1)}; + known_connection_handles_.insert(connection_state->connection_handle); + } + connection_state->timer_handle = + RunAfter(timeout, [this, connection_state]() { + grpc_core::MutexLock lock(&connection_state->mu); + if (CancelConnectFromDeadlineTimer(connection_state.get())) { + connection_state->on_connected_user_callback( + absl::DeadlineExceededError("Connection timed out")); + } + // else: The connection attempt could not be canceled. We can assume the + // connection callback will be called. + }); + connection_state->socket->NotifyOnWrite(connection_state->on_connected); + return connection_state->connection_handle; +} + +bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { + if (TaskHandleComparator::Eq()(handle, + invalid_connection_handle)) { + GRPC_EVENT_ENGINE_TRACE("%s", + "Attempted to cancel an invalid connection handle"); + return false; + } + // Erase the connection handle, which may be unknown + { + grpc_core::MutexLock lock(&connection_mu_); + if (!known_connection_handles_.contains(handle)) { + GRPC_EVENT_ENGINE_TRACE( + "Unknown connection handle: %s", + HandleToString(handle).c_str()); + return false; + } + known_connection_handles_.erase(handle); + } + auto* connection_state = reinterpret_cast(handle.keys[0]); + grpc_core::MutexLock state_lock(&connection_state->mu); + if (!Cancel(connection_state->timer_handle)) return false; + return CancelConnectInternalStateLocked(connection_state); +} + +bool WindowsEventEngine::CancelConnectFromDeadlineTimer( + ConnectionState* connection_state) { + // Erase the connection handle, which is guaranteed to exist. + { + grpc_core::MutexLock lock(&connection_mu_); + GPR_ASSERT(known_connection_handles_.erase( + connection_state->connection_handle) == 1); + } + return CancelConnectInternalStateLocked(connection_state); +} + +bool WindowsEventEngine::CancelConnectInternalStateLocked( + ConnectionState* connection_state) { + connection_state->socket->MaybeShutdown( + absl::CancelledError("CancelConnect")); + // Release the connection_state shared_ptr. connection_state is now invalid. + delete connection_state->on_connected; + GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s", + HandleToString( + connection_state->connection_handle) + .c_str()); + return true; } absl::StatusOr> diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 59a2dd0f0a4..23ba1d036f5 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -32,6 +32,7 @@ #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/event_engine/windows/iocp.h" +#include "src/core/lib/event_engine/windows/windows_endpoint.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/surface/init_internally.h" @@ -44,16 +45,10 @@ namespace experimental { class WindowsEventEngine : public EventEngine, public grpc_core::KeepsGrpcInitialized { public: - class WindowsEndpoint : public EventEngine::Endpoint { - public: - ~WindowsEndpoint() override; - void Read(absl::AnyInvocable on_read, - SliceBuffer* buffer, const ReadArgs* args) override; - void Write(absl::AnyInvocable on_writable, - SliceBuffer* data, const WriteArgs* args) override; - const ResolvedAddress& GetPeerAddress() const override; - const ResolvedAddress& GetLocalAddress() const override; - }; + constexpr static TaskHandle invalid_handle{-1, -1}; + constexpr static EventEngine::ConnectionHandle invalid_connection_handle{-1, + -1}; + class WindowsListener : public EventEngine::Listener { public: ~WindowsListener() override; @@ -104,16 +99,64 @@ class WindowsEventEngine : public EventEngine, bool Cancel(TaskHandle handle) override; private: - struct Closure; + // State of an active connection. + // Managed by a shared_ptr, owned exclusively by the timeout callback and the + // OnConnectCompleted callback herein. + struct ConnectionState { + // everything is guarded by mu; + grpc_core::Mutex mu + ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_); + EventEngine::ConnectionHandle connection_handle ABSL_GUARDED_BY(mu); + EventEngine::TaskHandle timer_handle ABSL_GUARDED_BY(mu); + EventEngine::OnConnectCallback on_connected_user_callback + ABSL_GUARDED_BY(mu); + EventEngine::Closure* on_connected ABSL_GUARDED_BY(mu); + std::unique_ptr socket ABSL_GUARDED_BY(mu); + EventEngine::ResolvedAddress address ABSL_GUARDED_BY(mu); + MemoryAllocator allocator ABSL_GUARDED_BY(mu); + }; + + // A poll worker which schedules itself unless kicked + class IOCPWorkClosure : public EventEngine::Closure { + public: + explicit IOCPWorkClosure(Executor* executor, IOCP* iocp); + void Run() override; + void WaitForShutdown(); + + private: + std::atomic workers_{1}; + grpc_core::Notification done_signal_; + Executor* executor_; + IOCP* iocp_; + }; + + void OnConnectCompleted(std::shared_ptr state); + + // CancelConnect called from within the deadline timer. + // In this case, the connection_state->mu is already locked, and timer + // cancellation is not possible. + bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(ConnectionState::mu); + + // Completes the connection cancellation logic after checking handle validity + // and optionally cancelling deadline timers. + bool CancelConnectInternalStateLocked(ConnectionState* connection_state) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(ConnectionState::mu); + + class TimerClosure; EventEngine::TaskHandle RunAfterInternal(Duration when, absl::AnyInvocable cb); - grpc_core::Mutex mu_; - TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); + grpc_core::Mutex task_mu_; + TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_); + grpc_core::Mutex connection_mu_ ABSL_ACQUIRED_AFTER(ConnectionState::mu); + grpc_core::CondVar connection_cv_; + ConnectionHandleSet known_connection_handles_ ABSL_GUARDED_BY(connection_mu_); std::atomic aba_token_{0}; std::shared_ptr executor_; IOCP iocp_; TimerManager timer_manager_; + IOCPWorkClosure iocp_worker_; }; } // namespace experimental diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 29291b9d1d0..814460b689c 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -486,6 +486,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/utils.cc', 'src/core/lib/event_engine/windows/iocp.cc', 'src/core/lib/event_engine/windows/win_socket.cc', + 'src/core/lib/event_engine/windows/windows_endpoint.cc', 'src/core/lib/event_engine/windows/windows_engine.cc', 'src/core/lib/experiments/config.cc', 'src/core/lib/experiments/experiments.cc', diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index d4babbf29f7..460791fab1a 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -71,6 +71,7 @@ grpc_cc_library( "//:httpcli", "//:sockaddr_utils", "//src/core:channel_args", + "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", "//src/core:closure", "//src/core:error", diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 1e2e7d0fd8d..80c3219099f 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -72,8 +72,8 @@ grpc_cc_test( uses_polling = False, deps = [ "//:gpr_platform", - "//:grpc", "//src/core:channel_args", + "//src/core:channel_args_endpoint_config", ], ) diff --git a/test/core/util/BUILD b/test/core/util/BUILD index 5048965cae9..dfd9a925b2c 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -136,6 +136,7 @@ grpc_cc_library( "//:ref_counted_ptr", "//:tsi_ssl_credentials", "//:uri_parser", + "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", "//src/core:closure", "//src/core:error", @@ -176,6 +177,7 @@ grpc_cc_library( "//:orphanable", "//:ref_counted_ptr", "//:uri_parser", + "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", "//src/core:closure", "//src/core:error", diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 941dba65016..7734c1880c1 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2045,6 +2045,8 @@ src/core/lib/event_engine/windows/iocp.cc \ src/core/lib/event_engine/windows/iocp.h \ src/core/lib/event_engine/windows/win_socket.cc \ src/core/lib/event_engine/windows/win_socket.h \ +src/core/lib/event_engine/windows/windows_endpoint.cc \ +src/core/lib/event_engine/windows/windows_endpoint.h \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/event_engine/windows/windows_engine.h \ src/core/lib/experiments/config.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 9d9b1c7eae8..7dcb54749a0 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1824,6 +1824,8 @@ src/core/lib/event_engine/windows/iocp.cc \ src/core/lib/event_engine/windows/iocp.h \ src/core/lib/event_engine/windows/win_socket.cc \ src/core/lib/event_engine/windows/win_socket.h \ +src/core/lib/event_engine/windows/windows_endpoint.cc \ +src/core/lib/event_engine/windows/windows_endpoint.h \ src/core/lib/event_engine/windows/windows_engine.cc \ src/core/lib/event_engine/windows/windows_engine.h \ src/core/lib/experiments/config.cc \