From 67ad297e616f9e601979756c8f9946ed24a139ab Mon Sep 17 00:00:00 2001 From: Yijie Ma Date: Fri, 11 Aug 2023 15:52:01 -0700 Subject: [PATCH] [EventEngine] Port GrpcPolledFdFactoryPosix fix to EE (#34025) Port https://github.com/grpc/grpc/pull/33871 to EE's GrpcPolledFdFactoryPosix. --- src/core/lib/event_engine/ares_resolver.cc | 21 +++-- src/core/lib/event_engine/ares_resolver.h | 3 + .../posix_engine/grpc_polled_fd_posix.h | 89 ++++++++++++++++++- test/cpp/naming/BUILD | 1 + test/cpp/naming/cancel_ares_query_test.cc | 26 ++++-- 5 files changed, 124 insertions(+), 16 deletions(-) diff --git a/src/core/lib/event_engine/ares_resolver.cc b/src/core/lib/event_engine/ares_resolver.cc index c78028e5be9..2f26a9fcf1c 100644 --- a/src/core/lib/event_engine/ares_resolver.cc +++ b/src/core/lib/event_engine/ares_resolver.cc @@ -159,6 +159,9 @@ AresResolver::CreateAresResolver( std::shared_ptr event_engine) { ares_options opts = {}; opts.flags |= ARES_FLAG_STAYOPEN; + if (g_event_engine_grpc_ares_test_only_force_tcp) { + opts.flags |= ARES_FLAG_USEVC; + } ares_channel channel; int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS); if (status != ARES_SUCCESS) { @@ -168,6 +171,7 @@ AresResolver::CreateAresResolver( absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status))); } event_engine_grpc_ares_test_only_inject_config(&channel); + polled_fd_factory->ConfigureAresChannelLocked(channel); if (!dns_server.empty()) { absl::Status status = SetRequestDNSServer(dns_server, &channel); if (!status.ok()) { @@ -194,7 +198,7 @@ void AresResolver::Orphan() { } for (const auto& fd_node : fd_node_list_) { if (!fd_node->already_shutdown) { - GRPC_ARES_RESOLVER_TRACE_LOG("request: %p shutdown fd: %s", this, + GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this, fd_node->polled_fd->GetName()); fd_node->polled_fd->ShutdownLocked( absl::CancelledError("AresResolver::Orphan")); @@ -351,9 +355,10 @@ void AresResolver::CheckSocketsLocked() { fd_node_list_.begin(), fd_node_list_.end(), [sock = socks[i]](const auto& node) { return node->as == sock; }); if (iter == fd_node_list_.end()) { + GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p new fd: %d", this, + socks[i]); new_list.push_back(std::make_unique( socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i]))); - GRPC_ARES_RESOLVER_TRACE_LOG("request:%p new fd: %d", this, socks[i]); } else { new_list.splice(new_list.end(), fd_node_list_, iter); } @@ -368,7 +373,7 @@ void AresResolver::CheckSocketsLocked() { // new data arrives and c-ares hasn't read all the data in the // previous ares_process_fd. GRPC_ARES_RESOLVER_TRACE_LOG( - "request:%p schedule read directly on: %d", this, fd_node->as); + "resolver:%p schedule read directly on: %d", this, fd_node->as); event_engine_->Run( [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"), fd_node]() mutable { @@ -376,7 +381,7 @@ void AresResolver::CheckSocketsLocked() { }); } else { // Otherwise register with the poller for readable event. - GRPC_ARES_RESOLVER_TRACE_LOG("request:%p notify read on: %d", this, + GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify read on: %d", this, fd_node->as); fd_node->polled_fd->RegisterForOnReadableLocked( [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"), @@ -389,7 +394,7 @@ void AresResolver::CheckSocketsLocked() { // has not been registered with this socket. if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fd_node->writable_registered) { - GRPC_ARES_RESOLVER_TRACE_LOG("request:%p notify write on: %d", this, + GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify write on: %d", this, fd_node->as); fd_node->writable_registered = true; fd_node->polled_fd->RegisterForOnWriteableLocked( @@ -407,13 +412,13 @@ void AresResolver::CheckSocketsLocked() { while (!fd_node_list_.empty()) { FdNode* fd_node = fd_node_list_.front().get(); if (!fd_node->already_shutdown) { - GRPC_ARES_RESOLVER_TRACE_LOG("request: %p shutdown fd: %s", this, + GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this, fd_node->polled_fd->GetName()); fd_node->polled_fd->ShutdownLocked(absl::OkStatus()); fd_node->already_shutdown = true; } if (!fd_node->readable_registered && !fd_node->writable_registered) { - GRPC_ARES_RESOLVER_TRACE_LOG("request: %p delete fd: %s", this, + GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this, fd_node->polled_fd->GetName()); fd_node_list_.pop_front(); } else { @@ -702,4 +707,6 @@ void noop_inject_channel_config(ares_channel* /*channel*/) {} void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) = noop_inject_channel_config; +bool g_event_engine_grpc_ares_test_only_force_tcp = false; + #endif // GRPC_ARES == 1 diff --git a/src/core/lib/event_engine/ares_resolver.h b/src/core/lib/event_engine/ares_resolver.h index dbae35cff8f..d90f13d8a8f 100644 --- a/src/core/lib/event_engine/ares_resolver.h +++ b/src/core/lib/event_engine/ares_resolver.h @@ -143,5 +143,8 @@ class AresResolver : public grpc_core::InternallyRefCounted { extern void (*event_engine_grpc_ares_test_only_inject_config)( ares_channel* channel); +// Exposed in this header for C-core tests only +extern bool g_event_engine_grpc_ares_test_only_force_tcp; + #endif // GRPC_ARES == 1 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_ARES_RESOLVER_H diff --git a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h index da54dbd3517..c6eed183233 100644 --- a/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h +++ b/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h @@ -21,10 +21,15 @@ #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) -#include +// IWYU pragma: no_include + #include +#include +#include +#include #include +#include #include #include @@ -36,6 +41,7 @@ #include "src/core/lib/event_engine/grpc_polled_fd.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/iomgr/error.h" namespace grpc_event_engine { @@ -93,16 +99,95 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller) : poller_(poller) {} + ~GrpcPolledFdFactoryPosix() override { + for (auto& fd : owned_fds_) { + close(fd); + } + } + GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) override { + owned_fds_.insert(as); return new GrpcPolledFdPosix( as, poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors())); } - void ConfigureAresChannelLocked(ares_channel /*channel*/) override {} + void ConfigureAresChannelLocked(ares_channel channel) override { + ares_set_socket_functions(channel, &kSockFuncs, this); + ares_set_socket_configure_callback( + channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr); + } private: + /// Overridden socket API for c-ares + static ares_socket_t Socket(int af, int type, int protocol, + void* /*user_data*/) { + return socket(af, type, protocol); + } + + /// Overridden connect API for c-ares + static int Connect(ares_socket_t as, const struct sockaddr* target, + ares_socklen_t target_len, void* /*user_data*/) { + return connect(as, target, target_len); + } + + /// Overridden writev API for c-ares + static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov, + int iovec_count, void* /*user_data*/) { + return writev(as, iov, iovec_count); + } + + /// Overridden recvfrom API for c-ares + static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len, + int flags, struct sockaddr* from, + ares_socklen_t* from_len, void* /*user_data*/) { + return recvfrom(as, data, data_len, flags, from, from_len); + } + + /// Overridden close API for c-ares + static int Close(ares_socket_t as, void* user_data) { + GrpcPolledFdFactoryPosix* self = + static_cast(user_data); + if (self->owned_fds_.find(as) == self->owned_fds_.end()) { + // c-ares owns this fd, grpc has never seen it + return close(as); + } + return 0; + } + + /// Because we're using socket API overrides, c-ares won't + /// perform its typical configuration on the socket. See + /// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018. + /// So we use the configure socket callback override and copy default + /// settings that c-ares would normally apply on posix platforms: + /// - non-blocking + /// - cloexec flag + /// - disable nagle + static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) { + // clang-format off +#define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; } + // clang-format on + PosixSocketWrapper sock(fd); + RETURN_IF_ERROR(sock.SetSocketNonBlocking(1)); + RETURN_IF_ERROR(sock.SetSocketCloexec(1)); + if (type == SOCK_STREAM) { + RETURN_IF_ERROR(sock.SetSocketLowLatency(1)); + } + return 0; + } + + const struct ares_socket_functions kSockFuncs = { + &GrpcPolledFdFactoryPosix::Socket /* socket */, + &GrpcPolledFdFactoryPosix::Close /* close */, + &GrpcPolledFdFactoryPosix::Connect /* connect */, + &GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */, + &GrpcPolledFdFactoryPosix::WriteV /* writev */, + }; + PosixEventPoller* poller_; + // fds that are used/owned by grpc - we (grpc) will close them rather than + // c-ares + std::unordered_set owned_fds_; }; } // namespace experimental diff --git a/test/cpp/naming/BUILD b/test/cpp/naming/BUILD index 4c5a2a2e8b1..13e08d98fba 100644 --- a/test/cpp/naming/BUILD +++ b/test/cpp/naming/BUILD @@ -43,6 +43,7 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:grpc++", + "//src/core:ares_resolver", "//src/core:channel_args", "//test/core/end2end:cq_verifier", "//test/core/util:fake_udp_and_tcp_server", diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 187a206c0b0..e737ce72b07 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -39,6 +39,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" +#include "src/core/lib/event_engine/ares_resolver.h" #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/string.h" @@ -442,13 +443,20 @@ TEST_F(CancelDuringAresQuery, TestQueryFailsBecauseTcpServerClosesSocket) { kWaitForClientToSendFirstBytes, grpc_core::testing::FakeUdpAndTcpServer:: CloseSocketUponReceivingBytesFromPeer); - // TODO(yijiem): make this test work with the EE DNS resolver by supporting - // this test flag to force TCP in the EE DNS resolver. - if (grpc_core::IsEventEngineDnsEnabled()) return; - g_grpc_ares_test_only_force_tcp = true; + if (grpc_core::IsEventEngineDnsEnabled()) { + g_event_engine_grpc_ares_test_only_force_tcp = true; + } else { + g_grpc_ares_test_only_force_tcp = true; + } grpc_status_code expected_status_code = GRPC_STATUS_UNAVAILABLE; - std::string expected_error_message_substring = - absl::StrCat("DNS resolution failed for ", kFakeName); + std::string expected_error_message_substring; + if (grpc_core::IsEventEngineDnsEnabled()) { + expected_error_message_substring = + absl::StrCat("errors resolving ", kFakeName); + } else { + expected_error_message_substring = + absl::StrCat("DNS resolution failed for ", kFakeName); + } // Don't really care about the deadline - we should quickly hit a DNS // resolution failure. gpr_timespec rpc_deadline = grpc_timeout_seconds_to_deadline(100); @@ -456,7 +464,11 @@ TEST_F(CancelDuringAresQuery, TestQueryFailsBecauseTcpServerClosesSocket) { TestCancelDuringActiveQuery(expected_status_code, expected_error_message_substring, rpc_deadline, dns_query_timeout_ms, fake_dns_server.port()); - g_grpc_ares_test_only_force_tcp = false; + if (grpc_core::IsEventEngineDnsEnabled()) { + g_event_engine_grpc_ares_test_only_force_tcp = false; + } else { + g_grpc_ares_test_only_force_tcp = false; + } } } // namespace