Remove already_closed param from fd_orphan

pull/15648/head
Alexander Polcyn 7 years ago
parent 701a4b1c32
commit 4e8a2f5f27
  1. 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  2. 4
      src/core/lib/iomgr/ev_epoll1_linux.cc
  3. 6
      src/core/lib/iomgr/ev_epollex_linux.cc
  4. 23
      src/core/lib/iomgr/ev_epollsig_linux.cc
  5. 4
      src/core/lib/iomgr/ev_poll_posix.cc
  6. 9
      src/core/lib/iomgr/ev_posix.cc
  7. 4
      src/core/lib/iomgr/ev_posix.h
  8. 6
      src/core/lib/iomgr/tcp_client_posix.cc
  9. 2
      src/core/lib/iomgr/tcp_posix.cc
  10. 2
      src/core/lib/iomgr/tcp_server_posix.cc
  11. 3
      src/core/lib/iomgr/udp_server.cc
  12. 6
      test/core/iomgr/ev_epollsig_linux_test.cc
  13. 8
      test/core/iomgr/fd_posix_test.cc
  14. 3
      test/core/iomgr/pollset_set_test.cc
  15. 4
      test/cpp/microbenchmarks/bm_pollset.cc

@ -104,17 +104,11 @@ static void fd_node_destroy(fd_node* fdn) {
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu);
/* TODO: we need to pass a non-null "release_fd" parameter to
* grpc_fd_orphan because "epollsig" iomgr will close the fd
* even if "already_closed" is true, and it only leaves it open
* if "release_fd" is non-null. This is unlike the rest of the
* pollers, should this be changed within epollsig? */
int dummy_release_fd;
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */,
"c-ares query finished");
int dummy_release_fd;
grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished");
gpr_free(fdn);
}

@ -337,7 +337,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != nullptr);
@ -350,7 +350,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (is_release_fd) {
*release_fd = fd->fd;
} else if (!already_closed) {
} else {
close(fd->fd);
}

@ -395,8 +395,8 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
bool is_fd_closed = already_closed;
const char* reason) {
bool is_fd_closed = false;
gpr_mu_lock(&fd->orphan_mu);
@ -406,7 +406,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != nullptr) {
*release_fd = fd->fd;
} else if (!is_fd_closed) {
} else {
close(fd->fd);
is_fd_closed = true;
}

@ -435,7 +435,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
bool is_fd_closed,
grpc_error** error) {
int err;
size_t i;
@ -444,16 +443,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
if (!is_fd_closed) {
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
if (err < 0 && errno != ENOENT) {
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
pi->epoll_fd, fd->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
if (err < 0 && errno != ENOENT) {
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
pi->epoll_fd, fd->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
for (i = 0; i < pi->fd_cnt; i++) {
@ -863,7 +860,7 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr;
@ -884,7 +881,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
before doing this.) */
if (fd->po.pi != nullptr) {
polling_island* pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
polling_island_remove_fd_locked(pi_latest, fd, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;

@ -424,14 +424,12 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
const char* reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != nullptr;
if (release_fd != nullptr) {
*release_fd = fd->fd;
fd->released = true;
} else if (already_closed) {
fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */

@ -204,13 +204,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) {
}
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason) {
GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)",
grpc_fd_wrapped_fd(fd), on_done, release_fd,
already_closed, reason);
const char* reason) {
GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
on_done, release_fd, reason);
GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason);
g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
}
void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {

@ -45,7 +45,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd* (*fd_create)(int fd, const char* name);
int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason);
const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
@ -100,7 +100,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason);
const char* reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd* fd);

@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) {
finish:
if (fd != nullptr) {
grpc_pollset_set_del_fd(ac->interested_parties, fd);
grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */,
"tcp_client_orphan");
grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
fd = nullptr;
}
done = (--ac->refs == 0);
@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd(
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */,
"tcp_client_connect_error");
grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
return;
}

@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
false /* already_closed */, "tcp_unref_orphan");
"tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);

@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
false /* already_closed */, "tcp_listener_shutdown");
"tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {

@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() {
grpc_schedule_on_exec_ctx);
/* Because at this point, all listening sockets have been shutdown already, no
* need to call OnFdAboutToOrphan() to notify the handler again. */
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr,
false /* already_closed */, "udp_listener_shutdown");
grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown");
}
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {

@ -79,8 +79,7 @@ static void test_fd_cleanup(test_fd* tfds, int num_fds) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
grpc_core::ExecCtx::Get()->Flush();
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */,
"test_fd_cleanup");
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(release_fd == tfds[i].inner_fd);
@ -287,8 +286,7 @@ static void test_threading(void) {
{
grpc_core::ExecCtx exec_ctx;
grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED);
grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr,
false /* already_closed */, "done");
grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, "done");
grpc_pollset_shutdown(shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));

@ -115,7 +115,7 @@ static void session_shutdown_cb(void* arg, /*session */
bool success) {
session* se = static_cast<session*>(arg);
server* sv = se->sv;
grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a");
grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd,
@ -171,7 +171,7 @@ static void session_read_cb(void* arg, /*session */
static void listen_shutdown_cb(void* arg /*server */, int success) {
server* sv = static_cast<server*>(arg);
grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b");
grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
gpr_mu_lock(g_mu);
sv->done = 1;
@ -289,7 +289,7 @@ static void client_init(client* cl) {
/* Called when a client upload session is ready to shutdown. */
static void client_session_shutdown_cb(void* arg /*client */, int success) {
client* cl = static_cast<client*>(arg);
grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c");
grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
cl->done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
@ -502,7 +502,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d");
grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
destroy_change_data(&a);
destroy_change_data(&b);

@ -136,8 +136,7 @@ static void cleanup_test_fds(test_fd* tfds, const int num_fds) {
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */,
"test_fd_cleanup");
grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
grpc_core::ExecCtx::Get()->Flush();
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);

@ -146,7 +146,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_pollset_add_fd(ps, fd);
grpc_core::ExecCtx::Get()->Flush();
}
grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, "xxx");
grpc_fd_orphan(fd, nullptr, nullptr, "xxx");
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
@ -242,7 +242,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
while (!done) {
GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE));
}
grpc_fd_orphan(wakeup, nullptr, nullptr, false /* already_closed */, "done");
grpc_fd_orphan(wakeup, nullptr, nullptr, "done");
wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,

Loading…
Cancel
Save