From 04f812b4e6f50f4ca36a87594332e4e02a5d3cf9 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Wed, 13 Apr 2016 20:27:51 +0200 Subject: [PATCH] Adding actual servers to reply to messages. --- .../surface/concurrent_connectivity_test.c | 149 +++++++++++++++++- 1 file changed, 142 insertions(+), 7 deletions(-) diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 96761b05023..1753623fcdf 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -31,12 +31,21 @@ * */ +#include #include #include #include #include +#include #include + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_server.h" + +#include "test/core/util/port.h" #include "test/core/util/test_config.h" #define NUM_THREADS 100 @@ -45,10 +54,13 @@ #define DELAY_MILLIS 10 #define POLL_MILLIS 3000 -void create_loop_destroy(void* unused) { +static void *tag(int n) { return (void *)(uintptr_t)n; } +static int detag(void *p) { return (int)(uintptr_t)p; } + +void create_loop_destroy(void *addr) { for (int i = 0; i < NUM_OUTER_LOOPS; ++i) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); - grpc_channel* chan = grpc_insecure_channel_create("localhost", NULL, NULL); + grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_channel *chan = grpc_insecure_channel_create((char*)addr, NULL, NULL); for (int j = 0; j < NUM_INNER_LOOPS; ++j) { gpr_timespec later_time = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(DELAY_MILLIS); @@ -64,18 +76,141 @@ void create_loop_destroy(void* unused) { } } -int main(int argc, char** argv) { +struct server_thread_args { + char *addr; + grpc_server *server; + grpc_completion_queue *cq; + grpc_pollset *pollset; + gpr_mu *mu; + gpr_event ready; + gpr_atm stop; +}; + +void server_thread(void *vargs) { + struct server_thread_args *args = (struct server_thread_args*)vargs; + grpc_event ev; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + ev = grpc_completion_queue_next(args->cq, deadline, NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(detag(ev.tag) == 0xd1e); +} + +static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, + grpc_tcp_server_acceptor *acceptor) { + struct server_thread_args *args = (struct server_thread_args*)vargs; + (void)acceptor; + grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_destroy(exec_ctx, tcp); + grpc_pollset_kick(args->pollset, NULL); +} + +void bad_server_thread(void *vargs) { + struct server_thread_args *args = (struct server_thread_args*)vargs; + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + struct sockaddr_storage addr; + socklen_t addr_len = sizeof(addr); + int port; + grpc_tcp_server *s = grpc_tcp_server_create(NULL); + memset(&addr, 0, sizeof(addr)); + addr.ss_family = AF_INET; + port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); + GPR_ASSERT(port > 0); + gpr_asprintf(&args->addr, "localhost:%d", port); + + grpc_tcp_server_start(&exec_ctx, s, &args->pollset, 1, on_connect, args); + gpr_event_set(&args->ready, (void *)1); + + gpr_mu_lock(args->mu); + while (gpr_atm_acq_load(&args->stop) == 0) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = gpr_time_add(now, gpr_time_from_millis(100, GPR_TIMESPAN)); + + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, args->pollset, &worker, now, deadline); + gpr_mu_unlock(args->mu); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(args->mu); + } + gpr_mu_unlock(args->mu); + + grpc_tcp_server_unref(&exec_ctx, s); + + grpc_exec_ctx_finish(&exec_ctx); + + gpr_free(args->addr); +} + +int main(int argc, char **argv) { + struct server_thread_args args; + memset(&args, 0, sizeof(args)); + grpc_test_init(argc, argv); grpc_init(); + gpr_thd_id threads[NUM_THREADS]; + gpr_thd_id server; + + char *localhost = gpr_strdup("localhost:54321"); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + + + /* First round, no server */ + gpr_log(GPR_DEBUG, "Wave 1"); for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&threads[i], create_loop_destroy, NULL, &options); + gpr_thd_new(&threads[i], create_loop_destroy, localhost, &options); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); } + gpr_free(localhost); + + + /* Second round, actual grpc server */ + gpr_log(GPR_DEBUG, "Wave 2"); + int port = grpc_pick_unused_port_or_die(); + gpr_asprintf(&args.addr, "localhost:%d", port); + args.server = grpc_server_create(NULL, NULL); + grpc_server_add_insecure_http2_port(args.server, args.addr); + args.cq = grpc_completion_queue_create(NULL); + grpc_server_register_completion_queue(args.server, args.cq, NULL); + grpc_server_start(args.server); + gpr_thd_new(&server, server_thread, &args, &options); + + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options); + } + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_join(threads[i]); + } + grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); + + gpr_thd_join(server); + grpc_server_destroy(args.server); + grpc_completion_queue_destroy(args.cq); + gpr_free(args.addr); + + + /* Third round, bogus tcp server */ + gpr_log(GPR_DEBUG, "Wave 3"); + args.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(args.pollset, &args.mu); + gpr_thd_new(&server, bad_server_thread, &args, &options); + gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options); + } + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_join(threads[i]); + } + + gpr_atm_rel_store(&args.stop, 1); + gpr_thd_join(server); + grpc_pollset_destroy(args.pollset); + gpr_free(args.pollset); + grpc_shutdown(); return 0; }