From 2ac2d393eba193683e4a7d2fd8222092d6bffbc0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 Oct 2017 15:53:02 +0000 Subject: [PATCH 1/3] Tune benchmark for epollexclusive --- tools/run_tests/performance/scenario_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 64af6a687c8..d466425cde0 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -253,7 +253,7 @@ class CXXLanguage: req_size=300, resp_size=50, unconstrained_client='async', outstanding=30000, channels=300, offered_load=37500, secure=False, - async_server_threads=16, server_threads_per_cq=16, + async_server_threads=16, server_threads_per_cq=1, categories=[SMOKETEST] + [SCALABLE]) for secure in [True, False]: From 8232492cce70bcf833dc82254d53d00b3544a751 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 Oct 2017 21:37:28 +0000 Subject: [PATCH 2/3] Fix leak --- src/core/lib/iomgr/ev_epollex_linux.cc | 42 +++++++++++++++----------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 1e8de8910e9..8b0ebc2d177 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -201,7 +201,7 @@ struct grpc_pollset_set { size_t pollset_count; size_t pollset_capacity; - pollable **pollsets; + grpc_pollset **pollsets; size_t fd_count; size_t fd_capacity; @@ -545,6 +545,9 @@ static void pollset_global_shutdown(void) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { +if (GRPC_TRACER_ON(grpc_polling_trace)) { +gpr_log(GPR_DEBUG, "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) rw=%p (target:NULL) cpsc=%d (target:0)", pollset, pollset->active_pollable, pollset->shutdown_closure, pollset->root_worker, pollset->containing_pollset_set_count); +} if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); @@ -1123,7 +1126,11 @@ static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { pollset_set_unref(exec_ctx, pss->parent); gpr_mu_destroy(&pss->mu); for (size_t i = 0; i < pss->pollset_count; i++) { - POLLABLE_UNREF(pss->pollsets[i], "pollset_set"); + gpr_mu_lock(&pss->pollsets[i]->mu); + if (0==--pss->pollsets[i]->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); + } + gpr_mu_unlock(&pss->pollsets[i]->mu); } for (size_t i = 0; i < pss->fd_count; i++) { UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); @@ -1142,7 +1149,7 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static const char *err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); for (size_t i = 0; i < pss->pollset_count; i++) { - append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc); + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), err_desc); } if (pss->fd_count == pss->fd_capacity) { pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); @@ -1185,8 +1192,7 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, pss = pss_lock_adam(pss); size_t i; for (i = 0; i < pss->pollset_count; i++) { - if (pss->pollsets[i] == ps->active_pollable) { - POLLABLE_UNREF(pss->pollsets[i], "pollset_set"); + if (pss->pollsets[i] == ps) { break; } } @@ -1204,9 +1210,10 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, } // add all fds to pollables, and output a new array of unorphaned out_fds -static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, - size_t fd_count, pollable **pollables, - size_t pollable_count, +// assumes pollsets are multipollable +static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, const char *err_desc, grpc_fd **out_fds, size_t *out_fd_count) { grpc_error *error = GRPC_ERROR_NONE; @@ -1216,8 +1223,8 @@ static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, gpr_mu_unlock(&fds[i]->orphan_mu); UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); } else { - for (size_t j = 0; j < pollable_count; j++) { - append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc); + for (size_t j = 0; j < pollset_count; j++) { + append_error(&error, pollable_add_fd(pollsets[j]->active_pollable, fds[i]), err_desc); } gpr_mu_unlock(&fds[i]->orphan_mu); out_fds[(*out_fd_count)++] = fds[i]; @@ -1246,17 +1253,18 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pss = pss_lock_adam(pss); size_t initial_fd_count = pss->fd_count; pss->fd_count = 0; - append_error(&error, add_fds_to_pollables(exec_ctx, pss->fds, - initial_fd_count, &pollable_obj, 1, + append_error(&error, add_fds_to_pollsets(exec_ctx, pss->fds, + initial_fd_count, &ps, 1, err_desc, pss->fds, &pss->fd_count), err_desc); if (pss->pollset_count == pss->pollset_capacity) { pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); - pss->pollsets = (pollable **)gpr_realloc( + pss->pollsets = (grpc_pollset **)gpr_realloc( pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); } - pss->pollsets[pss->pollset_count++] = pollable_obj; + pss->pollsets[pss->pollset_count++] = ps; gpr_mu_unlock(&pss->mu); +POLLABLE_UNREF(pollable_obj, "pollset_set"); GRPC_LOG_IF_ERROR(err_desc, error); } @@ -1309,18 +1317,18 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, } size_t initial_a_fd_count = a->fd_count; a->fd_count = 0; - append_error(&error, add_fds_to_pollables( + append_error(&error, add_fds_to_pollsets( exec_ctx, a->fds, initial_a_fd_count, b->pollsets, b->pollset_count, "merge_a2b", a->fds, &a->fd_count), err_desc); - append_error(&error, add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, + append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, a->pollsets, a->pollset_count, "merge_b2a", a->fds, &a->fd_count), err_desc); if (a->pollset_capacity < a->pollset_count + b->pollset_count) { a->pollset_capacity = GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); - a->pollsets = (pollable **)gpr_realloc( + a->pollsets = (grpc_pollset **)gpr_realloc( a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); } memcpy(a->pollsets + a->pollset_count, b->pollsets, From b653dff0bddfe452ca27f91cbe186335e087ae25 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 Oct 2017 14:38:36 -0700 Subject: [PATCH 3/3] clang-format --- src/core/lib/iomgr/ev_epollex_linux.cc | 45 +++++++++++++++----------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 8b0ebc2d177..1b3122176df 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -545,9 +545,13 @@ static void pollset_global_shutdown(void) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { -if (GRPC_TRACER_ON(grpc_polling_trace)) { -gpr_log(GPR_DEBUG, "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) rw=%p (target:NULL) cpsc=%d (target:0)", pollset, pollset->active_pollable, pollset->shutdown_closure, pollset->root_worker, pollset->containing_pollset_set_count); -} + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " + "rw=%p (target:NULL) cpsc=%d (target:0)", + pollset, pollset->active_pollable, pollset->shutdown_closure, + pollset->root_worker, pollset->containing_pollset_set_count); + } if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); @@ -1127,7 +1131,7 @@ static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { gpr_mu_destroy(&pss->mu); for (size_t i = 0; i < pss->pollset_count; i++) { gpr_mu_lock(&pss->pollsets[i]->mu); - if (0==--pss->pollsets[i]->containing_pollset_set_count) { + if (0 == --pss->pollsets[i]->containing_pollset_set_count) { pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); } gpr_mu_unlock(&pss->pollsets[i]->mu); @@ -1149,7 +1153,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static const char *err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); for (size_t i = 0; i < pss->pollset_count; i++) { - append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), err_desc); + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), + err_desc); } if (pss->fd_count == pss->fd_capacity) { pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); @@ -1212,10 +1217,10 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, // add all fds to pollables, and output a new array of unorphaned out_fds // assumes pollsets are multipollable static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, - size_t fd_count, grpc_pollset **pollsets, - size_t pollset_count, - const char *err_desc, grpc_fd **out_fds, - size_t *out_fd_count) { + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, + const char *err_desc, grpc_fd **out_fds, + size_t *out_fd_count) { grpc_error *error = GRPC_ERROR_NONE; for (size_t i = 0; i < fd_count; i++) { gpr_mu_lock(&fds[i]->orphan_mu); @@ -1224,7 +1229,9 @@ static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); } else { for (size_t j = 0; j < pollset_count; j++) { - append_error(&error, pollable_add_fd(pollsets[j]->active_pollable, fds[i]), err_desc); + append_error(&error, + pollable_add_fd(pollsets[j]->active_pollable, fds[i]), + err_desc); } gpr_mu_unlock(&fds[i]->orphan_mu); out_fds[(*out_fd_count)++] = fds[i]; @@ -1253,9 +1260,9 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pss = pss_lock_adam(pss); size_t initial_fd_count = pss->fd_count; pss->fd_count = 0; - append_error(&error, add_fds_to_pollsets(exec_ctx, pss->fds, - initial_fd_count, &ps, 1, - err_desc, pss->fds, &pss->fd_count), + append_error(&error, + add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1, + err_desc, pss->fds, &pss->fd_count), err_desc); if (pss->pollset_count == pss->pollset_capacity) { pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); @@ -1264,7 +1271,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, } pss->pollsets[pss->pollset_count++] = ps; gpr_mu_unlock(&pss->mu); -POLLABLE_UNREF(pollable_obj, "pollset_set"); + POLLABLE_UNREF(pollable_obj, "pollset_set"); GRPC_LOG_IF_ERROR(err_desc, error); } @@ -1317,13 +1324,13 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, } size_t initial_a_fd_count = a->fd_count; a->fd_count = 0; - append_error(&error, add_fds_to_pollsets( - exec_ctx, a->fds, initial_a_fd_count, b->pollsets, - b->pollset_count, "merge_a2b", a->fds, &a->fd_count), + append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, + b->pollsets, b->pollset_count, + "merge_a2b", a->fds, &a->fd_count), err_desc); append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, - a->pollsets, a->pollset_count, - "merge_b2a", a->fds, &a->fd_count), + a->pollsets, a->pollset_count, + "merge_b2a", a->fds, &a->fd_count), err_desc); if (a->pollset_capacity < a->pollset_count + b->pollset_count) { a->pollset_capacity =