Merge pull request #11786 from y-zeng/fd_orphan

Support closed fd in grpc_fd_orphan()
pull/11963/head
Yuchen Zeng 7 years ago committed by GitHub
commit 9511aefaa9
  1. 7
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
  2. 4
      src/core/lib/iomgr/ev_epoll1_linux.c
  3. 5
      src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
  4. 6
      src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
  5. 6
      src/core/lib/iomgr/ev_epollex_linux.c
  6. 5
      src/core/lib/iomgr/ev_epollsig_linux.c
  7. 7
      src/core/lib/iomgr/ev_poll_posix.c
  8. 5
      src/core/lib/iomgr/ev_posix.c
  9. 4
      src/core/lib/iomgr/ev_posix.h
  10. 6
      src/core/lib/iomgr/tcp_client_posix.c
  11. 2
      src/core/lib/iomgr/tcp_posix.c
  12. 2
      src/core/lib/iomgr/tcp_server_posix.c
  13. 2
      src/core/lib/iomgr/udp_server.c
  14. 6
      test/core/iomgr/ev_epollsig_linux_test.c
  15. 11
      test/core/iomgr/fd_posix_test.c
  16. 3
      test/core/iomgr/pollset_set_test.c
  17. 5
      test/cpp/microbenchmarks/bm_pollset.cc

@ -103,10 +103,9 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. To prevent this fd from being closed by grpc_fd_orphan,
a fd pointer is provided. */
int fd;
grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, &fd, "c-ares query finished");
grpc_fd_orphan. */
grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */,
"c-ares query finished");
gpr_free(fdn);
}

@ -249,7 +249,7 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
@ -260,7 +260,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
} else {
} else if (!already_closed) {
close(fd->fd);
}

@ -931,7 +931,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@ -952,8 +952,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
&error);
polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;

@ -493,8 +493,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool is_fd_closed = false;
bool already_closed, const char *reason) {
bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
epoll_set *unref_eps = NULL;
@ -505,7 +505,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
} else {
} else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}

@ -380,8 +380,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool is_fd_closed = false;
bool already_closed, const char *reason) {
bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->pollable.po.mu);
@ -392,7 +392,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
} else {
} else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}

@ -854,7 +854,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@ -875,8 +875,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
&error);
polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;

@ -398,11 +398,14 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool already_closed, const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
if (fd->released) {
if (release_fd != NULL) {
*release_fd = fd->fd;
fd->released = true;
} else if (already_closed) {
fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */

@ -170,8 +170,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason) {
g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
int *release_fd, bool already_closed, const char *reason) {
g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, already_closed,
reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {

@ -37,7 +37,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd *(*fd_create)(int fd, const char *name);
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
int *release_fd, bool already_closed, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
@ -104,7 +104,7 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
int *release_fd, bool already_closed, const char *reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd *fd);

@ -209,7 +209,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan");
grpc_fd_orphan(exec_ctx, fd, NULL, NULL, false /* already_closed */,
"tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@ -295,7 +296,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, false /* already_closed */,
"tcp_client_connect_error");
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}

@ -156,7 +156,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
false /* already_closed */, "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);

@ -166,7 +166,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
false /* already_closed */, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {

@ -214,7 +214,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {

@ -79,7 +79,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
grpc_exec_ctx_flush(exec_ctx);
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(release_fd == tfds[i].inner_fd);
@ -294,7 +295,8 @@ static void test_threading(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED);
grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, "done");
grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL,
false /* already_closed */, "done");
grpc_pollset_shutdown(&exec_ctx, shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));

@ -114,7 +114,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
bool success) {
session *se = arg;
server *sv = se->sv;
grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, false /* already_closed */,
"a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd,
@ -171,7 +172,8 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
int success) {
server *sv = arg;
grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b");
grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, false /* already_closed */,
"b");
gpr_mu_lock(g_mu);
sv->done = 1;
@ -291,7 +293,8 @@ static void client_init(client *cl) {
static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
void *arg /*client */, int success) {
client *cl = arg;
grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c");
grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, false /* already_closed */,
"c");
cl->done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
@ -511,7 +514,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d");
grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, false /* already_closed */, "d");
grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);

@ -137,7 +137,8 @@ static void cleanup_test_fds(grpc_exec_ctx *exec_ctx, test_fd *tfds,
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);

@ -149,7 +149,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_pollset_add_fd(&exec_ctx, ps, fd);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, false /* already_closed */, "xxx");
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
@ -247,7 +247,8 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
while (!done) {
GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, false /* already_closed */,
"done");
wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,

Loading…
Cancel
Save