From 5dca5bf644afc3df624f4c97c406a66b149df6f4 Mon Sep 17 00:00:00 2001 From: Ian Coolidge Date: Sun, 16 Oct 2016 22:36:02 -0700 Subject: [PATCH 1/2] sync_posix: Add Linux-specific monotonic clock preference When gRPC is running during wall clock acquisition, it's useful to avoid wall clock references as much as possible. --- src/core/lib/support/sync_posix.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/core/lib/support/sync_posix.cc b/src/core/lib/support/sync_posix.cc index dfdd233bf43..c3f6b104631 100644 --- a/src/core/lib/support/sync_posix.cc +++ b/src/core/lib/support/sync_posix.cc @@ -66,7 +66,12 @@ int gpr_mu_trylock(gpr_mu* mu) { /*----------------------------------------*/ void gpr_cv_init(gpr_cv* cv) { - GPR_ASSERT(pthread_cond_init(cv, nullptr) == 0); + pthread_condattr_t attr; + GPR_ASSERT(pthread_condattr_init(&attr) == 0); +#if GPR_LINUX + GPR_ASSERT(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0); +#endif // GPR_LINUX + GPR_ASSERT(pthread_cond_init(cv, &attr) == 0); } void gpr_cv_destroy(gpr_cv* cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); } @@ -78,7 +83,11 @@ int gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, gpr_timespec abs_deadline) { err = pthread_cond_wait(cv, mu); } else { struct timespec abs_deadline_ts; +#if GPR_LINUX + abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_MONOTONIC); +#else abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_REALTIME); +#endif // GPR_LINUX abs_deadline_ts.tv_sec = (time_t)abs_deadline.tv_sec; abs_deadline_ts.tv_nsec = abs_deadline.tv_nsec; err = pthread_cond_timedwait(cv, mu, &abs_deadline_ts); From 54961bb9e16f84d193077277f7d2d8269f57a411 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 4 Dec 2017 12:50:27 -0800 Subject: [PATCH 2/2] Change the code to use MONOTONIC clocks when calling gpr_cv_wait (condition varialbes in linux support MONOTONIC clock type) --- src/core/lib/iomgr/ev_epoll1_linux.cc | 2 +- src/core/lib/iomgr/ev_poll_posix.cc | 8 ++++---- src/core/lib/iomgr/executor.cc | 2 +- src/core/lib/iomgr/iomgr.cc | 5 +++-- src/core/lib/iomgr/timer_manager.cc | 4 ++-- src/core/lib/surface/completion_queue.cc | 2 +- src/core/lib/surface/server.cc | 2 +- test/core/support/cpu_test.cc | 2 +- test/core/support/sync_test.cc | 12 ++++++------ test/cpp/util/cli_call.cc | 4 ++-- 10 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 0dda1d924c3..a52bedeb7a4 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -753,7 +753,7 @@ static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } if (gpr_cv_wait(&worker->cv, &pollset->mu, - grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && + grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) && worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 8659559f78a..43a63c5255e 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1494,7 +1494,7 @@ static void run_poll(void* args) { decref_poll_result(result); // Leave this polling thread alive for a grace period to do another poll() // op - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC); deadline = gpr_time_add(deadline, thread_grace); pargs->trigger_set = 0; gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); @@ -1549,9 +1549,9 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { } } - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC); if (timeout < 0) { - deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } else { deadline = gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); @@ -1654,7 +1654,7 @@ static void global_cv_fd_table_shutdown() { // Not doing so will result in reported memory leaks if (!gpr_unref(&g_cvfds.pollcount)) { int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(3, GPR_TIMESPAN))); GPR_ASSERT(res == 0); } diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index d8a195f0108..cca59e7a528 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -158,7 +158,7 @@ static void executor_thread(void* arg) { ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; - gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } if (ts->shutdown) { if (executor_trace.enabled()) { diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index e077b35014f..bdedd850410 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -117,8 +117,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx* exec_ctx) { dump_objects("LEAKED"); abort(); } - gpr_timespec short_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); + gpr_timespec short_deadline = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(100, GPR_TIMESPAN)); if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) { if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { if (g_root_object.next != &g_root_object) { diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index dac74aea24e..b68088e4bd5 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -193,7 +193,7 @@ static bool wait_until(grpc_exec_ctx* exec_ctx, grpc_millis next) { } gpr_cv_wait(&g_cv_wait, &g_mu, - grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); + grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC)); if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -319,7 +319,7 @@ static void stop_threads(void) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } while (g_thread_count > 0) { - gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 98d7e359437..dfb5b5bf291 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -118,7 +118,7 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx, } w.kicked = false; gpr_timespec deadline_ts = - grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME); + grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC); while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 0f8a057f315..e88e1ee1610 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1213,7 +1213,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, gpr_mu_lock(&server->mu_global); while (server->starting) { gpr_cv_wait(&server->starting_cv, &server->mu_global, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } /* stay locked, and gather up some stuff to do */ diff --git a/test/core/support/cpu_test.cc b/test/core/support/cpu_test.cc index 1783ec3c60a..770b074d47a 100644 --- a/test/core/support/cpu_test.cc +++ b/test/core/support/cpu_test.cc @@ -114,7 +114,7 @@ static void cpu_test(void) { } gpr_mu_lock(&ct.mu); while (!ct.is_done) { - gpr_cv_wait(&ct.done_cv, &ct.mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&ct.done_cv, &ct.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&ct.mu); fprintf(stderr, "Saw cores ["); diff --git a/test/core/support/sync_test.cc b/test/core/support/sync_test.cc index 86e78ce0b51..c8824882430 100644 --- a/test/core/support/sync_test.cc +++ b/test/core/support/sync_test.cc @@ -73,7 +73,7 @@ void queue_append(queue* q, int x) { corresponding condition variable. The predicate must be on state protected by the lock. */ while (q->length == N) { - gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } if (q->length == 0) { /* Wake threads blocked in queue_remove(). */ /* It's normal to use gpr_cv_broadcast() or gpr_signal() while @@ -197,7 +197,7 @@ static void test_create_threads(struct test* m, void (*body)(void* arg)) { static void test_wait(struct test* m) { gpr_mu_lock(&m->mu); while (m->done != 0) { - gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&m->mu); } @@ -297,7 +297,7 @@ static void inc_by_turns(void* v /*=m*/) { for (i = 0; i != m->iterations; i++) { gpr_mu_lock(&m->mu); while ((m->counter % m->threads) != id) { - gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } m->counter++; gpr_cv_broadcast(&m->cv); @@ -314,7 +314,7 @@ static void inc_with_1ms_delay(void* v /*=m*/) { for (i = 0; i != m->iterations; i++) { gpr_timespec deadline; gpr_mu_lock(&m->mu); - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(1000, GPR_TIMESPAN)); while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) { } @@ -370,14 +370,14 @@ static void consumer(void* v /*=m*/) { int64_t i; int value; for (i = 0; i != n; i++) { - queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_REALTIME)); + queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_lock(&m->mu); m->counter = n; gpr_mu_unlock(&m->mu); GPR_ASSERT( !queue_remove(&m->q, &value, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(1000000, GPR_TIMESPAN)))); mark_thread_done(m); } diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc index c3220efa544..4f1a20c7278 100644 --- a/test/cpp/util/cli_call.cc +++ b/test/cpp/util/cli_call.cc @@ -126,7 +126,7 @@ void CliCall::WriteAndWait(const grpc::string& request) { call_->Write(send_buffer, tag(2)); write_done_ = false; while (!write_done_) { - gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&write_mu_); } @@ -136,7 +136,7 @@ void CliCall::WritesDoneAndWait() { call_->WritesDone(tag(4)); write_done_ = false; while (!write_done_) { - gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(&write_mu_); }