[test] Fix http_proxy end2end test fixture (#34838)

Built upon @Vignesh2208 's work in #33156

This adds ref counting to the http_proxy fixture object, fixing test
flakes identified by the introduction of EventEngine listeners. Proxy
objects were either being deleted twice, or sometimes not at all,
resulting in two different sorts of flakes.
pull/34854/head
AJ Heller 1 year ago committed by GitHub
parent 15037d7c4a
commit c85cdfe656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 75
      test/core/end2end/fixtures/http_proxy_fixture.cc

@ -21,7 +21,7 @@
#include <limits.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <string>
#include <vector>
@ -67,22 +67,42 @@
struct grpc_end2end_http_proxy {
grpc_end2end_http_proxy()
: server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) {
gpr_ref_init(&users, 1);
combiner = grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine());
}
: combiner(grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine())) {}
std::string proxy_name;
std::atomic<bool> is_shutdown{false};
std::atomic<size_t> users{1};
grpc_core::Thread thd;
grpc_tcp_server* server;
const grpc_channel_args* channel_args;
gpr_mu* mu;
grpc_tcp_server* server = nullptr;
const grpc_channel_args* channel_args = nullptr;
gpr_mu* mu = nullptr;
std::vector<grpc_pollset*> pollset;
gpr_refcount users;
grpc_core::Combiner* combiner;
grpc_core::Combiner* combiner = nullptr;
};
namespace {
// Sometimes, on_accept may be called after thread_main has returned, and the
// proxy will have already been destroyed. This value is reset every time a
// proxy fixture is created, and it prevents a crash due to a repeated unref.
std::atomic<bool> proxy_destroyed{false};
void proxy_ref(grpc_end2end_http_proxy* proxy) { proxy->users.fetch_add(1); }
// Returns the remaining number of outstanding refs
size_t proxy_unref(grpc_end2end_http_proxy* proxy) {
if (proxy_destroyed.load()) return -1;
size_t ref_count = proxy->users.fetch_sub(1) - 1;
if (ref_count == 0) {
proxy_destroyed.store(true);
GRPC_COMBINER_UNREF(proxy->combiner, "test");
delete proxy;
}
return ref_count;
}
} // namespace
//
// Connection handling
//
@ -139,8 +159,6 @@ static void proxy_connection_ref(proxy_connection* conn,
static void proxy_connection_unref(proxy_connection* conn,
const char* /*reason*/) {
if (gpr_unref(&conn->refcount)) {
gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint,
conn->server_endpoint);
grpc_endpoint_destroy(conn->client_endpoint);
if (conn->server_endpoint != nullptr) {
grpc_endpoint_destroy(conn->server_endpoint);
@ -154,7 +172,7 @@ static void proxy_connection_unref(proxy_connection* conn,
grpc_slice_buffer_destroy(&conn->server_write_buffer);
grpc_http_parser_destroy(&conn->http_parser);
grpc_http_request_destroy(&conn->http_request);
gpr_unref(&conn->proxy->users);
proxy_unref(conn->proxy);
gpr_free(conn);
}
}
@ -589,9 +607,15 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
proxy_ref(proxy);
if (proxy->is_shutdown.load()) {
grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown"));
grpc_endpoint_destroy(endpoint);
proxy_unref(proxy);
return;
}
// Instantiate proxy_connection.
proxy_connection* conn = grpc_core::Zalloc<proxy_connection>();
gpr_ref(&proxy->users);
conn->client_endpoint = endpoint;
conn->proxy = proxy;
gpr_ref_init(&conn->refcount, 1);
@ -623,22 +647,24 @@ static void thread_main(void* arg) {
grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
grpc_core::ExecCtx exec_ctx;
do {
gpr_ref(&proxy->users);
proxy_ref(proxy);
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(proxy->mu);
GRPC_LOG_IF_ERROR("grpc_pollset_work",
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::Timestamp::Now() +
grpc_core::Duration::Seconds(1)));
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::Timestamp::Now() +
grpc_core::Duration::Milliseconds(100)));
gpr_mu_unlock(proxy->mu);
grpc_core::ExecCtx::Get()->Flush();
} while (!gpr_unref(&proxy->users));
} while (proxy_unref(proxy) > 1 || !proxy->is_shutdown.load());
}
grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
const grpc_channel_args* args) {
grpc_core::ExecCtx exec_ctx;
grpc_end2end_http_proxy* proxy = new grpc_end2end_http_proxy();
proxy_destroyed.store(false);
// Construct proxy address.
const int proxy_port = grpc_pick_unused_port_or_die();
proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port);
@ -684,7 +710,7 @@ static void destroy_pollset(void* arg, grpc_error_handle /*error*/) {
}
void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
gpr_unref(&proxy->users); // Signal proxy thread to shutdown.
proxy->is_shutdown.store(true);
grpc_core::ExecCtx exec_ctx;
proxy->thd.Join();
grpc_tcp_server_shutdown_listeners(proxy->server);
@ -693,8 +719,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
grpc_pollset_shutdown(proxy->pollset[0],
GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset[0],
grpc_schedule_on_exec_ctx));
GRPC_COMBINER_UNREF(proxy->combiner, "test");
delete proxy;
proxy_unref(proxy);
}
const char* grpc_end2end_http_proxy_get_proxy_name(

Loading…
Cancel
Save