diff --git a/BUILD b/BUILD index 4de00bb125f..1c887345b0c 100644 --- a/BUILD +++ b/BUILD @@ -512,6 +512,7 @@ grpc_cc_library( "src/core/lib/iomgr/tcp_windows.c", "src/core/lib/iomgr/time_averaged_stats.c", "src/core/lib/iomgr/timer_generic.c", + "src/core/lib/iomgr/timer_manager.c", "src/core/lib/iomgr/timer_heap.c", "src/core/lib/iomgr/timer_uv.c", "src/core/lib/iomgr/udp_server.c", @@ -632,6 +633,7 @@ grpc_cc_library( "src/core/lib/iomgr/time_averaged_stats.h", "src/core/lib/iomgr/timer.h", "src/core/lib/iomgr/timer_generic.h", + "src/core/lib/iomgr/timer_manager.h", "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/timer_uv.h", "src/core/lib/iomgr/udp_server.h", diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index ff857878e46..9e495f4d422 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -46,8 +46,6 @@ #define EXPECTED_CONTENT_TYPE "application/grpc" #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 -extern int grpc_http_trace; - typedef struct call_data { grpc_linked_mdelem status; grpc_linked_mdelem content_type; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 8fdd4ee77c1..126e012aac9 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -69,7 +69,7 @@ static grpc_slice_refcount terminal_slice_refcount = {NULL, NULL}; static const grpc_slice terminal_slice = {&terminal_slice_refcount, .data.refcounted = {0, 0}}; -extern int grpc_http_trace; +extern grpc_tracer_flag grpc_http_trace; typedef struct { int is_first_frame; @@ -425,7 +425,7 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, "Reserved header (colon-prefixed) happening after regular ones."); } - if (grpc_http_trace && !GRPC_MDELEM_IS_INTERNED(elem)) { + if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(elem)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(elem)); char *v = grpc_slice_to_c_string(GRPC_MDVALUE(elem)); gpr_log( @@ -616,7 +616,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( } } c->advertise_table_size_change = 1; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size); } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c index 9dd41fdbe16..7aaff553396 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.c +++ b/src/core/ext/transport/chttp2/transport/hpack_table.c @@ -40,9 +40,10 @@ #include #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/support/murmur_hash.h" -extern int grpc_http_trace; +extern grpc_tracer_flag grpc_http_trace; static struct { const char *key; @@ -260,7 +261,7 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx, if (tbl->max_bytes == max_bytes) { return; } - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes); } while (tbl->mem_used > max_bytes) { @@ -284,7 +285,7 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx, gpr_free(msg); return err; } - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes); } while (tbl->mem_used > bytes) { diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 682c24ed56b..f0856a76d4b 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -46,7 +46,7 @@ #include "src/core/lib/iomgr/tcp_uv.h" #include "src/core/lib/iomgr/timer.h" -extern int grpc_tcp_trace; +extern grpc_tracer_flag grpc_tcp_trace; typedef struct grpc_uv_tcp_connect { uv_connect_t connect_req; @@ -72,7 +72,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { int done; grpc_uv_tcp_connect *connect = acp; - if (grpc_tcp_trace) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", connect->addr_name, str); @@ -156,7 +156,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; - if (grpc_tcp_trace) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); } diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index 970fcafe4a5..106bec5eca7 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -44,11 +44,12 @@ otherwise specified. */ +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/endpoint.h" #include -extern int grpc_tcp_trace; +extern grpc_tracer_flag grpc_tcp_trace; #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 1d83341299e..464907b4546 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -44,16 +44,26 @@ typedef struct completed_thread { struct completed_thread *next; } completed_thread; +// global mutex static gpr_mu g_mu; +// are we multi-threaded static bool g_threaded; +// cv to wait until a thread is needed static gpr_cv g_cv_wait; +// cv for notification when threading ends static gpr_cv g_cv_shutdown; +// number of threads in the system static int g_thread_count; +// number of threads sitting around waiting static int g_waiter_count; +// linked list of threads that have completed (and need joining) static completed_thread *g_completed_threads; +// was the manager kicked by the timer system static bool g_kicked; - -#define MAX_WAITERS 3 +// is there a thread waiting until the next timer should fire? +static bool g_has_timed_waiter; +// generation counter to track which thread is waiting for the next timer +static uint64_t g_timed_waiter_generation; static void timer_thread(void *unused); @@ -92,39 +102,79 @@ void grpc_timer_manager_tick() { } static void timer_thread(void *unused) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + // check timer state, updates next to the next time to run a check if (grpc_timer_check(&exec_ctx, now, &next)) { + // if there's something to execute... gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary --g_waiter_count; - bool start_thread = g_waiter_count == 0; - if (start_thread && g_threaded) { + if (g_waiter_count == 0 && g_threaded) { start_timer_thread_and_unlock(); } else { + // if there's no thread waiting with a timeout, kick an existing waiter + // so that the next deadline is not missed + if (!g_has_timed_waiter) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + gpr_cv_signal(&g_cv_wait); + } gpr_mu_unlock(&g_mu); } + // without our lock, flush the exec_ctx grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead gc_completed_threads(); + // get ready to wait again ++g_waiter_count; gpr_mu_unlock(&g_mu); } else { gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave if (!g_threaded) break; - if (gpr_cv_wait(&g_cv_wait, &g_mu, next)) { - if (g_kicked) { - grpc_timer_consume_kick(); - g_kicked = false; - } else if (g_waiter_count > MAX_WAITERS) { - break; - } + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire + // all other timers wait forever + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + if (!g_has_timed_waiter) { + g_has_timed_waiter = true; + // we use a generation counter to track the timed waiter so we can + // cancel an existing one quickly (and when it actually times out it'll + // figure stuff out instead of incurring a wakeup) + my_timed_waiter_generation = ++g_timed_waiter_generation; + gpr_log(GPR_DEBUG, "sleep for a while"); + } else { + next = inf_future; + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + g_kicked); + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + } + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; } - gpr_mu_unlock(&g_mu); } + gpr_mu_unlock(&g_mu); } + // terminate the thread: drop the waiter count, thread count, and let whomever + // stopped the threading stuff know that we're done --g_waiter_count; --g_thread_count; if (0 == g_thread_count) { @@ -135,6 +185,7 @@ static void timer_thread(void *unused) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); + grpc_exec_ctx_finish(&exec_ctx); gpr_log(GPR_DEBUG, "End timer thread"); } @@ -193,6 +244,8 @@ void grpc_timer_manager_set_threading(bool threaded) { void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; + g_has_timed_waiter = false; + ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); } diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 46e41dd4490..88a9f6b8556 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -41,12 +41,13 @@ #include #include +#include "src/core/lib/debug/trace.h" #include "test/core/util/test_config.h" #define MAX_CB 30 -extern int grpc_timer_trace; -extern int grpc_timer_check_trace; +extern grpc_tracer_flag grpc_timer_trace; +extern grpc_tracer_flag grpc_timer_check_trace; static int cb_called[MAX_CB][2]; @@ -63,8 +64,8 @@ static void add_test(void) { gpr_log(GPR_INFO, "add_test"); grpc_timer_list_init(start); - grpc_timer_trace = 1; - grpc_timer_check_trace = 1; + grpc_timer_trace.value = 1; + grpc_timer_check_trace.value = 1; memset(cb_called, 0, sizeof(cb_called)); /* 10 ms timers. will expire in the current epoch */ @@ -138,8 +139,8 @@ void destruction_test(void) { gpr_log(GPR_INFO, "destruction_test"); grpc_timer_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); - grpc_timer_trace = 1; - grpc_timer_check_trace = 1; + grpc_timer_trace.value = 1; + grpc_timer_check_trace.value = 1; memset(cb_called, 0, sizeof(cb_called)); grpc_timer_init(