Merge pull request #16298 from vjpai/custom_poller

Infrastructure for registering custom polling engines
pull/16353/head
Vijay Pai 7 years ago committed by GitHub
commit 17a6bf65b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.yaml
  2. 54
      src/core/lib/iomgr/ev_posix.cc
  3. 10
      src/core/lib/iomgr/ev_posix.h
  4. 83
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
  5. 2
      tools/run_tests/generated/tests.json

@ -4111,6 +4111,7 @@ targets:
- mac
- linux
- posix
uses_polling: false
- name: bm_error
build: test
language: c++

@ -101,10 +101,28 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
}
} // namespace
static const event_engine_factory g_factories[] = {
#define ENGINE_HEAD_CUSTOM "head_custom"
#define ENGINE_TAIL_CUSTOM "tail_custom"
// The global array of event-engine factories. Each entry is a pair with a name
// and an event-engine generator function (nullptr if there is no generator
// registered for this name). The middle entries are the engines predefined by
// open-source gRPC. The head entries represent an opportunity for specific
// high-priority custom pollers to be added by the initializer plugins of
// custom-built gRPC libraries. The tail entries represent the same, but for
// low-priority custom pollers. The actual poller selected is either the first
// available one in the list if no specific poller is requested, or the first
// specific poller that is requested by name in the GRPC_POLL_STRATEGY
// environment variable if that variable is set (which should be a
// comma-separated list of one or more event engine names)
static event_engine_factory g_factories[] = {
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};
static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
@ -138,7 +156,7 @@ static bool is(const char* want, const char* have) {
static void try_engine(const char* engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (is(engine, g_factories[i].name)) {
if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory(
0 == strcmp(engine, g_factories[i].name)))) {
g_poll_strategy_name = g_factories[i].name;
@ -149,14 +167,32 @@ static void try_engine(const char* engine) {
}
}
/* This should be used for testing purposes ONLY */
void grpc_set_event_engine_test_only(
const grpc_event_engine_vtable* ev_engine) {
g_event_engine = ev_engine;
}
/* Call this before calling grpc_event_engine_init() */
void grpc_register_event_engine_factory(const char* name,
event_engine_factory_fn factory,
bool add_at_head) {
const char* custom_match =
add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
// Overwrite an existing registration if already registered
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (0 == strcmp(name, g_factories[i].name)) {
g_factories[i].factory = factory;
return;
}
}
// Otherwise fill in an available custom slot
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
if (0 == strcmp(g_factories[i].name, custom_match)) {
g_factories[i].name = name;
g_factories[i].factory = factory;
return;
}
}
const grpc_event_engine_vtable* grpc_get_event_engine_test_only() {
return g_event_engine;
// Otherwise fail
GPR_ASSERT(false);
}
/* Call this only after calling grpc_event_engine_init() */

@ -82,6 +82,11 @@ typedef struct grpc_event_engine_vtable {
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
/* register a new event engine factory */
void grpc_register_event_engine_factory(
const char* name, const grpc_event_engine_vtable* (*factory)(bool),
bool add_at_head);
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);
@ -173,9 +178,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
/* WARNING: The following two functions should be used for testing purposes
* ONLY */
void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*);
const grpc_event_engine_vtable* grpc_get_event_engine_test_only();
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */

@ -34,15 +34,15 @@ struct grpc_pollset {
gpr_mu mu;
};
static gpr_mu g_mu;
static gpr_cv g_cv;
static int g_threads_active;
static bool g_active;
namespace grpc {
namespace testing {
auto& force_library_initialization = Library::get();
static void* g_tag = (void*)static_cast<intptr_t>(10); // Some random number
static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
static const grpc_event_engine_vtable* g_old_vtable;
static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
@ -74,16 +74,18 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
}
gpr_mu_unlock(&ps->mu);
GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag));
void* tag = (void*)static_cast<intptr_t>(10); // Some random number
GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
grpc_cq_end_op(
g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&ps->mu);
return GRPC_ERROR_NONE;
}
static void init_engine_vtable() {
static const grpc_event_engine_vtable* init_engine_vtable(bool) {
memset(&g_vtable, 0, sizeof(g_vtable));
g_vtable.pollset_size = sizeof(grpc_pollset);
@ -92,17 +94,23 @@ static void init_engine_vtable() {
g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick;
g_vtable.shutdown_engine = [] {};
return &g_vtable;
}
static void setup() {
grpc_init();
// This test should only ever be run with a non or any polling engine
// Override the polling engine for the non-polling engine
// and add a custom polling engine
grpc_register_event_engine_factory("none", init_engine_vtable, false);
grpc_register_event_engine_factory("bm_cq_multiple_threads",
init_engine_vtable, true);
/* Override the event engine with our test event engine (g_vtable); but before
* that, save the current event engine in g_old_vtable. We will have to set
* g_old_vtable back before calling grpc_shutdown() */
init_engine_vtable();
g_old_vtable = grpc_get_event_engine_test_only();
grpc_set_event_engine_test_only(&g_vtable);
grpc_init();
GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
0);
g_cq = grpc_completion_queue_create_for_next(nullptr);
}
@ -118,9 +126,6 @@ static void teardown() {
}
grpc_completion_queue_destroy(g_cq);
/* Restore the old event engine before calling grpc_shutdown */
grpc_set_event_engine_test_only(g_old_vtable);
grpc_shutdown();
}
@ -137,14 +142,33 @@ static void teardown() {
code (i.e the code between two successive calls of state.KeepRunning()) if
state.KeepRunning() returns false. So it is safe to do the teardown in one
of the threads after state.keepRunning() returns false.
However, our use requires synchronization because we do additional work at
each thread that requires specific ordering (TrackCounters must be constructed
after grpc_init because it needs the number of cores, initialized by grpc,
and its Finish call must take place before grpc_shutdown so that it can use
grpc_stats).
*/
static void BM_Cq_Throughput(benchmark::State& state) {
TrackCounters track_counters;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
auto thd_idx = state.thread_index;
if (state.thread_index == 0) {
gpr_mu_lock(&g_mu);
g_threads_active++;
if (thd_idx == 0) {
setup();
g_active = true;
gpr_cv_broadcast(&g_cv);
} else {
while (!g_active) {
gpr_cv_wait(&g_cv, &g_mu, deadline);
}
}
gpr_mu_unlock(&g_mu);
// Use a TrackCounters object to monitor the gRPC performance statistics
// (optionally including low-level counters) before and after the test
TrackCounters track_counters;
while (state.KeepRunning()) {
GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
@ -152,12 +176,23 @@ static void BM_Cq_Throughput(benchmark::State& state) {
}
state.SetItemsProcessed(state.iterations());
track_counters.Finish(state);
if (state.thread_index == 0) {
teardown();
gpr_mu_lock(&g_mu);
g_threads_active--;
if (g_threads_active == 0) {
gpr_cv_broadcast(&g_cv);
} else {
while (g_threads_active > 0) {
gpr_cv_wait(&g_cv, &g_mu, deadline);
}
}
gpr_mu_unlock(&g_mu);
track_counters.Finish(state);
if (thd_idx == 0) {
teardown();
g_active = false;
}
}
BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
@ -172,6 +207,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_cv);
::benchmark::Initialize(&argc, argv);
::grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();

@ -3483,7 +3483,7 @@
"mac",
"posix"
],
"uses_polling": true
"uses_polling": false
},
{
"args": [],

Loading…
Cancel
Save