diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 64eef092fab..effa9bde8a3 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -61,6 +61,7 @@ #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; static int g_epfd; @@ -759,11 +760,38 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_strvec log; + gpr_strvec_init(&log); + char *tmp; + gpr_asprintf( + &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, + specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker); + gpr_strvec_add(&log, tmp); + if (pollset->root_worker != NULL) { + gpr_asprintf(&tmp, " {kicked=%d next=%p {kicked=%d}}", + pollset->root_worker->kick_state, pollset->root_worker->next, + pollset->root_worker->next->kick_state); + gpr_strvec_add(&log, tmp); + } + if (specific_worker != NULL) { + gpr_asprintf(&tmp, " worker_kicked=%d", specific_worker->kick_state); + gpr_strvec_add(&log, tmp); + } + tmp = gpr_strvec_flatten(&log, NULL); + gpr_strvec_destroy(&log); + gpr_log(GPR_DEBUG, "%s", tmp); + gpr_free(tmp); + } if (specific_worker == NULL) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { grpc_pollset_worker *root_worker = pollset->root_worker; if (root_worker == NULL) { pollset->kicked_without_poller = true; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kicked_without_poller"); + } return GRPC_ERROR_NONE; } grpc_pollset_worker *next_worker = root_worker->next; @@ -771,21 +799,34 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, // there is no next worker root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( &g_active_poller)) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kicked %p", root_worker); + } SET_KICK_STATE(root_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (next_worker->kick_state == UNKICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kicked %p", next_worker); + } GPR_ASSERT(next_worker->initialized_cv); SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); return GRPC_ERROR_NONE; } else if (next_worker->kick_state == DESIGNATED_POLLER) { if (root_worker->kick_state != DESIGNATED_POLLER) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kicked root non-poller %p", next_worker); + } SET_KICK_STATE(root_worker, KICKED); if (root_worker->initialized_cv) { gpr_cv_signal(&root_worker->cv); } return GRPC_ERROR_NONE; } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. non-root poller %p (root=%p)", next_worker, + root_worker); + } SET_KICK_STATE(next_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } @@ -799,20 +840,35 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, return GRPC_ERROR_NONE; } } else if (specific_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. specific worker already kicked"); + } return GRPC_ERROR_NONE; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. mark %p kicked", specific_worker); + } SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } else if (specific_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kick active poller"); + } SET_KICK_STATE(specific_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (specific_worker->initialized_cv) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kick waiting worker"); + } SET_KICK_STATE(specific_worker, KICKED); gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. kick non-waiting worker"); + } SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index 37540353088..3ff773300f3 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -41,6 +41,7 @@ import subprocess import sys import tempfile import time +import errno # cpu cost measurement @@ -132,29 +133,44 @@ _TAG_COLOR = { _FORMAT = '%(asctime)-15s %(message)s' logging.basicConfig(level=logging.INFO, format=_FORMAT) + +def eintr_be_gone(fn): + """Run fn until it doesn't stop because of EINTR""" + while True: + try: + return fn() + except IOError, e: + if e.errno != errno.EINTR: + raise + + + def message(tag, msg, explanatory_text=None, do_newline=False): if message.old_tag == tag and message.old_msg == msg and not explanatory_text: return message.old_tag = tag message.old_msg = msg - try: - if platform_string() == 'windows' or not sys.stdout.isatty(): - if explanatory_text: - logging.info(explanatory_text) - logging.info('%s: %s', tag, msg) - else: - sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( - _BEGINNING_OF_LINE, - _CLEAR_LINE, - '\n%s' % explanatory_text if explanatory_text is not None else '', - _COLORS[_TAG_COLOR[tag]][1], - _COLORS[_TAG_COLOR[tag]][0], - tag, - msg, - '\n' if do_newline or explanatory_text is not None else '')) - sys.stdout.flush() - except: - pass + while True: + try: + if platform_string() == 'windows' or not sys.stdout.isatty(): + if explanatory_text: + logging.info(explanatory_text) + logging.info('%s: %s', tag, msg) + else: + sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( + _BEGINNING_OF_LINE, + _CLEAR_LINE, + '\n%s' % explanatory_text if explanatory_text is not None else '', + _COLORS[_TAG_COLOR[tag]][1], + _COLORS[_TAG_COLOR[tag]][0], + tag, + msg, + '\n' if do_newline or explanatory_text is not None else '')) + sys.stdout.flush() + return + except IOError, e: + if e.errno != errno.EINTR: + raise message.old_tag = '' message.old_msg = '' @@ -226,16 +242,6 @@ class JobResult(object): self.cpu_measured = 0 -def eintr_be_gone(fn): - """Run fn until it doesn't stop because of EINTR""" - while True: - try: - return fn() - except IOError, e: - if e.errno != errno.EINTR: - raise - - def read_from_start(f): f.seek(0) return f.read()