diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index c76ac4ad019..d2336fa5747 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include #include @@ -67,22 +67,42 @@ struct grpc_end2end_http_proxy { grpc_end2end_http_proxy() - : server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) { - gpr_ref_init(&users, 1); - combiner = grpc_combiner_create( - grpc_event_engine::experimental::GetDefaultEventEngine()); - } + : combiner(grpc_combiner_create( + grpc_event_engine::experimental::GetDefaultEventEngine())) {} std::string proxy_name; + std::atomic is_shutdown{false}; + std::atomic users{1}; grpc_core::Thread thd; - grpc_tcp_server* server; - const grpc_channel_args* channel_args; - gpr_mu* mu; + grpc_tcp_server* server = nullptr; + const grpc_channel_args* channel_args = nullptr; + gpr_mu* mu = nullptr; std::vector pollset; - gpr_refcount users; - - grpc_core::Combiner* combiner; + grpc_core::Combiner* combiner = nullptr; }; +namespace { + +// Sometimes, on_accept may be called after thread_main has returned, and the +// proxy will have already been destroyed. This value is reset every time a +// proxy fixture is created, and it prevents a crash due to a repeated unref. +std::atomic proxy_destroyed{false}; + +void proxy_ref(grpc_end2end_http_proxy* proxy) { proxy->users.fetch_add(1); } + +// Returns the remaining number of outstanding refs +size_t proxy_unref(grpc_end2end_http_proxy* proxy) { + if (proxy_destroyed.load()) return -1; + size_t ref_count = proxy->users.fetch_sub(1) - 1; + if (ref_count == 0) { + proxy_destroyed.store(true); + GRPC_COMBINER_UNREF(proxy->combiner, "test"); + delete proxy; + } + return ref_count; +} + +} // namespace + // // Connection handling // @@ -139,8 +159,6 @@ static void proxy_connection_ref(proxy_connection* conn, static void proxy_connection_unref(proxy_connection* conn, const char* /*reason*/) { if (gpr_unref(&conn->refcount)) { - gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint, - conn->server_endpoint); grpc_endpoint_destroy(conn->client_endpoint); if (conn->server_endpoint != nullptr) { grpc_endpoint_destroy(conn->server_endpoint); @@ -154,7 +172,7 @@ static void proxy_connection_unref(proxy_connection* conn, grpc_slice_buffer_destroy(&conn->server_write_buffer); grpc_http_parser_destroy(&conn->http_parser); grpc_http_request_destroy(&conn->http_request); - gpr_unref(&conn->proxy->users); + proxy_unref(conn->proxy); gpr_free(conn); } } @@ -589,9 +607,15 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_tcp_server_acceptor* acceptor) { gpr_free(acceptor); grpc_end2end_http_proxy* proxy = static_cast(arg); + proxy_ref(proxy); + if (proxy->is_shutdown.load()) { + grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown")); + grpc_endpoint_destroy(endpoint); + proxy_unref(proxy); + return; + } // Instantiate proxy_connection. proxy_connection* conn = grpc_core::Zalloc(); - gpr_ref(&proxy->users); conn->client_endpoint = endpoint; conn->proxy = proxy; gpr_ref_init(&conn->refcount, 1); @@ -623,22 +647,24 @@ static void thread_main(void* arg) { grpc_end2end_http_proxy* proxy = static_cast(arg); grpc_core::ExecCtx exec_ctx; do { - gpr_ref(&proxy->users); + proxy_ref(proxy); grpc_pollset_worker* worker = nullptr; gpr_mu_lock(proxy->mu); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(proxy->pollset[0], &worker, - grpc_core::Timestamp::Now() + - grpc_core::Duration::Seconds(1))); + GRPC_LOG_IF_ERROR( + "grpc_pollset_work", + grpc_pollset_work(proxy->pollset[0], &worker, + grpc_core::Timestamp::Now() + + grpc_core::Duration::Milliseconds(100))); gpr_mu_unlock(proxy->mu); grpc_core::ExecCtx::Get()->Flush(); - } while (!gpr_unref(&proxy->users)); + } while (proxy_unref(proxy) > 1 || !proxy->is_shutdown.load()); } grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( const grpc_channel_args* args) { grpc_core::ExecCtx exec_ctx; grpc_end2end_http_proxy* proxy = new grpc_end2end_http_proxy(); + proxy_destroyed.store(false); // Construct proxy address. const int proxy_port = grpc_pick_unused_port_or_die(); proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port); @@ -684,7 +710,7 @@ static void destroy_pollset(void* arg, grpc_error_handle /*error*/) { } void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { - gpr_unref(&proxy->users); // Signal proxy thread to shutdown. + proxy->is_shutdown.store(true); grpc_core::ExecCtx exec_ctx; proxy->thd.Join(); grpc_tcp_server_shutdown_listeners(proxy->server); @@ -693,8 +719,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { grpc_pollset_shutdown(proxy->pollset[0], GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset[0], grpc_schedule_on_exec_ctx)); - GRPC_COMBINER_UNREF(proxy->combiner, "test"); - delete proxy; + proxy_unref(proxy); } const char* grpc_end2end_http_proxy_get_proxy_name(