Fix backup poller races (#26446)

* Use a mutex for backup poller operations

* Remove unnecessary assert
pull/26510/head
Yash Tibrewal 3 years ago committed by GitHub
parent 25d91e3091
commit a6b140cd85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/iomgr/iomgr_posix.cc
  2. 79
      src/core/lib/iomgr/tcp_posix.cc
  3. 8
      src/core/lib/iomgr/tcp_posix.h

@ -41,11 +41,13 @@ extern grpc_address_resolver_vtable grpc_posix_resolver_vtable;
static void iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_tcp_posix_init();
}
static void iomgr_platform_flush(void) {}
static void iomgr_platform_shutdown(void) {
grpc_tcp_posix_shutdown();
grpc_event_engine_shutdown();
grpc_wakeup_fd_global_destroy();
}

@ -432,8 +432,10 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);
#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
static gpr_atm g_uncovered_notifications_pending;
static gpr_atm g_backup_poller; /* backup_poller* */
static grpc_core::Mutex* g_backup_poller_mu = nullptr;
static int g_uncovered_notifications_pending
ABSL_GUARDED_BY(g_backup_poller_mu);
static backup_poller* g_backup_poller ABSL_GUARDED_BY(g_backup_poller_mu);
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
@ -461,17 +463,13 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
"backup_poller:pollset_work",
grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline));
gpr_mu_unlock(p->pollset_mu);
/* last "uncovered" notification is the ref that keeps us polling, if we get
* there try a cas to release it */
if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 &&
gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) {
gpr_mu_lock(p->pollset_mu);
bool cas_ok =
gpr_atm_full_cas(&g_backup_poller, reinterpret_cast<gpr_atm>(p), 0);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok);
}
gpr_mu_unlock(p->pollset_mu);
g_backup_poller_mu->Lock();
/* last "uncovered" notification is the ref that keeps us polling */
if (g_uncovered_notifications_pending == 1) {
GPR_ASSERT(g_backup_poller == p);
g_backup_poller = nullptr;
g_uncovered_notifications_pending = 0;
g_backup_poller_mu->Unlock();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p shutdown", p);
}
@ -479,6 +477,7 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
grpc_schedule_on_exec_ctx));
} else {
g_backup_poller_mu->Unlock();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p reschedule", p);
}
@ -489,15 +488,17 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
}
static void drop_uncovered(grpc_tcp* /*tcp*/) {
backup_poller* p =
reinterpret_cast<backup_poller*>(gpr_atm_acq_load(&g_backup_poller));
gpr_atm old_count =
gpr_atm_full_fetch_add(&g_uncovered_notifications_pending, -1);
int old_count;
backup_poller* p;
g_backup_poller_mu->Lock();
p = g_backup_poller;
old_count = g_uncovered_notifications_pending--;
g_backup_poller_mu->Unlock();
GPR_ASSERT(old_count > 1);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p uncover cnt %d->%d", p,
static_cast<int>(old_count), static_cast<int>(old_count) - 1);
gpr_log(GPR_INFO, "BACKUP_POLLER:%p uncover cnt %d->%d", p, old_count,
old_count - 1);
}
GPR_ASSERT(old_count != 1);
}
// gRPC API considers a Write operation to be done the moment it clears ‘flow
@ -509,38 +510,33 @@ static void drop_uncovered(grpc_tcp* /*tcp*/) {
// polling thread and progress is made) and hence add it to a backup poller here
static void cover_self(grpc_tcp* tcp) {
backup_poller* p;
gpr_atm old_count =
gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER: cover cnt %d->%d",
static_cast<int>(old_count), 2 + static_cast<int>(old_count));
}
if (old_count == 0) {
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED();
g_backup_poller_mu->Lock();
int old_count = 0;
if (g_uncovered_notifications_pending == 0) {
g_uncovered_notifications_pending = 2;
p = static_cast<backup_poller*>(
gpr_zalloc(sizeof(*p) + grpc_pollset_size()));
g_backup_poller = p;
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
g_backup_poller_mu->Unlock();
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p create", p);
}
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
grpc_core::Executor::Run(
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr),
GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
} else {
while ((p = reinterpret_cast<backup_poller*>(
gpr_atm_acq_load(&g_backup_poller))) == nullptr) {
// spin waiting for backup poller
}
old_count = g_uncovered_notifications_pending++;
p = g_backup_poller;
g_backup_poller_mu->Unlock();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p add %p", p, tcp);
gpr_log(GPR_INFO, "BACKUP_POLLER:%p add %p cnt %d->%d", p, tcp,
old_count - 1, old_count);
}
grpc_pollset_add_fd(BACKUP_POLLER_POLLSET(p), tcp->em_fd);
if (old_count != 0) {
drop_uncovered(tcp);
}
}
static void notify_on_read(grpc_tcp* tcp) {
@ -1873,4 +1869,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
TCP_UNREF(tcp, "destroy");
}
void grpc_tcp_posix_init() { g_backup_poller_mu = new grpc_core::Mutex; }
void grpc_tcp_posix_shutdown() {
delete g_backup_poller_mu;
g_backup_poller_mu = nullptr;
}
#endif /* GRPC_POSIX_SOCKET_TCP */

@ -57,4 +57,12 @@ int grpc_tcp_fd(grpc_endpoint* ep);
void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done);
#ifdef GRPC_POSIX_SOCKET_TCP
void grpc_tcp_posix_init();
void grpc_tcp_posix_shutdown();
#endif /* GRPC_POSIX_SOCKET_TCP */
#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */

Loading…
Cancel
Save