diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index f0b9ee39f65..31316cfc4dc 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -161,6 +161,7 @@ struct grpc_fd { /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ + gpr_mu orphaned_mu; bool orphaned; gpr_atm read_closure; @@ -285,6 +286,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); pollable_destroy(&fd->pollable); + gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -347,6 +349,7 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + gpr_mu_init(&new_fd->orphaned_mu); new_fd->orphaned = false; grpc_lfev_init(&new_fd->read_closure); grpc_lfev_init(&new_fd->write_closure); @@ -374,11 +377,11 @@ static grpc_fd *fd_create(int fd, const char *name) { static int fd_wrapped_fd(grpc_fd *fd) { int ret_fd = -1; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->orphaned_mu); if (!fd->orphaned) { ret_fd = fd->fd; } - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->orphaned_mu); return ret_fd; } @@ -390,6 +393,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -413,6 +417,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&fd->orphaned_mu); gpr_mu_unlock(&fd->pollable.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); @@ -545,9 +550,11 @@ static void pollable_init(pollable *p, polling_obj_type type) { } static void pollable_destroy(pollable *p) { - close(p->epfd); - grpc_wakeup_fd_destroy(&p->wakeup); po_destroy(&p->po); + if (p->epfd != -1) { + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + } } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ @@ -590,7 +597,13 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; + GPR_ASSERT(epfd != -1); + gpr_mu_lock(&fd->orphaned_mu); + if (fd->orphaned) { + gpr_mu_unlock(&fd->orphaned_mu); + return GRPC_ERROR_NONE; + } struct epoll_event ev_fd = { .events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE, .data.ptr = fd}; if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { @@ -614,6 +627,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); } } + gpr_mu_unlock(&fd->orphaned_mu); return error; } diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index 2cd50a4ff79..a388d5f27b6 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -60,7 +60,6 @@ static void destroy_pops_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, grpc_pollset *pollset = grpc_polling_entity_pollset(p); grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); - grpc_shutdown(); } static void freed_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, @@ -122,12 +121,13 @@ void grpc_free_port_using_server(int port) { gpr_mu_unlock(pr.mu); grpc_httpcli_context_destroy(&exec_ctx, &context); - grpc_exec_ctx_finish(&exec_ctx); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); gpr_free(path); grpc_http_response_destroy(&rsp); + + grpc_shutdown(); } typedef struct portreq { @@ -239,7 +239,6 @@ int grpc_pick_port_using_server(void) { grpc_closure_create(got_port_from_server, &pr, grpc_schedule_on_exec_ctx), &pr.response); grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); - grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(pr.mu); while (pr.port == -1) { grpc_pollset_worker *worker = NULL; @@ -258,6 +257,7 @@ int grpc_pick_port_using_server(void) { grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); + grpc_shutdown(); return pr.port; } diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index 2a0e96ddf1d..19b3ba0a3c5 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -59,7 +59,7 @@ extern "C" { auto& force_library_initialization = Library::get(); static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) { - grpc_pollset_destroy(static_cast(ps)); + grpc_pollset_destroy(exec_ctx, static_cast(ps)); } static void BM_CreateDestroyPollset(benchmark::State& state) {