From 00c01f395c01204fbd767abe7143bd1a49e82a20 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 28 Feb 2024 10:32:14 -0800 Subject: [PATCH] [EventEngine] Do not accept connections if the listener is shut down (#35990) There's a race with the iomgr/EventEngine shims on listener shutdown, specifically for the PosixEventEngine implementation. This showed up as TSAN failures, such as in https://source.cloud.google.com/results/invocations/539def09-0647-4508-960a-f22b76e76dae/targets/%2F%2Ftest%2Fcore%2Fend2end:retry_cancel_after_first_attempt_starts_test@poller%3Depoll1/log. This PR alters the lifetime of the grpc_tcp_server object and its constituent members, letting the EventEngine listener manage it when EE listeners are enabled. Previously, some member objects were destroyed before the tcp server itself was destroyed. Notes: * This is best reviewed once with whitespace diff turned off. Blocks was added to manage mutex lifetimes. * BUILD file changes are result of `fix_build_deps.py :iomgr` Closes #35990 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35990 from drfloob:fix/proxy-listener-accept d35eed5d16ec23301f2b0b350f096f5d6f4f79cd PiperOrigin-RevId: 611149345 --- BUILD | 19 +-- src/core/lib/iomgr/tcp_server_posix.cc | 188 +++++++++++++------------ 2 files changed, 102 insertions(+), 105 deletions(-) diff --git a/BUILD b/BUILD index 898ed19e1f8..fa1c61cd611 100644 --- a/BUILD +++ b/BUILD @@ -1544,15 +1544,9 @@ grpc_cc_library( }), external_deps = [ "absl/base:core_headers", - "absl/cleanup", "absl/container:flat_hash_map", "absl/container:flat_hash_set", - "absl/container:inlined_vector", "absl/functional:any_invocable", - "absl/functional:function_ref", - "absl/hash", - "absl/meta:type_traits", - "absl/random", "absl/status", "absl/status:statusor", "absl/strings", @@ -1560,7 +1554,6 @@ grpc_cc_library( "absl/time", "absl/types:optional", "absl/utility", - "madler_zlib", ], language = "c++", linkopts = select({ @@ -1570,11 +1563,9 @@ grpc_cc_library( public_hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS, visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ - "byte_buffer", "channel_arg_names", "config_vars", "debug_location", - "dynamic_annotations", "exec_ctx", "gpr", "grpc_public_hdrs", @@ -1584,11 +1575,9 @@ grpc_cc_library( "iomgr_timer", "orphanable", "parse_address", - "ref_counted_ptr", "resource_quota_api", "sockaddr_utils", "stats", - "tcp_tracer", "//src/core:channel_args", "//src/core:channel_args_endpoint_config", "//src/core:closure", @@ -1605,22 +1594,19 @@ grpc_cc_library( "//src/core:event_engine_tcp_socket_utils", "//src/core:event_engine_trace", "//src/core:event_log", + "//src/core:examine_stack", "//src/core:experiments", "//src/core:gpr_atm", "//src/core:gpr_manual_constructor", - "//src/core:gpr_spinlock", "//src/core:grpc_sockaddr", "//src/core:init_internally", "//src/core:iomgr_fwd", "//src/core:iomgr_port", "//src/core:memory_quota", "//src/core:no_destruct", - "//src/core:per_cpu", "//src/core:pollset_set", "//src/core:posix_event_engine_base_hdrs", "//src/core:posix_event_engine_endpoint", - "//src/core:random_early_detection", - "//src/core:ref_counted", "//src/core:resolved_address", "//src/core:resource_quota", "//src/core:resource_quota_trace", @@ -1630,10 +1616,7 @@ grpc_cc_library( "//src/core:slice_refcount", "//src/core:socket_mutator", "//src/core:stats_data", - "//src/core:status_flag", - "//src/core:status_helper", "//src/core:strerror", - "//src/core:thread_quota", "//src/core:time", "//src/core:useful", "//src/core:windows_event_engine", diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 85404201425..f4dcdd588d0 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -62,10 +62,6 @@ #include "src/core/lib/event_engine/query_extensions.h" #include "src/core/lib/event_engine/resolved_address_internal.h" #include "src/core/lib/event_engine/shim.h" -#include "src/core/lib/event_engine/tcp_socket_utils.h" -#include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/crash.h" -#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/strerror.h" #include "src/core/lib/iomgr/event_engine_shims/closure.h" #include "src/core/lib/iomgr/event_engine_shims/endpoint.h" @@ -79,7 +75,6 @@ #include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/iomgr/vsock.h" -#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/transport/error_utils.h" static std::atomic num_dropped_connections{0}; @@ -93,6 +88,24 @@ using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory; using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport; using ::grpc_event_engine::experimental::SliceBuffer; +static void finish_shutdown(grpc_tcp_server* s) { + gpr_mu_lock(&s->mu); + GPR_ASSERT(s->shutdown); + gpr_mu_unlock(&s->mu); + if (s->shutdown_complete != nullptr) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, + absl::OkStatus()); + } + gpr_mu_destroy(&s->mu); + while (s->head) { + grpc_tcp_listener* sp = s->head; + s->head = sp->next; + gpr_free(sp); + } + delete s->fd_handler; + delete s; +} + static grpc_error_handle CreateEventEngineListener( grpc_tcp_server* s, grpc_closure* shutdown_complete, const EndpointConfig& config, grpc_tcp_server** server) { @@ -117,63 +130,76 @@ static grpc_error_handle CreateEventEngineListener( SliceBuffer* pending_data) { grpc_core::ApplicationCallbackExecCtx app_ctx; grpc_core::ExecCtx exec_ctx; - grpc_tcp_server_acceptor* acceptor = - static_cast( - gpr_malloc(sizeof(*acceptor))); - acceptor->from_server = s; - acceptor->port_index = -1; - acceptor->fd_index = -1; - if (!is_external) { - auto it = s->listen_fd_to_index_map.find(listener_fd); - if (it != s->listen_fd_to_index_map.end()) { - acceptor->port_index = std::get<0>(it->second); - acceptor->fd_index = std::get<1>(it->second); - } - } else { - // External connection handling. - grpc_resolved_address addr; - memset(&addr, 0, sizeof(addr)); - addr.len = static_cast(sizeof(struct sockaddr_storage)); - // Get the fd of the socket connected to peer. - int fd = - reinterpret_cast< - grpc_event_engine::experimental::PosixEndpoint*>(ep.get()) - ->GetWrappedFd(); - if (getpeername(fd, reinterpret_cast(addr.addr), - &(addr.len)) < 0) { - gpr_log(GPR_ERROR, "Failed getpeername: %s", - grpc_core::StrError(errno).c_str()); - close(fd); + grpc_pollset* read_notifier_pollset; + grpc_tcp_server_acceptor* acceptor; + void* cb_arg; + // Scoped for server lock, to ensure it's released before the callback + // is called. + { + grpc_core::MutexLockForGprMu lock(&s->mu); + if (s->shutdown) { return; } - (void)grpc_set_socket_no_sigpipe_if_possible(fd); - auto addr_uri = grpc_sockaddr_to_uri(&addr); - if (!addr_uri.ok()) { - gpr_log(GPR_ERROR, "Invalid address: %s", - addr_uri.status().ToString().c_str()); - return; + cb_arg = s->on_accept_cb_arg; + acceptor = static_cast( + gpr_malloc(sizeof(*acceptor))); + acceptor->from_server = s; + acceptor->port_index = -1; + acceptor->fd_index = -1; + if (!is_external) { + auto it = s->listen_fd_to_index_map.find(listener_fd); + if (it != s->listen_fd_to_index_map.end()) { + acceptor->port_index = std::get<0>(it->second); + acceptor->fd_index = std::get<1>(it->second); + } + } else { + // External connection handling. + grpc_resolved_address addr; + memset(&addr, 0, sizeof(addr)); + addr.len = + static_cast(sizeof(struct sockaddr_storage)); + // Get the fd of the socket connected to peer. + int fd = + reinterpret_cast< + grpc_event_engine::experimental::PosixEndpoint*>(ep.get()) + ->GetWrappedFd(); + if (getpeername(fd, reinterpret_cast(addr.addr), + &(addr.len)) < 0) { + gpr_log(GPR_ERROR, "Failed getpeername: %s", + grpc_core::StrError(errno).c_str()); + close(fd); + return; + } + (void)grpc_set_socket_no_sigpipe_if_possible(fd); + auto addr_uri = grpc_sockaddr_to_uri(&addr); + if (!addr_uri.ok()) { + gpr_log(GPR_ERROR, "Invalid address: %s", + addr_uri.status().ToString().c_str()); + return; + } + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, + "SERVER_CONNECT: incoming external connection: %s", + addr_uri->c_str()); + } } - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_INFO, - "SERVER_CONNECT: incoming external connection: %s", - addr_uri->c_str()); + read_notifier_pollset = + (*(s->pollsets))[static_cast( + gpr_atm_no_barrier_fetch_add( + &s->next_pollset_to_assign, 1)) % + s->pollsets->size()]; + acceptor->external_connection = is_external; + acceptor->listener_fd = listener_fd; + grpc_byte_buffer* buf = nullptr; + if (pending_data != nullptr && pending_data->Length() > 0) { + buf = grpc_raw_byte_buffer_create(nullptr, 0); + grpc_slice_buffer_swap(&buf->data.raw.slice_buffer, + pending_data->c_slice_buffer()); + pending_data->Clear(); } + acceptor->pending_data = buf; } - grpc_pollset* read_notifier_pollset = - (*(s->pollsets))[static_cast(gpr_atm_no_barrier_fetch_add( - &s->next_pollset_to_assign, 1)) % - s->pollsets->size()]; - acceptor->external_connection = is_external; - acceptor->listener_fd = listener_fd; - grpc_byte_buffer* buf = nullptr; - if (pending_data != nullptr && pending_data->Length() > 0) { - buf = grpc_raw_byte_buffer_create(nullptr, 0); - grpc_slice_buffer_swap(&buf->data.raw.slice_buffer, - pending_data->c_slice_buffer()); - pending_data->Clear(); - } - acceptor->pending_data = buf; - s->on_accept_cb(s->on_accept_cb_arg, + s->on_accept_cb(cb_arg, grpc_event_engine::experimental:: grpc_event_engine_endpoint_create(std::move(ep)), read_notifier_pollset, acceptor); @@ -183,8 +209,7 @@ static grpc_error_handle CreateEventEngineListener( [s, shutdown_complete](absl::Status status) { grpc_event_engine::experimental::RunEventEngineClosure( shutdown_complete, absl_status_to_grpc_error(status)); - delete s->fd_handler; - delete s; + finish_shutdown(s); }, config, std::make_unique( @@ -194,7 +219,15 @@ static grpc_error_handle CreateEventEngineListener( [s](std::unique_ptr ep, MemoryAllocator) { grpc_core::ApplicationCallbackExecCtx app_ctx; grpc_core::ExecCtx exec_ctx; - s->on_accept_cb(s->on_accept_cb_arg, + void* cb_arg; + { + grpc_core::MutexLockForGprMu lock(&s->mu); + if (s->shutdown) { + return; + } + cb_arg = s->on_accept_cb_arg; + } + s->on_accept_cb(cb_arg, grpc_event_engine::experimental:: grpc_event_engine_endpoint_create(std::move(ep)), nullptr, nullptr); @@ -205,8 +238,7 @@ static grpc_error_handle CreateEventEngineListener( GPR_ASSERT(gpr_atm_no_barrier_load(&s->refs.count) == 0); grpc_event_engine::experimental::RunEventEngineClosure( shutdown_complete, absl_status_to_grpc_error(status)); - delete s->fd_handler; - delete s; + finish_shutdown(s); }, config, std::make_unique( @@ -271,30 +303,6 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, return absl::OkStatus(); } -static void finish_shutdown(grpc_tcp_server* s) { - gpr_mu_lock(&s->mu); - GPR_ASSERT(s->shutdown); - gpr_mu_unlock(&s->mu); - if (s->shutdown_complete != nullptr) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, - absl::OkStatus()); - } - gpr_mu_destroy(&s->mu); - while (s->head) { - grpc_tcp_listener* sp = s->head; - s->head = sp->next; - gpr_free(sp); - } - if (grpc_event_engine::experimental::UseEventEngineListener()) { - // This will trigger asynchronous execution of the on_shutdown_complete - // callback when appropriate. That callback will delete the server - s->ee_listener.reset(); - } else { - delete s->fd_handler; - delete s; - } -} - static void destroyed_port(void* server, grpc_error_handle /*error*/) { grpc_tcp_server* s = static_cast(server); gpr_mu_lock(&s->mu); @@ -332,7 +340,13 @@ static void deactivated_all_ports(grpc_tcp_server* s) { gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + if (grpc_event_engine::experimental::UseEventEngineListener()) { + // This will trigger asynchronous execution of the on_shutdown_complete + // callback when appropriate. That callback will delete the server. + s->ee_listener.reset(); + } else { + finish_shutdown(s); + } } }