From 477d061238798ffccc7842cbc312ef80a52663d3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Jul 2016 13:03:26 -0700 Subject: [PATCH] Clean up test. Still debugging things. --- test/core/end2end/fixtures/h2_http_proxy.c | 10 +- test/core/end2end/fixtures/http_proxy.c | 103 +++++++++------------ test/core/end2end/fixtures/http_proxy.h | 2 - 3 files changed, 46 insertions(+), 69 deletions(-) diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index 002e2cb16b2..d84f0b8cb92 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -52,7 +52,6 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#if 0 typedef struct fullstack_fixture_data { char *server_addr; grpc_end2end_http_proxy *proxy; @@ -111,24 +110,17 @@ static grpc_end2end_test_config configs[] = { chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; -#endif int main(int argc, char **argv) { -// size_t i; + size_t i; grpc_test_init(argc, argv); grpc_end2end_tests_pre_init(); grpc_init(); - grpc_end2end_http_proxy* proxy = grpc_end2end_http_proxy_create(); - grpc_end2end_http_proxy_start_thread(proxy); - grpc_end2end_http_proxy_destroy(proxy); - -#if 0 for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(argc, argv, configs[i]); } -#endif grpc_shutdown(); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 97aced3d1e2..daf023958c9 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -99,9 +99,9 @@ typedef struct connection_data { gpr_refcount refcount; } connection_data; +// Helper function to destroy the proxy connection. static void connection_data_destroy(grpc_exec_ctx* exec_ctx, connection_data* cd) { -gpr_log(GPR_ERROR, "==> %s()", __func__); cd->proxy->shutdown = true; grpc_endpoint_destroy(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) @@ -116,54 +116,59 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); gpr_free(cd); } +// Helper function to shut down the proxy connection. +// Does NOT take ownership of a reference to error. static void connection_data_failed(grpc_exec_ctx* exec_ctx, connection_data* cd, const char* prefix, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); const char* msg = grpc_error_string(error); gpr_log(GPR_ERROR, "%s: %s", prefix, msg); grpc_error_free_string(msg); grpc_endpoint_shutdown(exec_ctx, cd->client_endpoint); if (cd->server_endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, cd->server_endpoint); - if (gpr_unref(&cd->refcount)) { + if (gpr_unref(&cd->refcount)) connection_data_destroy(exec_ctx, cd); - } } +// Callback for writing proxy data to the client. static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client write", error); return; } - // Clear write buffer. + // Clear write buffer and release our reference. gpr_slice_buffer_reset_and_unref(&cd->client_write_buffer); + gpr_unref(&cd->refcount); } +// Callback for writing proxy data to the backend server. static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server write", error); return; } - // Clear write buffer. + // Clear write buffer and release our reference. gpr_slice_buffer_reset_and_unref(&cd->server_write_buffer); + gpr_unref(&cd->refcount); } +// Callback for reading data from the client, which will be proxied to +// the backend server. static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy client read", error); return; } // Move read data into write buffer and write it. + // Take a new ref for the write callback. + gpr_ref(&cd->refcount); gpr_slice_buffer_move_into(&cd->client_read_buffer, &cd->server_write_buffer); grpc_endpoint_write(exec_ctx, cd->server_endpoint, &cd->server_write_buffer, &cd->on_server_write_done); @@ -172,15 +177,18 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_client_read_done); } +// Callback for reading data from the backend server, which will be +// proxied to the client. static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server read", error); return; } // Move read data into write buffer and write it. + // Take a new ref for the write callback. + gpr_ref(&cd->refcount); gpr_slice_buffer_move_into(&cd->server_read_buffer, &cd->client_write_buffer); grpc_endpoint_write(exec_ctx, cd->client_endpoint, &cd->client_write_buffer, &cd->on_client_write_done); @@ -189,9 +197,9 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_server_read_done); } +// Callback to write the HTTP response for the CONNECT request. static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy write response", error); @@ -209,9 +217,10 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_server_read_done); } +// Callback to connect to the backend server specified by the HTTP +// CONNECT request. static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy server connect", error); @@ -225,9 +234,9 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); &cd->on_write_response_done); } +// Callback to read the HTTP CONNECT request. static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { -gpr_log(GPR_ERROR, "==> %s()", __func__); connection_data* cd = arg; if (error != GRPC_ERROR_NONE) { connection_data_failed(exec_ctx, cd, "HTTP proxy read request", error); @@ -284,8 +293,6 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* ep, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { -// FIXME: remove -gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_end2end_http_proxy* proxy = arg; // Instantiate connection_data. connection_data* cd = gpr_malloc(sizeof(*cd)); @@ -316,6 +323,24 @@ gpr_log(GPR_ERROR, "==> %s()", __func__); // Proxy class // +static void thread_main(void* arg) { + grpc_end2end_http_proxy *proxy = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + do { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); + grpc_pollset_worker *worker = NULL; + gpr_mu_lock(proxy->mu); + GRPC_LOG_IF_ERROR("grpc_pollset_work", + grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, + now, deadline)); + gpr_mu_unlock(proxy->mu); + grpc_exec_ctx_flush(&exec_ctx); + } while (!proxy->shutdown); + grpc_exec_ctx_finish(&exec_ctx); +} + grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); @@ -323,8 +348,6 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create() { const int proxy_port = grpc_pick_unused_port_or_die(); gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name); -// FIXME: remove -gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); // Create TCP server. proxy->channel_args = grpc_channel_args_copy(NULL); grpc_error* error = grpc_tcp_server_create( @@ -347,32 +370,25 @@ gpr_log(GPR_ERROR, "Proxy address: %s", proxy->proxy_name); grpc_tcp_server_start(&exec_ctx, proxy->server, &proxy->pollset, 1, on_accept, proxy); grpc_exec_ctx_finish(&exec_ctx); -#if 0 // Start proxy thread. gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); -#endif return proxy; } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_pollset_destroy(p); - gpr_free(p); + grpc_pollset* pollset = arg; + grpc_pollset_destroy(pollset); + gpr_free(pollset); } -// FIXME: remove (including all references below) -//#define USE_THREAD 1 - void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { -gpr_log(GPR_ERROR, "==> %s()", __func__); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_thd_join(proxy->thd); grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); grpc_tcp_server_unref(&exec_ctx, proxy->server); -#ifdef USE_THREAD - gpr_thd_join(proxy->thd); -#endif gpr_free(proxy->proxy_name); grpc_channel_args_destroy(proxy->channel_args); grpc_closure destroyed; @@ -386,32 +402,3 @@ const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy) { return proxy->proxy_name; } - -static void thread_main(void* arg) { -gpr_log(GPR_ERROR, "==> %s()", __func__); - grpc_end2end_http_proxy *proxy = arg; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - do { - const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - const gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(5, GPR_TIMESPAN)); - grpc_pollset_worker *worker = NULL; - gpr_mu_lock(proxy->mu); - GRPC_LOG_IF_ERROR("grpc_pollset_work", - grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, - now, deadline)); - gpr_mu_unlock(proxy->mu); - grpc_exec_ctx_flush(&exec_ctx); - } while (!proxy->shutdown); - grpc_exec_ctx_finish(&exec_ctx); -} - -void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy) { -#ifdef USE_THREAD - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt)); -#else - thread_main(proxy); -#endif -} diff --git a/test/core/end2end/fixtures/http_proxy.h b/test/core/end2end/fixtures/http_proxy.h index 444324e7e03..7af2ea92d07 100644 --- a/test/core/end2end/fixtures/http_proxy.h +++ b/test/core/end2end/fixtures/http_proxy.h @@ -39,5 +39,3 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy); const char *grpc_end2end_http_proxy_get_proxy_name( grpc_end2end_http_proxy *proxy); - -void grpc_end2end_http_proxy_start_thread(grpc_end2end_http_proxy *proxy);