[iomgr] Second take for iomgr shutdown (#30101)

* better api for ee up/down

* fixes

* mac fix

* review feedback

* nuke bm_cq_multiple_threads

* Revert "nuke bm_cq_multiple_threads"

This reverts commit dcda194a42.

* bleh-revert-fix

* fix tsan race in stats

* fix
pull/30109/head
Craig Tiller 3 years ago committed by GitHub
parent 9824e3355d
commit 03dcd61068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/lib/debug/stats.cc
  2. 1
      src/core/lib/debug/stats.h
  3. 69
      src/core/lib/iomgr/ev_epoll1_linux.cc
  4. 2
      src/core/lib/iomgr/ev_epoll1_linux.h
  5. 91
      src/core/lib/iomgr/ev_poll_posix.cc
  6. 3
      src/core/lib/iomgr/ev_poll_posix.h
  7. 146
      src/core/lib/iomgr/ev_posix.cc
  8. 8
      src/core/lib/iomgr/ev_posix.h
  9. 27
      src/core/lib/iomgr/wakeup_fd_posix.cc
  10. 2
      src/core/lib/iomgr/wakeup_fd_posix.h
  11. 1
      src/core/lib/surface/init.cc
  12. 54
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc

@ -31,18 +31,20 @@
#include <grpc/support/alloc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/sync.h>
grpc_stats_data* grpc_stats_per_cpu_storage = nullptr;
static size_t g_num_cores;
static gpr_once g_once = GPR_ONCE_INIT;
void grpc_stats_init(void) {
g_num_cores = std::max(1u, gpr_cpu_num_cores());
grpc_stats_per_cpu_storage = static_cast<grpc_stats_data*>(
gpr_zalloc(sizeof(grpc_stats_data) * g_num_cores));
gpr_once_init(&g_once, []() {
g_num_cores = gpr_cpu_num_cores();
grpc_stats_per_cpu_storage = static_cast<grpc_stats_data*>(
gpr_zalloc(sizeof(grpc_stats_data) * g_num_cores));
});
}
void grpc_stats_shutdown(void) { gpr_free(grpc_stats_per_cpu_storage); }
void grpc_stats_collect(grpc_stats_data* output) {
memset(output, 0, sizeof(*output));
for (size_t core = 0; core < g_num_cores; core++) {

@ -56,7 +56,6 @@ extern grpc_stats_data* grpc_stats_per_cpu_storage;
#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */
void grpc_stats_init(void);
void grpc_stats_shutdown(void);
void grpc_stats_collect(grpc_stats_data* output);
// c = b-a
void grpc_stats_diff(const grpc_stats_data* b, const grpc_stats_data* a,

@ -1268,7 +1268,9 @@ static void shutdown_engine(void) {
}
}
static const grpc_event_engine_vtable vtable = {
static bool init_epoll1_linux();
const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
sizeof(grpc_pollset),
true,
false,
@ -1302,8 +1304,11 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_del_fd,
is_any_background_poller_thread,
/* name = */ "epoll1",
/* check_engine_available = */ [](bool) { return init_epoll1_linux(); },
/* init_engine = */ []() {},
shutdown_background_closure,
shutdown_engine,
/* shutdown_engine = */ []() {},
add_closure_to_background_poller,
};
@ -1319,21 +1324,20 @@ static void reset_event_manager_on_fork() {
}
gpr_mu_unlock(&fork_fd_list_mu);
shutdown_engine();
grpc_init_epoll1_linux(true);
init_epoll1_linux();
}
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
* support is available */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(
bool /*explicit_request*/) {
static bool init_epoll1_linux() {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
return nullptr;
return false;
}
if (!epoll_set_init()) {
return nullptr;
return false;
}
fd_global_init();
@ -1341,7 +1345,7 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
fd_global_shutdown();
epoll_set_shutdown();
return nullptr;
return false;
}
if (grpc_core::Fork::Enabled()) {
@ -1349,17 +1353,52 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
}
return &vtable;
return true;
}
#else /* defined(GRPC_LINUX_EPOLL) */
#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(
bool /*explicit_request*/) {
return nullptr;
}
const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1,
true,
false,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
/* name = */ "epoll1",
/* check_engine_available = */ [](bool) { return false; },
nullptr,
nullptr,
nullptr,
nullptr,
};
#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
#endif /* !defined(GRPC_LINUX_EPOLL) */

@ -26,6 +26,6 @@
// a polling engine that utilizes a singleton epoll set and turnstile polling
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request);
extern const grpc_event_engine_vtable grpc_ev_epoll1_posix;
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */

@ -840,8 +840,6 @@ static grpc_error_handle pollset_kick(grpc_pollset* p,
static grpc_error_handle pollset_global_init(void) { return GRPC_ERROR_NONE; }
static void pollset_global_shutdown(void) {}
/* main interface */
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
@ -1340,15 +1338,29 @@ static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
return false;
}
static void shutdown_engine(void) {
pollset_global_shutdown();
if (track_fds_for_fork) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
/* Called by the child process's post-fork handler to close open fds, including
* worker wakeup fds. This allows gRPC to shutdown in the child process without
* interfering with connections or RPCs ongoing in the parent. */
static void reset_event_manager_on_fork() {
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
if (fork_fd_list_head->fd != nullptr) {
if (!fork_fd_list_head->fd->closed) {
close(fork_fd_list_head->fd->fd);
}
fork_fd_list_head->fd->fd = -1;
} else {
close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
}
fork_fd_list_head = fork_fd_list_head->next;
}
gpr_mu_unlock(&fork_fd_list_mu);
}
static const grpc_event_engine_vtable vtable = {
const grpc_event_engine_vtable grpc_ev_poll_posix = {
sizeof(grpc_pollset),
false,
false,
@ -1382,49 +1394,28 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_del_fd,
is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
/* name = */ "poll",
/* check_engine_available = */
[](bool) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
return false;
}
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return false;
}
if (grpc_core::Fork::Enabled()) {
track_fds_for_fork = true;
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
}
return true;
},
/* init_engine = */ []() {},
/* shutdown_engine = */ shutdown_background_closure,
[]() {},
add_closure_to_background_poller,
};
/* Called by the child process's post-fork handler to close open fds, including
* worker wakeup fds. This allows gRPC to shutdown in the child process without
* interfering with connections or RPCs ongoing in the parent. */
static void reset_event_manager_on_fork() {
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
if (fork_fd_list_head->fd != nullptr) {
if (!fork_fd_list_head->fd->closed) {
close(fork_fd_list_head->fd->fd);
}
fork_fd_list_head->fd->fd = -1;
} else {
close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
}
fork_fd_list_head = fork_fd_list_head->next;
}
gpr_mu_unlock(&fork_fd_list_mu);
}
const grpc_event_engine_vtable* grpc_init_poll_posix(
bool /*explicit_request*/) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
return nullptr;
}
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
if (grpc_core::Fork::Enabled()) {
track_fds_for_fork = true;
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
}
return &vtable;
}
#endif /* GRPC_POSIX_SOCKET_EV_POLL */

@ -23,7 +23,6 @@
#include "src/core/lib/iomgr/ev_posix.h"
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request);
const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request);
extern const grpc_event_engine_vtable grpc_ev_poll_posix;
#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */

@ -18,6 +18,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_EV
@ -74,15 +76,8 @@ grpc_poll_function_type grpc_poll_function = aix_poll;
grpc_wakeup_fd grpc_global_wakeup_fd;
static const grpc_event_engine_vtable* g_event_engine = nullptr;
static const char* g_poll_strategy_name = nullptr;
typedef const grpc_event_engine_vtable* (*event_engine_factory_fn)(
bool explicit_request);
static gpr_once g_choose_engine = GPR_ONCE_INIT;
struct event_engine_factory {
const char* name;
event_engine_factory_fn factory;
};
namespace {
grpc_poll_function_type real_poll_function;
@ -97,22 +92,26 @@ int phony_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
}
}
const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
if (!explicit_request) {
return nullptr;
}
// return the simplest engine as a phony but also override the poller
auto ret = grpc_init_poll_posix(explicit_request);
real_poll_function = grpc_poll_function;
grpc_poll_function = phony_poll;
return ret;
const grpc_event_engine_vtable* non_polling() {
static const grpc_event_engine_vtable vtable = []() {
grpc_event_engine_vtable v = grpc_ev_poll_posix;
v.check_engine_available = [](bool explicit_request) {
if (!explicit_request) return false;
// return the simplest engine as a phony but also override the poller
if (!grpc_ev_poll_posix.check_engine_available(explicit_request)) {
return false;
}
real_poll_function = grpc_poll_function;
grpc_poll_function = phony_poll;
return true;
};
v.name = "none";
return v;
}();
return &vtable;
}
} // namespace
#define ENGINE_HEAD_CUSTOM "head_custom"
#define ENGINE_TAIL_CUSTOM "tail_custom"
// The global array of event-engine factories. Each entry is a pair with a name
// and an event-engine generator function (nullptr if there is no generator
// registered for this name). The middle entries are the engines predefined by
@ -124,13 +123,18 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
// specific poller that is requested by name in the GRPC_POLL_STRATEGY
// environment variable if that variable is set (which should be a
// comma-separated list of one or more event engine names)
static event_engine_factory g_factories[] = {
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epoll1", grpc_init_epoll1_linux}, {"poll", grpc_init_poll_posix},
{"none", init_non_polling}, {ENGINE_TAIL_CUSTOM, nullptr},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
{ENGINE_TAIL_CUSTOM, nullptr},
static const grpc_event_engine_vtable* g_vtables[] = {
nullptr,
nullptr,
nullptr,
nullptr,
&grpc_ev_epoll1_posix,
&grpc_ev_poll_posix,
non_polling(),
nullptr,
nullptr,
nullptr,
nullptr,
};
static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
@ -163,77 +167,69 @@ static bool is(const char* want, const char* have) {
}
static void try_engine(const char* engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory(
0 == strcmp(engine, g_factories[i].name)))) {
g_poll_strategy_name = g_factories[i].name;
gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
return;
}
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_vtables); i++) {
if (g_vtables[i] != nullptr && is(engine, g_vtables[i]->name) &&
g_vtables[i]->check_engine_available(
0 == strcmp(engine, g_vtables[i]->name))) {
g_event_engine = g_vtables[i];
gpr_log(GPR_DEBUG, "Using polling engine: %s", g_event_engine->name);
return;
}
}
}
/* Call this before calling grpc_event_engine_init() */
void grpc_register_event_engine_factory(const char* name,
event_engine_factory_fn factory,
void grpc_register_event_engine_factory(const grpc_event_engine_vtable* vtable,
bool add_at_head) {
const char* custom_match =
add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
const grpc_event_engine_vtable** first_null = nullptr;
const grpc_event_engine_vtable** last_null = nullptr;
// Overwrite an existing registration if already registered
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (0 == strcmp(name, g_factories[i].name)) {
g_factories[i].factory = factory;
return;
}
}
// Otherwise fill in an available custom slot
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (0 == strcmp(g_factories[i].name, custom_match)) {
g_factories[i].name = name;
g_factories[i].factory = factory;
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_vtables); i++) {
if (g_vtables[i] == nullptr) {
if (first_null == nullptr) first_null = &g_vtables[i];
last_null = &g_vtables[i];
} else if (0 == strcmp(g_vtables[i]->name, vtable->name)) {
g_vtables[i] = vtable;
return;
}
}
// Otherwise fail
GPR_ASSERT(false);
*(add_at_head ? first_null : last_null) = vtable;
}
/*If grpc_event_engine_init() has been called, returns the poll_strategy_name.
* Otherwise, returns nullptr. */
const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
const char* grpc_get_poll_strategy_name() { return g_event_engine->name; }
void grpc_event_engine_init(void) {
grpc_core::UniquePtr<char> value = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
gpr_once_init(&g_choose_engine, []() {
grpc_core::UniquePtr<char> value =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
char** strings = nullptr;
size_t nstrings = 0;
split(value.get(), &strings, &nstrings);
char** strings = nullptr;
size_t nstrings = 0;
split(value.get(), &strings, &nstrings);
for (size_t i = 0; g_event_engine == nullptr && i < nstrings; i++) {
try_engine(strings[i]);
}
for (size_t i = 0; g_event_engine == nullptr && i < nstrings; i++) {
try_engine(strings[i]);
}
for (size_t i = 0; i < nstrings; i++) {
gpr_free(strings[i]);
}
gpr_free(strings);
for (size_t i = 0; i < nstrings; i++) {
gpr_free(strings[i]);
}
gpr_free(strings);
if (g_event_engine == nullptr) {
gpr_log(GPR_ERROR, "No event engine could be initialized from %s",
value.get());
abort();
}
if (g_event_engine == nullptr) {
gpr_log(GPR_ERROR, "No event engine could be initialized from %s",
value.get());
abort();
}
});
g_event_engine->init_engine();
}
void grpc_event_engine_shutdown(void) {
g_event_engine->shutdown_engine();
g_event_engine = nullptr;
}
void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
bool grpc_event_engine_can_track_errors(void) {
/* Only track errors if platform supports errqueue. */

@ -85,6 +85,9 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
bool (*is_any_background_poller_thread)(void);
const char* name;
bool (*check_engine_available)(bool explicit_request);
void (*init_engine)();
void (*shutdown_background_closure)(void);
void (*shutdown_engine)(void);
bool (*add_closure_to_background_poller)(grpc_closure* closure,
@ -92,9 +95,8 @@ typedef struct grpc_event_engine_vtable {
} grpc_event_engine_vtable;
/* register a new event engine factory */
void grpc_register_event_engine_factory(
const char* name, const grpc_event_engine_vtable* (*factory)(bool),
bool add_at_head);
void grpc_register_event_engine_factory(const grpc_event_engine_vtable* vtable,
bool add_at_head);
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);

@ -32,22 +32,25 @@ static const grpc_wakeup_fd_vtable* wakeup_fd_vtable = nullptr;
int grpc_allow_specialized_wakeup_fd = 1;
int grpc_allow_pipe_wakeup_fd = 1;
int has_real_wakeup_fd = 1;
int cv_wakeup_fds_enabled = 0;
static int has_real_wakeup_fd = 1;
static gpr_once once_init_wakeup_fd = GPR_ONCE_INIT;
void grpc_wakeup_fd_global_init(void) {
if (grpc_allow_specialized_wakeup_fd &&
grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
} else if (grpc_allow_pipe_wakeup_fd &&
grpc_pipe_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
} else {
has_real_wakeup_fd = 0;
}
gpr_once_init(&once_init_wakeup_fd, []() {
if (grpc_allow_specialized_wakeup_fd &&
grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
} else if (grpc_allow_pipe_wakeup_fd &&
grpc_pipe_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
} else {
has_real_wakeup_fd = 0;
}
});
}
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = nullptr; }
void grpc_wakeup_fd_global_destroy(void) {}
int grpc_has_wakeup_fd(void) { return has_real_wakeup_fd; }

@ -59,8 +59,6 @@ void grpc_wakeup_fd_global_destroy(void);
void grpc_wakeup_fd_global_init_force_fallback(void);
int grpc_has_wakeup_fd(void);
int grpc_cv_wakeup_fds_enabled(void);
void grpc_enable_cv_wakeup_fds(int enable);
typedef struct grpc_wakeup_fd grpc_wakeup_fd;

@ -186,7 +186,6 @@ void grpc_shutdown_internal_locked(void)
grpc_iomgr_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();

@ -46,7 +46,6 @@ static bool g_active;
namespace grpc {
namespace testing {
static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
@ -91,34 +90,31 @@ static grpc_error_handle pollset_work(grpc_pollset* ps,
return GRPC_ERROR_NONE;
}
static const grpc_event_engine_vtable* init_engine_vtable(bool) {
memset(&g_vtable, 0, sizeof(g_vtable));
g_vtable.pollset_size = sizeof(grpc_pollset);
g_vtable.pollset_init = pollset_init;
g_vtable.pollset_shutdown = pollset_shutdown;
g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick;
g_vtable.is_any_background_poller_thread = [] { return false; };
g_vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
grpc_error_handle /*error*/) {
static grpc_event_engine_vtable make_engine_vtable(const char* name) {
grpc_event_engine_vtable vtable;
memset(&vtable, 0, sizeof(vtable));
vtable.pollset_size = sizeof(grpc_pollset);
vtable.pollset_init = pollset_init;
vtable.pollset_shutdown = pollset_shutdown;
vtable.pollset_destroy = pollset_destroy;
vtable.pollset_work = pollset_work;
vtable.pollset_kick = pollset_kick;
vtable.is_any_background_poller_thread = [] { return false; };
vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
grpc_error_handle /*error*/) {
return false;
};
g_vtable.shutdown_background_closure = [] {};
g_vtable.shutdown_engine = [] {};
vtable.shutdown_background_closure = [] {};
vtable.shutdown_engine = [] {};
vtable.check_engine_available = [](bool) { return true; };
vtable.init_engine = [] {};
vtable.name = name;
return &g_vtable;
return vtable;
}
static void setup() {
// This test should only ever be run with a non or any polling engine
// Override the polling engine for the non-polling engine
// and add a custom polling engine
grpc_register_event_engine_factory("none", init_engine_vtable, false);
grpc_register_event_engine_factory("bm_cq_multiple_threads",
init_engine_vtable, true);
grpc_init();
GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
@ -209,6 +205,13 @@ static void BM_Cq_Throughput(benchmark::State& state) {
BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
namespace {
const grpc_event_engine_vtable g_none_vtable =
grpc::testing::make_engine_vtable("none");
const grpc_event_engine_vtable g_bm_vtable =
grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
} // namespace
} // namespace testing
} // namespace grpc
@ -219,6 +222,11 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
// This test should only ever be run with a non or any polling engine
// Override the polling engine for the non-polling engine
// and add a custom polling engine
grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
grpc::testing::TestEnvironment env(&argc, argv);
gpr_mu_init(&g_mu);
gpr_cv_init(&g_cv);

Loading…
Cancel
Save