Merge pull request #15648 from apolcyn/remove_fd_shutdown_already_closed

Remove already_closed param from fd_orphan
pull/15730/head
apolcyn 7 years ago committed by GitHub
commit eb10abe94a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  2. 4
      src/core/lib/iomgr/ev_epoll1_linux.cc
  3. 6
      src/core/lib/iomgr/ev_epollex_linux.cc
  4. 23
      src/core/lib/iomgr/ev_epollsig_linux.cc
  5. 4
      src/core/lib/iomgr/ev_poll_posix.cc
  6. 9
      src/core/lib/iomgr/ev_posix.cc
  7. 4
      src/core/lib/iomgr/ev_posix.h
  8. 6
      src/core/lib/iomgr/tcp_client_posix.cc
  9. 2
      src/core/lib/iomgr/tcp_posix.cc
  10. 2
      src/core/lib/iomgr/tcp_server_posix.cc
  11. 3
      src/core/lib/iomgr/udp_server.cc
  12. 6
      test/core/iomgr/ev_epollsig_linux_test.cc
  13. 8
      test/core/iomgr/fd_posix_test.cc
  14. 3
      test/core/iomgr/pollset_set_test.cc
  15. 4
      test/cpp/microbenchmarks/bm_pollset.cc

@ -104,17 +104,11 @@ static void fd_node_destroy(fd_node* fdn) {
GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown); GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu); gpr_mu_destroy(&fdn->mu);
/* TODO: we need to pass a non-null "release_fd" parameter to
* grpc_fd_orphan because "epollsig" iomgr will close the fd
* even if "already_closed" is true, and it only leaves it open
* if "release_fd" is non-null. This is unlike the rest of the
* pollers, should this be changed within epollsig? */
int dummy_release_fd;
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */, int dummy_release_fd;
"c-ares query finished"); grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished");
gpr_free(fdn); gpr_free(fdn);
} }

@ -346,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
} }
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) { const char* reason) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != nullptr); bool is_release_fd = (release_fd != nullptr);
@ -359,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */ descriptor fd->fd (but we still own the grpc_fd structure). */
if (is_release_fd) { if (is_release_fd) {
*release_fd = fd->fd; *release_fd = fd->fd;
} else if (!already_closed) { } else {
close(fd->fd); close(fd->fd);
} }

@ -403,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) {
} }
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) { const char* reason) {
bool is_fd_closed = already_closed; bool is_fd_closed = false;
gpr_mu_lock(&fd->orphan_mu); gpr_mu_lock(&fd->orphan_mu);
@ -414,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */ descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != nullptr) { if (release_fd != nullptr) {
*release_fd = fd->fd; *release_fd = fd->fd;
} else if (!is_fd_closed) { } else {
close(fd->fd); close(fd->fd);
is_fd_closed = true; is_fd_closed = true;
} }

@ -442,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi,
/* The caller is expected to hold pi->mu lock before calling this function */ /* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
bool is_fd_closed,
grpc_error** error) { grpc_error** error) {
int err; int err;
size_t i; size_t i;
@ -451,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
/* If fd is already closed, then it would have been automatically been removed /* If fd is already closed, then it would have been automatically been removed
from the epoll set */ from the epoll set */
if (!is_fd_closed) { err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); if (err < 0 && errno != ENOENT) {
if (err < 0 && errno != ENOENT) { gpr_asprintf(
gpr_asprintf( &err_msg,
&err_msg, "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
"epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", pi->epoll_fd, fd->fd, errno, strerror(errno));
pi->epoll_fd, fd->fd, errno, strerror(errno)); append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); gpr_free(err_msg);
gpr_free(err_msg);
}
} }
for (i = 0; i < pi->fd_cnt; i++) { for (i = 0; i < pi->fd_cnt; i++) {
@ -874,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) {
} }
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) { const char* reason) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr; polling_island* unref_pi = nullptr;
@ -895,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
before doing this.) */ before doing this.) */
if (fd->po.pi != nullptr) { if (fd->po.pi != nullptr) {
polling_island* pi_latest = polling_island_lock(fd->po.pi); polling_island* pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); polling_island_remove_fd_locked(pi_latest, fd, &error);
gpr_mu_unlock(&pi_latest->mu); gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi; unref_pi = fd->po.pi;

@ -425,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) {
} }
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) { const char* reason) {
fd->on_done_closure = on_done; fd->on_done_closure = on_done;
fd->released = release_fd != nullptr; fd->released = release_fd != nullptr;
if (release_fd != nullptr) { if (release_fd != nullptr) {
*release_fd = fd->fd; *release_fd = fd->fd;
fd->released = true; fd->released = true;
} else if (already_closed) {
fd->released = true;
} }
gpr_mu_lock(&fd->mu); gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */

@ -209,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) {
} }
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) { const char* reason) {
GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)", GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
grpc_fd_wrapped_fd(fd), on_done, release_fd, on_done, release_fd, reason);
already_closed, reason);
GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd)); GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason); g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
} }
void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) { void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {

@ -46,7 +46,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd); int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd, void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason); const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why); void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
@ -112,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd);
notify_on_write. notify_on_write.
MUST NOT be called with a pollset lock taken */ MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason); const char* reason);
/* Has grpc_fd_shutdown been called on an fd? */ /* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd* fd); bool grpc_fd_is_shutdown(grpc_fd* fd);

@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) {
finish: finish:
if (fd != nullptr) { if (fd != nullptr) {
grpc_pollset_set_del_fd(ac->interested_parties, fd); grpc_pollset_set_del_fd(ac->interested_parties, fd);
grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
"tcp_client_orphan");
fd = nullptr; fd = nullptr;
} }
done = (--ac->refs == 0); done = (--ac->refs == 0);
@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd(
return; return;
} }
if (errno != EWOULDBLOCK && errno != EINPROGRESS) { if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */, grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
"tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect")); GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
return; return;
} }

@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
static void tcp_free(grpc_tcp* tcp) { static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
false /* already_closed */, "tcp_unref_orphan"); "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user); grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string); gpr_free(tcp->peer_string);

@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
false /* already_closed */, "tcp_listener_shutdown"); "tcp_listener_shutdown");
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {

@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() {
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
/* Because at this point, all listening sockets have been shutdown already, no /* Because at this point, all listening sockets have been shutdown already, no
* need to call OnFdAboutToOrphan() to notify the handler again. */ * need to call OnFdAboutToOrphan() to notify the handler again. */
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown");
false /* already_closed */, "udp_listener_shutdown");
} }
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {

@ -79,8 +79,7 @@ static void test_fd_cleanup(test_fd* tfds, int num_fds) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
"test_fd_cleanup");
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(release_fd == tfds[i].inner_fd); GPR_ASSERT(release_fd == tfds[i].inner_fd);
@ -287,8 +286,7 @@ static void test_threading(void) {
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED); grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED);
grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, "done");
false /* already_closed */, "done");
grpc_pollset_shutdown(shared.pollset, grpc_pollset_shutdown(shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset, GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx)); grpc_schedule_on_exec_ctx));

@ -115,7 +115,7 @@ static void session_shutdown_cb(void* arg, /*session */
bool success) { bool success) {
session* se = static_cast<session*>(arg); session* se = static_cast<session*>(arg);
server* sv = se->sv; server* sv = se->sv;
grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a"); grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
gpr_free(se); gpr_free(se);
/* Start to shutdown listen fd. */ /* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd, grpc_fd_shutdown(sv->em_fd,
@ -171,7 +171,7 @@ static void session_read_cb(void* arg, /*session */
static void listen_shutdown_cb(void* arg /*server */, int success) { static void listen_shutdown_cb(void* arg /*server */, int success) {
server* sv = static_cast<server*>(arg); server* sv = static_cast<server*>(arg);
grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b"); grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
sv->done = 1; sv->done = 1;
@ -289,7 +289,7 @@ static void client_init(client* cl) {
/* Called when a client upload session is ready to shutdown. */ /* Called when a client upload session is ready to shutdown. */
static void client_session_shutdown_cb(void* arg /*client */, int success) { static void client_session_shutdown_cb(void* arg /*client */, int success) {
client* cl = static_cast<client*>(arg); client* cl = static_cast<client*>(arg);
grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c"); grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
cl->done = 1; cl->done = 1;
GPR_ASSERT( GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
@ -502,7 +502,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback); GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d"); grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
destroy_change_data(&a); destroy_change_data(&a);
destroy_change_data(&b); destroy_change_data(&b);

@ -136,8 +136,7 @@ static void cleanup_test_fds(test_fd* tfds, const int num_fds) {
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling * grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the * grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */ * underlying fd, call it with a non-NULL 'release_fd' parameter */
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
"test_fd_cleanup");
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd); grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);

@ -146,7 +146,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_pollset_add_fd(ps, fd); grpc_pollset_add_fd(ps, fd);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, "xxx"); grpc_fd_orphan(fd, nullptr, nullptr, "xxx");
grpc_closure shutdown_ps_closure; grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
@ -242,7 +242,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
while (!done) { while (!done) {
GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE)); GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE));
} }
grpc_fd_orphan(wakeup, nullptr, nullptr, false /* already_closed */, "done"); grpc_fd_orphan(wakeup, nullptr, nullptr, "done");
wakeup_fd.read_fd = 0; wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure; grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,

Loading…
Cancel
Save