[EventEngine] Do not accept connections if the listener is shut down (#35990)

There's a race with the iomgr/EventEngine shims on listener shutdown, specifically for the PosixEventEngine implementation. This showed up as TSAN failures, such as in https://source.cloud.google.com/results/invocations/539def09-0647-4508-960a-f22b76e76dae/targets/%2F%2Ftest%2Fcore%2Fend2end:retry_cancel_after_first_attempt_starts_test@poller%3Depoll1/log. This PR alters the lifetime of the grpc_tcp_server object and its constituent members, letting the EventEngine listener manage it when EE listeners are enabled. Previously, some member objects were destroyed before the tcp server itself was destroyed.

Notes:
 * This is best reviewed once with whitespace diff turned off. Blocks was added to manage mutex lifetimes.
 * BUILD file changes are result of `fix_build_deps.py :iomgr`

Closes #35990

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35990 from drfloob:fix/proxy-listener-accept d35eed5d16
PiperOrigin-RevId: 611149345
pull/36022/head
AJ Heller 1 year ago committed by Copybara-Service
parent d6089c336c
commit 00c01f395c
  1. 19
      BUILD
  2. 188
      src/core/lib/iomgr/tcp_server_posix.cc

19
BUILD

@ -1544,15 +1544,9 @@ grpc_cc_library(
}),
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_map",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/functional:any_invocable",
"absl/functional:function_ref",
"absl/hash",
"absl/meta:type_traits",
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -1560,7 +1554,6 @@ grpc_cc_library(
"absl/time",
"absl/types:optional",
"absl/utility",
"madler_zlib",
],
language = "c++",
linkopts = select({
@ -1570,11 +1563,9 @@ grpc_cc_library(
public_hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS,
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"byte_buffer",
"channel_arg_names",
"config_vars",
"debug_location",
"dynamic_annotations",
"exec_ctx",
"gpr",
"grpc_public_hdrs",
@ -1584,11 +1575,9 @@ grpc_cc_library(
"iomgr_timer",
"orphanable",
"parse_address",
"ref_counted_ptr",
"resource_quota_api",
"sockaddr_utils",
"stats",
"tcp_tracer",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
"//src/core:closure",
@ -1605,22 +1594,19 @@ grpc_cc_library(
"//src/core:event_engine_tcp_socket_utils",
"//src/core:event_engine_trace",
"//src/core:event_log",
"//src/core:examine_stack",
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:gpr_spinlock",
"//src/core:grpc_sockaddr",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:memory_quota",
"//src/core:no_destruct",
"//src/core:per_cpu",
"//src/core:pollset_set",
"//src/core:posix_event_engine_base_hdrs",
"//src/core:posix_event_engine_endpoint",
"//src/core:random_early_detection",
"//src/core:ref_counted",
"//src/core:resolved_address",
"//src/core:resource_quota",
"//src/core:resource_quota_trace",
@ -1630,10 +1616,7 @@ grpc_cc_library(
"//src/core:slice_refcount",
"//src/core:socket_mutator",
"//src/core:stats_data",
"//src/core:status_flag",
"//src/core:status_helper",
"//src/core:strerror",
"//src/core:thread_quota",
"//src/core:time",
"//src/core:useful",
"//src/core:windows_event_engine",

@ -62,10 +62,6 @@
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/iomgr/event_engine_shims/closure.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
@ -79,7 +75,6 @@
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/iomgr/vsock.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/transport/error_utils.h"
static std::atomic<int64_t> num_dropped_connections{0};
@ -93,6 +88,24 @@ using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
using ::grpc_event_engine::experimental::SliceBuffer;
static void finish_shutdown(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
absl::OkStatus());
}
gpr_mu_destroy(&s->mu);
while (s->head) {
grpc_tcp_listener* sp = s->head;
s->head = sp->next;
gpr_free(sp);
}
delete s->fd_handler;
delete s;
}
static grpc_error_handle CreateEventEngineListener(
grpc_tcp_server* s, grpc_closure* shutdown_complete,
const EndpointConfig& config, grpc_tcp_server** server) {
@ -117,63 +130,76 @@ static grpc_error_handle CreateEventEngineListener(
SliceBuffer* pending_data) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(
gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = s;
acceptor->port_index = -1;
acceptor->fd_index = -1;
if (!is_external) {
auto it = s->listen_fd_to_index_map.find(listener_fd);
if (it != s->listen_fd_to_index_map.end()) {
acceptor->port_index = std::get<0>(it->second);
acceptor->fd_index = std::get<1>(it->second);
}
} else {
// External connection handling.
grpc_resolved_address addr;
memset(&addr, 0, sizeof(addr));
addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
// Get the fd of the socket connected to peer.
int fd =
reinterpret_cast<
grpc_event_engine::experimental::PosixEndpoint*>(ep.get())
->GetWrappedFd();
if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
&(addr.len)) < 0) {
gpr_log(GPR_ERROR, "Failed getpeername: %s",
grpc_core::StrError(errno).c_str());
close(fd);
grpc_pollset* read_notifier_pollset;
grpc_tcp_server_acceptor* acceptor;
void* cb_arg;
// Scoped for server lock, to ensure it's released before the callback
// is called.
{
grpc_core::MutexLockForGprMu lock(&s->mu);
if (s->shutdown) {
return;
}
(void)grpc_set_socket_no_sigpipe_if_possible(fd);
auto addr_uri = grpc_sockaddr_to_uri(&addr);
if (!addr_uri.ok()) {
gpr_log(GPR_ERROR, "Invalid address: %s",
addr_uri.status().ToString().c_str());
return;
cb_arg = s->on_accept_cb_arg;
acceptor = static_cast<grpc_tcp_server_acceptor*>(
gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = s;
acceptor->port_index = -1;
acceptor->fd_index = -1;
if (!is_external) {
auto it = s->listen_fd_to_index_map.find(listener_fd);
if (it != s->listen_fd_to_index_map.end()) {
acceptor->port_index = std::get<0>(it->second);
acceptor->fd_index = std::get<1>(it->second);
}
} else {
// External connection handling.
grpc_resolved_address addr;
memset(&addr, 0, sizeof(addr));
addr.len =
static_cast<socklen_t>(sizeof(struct sockaddr_storage));
// Get the fd of the socket connected to peer.
int fd =
reinterpret_cast<
grpc_event_engine::experimental::PosixEndpoint*>(ep.get())
->GetWrappedFd();
if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
&(addr.len)) < 0) {
gpr_log(GPR_ERROR, "Failed getpeername: %s",
grpc_core::StrError(errno).c_str());
close(fd);
return;
}
(void)grpc_set_socket_no_sigpipe_if_possible(fd);
auto addr_uri = grpc_sockaddr_to_uri(&addr);
if (!addr_uri.ok()) {
gpr_log(GPR_ERROR, "Invalid address: %s",
addr_uri.status().ToString().c_str());
return;
}
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO,
"SERVER_CONNECT: incoming external connection: %s",
addr_uri->c_str());
}
}
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO,
"SERVER_CONNECT: incoming external connection: %s",
addr_uri->c_str());
read_notifier_pollset =
(*(s->pollsets))[static_cast<size_t>(
gpr_atm_no_barrier_fetch_add(
&s->next_pollset_to_assign, 1)) %
s->pollsets->size()];
acceptor->external_connection = is_external;
acceptor->listener_fd = listener_fd;
grpc_byte_buffer* buf = nullptr;
if (pending_data != nullptr && pending_data->Length() > 0) {
buf = grpc_raw_byte_buffer_create(nullptr, 0);
grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
pending_data->c_slice_buffer());
pending_data->Clear();
}
acceptor->pending_data = buf;
}
grpc_pollset* read_notifier_pollset =
(*(s->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s->next_pollset_to_assign, 1)) %
s->pollsets->size()];
acceptor->external_connection = is_external;
acceptor->listener_fd = listener_fd;
grpc_byte_buffer* buf = nullptr;
if (pending_data != nullptr && pending_data->Length() > 0) {
buf = grpc_raw_byte_buffer_create(nullptr, 0);
grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
pending_data->c_slice_buffer());
pending_data->Clear();
}
acceptor->pending_data = buf;
s->on_accept_cb(s->on_accept_cb_arg,
s->on_accept_cb(cb_arg,
grpc_event_engine::experimental::
grpc_event_engine_endpoint_create(std::move(ep)),
read_notifier_pollset, acceptor);
@ -183,8 +209,7 @@ static grpc_error_handle CreateEventEngineListener(
[s, shutdown_complete](absl::Status status) {
grpc_event_engine::experimental::RunEventEngineClosure(
shutdown_complete, absl_status_to_grpc_error(status));
delete s->fd_handler;
delete s;
finish_shutdown(s);
},
config,
std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
@ -194,7 +219,15 @@ static grpc_error_handle CreateEventEngineListener(
[s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
s->on_accept_cb(s->on_accept_cb_arg,
void* cb_arg;
{
grpc_core::MutexLockForGprMu lock(&s->mu);
if (s->shutdown) {
return;
}
cb_arg = s->on_accept_cb_arg;
}
s->on_accept_cb(cb_arg,
grpc_event_engine::experimental::
grpc_event_engine_endpoint_create(std::move(ep)),
nullptr, nullptr);
@ -205,8 +238,7 @@ static grpc_error_handle CreateEventEngineListener(
GPR_ASSERT(gpr_atm_no_barrier_load(&s->refs.count) == 0);
grpc_event_engine::experimental::RunEventEngineClosure(
shutdown_complete, absl_status_to_grpc_error(status));
delete s->fd_handler;
delete s;
finish_shutdown(s);
},
config,
std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
@ -271,30 +303,6 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
return absl::OkStatus();
}
static void finish_shutdown(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
absl::OkStatus());
}
gpr_mu_destroy(&s->mu);
while (s->head) {
grpc_tcp_listener* sp = s->head;
s->head = sp->next;
gpr_free(sp);
}
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// This will trigger asynchronous execution of the on_shutdown_complete
// callback when appropriate. That callback will delete the server
s->ee_listener.reset();
} else {
delete s->fd_handler;
delete s;
}
}
static void destroyed_port(void* server, grpc_error_handle /*error*/) {
grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
gpr_mu_lock(&s->mu);
@ -332,7 +340,13 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// This will trigger asynchronous execution of the on_shutdown_complete
// callback when appropriate. That callback will delete the server.
s->ee_listener.reset();
} else {
finish_shutdown(s);
}
}
}

Loading…
Cancel
Save