Make linux polling engines capable of tracking errors separately with

backward compatibility.
reviewable/pr15092/r1
Yash Tibrewal 7 years ago
parent 6f3cadb687
commit adc733f024
  1. 6
      include/grpc/support/log.h
  2. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  3. 2
      src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
  4. 5
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  5. 4
      src/core/lib/iomgr/endpoint_pair_posix.cc
  6. 37
      src/core/lib/iomgr/ev_epoll1_linux.cc
  7. 41
      src/core/lib/iomgr/ev_epollex_linux.cc
  8. 45
      src/core/lib/iomgr/ev_epollsig_linux.cc
  9. 10
      src/core/lib/iomgr/ev_poll_posix.cc
  10. 15
      src/core/lib/iomgr/ev_posix.cc
  11. 16
      src/core/lib/iomgr/ev_posix.h
  12. 2
      src/core/lib/iomgr/tcp_client_posix.cc
  13. 4
      src/core/lib/iomgr/tcp_server_posix.cc
  14. 2
      src/core/lib/iomgr/tcp_server_utils_posix_common.cc
  15. 2
      src/core/lib/iomgr/udp_server.cc
  16. 4
      test/core/iomgr/ev_epollsig_linux_test.cc
  17. 8
      test/core/iomgr/fd_posix_test.cc
  18. 2
      test/core/iomgr/pollset_set_test.cc
  19. 20
      test/core/iomgr/tcp_posix_test.cc
  20. 4
      test/cpp/microbenchmarks/bm_pollset.cc

@ -99,6 +99,12 @@ GPRAPI void gpr_set_log_function(gpr_log_func func);
} \
} while (0)
#ifndef NDEBUG
#define GPR_DEBUG_ASSERT(x) GPR_ASSERT(x)
#else
#define GPR_DEBUG_ASSERT(x)
#endif
#ifdef __cplusplus
}
#endif

@ -284,7 +284,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
fdn->fd = grpc_fd_create(socks[i], fd_name);
fdn->fd = grpc_fd_create(socks[i], fd_name, false);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;

@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
grpc_fd_create(fd, "client"), args, "fd-client");
grpc_fd_create(fd, "client", false), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);

@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
char* name;
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint* server_endpoint = grpc_tcp_create(
grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name);
grpc_endpoint* server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name, false),
grpc_server_get_channel_args(server), name);
gpr_free(name);

@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args,
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args,
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
"socketpair-client");
gpr_free(final_name);

@ -136,6 +136,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
@ -272,7 +273,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
static grpc_fd* fd_create(int fd, const char* name) {
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
new_fd->error_closure.Init();
}
new_fd->fd = fd;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@ -307,7 +309,9 @@ static grpc_fd* fd_create(int fd, const char* name) {
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = new_fd;
/* Use the least significant bit of ev.data.ptr to store track_err. */
ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
(track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@ -327,6 +331,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
shutdown(fd->fd, SHUT_RDWR);
}
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@ -359,6 +364,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@ -383,6 +389,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
fd->read_closure->SetReady();
/* Use release store to match with acquire load in fd_get_read_notifier */
@ -391,6 +401,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
/*******************************************************************************
* Pollset Definitions
*/
@ -611,16 +623,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
grpc_fd* fd = reinterpret_cast<grpc_fd*>(
reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
bool track_err =
reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
bool cancel = (ev->events & EPOLLHUP) != 0;
bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
bool err_fallback = error && !track_err;
if (error && !err_fallback) {
fd_has_errors(fd);
}
if (read_ev || cancel) {
if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
if (write_ev || cancel) {
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@ -1183,6 +1204,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
fd_create,
fd_wrapped_fd,
@ -1190,6 +1212,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,

@ -160,6 +160,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@ -169,6 +170,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */
bool track_err;
};
static void fd_global_init(void);
@ -294,6 +298,7 @@ static void fd_destroy(void* arg, grpc_error* error) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
}
@ -333,7 +338,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
static grpc_fd* fd_create(int fd, const char* name) {
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@ -347,6 +352,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
new_fd->error_closure.Init();
}
gpr_mu_init(&new_fd->pollable_mu);
@ -354,8 +360,10 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
new_fd->track_err = track_err;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@ -424,6 +432,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@ -436,6 +445,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
/*******************************************************************************
* Pollable Definitions
*/
@ -524,7 +537,11 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
ev_fd.data.ptr = fd;
/* Use the second least significant bit of ev_fd.data.ptr to store track_err
* to avoid synchronization issues when accessing it after receiving an event.
*/
ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) |
(fd->track_err ? 2 : 0));
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
case EEXIST:
@ -724,6 +741,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
grpc_error* error = GRPC_ERROR_NONE;
@ -792,20 +811,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
(intptr_t)data_ptr)),
err_desc);
} else {
grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
grpc_fd* fd =
reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2);
bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2;
bool cancel = (ev->events & EPOLLHUP) != 0;
bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
bool err_fallback = error && !track_err;
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
}
if (read_ev || cancel) {
if (error && !err_fallback) {
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
if (write_ev || cancel) {
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@ -1447,6 +1474,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
fd_create,
fd_wrapped_fd,
@ -1454,6 +1482,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,

@ -132,6 +132,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@ -141,6 +142,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */
bool track_err;
};
/* Reference counting for fds */
@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
for (i = 0; i < fd_count; i++) {
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fds[i];
/* Use the least significant bit of ev.data.ptr to store track_err to avoid
* synchronization issues when accessing it after receiving an event */
ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
(fds[i]->track_err ? 1 : 0));
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
if (err < 0) {
@ -769,6 +776,7 @@ static void unref_by(grpc_fd* fd, int n) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@ -806,7 +814,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
static grpc_fd* fd_create(int fd, const char* name) {
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@ -821,6 +829,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_mu_init(&new_fd->po.mu);
new_fd->read_closure.Init();
new_fd->write_closure.Init();
new_fd->error_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@ -837,6 +846,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->orphaned = false;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
new_fd->track_err = track_err;
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@ -933,6 +944,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@ -945,6 +957,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
/*******************************************************************************
* Pollset Definitions
*/
@ -1116,6 +1132,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@ -1254,14 +1272,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
if (read_ev || cancel) {
grpc_fd* fd = reinterpret_cast<grpc_fd*>(
reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
bool track_err =
reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
bool error = (ep_ev[i].events & EPOLLERR) != 0;
bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
bool err_fallback = error && track_err;
if (error && !err_fallback) {
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
if (write_ev || cancel) {
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@ -1634,6 +1661,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
fd_create,
fd_wrapped_fd,
@ -1641,6 +1669,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,

@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) {
}
}
static grpc_fd* fd_create(int fd, const char* name) {
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
GPR_DEBUG_ASSERT(track_err == false);
grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
gpr_mu_init(&r->mu);
gpr_atm_rel_store(&r->refst, 1);
@ -553,6 +554,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
gpr_mu_unlock(&fd->mu);
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
abort();
}
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
grpc_pollset_worker* worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher* watcher) {
@ -1710,6 +1716,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
false,
fd_create,
fd_wrapped_fd,
@ -1717,6 +1724,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,

@ -190,9 +190,14 @@ void grpc_event_engine_shutdown(void) {
g_event_engine = nullptr;
}
grpc_fd* grpc_fd_create(int fd, const char* name) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name);
return g_event_engine->fd_create(fd, name);
bool grpc_event_engine_can_track_errors(void) {
return g_event_engine->can_track_err;
}
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
return g_event_engine->fd_create(fd, name, track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
@ -224,6 +229,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_write(fd, closure);
}
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_error(fd, closure);
}
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {

@ -35,14 +35,16 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable {
size_t pollset_size;
bool can_track_err;
grpc_fd* (*fd_create)(int fd, const char* name);
grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
bool (*fd_is_shutdown)(grpc_fd* fd);
grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
@ -78,10 +80,16 @@ void grpc_event_engine_shutdown(void);
/* Return the name of the poll strategy */
const char* grpc_get_poll_strategy_name();
/* Returns true if polling engine can track errors separately, false otherwise
*/
bool grpc_event_engine_can_track_errors();
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
\a track_err if true means that error events would be tracked separately
using grpc_fd_notify_on_error. Currently, valid only for linux systems.
This takes ownership of closing fd. */
grpc_fd* grpc_fd_create(int fd, const char* name);
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd* fd);
@ -120,6 +128,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
/* Exactly the same semantics as above, except based on error events. track_err
* needs to have been set on grpc_fd_create */
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
/* Return the read notifier pollset from the fd */
grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);

@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
*fdobj = grpc_fd_create(fd, name);
*fdobj = grpc_fd_create(fd, name, false);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;

@ -231,7 +231,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
grpc_fd* fdobj = grpc_fd_create(fd, name);
grpc_fd* fdobj = grpc_fd_create(fd, name, false);
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
@ -361,7 +361,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;

@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;

@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
emfd_ = grpc_fd_create(fd, name);
emfd_ = grpc_fd_create(fd, name, false);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);

@ -66,7 +66,7 @@ static void test_fd_init(test_fd* tfds, int* fds, int num_fds) {
for (i = 0; i < num_fds; i++) {
tfds[i].inner_fd = fds[i];
tfds[i].fd = grpc_fd_create(fds[i], "test_fd");
tfds[i].fd = grpc_fd_create(fds[i], "test_fd", false);
}
}
@ -267,7 +267,7 @@ static void test_threading(void) {
grpc_wakeup_fd fd;
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd)));
shared.wakeup_fd = &fd;
shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup");
shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup", false);
shared.wakeups = 0;
{
grpc_core::ExecCtx exec_ctx;

@ -204,7 +204,7 @@ static void listen_cb(void* arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
se->em_fd = grpc_fd_create(fd, "listener", false);
grpc_pollset_add_fd(g_pollset, se->em_fd);
GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
grpc_schedule_on_exec_ctx);
@ -233,7 +233,7 @@ static int server_start(server* sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd, "server");
sv->em_fd = grpc_fd_create(fd, "server", false);
grpc_pollset_add_fd(g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
@ -353,7 +353,7 @@ static void client_start(client* cl, int port) {
}
}
cl->em_fd = grpc_fd_create(fd, "client");
cl->em_fd = grpc_fd_create(fd, "client", false);
grpc_pollset_add_fd(g_pollset, cl->em_fd);
client_session_write(cl, GRPC_ERROR_NONE);
@ -454,7 +454,7 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
grpc_pollset_add_fd(g_pollset, em_fd);
/* Register the first callback, then make its FD readable */

@ -118,7 +118,7 @@ static void init_test_fds(test_fd* tfds, const int num_fds) {
for (int i = 0; i < num_fds; i++) {
GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd));
tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd),
"test_fd");
"test_fd", false);
reset_test_fd(&tfds[i]);
}
}

@ -176,7 +176,8 @@ static void read_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
ep =
grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@ -226,7 +227,8 @@ static void large_read_test(size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
"test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@ -365,7 +367,8 @@ static void write_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
"test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@ -433,7 +436,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
ep =
grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
grpc_endpoint_add_to_pollset(ep, g_pollset);
@ -522,10 +526,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
f.client_ep =
grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test");
f.server_ep =
grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test");
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
&args, "test");
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
&args, "test");
grpc_resource_quota_unref_internal(resource_quota);
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);

@ -140,7 +140,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_wakeup_fd wakeup_fd;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx");
grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx", false);
while (state.KeepRunning()) {
grpc_pollset_add_fd(ps, fd);
grpc_core::ExecCtx::Get()->Flush();
@ -221,7 +221,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
grpc_wakeup_fd wakeup_fd;
GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd));
grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read");
grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false);
grpc_pollset_add_fd(ps, wakeup);
bool done = false;
Closure* continue_closure = MakeClosure(

Loading…
Cancel
Save