Merge pull request #17244 from guantaol/ev_epollbg

Preparation for the new background poller 'epollbg'
pull/17329/head
Guantao Liu 6 years ago committed by GitHub
commit 99673fcbe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/lib/iomgr/ev_epoll1_linux.cc
  2. 4
      src/core/lib/iomgr/ev_epollex_linux.cc
  3. 4
      src/core/lib/iomgr/ev_poll_posix.cc
  4. 8
      src/core/lib/iomgr/ev_posix.cc
  5. 10
      src/core/lib/iomgr/ev_posix.h
  6. 4
      src/core/lib/iomgr/iomgr.cc
  7. 4
      src/core/lib/iomgr/iomgr.h
  8. 4
      src/core/lib/iomgr/iomgr_custom.cc
  9. 4
      src/core/lib/iomgr/iomgr_internal.cc
  10. 4
      src/core/lib/iomgr/iomgr_internal.h
  11. 7
      src/core/lib/iomgr/iomgr_posix.cc
  12. 7
      src/core/lib/iomgr/iomgr_posix_cfstream.cc
  13. 5
      src/core/lib/iomgr/iomgr_windows.cc
  14. 15
      src/core/lib/iomgr/tcp_posix.cc
  15. 1
      src/core/lib/surface/init.cc
  16. 1
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc

@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding * Event engine binding
*/ */
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
fd_global_shutdown(); fd_global_shutdown();
pollset_global_shutdown(); pollset_global_shutdown();
@ -1255,6 +1257,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = { static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset), sizeof(grpc_pollset),
true, true,
false,
fd_create, fd_create,
fd_wrapped_fd, fd_wrapped_fd,
@ -1284,6 +1287,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding * Event engine binding
*/ */
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
fd_global_shutdown(); fd_global_shutdown();
pollset_global_shutdown(); pollset_global_shutdown();
@ -1612,6 +1614,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = { static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset), sizeof(grpc_pollset),
true, true,
false,
fd_create, fd_create,
fd_wrapped_fd, fd_wrapped_fd,
@ -1641,6 +1644,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() {
* event engine binding * event engine binding
*/ */
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
pollset_global_shutdown(); pollset_global_shutdown();
if (grpc_cv_wakeup_fds_enabled()) { if (grpc_cv_wakeup_fds_enabled()) {
@ -1796,6 +1798,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = { static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset), sizeof(grpc_pollset),
false, false,
false,
fd_create, fd_create,
fd_wrapped_fd, fd_wrapped_fd,
@ -1825,6 +1828,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -244,6 +244,10 @@ bool grpc_event_engine_can_track_errors(void) {
return false; return false;
} }
bool grpc_event_engine_run_in_background(void) {
return g_event_engine->run_in_background;
}
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
@ -395,4 +399,8 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd); g_event_engine->pollset_set_del_fd(pollset_set, fd);
} }
void grpc_shutdown_background_closure(void) {
g_event_engine->shutdown_background_closure();
}
#endif // GRPC_POSIX_SOCKET_EV #endif // GRPC_POSIX_SOCKET_EV

@ -42,6 +42,7 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable { typedef struct grpc_event_engine_vtable {
size_t pollset_size; size_t pollset_size;
bool can_track_err; bool can_track_err;
bool run_in_background;
grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd); int (*fd_wrapped_fd)(grpc_fd* fd);
@ -79,6 +80,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
void (*shutdown_background_closure)(void);
void (*shutdown_engine)(void); void (*shutdown_engine)(void);
} grpc_event_engine_vtable; } grpc_event_engine_vtable;
@ -101,6 +103,11 @@ const char* grpc_get_poll_strategy_name();
*/ */
bool grpc_event_engine_can_track_errors(); bool grpc_event_engine_can_track_errors();
/* Returns true if polling engine runs in the background, false otherwise.
* Currently only 'epollbg' runs in the background.
*/
bool grpc_event_engine_run_in_background();
/* Create a wrapped file descriptor. /* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor. Requires fd is a non-blocking file descriptor.
\a track_err if true means that error events would be tracked separately \a track_err if true means that error events would be tracked separately
@ -174,6 +181,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
/* Shut down all the closures registered in the background poller. */
void grpc_shutdown_background_closure();
/* override to allow tests to hook poll() usage */ /* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int); typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function; extern grpc_poll_function_type grpc_poll_function;

@ -157,6 +157,10 @@ void grpc_iomgr_shutdown() {
gpr_cv_destroy(&g_rcv); gpr_cv_destroy(&g_rcv);
} }
void grpc_iomgr_shutdown_background_closure() {
grpc_iomgr_platform_shutdown_background_closure();
}
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) {
obj->name = gpr_strdup(name); obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);

@ -35,6 +35,10 @@ void grpc_iomgr_start();
* exec_ctx. */ * exec_ctx. */
void grpc_iomgr_shutdown(); void grpc_iomgr_shutdown();
/** Signals the intention to shutdown all the closures registered in the
* background poller. */
void grpc_iomgr_shutdown_background_closure();
/* Exposed only for testing */ /* Exposed only for testing */
size_t grpc_iomgr_count_objects_for_testing(); size_t grpc_iomgr_count_objects_for_testing();

@ -40,9 +40,11 @@ static void iomgr_platform_init(void) {
} }
static void iomgr_platform_flush(void) {} static void iomgr_platform_flush(void) {}
static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
static void iomgr_platform_shutdown_background_closure(void) {}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure};
void grpc_custom_iomgr_init(grpc_socket_vtable* socket, void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver, grpc_custom_resolver_vtable* resolver,

@ -41,3 +41,7 @@ void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); }
void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); } void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); }
void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); }
void grpc_iomgr_platform_shutdown_background_closure() {
iomgr_platform_vtable->shutdown_background_closure();
}

@ -35,6 +35,7 @@ typedef struct grpc_iomgr_platform_vtable {
void (*init)(void); void (*init)(void);
void (*flush)(void); void (*flush)(void);
void (*shutdown)(void); void (*shutdown)(void);
void (*shutdown_background_closure)(void);
} grpc_iomgr_platform_vtable; } grpc_iomgr_platform_vtable;
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name);
@ -52,6 +53,9 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */ /** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void); void grpc_iomgr_platform_shutdown(void);
/** shut down all the closures registered in the background poller */
void grpc_iomgr_platform_shutdown_background_closure(void);
bool grpc_iomgr_abort_on_leaks(void); bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */

@ -51,8 +51,13 @@ static void iomgr_platform_shutdown(void) {
grpc_wakeup_fd_global_destroy(); grpc_wakeup_fd_global_destroy();
} }
static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure();
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);

@ -54,8 +54,13 @@ static void iomgr_platform_shutdown(void) {
grpc_wakeup_fd_global_destroy(); grpc_wakeup_fd_global_destroy();
} }
static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure();
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
char* enable_cfstream = getenv(grpc_cfstream_env_var); char* enable_cfstream = getenv(grpc_cfstream_env_var);

@ -71,8 +71,11 @@ static void iomgr_platform_shutdown(void) {
winsock_shutdown(); winsock_shutdown();
} }
static void iomgr_platform_shutdown_background_closure(void) {}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown}; iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable);

@ -260,10 +260,17 @@ static void notify_on_write(grpc_tcp* tcp) {
if (grpc_tcp_trace.enabled()) { if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp); gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp);
} }
cover_self(tcp); if (grpc_event_engine_run_in_background()) {
GRPC_CLOSURE_INIT(&tcp->write_done_closure, // If there is a polling engine always running in the background, there is
tcp_drop_uncovered_then_handle_write, tcp, // no need to run the backup poller.
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp,
grpc_schedule_on_exec_ctx);
} else {
cover_self(tcp);
GRPC_CLOSURE_INIT(&tcp->write_done_closure,
tcp_drop_uncovered_then_handle_write, tcp,
grpc_schedule_on_exec_ctx);
}
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
} }

@ -161,6 +161,7 @@ void grpc_shutdown(void) {
if (--g_initializations == 0) { if (--g_initializations == 0) {
{ {
grpc_core::ExecCtx exec_ctx(0); grpc_core::ExecCtx exec_ctx(0);
grpc_iomgr_shutdown_background_closure();
{ {
grpc_timer_manager_set_threading( grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread false); // shutdown timer_manager thread

@ -94,6 +94,7 @@ static const grpc_event_engine_vtable* init_engine_vtable(bool) {
g_vtable.pollset_destroy = pollset_destroy; g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work; g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick; g_vtable.pollset_kick = pollset_kick;
g_vtable.shutdown_background_closure = [] {};
g_vtable.shutdown_engine = [] {}; g_vtable.shutdown_engine = [] {};
return &g_vtable; return &g_vtable;

Loading…
Cancel
Save