From c85cdfe6564ff65fe613f04104239cc79c36d827 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 1 Nov 2023 12:13:28 -0700 Subject: [PATCH] [test] Fix http_proxy end2end test fixture (#34838) Built upon @Vignesh2208 's work in #33156 This adds ref counting to the http_proxy fixture object, fixing test flakes identified by the introduction of EventEngine listeners. Proxy objects were either being deleted twice, or sometimes not at all, resulting in two different sorts of flakes. --- .../end2end/fixtures/http_proxy_fixture.cc | 75 ++++++++++++------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index c76ac4ad019..d2336fa5747 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include #include @@ -67,22 +67,42 @@ struct grpc_end2end_http_proxy { grpc_end2end_http_proxy() - : server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) { - gpr_ref_init(&users, 1); - combiner = grpc_combiner_create( - grpc_event_engine::experimental::GetDefaultEventEngine()); - } + : combiner(grpc_combiner_create( + grpc_event_engine::experimental::GetDefaultEventEngine())) {} std::string proxy_name; + std::atomic is_shutdown{false}; + std::atomic users{1}; grpc_core::Thread thd; - grpc_tcp_server* server; - const grpc_channel_args* channel_args; - gpr_mu* mu; + grpc_tcp_server* server = nullptr; + const grpc_channel_args* channel_args = nullptr; + gpr_mu* mu = nullptr; std::vector pollset; - gpr_refcount users; - - grpc_core::Combiner* combiner; + grpc_core::Combiner* combiner = nullptr; }; +namespace { + +// Sometimes, on_accept may be called after thread_main has returned, and the +// proxy will have already been destroyed. This value is reset every time a +// proxy fixture is created, and it prevents a crash due to a repeated unref. +std::atomic proxy_destroyed{false}; + +void proxy_ref(grpc_end2end_http_proxy* proxy) { proxy->users.fetch_add(1); } + +// Returns the remaining number of outstanding refs +size_t proxy_unref(grpc_end2end_http_proxy* proxy) { + if (proxy_destroyed.load()) return -1; + size_t ref_count = proxy->users.fetch_sub(1) - 1; + if (ref_count == 0) { + proxy_destroyed.store(true); + GRPC_COMBINER_UNREF(proxy->combiner, "test"); + delete proxy; + } + return ref_count; +} + +} // namespace + // // Connection handling // @@ -139,8 +159,6 @@ static void proxy_connection_ref(proxy_connection* conn, static void proxy_connection_unref(proxy_connection* conn, const char* /*reason*/) { if (gpr_unref(&conn->refcount)) { - gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint, - conn->server_endpoint); grpc_endpoint_destroy(conn->client_endpoint); if (conn->server_endpoint != nullptr) { grpc_endpoint_destroy(conn->server_endpoint); @@ -154,7 +172,7 @@ static void proxy_connection_unref(proxy_connection* conn, grpc_slice_buffer_destroy(&conn->server_write_buffer); grpc_http_parser_destroy(&conn->http_parser); grpc_http_request_destroy(&conn->http_request); - gpr_unref(&conn->proxy->users); + proxy_unref(conn->proxy); gpr_free(conn); } } @@ -589,9 +607,15 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_tcp_server_acceptor* acceptor) { gpr_free(acceptor); grpc_end2end_http_proxy* proxy = static_cast(arg); + proxy_ref(proxy); + if (proxy->is_shutdown.load()) { + grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown")); + grpc_endpoint_destroy(endpoint); + proxy_unref(proxy); + return; + } // Instantiate proxy_connection. proxy_connection* conn = grpc_core::Zalloc(); - gpr_ref(&proxy->users); conn->client_endpoint = endpoint; conn->proxy = proxy; gpr_ref_init(&conn->refcount, 1); @@ -623,22 +647,24 @@ static void thread_main(void* arg) { grpc_end2end_http_proxy* proxy = static_cast(arg); grpc_core::ExecCtx exec_ctx; do { - gpr_ref(&proxy->users); + proxy_ref(proxy); grpc_pollset_worker* worker = nullptr; gpr_mu_lock(proxy->mu); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(proxy->pollset[0], &worker, - grpc_core::Timestamp::Now() + - grpc_core::Duration::Seconds(1))); + GRPC_LOG_IF_ERROR( + "grpc_pollset_work", + grpc_pollset_work(proxy->pollset[0], &worker, + grpc_core::Timestamp::Now() + + grpc_core::Duration::Milliseconds(100))); gpr_mu_unlock(proxy->mu); grpc_core::ExecCtx::Get()->Flush(); - } while (!gpr_unref(&proxy->users)); + } while (proxy_unref(proxy) > 1 || !proxy->is_shutdown.load()); } grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( const grpc_channel_args* args) { grpc_core::ExecCtx exec_ctx; grpc_end2end_http_proxy* proxy = new grpc_end2end_http_proxy(); + proxy_destroyed.store(false); // Construct proxy address. const int proxy_port = grpc_pick_unused_port_or_die(); proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port); @@ -684,7 +710,7 @@ static void destroy_pollset(void* arg, grpc_error_handle /*error*/) { } void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { - gpr_unref(&proxy->users); // Signal proxy thread to shutdown. + proxy->is_shutdown.store(true); grpc_core::ExecCtx exec_ctx; proxy->thd.Join(); grpc_tcp_server_shutdown_listeners(proxy->server); @@ -693,8 +719,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { grpc_pollset_shutdown(proxy->pollset[0], GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset[0], grpc_schedule_on_exec_ctx)); - GRPC_COMBINER_UNREF(proxy->combiner, "test"); - delete proxy; + proxy_unref(proxy); } const char* grpc_end2end_http_proxy_get_proxy_name(