Merge pull request #13631 from sreecha/cv-wait-monotonic

Use monotonic clock type for gpr_cv_wait
pull/13745/head
Sree Kuchibhotla 7 years ago committed by GitHub
commit 66d10191b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/iomgr/ev_epoll1_linux.cc
  2. 8
      src/core/lib/iomgr/ev_poll_posix.cc
  3. 2
      src/core/lib/iomgr/executor.cc
  4. 2
      src/core/lib/iomgr/iomgr.cc
  5. 4
      src/core/lib/iomgr/timer_manager.cc
  6. 11
      src/core/lib/support/sync_posix.cc
  7. 2
      src/core/lib/surface/completion_queue.cc
  8. 2
      src/core/lib/surface/server.cc
  9. 2
      test/core/support/cpu_test.cc
  10. 12
      test/core/support/sync_test.cc
  11. 4
      test/cpp/util/cli_call.cc

@ -738,7 +738,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
}
if (gpr_cv_wait(&worker->cv, &pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) &&
grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
worker->state == UNKICKED) {
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */

@ -1471,7 +1471,7 @@ static void run_poll(void* args) {
decref_poll_result(result);
// Leave this polling thread alive for a grace period to do another poll()
// op
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
deadline = gpr_time_add(deadline, thread_grace);
pargs->trigger_set = 0;
gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
@ -1526,9 +1526,9 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
}
}
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
if (timeout < 0) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
} else {
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
@ -1631,7 +1631,7 @@ static void global_cv_fd_table_shutdown() {
// Not doing so will result in reported memory leaks
if (!gpr_unref(&g_cvfds.pollcount)) {
int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(3, GPR_TIMESPAN)));
GPR_ASSERT(res == 0);
}

@ -155,7 +155,7 @@ static void executor_thread(void* arg) {
ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
if (ts->shutdown) {
if (executor_trace.enabled()) {

@ -118,7 +118,7 @@ void grpc_iomgr_shutdown() {
abort();
}
gpr_timespec short_deadline =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(100, GPR_TIMESPAN));
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) >

@ -192,7 +192,7 @@ static bool wait_until(grpc_millis next) {
}
gpr_cv_wait(&g_cv_wait, &g_mu,
grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME));
grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC));
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
@ -317,7 +317,7 @@ static void stop_threads(void) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
while (g_thread_count > 0) {
gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}

@ -66,7 +66,12 @@ int gpr_mu_trylock(gpr_mu* mu) {
/*----------------------------------------*/
void gpr_cv_init(gpr_cv* cv) {
GPR_ASSERT(pthread_cond_init(cv, nullptr) == 0);
pthread_condattr_t attr;
GPR_ASSERT(pthread_condattr_init(&attr) == 0);
#if GPR_LINUX
GPR_ASSERT(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0);
#endif // GPR_LINUX
GPR_ASSERT(pthread_cond_init(cv, &attr) == 0);
}
void gpr_cv_destroy(gpr_cv* cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); }
@ -78,7 +83,11 @@ int gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, gpr_timespec abs_deadline) {
err = pthread_cond_wait(cv, mu);
} else {
struct timespec abs_deadline_ts;
#if GPR_LINUX
abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_MONOTONIC);
#else
abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_REALTIME);
#endif // GPR_LINUX
abs_deadline_ts.tv_sec = (time_t)abs_deadline.tv_sec;
abs_deadline_ts.tv_nsec = abs_deadline.tv_nsec;
err = pthread_cond_timedwait(cv, mu, &abs_deadline_ts);

@ -115,7 +115,7 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
}
w.kicked = false;
gpr_timespec deadline_ts =
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME);
grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;

@ -1170,7 +1170,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
gpr_mu_lock(&server->mu_global);
while (server->starting) {
gpr_cv_wait(&server->starting_cv, &server->mu_global,
gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
/* stay locked, and gather up some stuff to do */

@ -115,7 +115,7 @@ static void cpu_test(void) {
}
gpr_mu_lock(&ct.mu);
while (!ct.is_done) {
gpr_cv_wait(&ct.done_cv, &ct.mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&ct.done_cv, &ct.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&ct.mu);
fprintf(stderr, "Saw cores [");

@ -73,7 +73,7 @@ void queue_append(queue* q, int x) {
corresponding condition variable. The predicate must be on state
protected by the lock. */
while (q->length == N) {
gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
if (q->length == 0) { /* Wake threads blocked in queue_remove(). */
/* It's normal to use gpr_cv_broadcast() or gpr_signal() while
@ -197,7 +197,7 @@ static void test_create_threads(struct test* m, void (*body)(void* arg)) {
static void test_wait(struct test* m) {
gpr_mu_lock(&m->mu);
while (m->done != 0) {
gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&m->mu);
}
@ -297,7 +297,7 @@ static void inc_by_turns(void* v /*=m*/) {
for (i = 0; i != m->iterations; i++) {
gpr_mu_lock(&m->mu);
while ((m->counter % m->threads) != id) {
gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
m->counter++;
gpr_cv_broadcast(&m->cv);
@ -314,7 +314,7 @@ static void inc_with_1ms_delay(void* v /*=m*/) {
for (i = 0; i != m->iterations; i++) {
gpr_timespec deadline;
gpr_mu_lock(&m->mu);
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(1000, GPR_TIMESPAN));
while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) {
}
@ -370,14 +370,14 @@ static void consumer(void* v /*=m*/) {
int64_t i;
int value;
for (i = 0; i != n; i++) {
queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_REALTIME));
queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_lock(&m->mu);
m->counter = n;
gpr_mu_unlock(&m->mu);
GPR_ASSERT(
!queue_remove(&m->q, &value,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(1000000, GPR_TIMESPAN))));
mark_thread_done(m);
}

@ -126,7 +126,7 @@ void CliCall::WriteAndWait(const grpc::string& request) {
call_->Write(send_buffer, tag(2));
write_done_ = false;
while (!write_done_) {
gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&write_mu_);
}
@ -136,7 +136,7 @@ void CliCall::WritesDoneAndWait() {
call_->WritesDone(tag(4));
write_done_ = false;
while (!write_done_) {
gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&write_mu_);
}

Loading…
Cancel
Save