Merge pull request #17658 from guantaol/avoid_thd_jump

Avoid the thread jump in server callback APIs.
pull/17683/head
Vijay Pai 6 years ago committed by GitHub
commit 302e7b4d2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/lib/iomgr/combiner.cc
  2. 3
      src/core/lib/iomgr/ev_epoll1_linux.cc
  3. 3
      src/core/lib/iomgr/ev_epollex_linux.cc
  4. 3
      src/core/lib/iomgr/ev_poll_posix.cc
  5. 4
      src/core/lib/iomgr/ev_posix.cc
  6. 4
      src/core/lib/iomgr/ev_posix.h
  7. 4
      src/core/lib/iomgr/iomgr.cc
  8. 3
      src/core/lib/iomgr/iomgr.h
  9. 6
      src/core/lib/iomgr/iomgr_custom.cc
  10. 4
      src/core/lib/iomgr/iomgr_internal.cc
  11. 4
      src/core/lib/iomgr/iomgr_internal.h
  12. 7
      src/core/lib/iomgr/iomgr_posix.cc
  13. 7
      src/core/lib/iomgr/iomgr_posix_cfstream.cc
  14. 7
      src/core/lib/iomgr/iomgr_windows.cc
  15. 1
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc

@ -29,6 +29,7 @@
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
@ -228,8 +229,14 @@ bool grpc_combiner_continue_exec_ctx() {
grpc_core::ExecCtx::Get()->IsReadyToFinish(), grpc_core::ExecCtx::Get()->IsReadyToFinish(),
lock->time_to_execute_final_list)); lock->time_to_execute_final_list));
// offload only if all the following conditions are true:
// 1. the combiner is contended and has more than one closure to execute
// 2. the current execution context needs to finish as soon as possible
// 3. the DEFAULT executor is threaded
// 4. the current thread is not a worker for any background poller
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
grpc_executor_is_threaded()) { grpc_executor_is_threaded() &&
!grpc_iomgr_is_any_background_poller_thread()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be // this execution context wants to move on: schedule remaining work to be
// picked up on the executor // picked up on the executor

@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding * Event engine binding
*/ */
static bool is_any_background_poller_thread(void) { return false; }
static void shutdown_background_closure(void) {} static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
@ -1287,6 +1289,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
is_any_background_poller_thread,
shutdown_background_closure, shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding * Event engine binding
*/ */
static bool is_any_background_poller_thread(void) { return false; }
static void shutdown_background_closure(void) {} static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
@ -1644,6 +1646,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
is_any_background_poller_thread,
shutdown_background_closure, shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() {
* event engine binding * event engine binding
*/ */
static bool is_any_background_poller_thread(void) { return false; }
static void shutdown_background_closure(void) {} static void shutdown_background_closure(void) {}
static void shutdown_engine(void) { static void shutdown_engine(void) {
@ -1828,6 +1830,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd, pollset_set_add_fd,
pollset_set_del_fd, pollset_set_del_fd,
is_any_background_poller_thread,
shutdown_background_closure, shutdown_background_closure,
shutdown_engine, shutdown_engine,
}; };

@ -399,6 +399,10 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd); g_event_engine->pollset_set_del_fd(pollset_set, fd);
} }
bool grpc_is_any_background_poller_thread(void) {
return g_event_engine->is_any_background_poller_thread();
}
void grpc_shutdown_background_closure(void) { void grpc_shutdown_background_closure(void) {
g_event_engine->shutdown_background_closure(); g_event_engine->shutdown_background_closure();
} }

@ -80,6 +80,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
bool (*is_any_background_poller_thread)(void);
void (*shutdown_background_closure)(void); void (*shutdown_background_closure)(void);
void (*shutdown_engine)(void); void (*shutdown_engine)(void);
} grpc_event_engine_vtable; } grpc_event_engine_vtable;
@ -181,6 +182,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
/* Returns true if the caller is a worker thread for any background poller. */
bool grpc_is_any_background_poller_thread();
/* Shut down all the closures registered in the background poller. */ /* Shut down all the closures registered in the background poller. */
void grpc_shutdown_background_closure(); void grpc_shutdown_background_closure();

@ -161,6 +161,10 @@ void grpc_iomgr_shutdown_background_closure() {
grpc_iomgr_platform_shutdown_background_closure(); grpc_iomgr_platform_shutdown_background_closure();
} }
bool grpc_iomgr_is_any_background_poller_thread() {
return grpc_iomgr_platform_is_any_background_poller_thread();
}
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) {
obj->name = gpr_strdup(name); obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);

@ -39,6 +39,9 @@ void grpc_iomgr_shutdown();
* background poller. */ * background poller. */
void grpc_iomgr_shutdown_background_closure(); void grpc_iomgr_shutdown_background_closure();
/** Returns true if the caller is a worker thread for any background poller. */
bool grpc_iomgr_is_any_background_poller_thread();
/* Exposed only for testing */ /* Exposed only for testing */
size_t grpc_iomgr_count_objects_for_testing(); size_t grpc_iomgr_count_objects_for_testing();

@ -41,10 +41,14 @@ static void iomgr_platform_init(void) {
static void iomgr_platform_flush(void) {} static void iomgr_platform_flush(void) {}
static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
static void iomgr_platform_shutdown_background_closure(void) {} static void iomgr_platform_shutdown_background_closure(void) {}
static bool iomgr_platform_is_any_background_poller_thread(void) {
return false;
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure}; iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};
void grpc_custom_iomgr_init(grpc_socket_vtable* socket, void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver, grpc_custom_resolver_vtable* resolver,

@ -45,3 +45,7 @@ void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); }
void grpc_iomgr_platform_shutdown_background_closure() { void grpc_iomgr_platform_shutdown_background_closure() {
iomgr_platform_vtable->shutdown_background_closure(); iomgr_platform_vtable->shutdown_background_closure();
} }
bool grpc_iomgr_platform_is_any_background_poller_thread() {
return iomgr_platform_vtable->is_any_background_poller_thread();
}

@ -36,6 +36,7 @@ typedef struct grpc_iomgr_platform_vtable {
void (*flush)(void); void (*flush)(void);
void (*shutdown)(void); void (*shutdown)(void);
void (*shutdown_background_closure)(void); void (*shutdown_background_closure)(void);
bool (*is_any_background_poller_thread)(void);
} grpc_iomgr_platform_vtable; } grpc_iomgr_platform_vtable;
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name);
@ -56,6 +57,9 @@ void grpc_iomgr_platform_shutdown(void);
/** shut down all the closures registered in the background poller */ /** shut down all the closures registered in the background poller */
void grpc_iomgr_platform_shutdown_background_closure(void); void grpc_iomgr_platform_shutdown_background_closure(void);
/** return true is the caller is a worker thread for any background poller */
bool grpc_iomgr_platform_is_any_background_poller_thread(void);
bool grpc_iomgr_abort_on_leaks(void); bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */

@ -55,9 +55,14 @@ static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure(); grpc_shutdown_background_closure();
} }
static bool iomgr_platform_is_any_background_poller_thread(void) {
return grpc_is_any_background_poller_thread();
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure}; iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);

@ -58,9 +58,14 @@ static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure(); grpc_shutdown_background_closure();
} }
static bool iomgr_platform_is_any_background_poller_thread(void) {
return grpc_is_any_background_poller_thread();
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure}; iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
char* enable_cfstream = getenv(grpc_cfstream_env_var); char* enable_cfstream = getenv(grpc_cfstream_env_var);

@ -73,9 +73,14 @@ static void iomgr_platform_shutdown(void) {
static void iomgr_platform_shutdown_background_closure(void) {} static void iomgr_platform_shutdown_background_closure(void) {}
static bool iomgr_platform_is_any_background_poller_thread(void) {
return false;
}
static grpc_iomgr_platform_vtable vtable = { static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure}; iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() { void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable);

@ -94,6 +94,7 @@ static const grpc_event_engine_vtable* init_engine_vtable(bool) {
g_vtable.pollset_destroy = pollset_destroy; g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work; g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick; g_vtable.pollset_kick = pollset_kick;
g_vtable.is_any_background_poller_thread = [] { return false; };
g_vtable.shutdown_background_closure = [] {}; g_vtable.shutdown_background_closure = [] {};
g_vtable.shutdown_engine = [] {}; g_vtable.shutdown_engine = [] {};

Loading…
Cancel
Save