From c9c78ee96c5a73234513decd398a63d48f14aa89 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 24 Jul 2017 10:18:07 -0700 Subject: [PATCH] Add hysteresis to avoid mallocs --- src/core/lib/iomgr/tcp_posix.c | 38 +++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index c6ec4723922..9d27de0b1c6 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -135,23 +135,34 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); GRPC_LOG_IF_ERROR("backup_poller:pollset_work", grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC))); + now, deadline)); gpr_mu_unlock(p->pollset_mu); - if (gpr_atm_no_barrier_load(&g_backup_poller) == (gpr_atm)p) { + /* last "uncovered" notification is the ref that keeps us polling, if we get + * there try a cas to release it */ + if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 && + gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { + gpr_mu_lock(p->pollset_mu); + bool cas_ok = gpr_atm_no_barrier_cas(&g_backup_poller, (gpr_atm)p, 0); if (GRPC_TRACER_ON(grpc_tcp_trace)) { - gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); } - GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); - } else { + gpr_mu_unlock(p->pollset_mu); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); } grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, grpc_schedule_on_exec_ctx)); + } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + } + GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); } } @@ -163,16 +174,7 @@ static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, (int)old_count - 1); } - if (old_count == 1) { - gpr_mu_lock(p->pollset_mu); - bool cas_ok = gpr_atm_no_barrier_cas(&g_backup_poller, (gpr_atm)p, 0); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { - gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); - } - GRPC_LOG_IF_ERROR("backup_poller:pollset_kick", - grpc_pollset_kick(BACKUP_POLLER_POLLSET(p), NULL)); - gpr_mu_unlock(p->pollset_mu); - } + GPR_ASSERT(old_count != 1); } static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { @@ -203,7 +205,9 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); } grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); - drop_uncovered(exec_ctx, tcp); + if (old_count != 0) { + drop_uncovered(exec_ctx, tcp); + } } static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {