diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 3d32359be46..f0cb416447f 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -37,6 +37,7 @@ #include "src/core/lib/iomgr/ev_epollex_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/iomgr/internal_errqueue.h" +#include "src/core/lib/iomgr/iomgr.h" GPR_GLOBAL_CONFIG_DEFINE_STRING( grpc_poll_strategy, "all", @@ -107,6 +108,7 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { auto ret = grpc_init_poll_posix(explicit_request); real_poll_function = grpc_poll_function; grpc_poll_function = dummy_poll; + grpc_iomgr_mark_non_polling_internal(); return ret; } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 802e3bdcb4d..d7da7c9151d 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -50,6 +50,7 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; static bool g_grpc_abort_on_leaks; +static bool g_iomgr_non_polling; void grpc_iomgr_init() { grpc_core::ExecCtx exec_ctx; @@ -192,3 +193,16 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) { } bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; } + +bool grpc_iomgr_non_polling() { + gpr_mu_lock(&g_mu); + bool ret = g_iomgr_non_polling; + gpr_mu_unlock(&g_mu); + return ret; +} + +void grpc_iomgr_mark_non_polling_internal() { + gpr_mu_lock(&g_mu); + g_iomgr_non_polling = true; + gpr_mu_unlock(&g_mu); +} diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index e02f15e551c..cb9f3eb35c8 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -45,6 +45,16 @@ void grpc_iomgr_shutdown_background_closure(); */ bool grpc_iomgr_run_in_background(); +/* Returns true if polling engine is non-polling, false otherwise. + * Currently only 'none' is non-polling. + */ +bool grpc_iomgr_non_polling(); + +/* Mark the polling engine as non-polling. For internal use only. + * Currently only 'none' is non-polling. + */ +void grpc_iomgr_mark_non_polling_internal(); + /** Returns true if the caller is a worker thread for any background poller. */ bool grpc_iomgr_is_any_background_poller_thread(); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index a9f65bd5310..c59e329fabe 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -39,6 +39,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" @@ -208,6 +209,9 @@ struct cq_vtable { void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); + // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer + // needed. + grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq); }; namespace { @@ -309,7 +313,7 @@ struct cq_pluck_data { }; struct cq_callback_data { - cq_callback_data( + explicit cq_callback_data( grpc_experimental_completion_queue_functor* shutdown_callback) : shutdown_callback(shutdown_callback) {} @@ -334,6 +338,81 @@ struct cq_callback_data { grpc_experimental_completion_queue_functor* shutdown_callback; }; +// TODO(vjpai): Remove all callback_alternative variants when event manager is +// the only supported poller. +struct cq_callback_alternative_data { + explicit cq_callback_alternative_data( + grpc_experimental_completion_queue_functor* shutdown_callback) + : implementation(SharedNextableCQ()), + shutdown_callback(shutdown_callback) {} + + /* This just points to a single shared nextable CQ */ + grpc_completion_queue* const implementation; + + /** Number of outstanding events (+1 if not shut down) + Initial count is dropped by grpc_completion_queue_shutdown */ + grpc_core::Atomic<intptr_t> pending_events{1}; + + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called = false; + + /** A callback that gets invoked when the CQ completes shutdown */ + grpc_experimental_completion_queue_functor* shutdown_callback; + + static grpc_completion_queue* SharedNextableCQ() { + grpc_core::MutexLock lock(&*shared_cq_next_mu); + + if (shared_cq_next == nullptr) { + shared_cq_next = grpc_completion_queue_create_for_next(nullptr); + int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32); + threads_remaining.Store(num_nexting_threads, + grpc_core::MemoryOrder::RELEASE); + for (int i = 0; i < num_nexting_threads; i++) { + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE( + [](void* arg, grpc_error* /*error*/) { + grpc_completion_queue* cq = + static_cast<grpc_completion_queue*>(arg); + while (true) { + grpc_event event = grpc_completion_queue_next( + cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + if (event.type == GRPC_QUEUE_SHUTDOWN) { + break; + } + GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE); + // We can always execute the callback inline rather than + // pushing it to another Executor thread because this + // thread is definitely running on an executor, does not + // hold any application locks before executing the callback, + // and cannot be entered recursively. + auto* functor = static_cast< + grpc_experimental_completion_queue_functor*>(event.tag); + functor->functor_run(functor, event.success); + } + if (threads_remaining.FetchSub( + 1, grpc_core::MemoryOrder::ACQ_REL) == 1) { + grpc_completion_queue_destroy(cq); + } + }, + shared_cq_next, nullptr), + GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT, + grpc_core::ExecutorJobType::LONG); + } + } + return shared_cq_next; + } + // Use manually-constructed Mutex to avoid static construction issues + static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu; + static grpc_completion_queue* + shared_cq_next; // GUARDED_BY(shared_cq_next_mu) + static grpc_core::Atomic<int> threads_remaining; +}; + +grpc_core::ManualConstructor<grpc_core::Mutex> + cq_callback_alternative_data::shared_cq_next_mu; +grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr; +grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0}; + } // namespace /* Completion queue structure */ @@ -346,6 +425,12 @@ struct grpc_completion_queue { const cq_vtable* vtable; const cq_poller_vtable* poller_vtable; + // The pollset entry is allowed to enable proxy CQs like the + // callback_alternative. + // TODO(vjpai): Consider removing pollset and reverting to previous + // calculation of pollset once callback_alternative is no longer needed. + grpc_pollset* pollset; + #ifndef NDEBUG void** outstanding_tags; size_t outstanding_tag_count; @@ -360,13 +445,17 @@ struct grpc_completion_queue { static void cq_finish_shutdown_next(grpc_completion_queue* cq); static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); static void cq_finish_shutdown_callback(grpc_completion_queue* cq); +static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq); static void cq_shutdown_next(grpc_completion_queue* cq); static void cq_shutdown_pluck(grpc_completion_queue* cq); static void cq_shutdown_callback(grpc_completion_queue* cq); +static void cq_shutdown_callback_alternative(grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); +static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, + void* tag); // A cq_end_op function is called when an operation on a given CQ with // a given tag has completed. The storage argument is a reference to the @@ -389,12 +478,20 @@ static void cq_end_op_for_callback( void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); +static void cq_end_op_for_callback_alternative( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal); + static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); +static grpc_pollset* cq_proxy_pollset_for_callback_alternative( + grpc_completion_queue* cq); + // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback static void cq_init_next( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); @@ -402,29 +499,39 @@ static void cq_init_pluck( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_init_callback( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +// poller becomes only option. +static void cq_init_callback_alternative( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); +static void cq_destroy_callback_alternative(void* data); /* Completion queue vtables based on the completion-type */ -static const cq_vtable g_cq_vtable[] = { +// TODO(vjpai): Make this const again once we stop needing callback_alternative +static cq_vtable g_polling_cq_vtable[] = { /* GRPC_CQ_NEXT */ {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, - nullptr}, + nullptr, nullptr}, /* GRPC_CQ_PLUCK */ {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, - cq_pluck}, + cq_pluck, nullptr}, /* GRPC_CQ_CALLBACK */ {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, - cq_end_op_for_callback, nullptr, nullptr}, + cq_end_op_for_callback, nullptr, nullptr, nullptr}, }; +// Separate vtable for non-polling cqs, assign at init +static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) / + sizeof(g_polling_cq_vtable[0])]; + #define DATA_FROM_CQ(cq) ((void*)(cq + 1)) -#define POLLSET_FROM_CQ(cq) \ +#define INLINE_POLLSET_FROM_CQ(cq) \ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) +#define POLLSET_FROM_CQ(cq) (cq->pollset) grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); @@ -443,6 +550,46 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); gpr_tls_init(&g_cached_cq); + g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT]; + g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK]; + g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] = + g_polling_cq_vtable[GRPC_CQ_CALLBACK]; +} + +// TODO(vjpai): Remove when callback_alternative is no longer needed +void grpc_cq_init() { + // If the iomgr runs in the background, we can use the preferred callback CQ. + // If the iomgr is non-polling, we cannot use the alternative callback CQ. + if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { + cq_callback_alternative_data::shared_cq_next_mu.Init(); + g_polling_cq_vtable[GRPC_CQ_CALLBACK] = { + GRPC_CQ_CALLBACK, + sizeof(cq_callback_alternative_data), + cq_init_callback_alternative, + cq_shutdown_callback_alternative, + cq_destroy_callback_alternative, + cq_begin_op_for_callback_alternative, + cq_end_op_for_callback_alternative, + nullptr, + nullptr, + cq_proxy_pollset_for_callback_alternative}; + } +} + +// TODO(vjpai): Remove when callback_alternative is no longer needed +void grpc_cq_shutdown() { + if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { + { + grpc_core::MutexLock lock( + &*cq_callback_alternative_data::shared_cq_next_mu); + if (cq_callback_alternative_data::shared_cq_next != nullptr) { + grpc_completion_queue_shutdown( + cq_callback_alternative_data::shared_cq_next); + } + cq_callback_alternative_data::shared_cq_next = nullptr; + } + cq_callback_alternative_data::shared_cq_next_mu.Destroy(); + } } void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { @@ -521,7 +668,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - const cq_vtable* vtable = &g_cq_vtable[completion_type]; + const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING) + ? &g_nonpolling_cq_vtable[completion_type] + : &g_polling_cq_vtable[completion_type]; const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; @@ -538,9 +687,18 @@ grpc_completion_queue* grpc_completion_queue_create_internal( /* One for destroy(), one for pollset_shutdown */ new (&cq->owning_refs) grpc_core::RefCount(2); - poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); vtable->init(DATA_FROM_CQ(cq), shutdown_callback); + // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can + // be removed and the nullptr proxy_pollset value below can be the definition + // of POLLSET_FROM_CQ. + cq->pollset = cq->vtable->proxy_pollset == nullptr + ? INLINE_POLLSET_FROM_CQ(cq) + : cq->vtable->proxy_pollset(cq); + // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be + // init'ed in its CQ init. + cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu); + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, grpc_schedule_on_exec_ctx); return cq; @@ -578,6 +736,17 @@ static void cq_destroy_callback(void* data) { cqd->~cq_callback_data(); } +static void cq_init_callback_alternative( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { + new (data) cq_callback_alternative_data(shutdown_callback); +} + +static void cq_destroy_callback_alternative(void* data) { + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*>(data); + cqd->~cq_callback_alternative_data(); +} + grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; } @@ -618,7 +787,9 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) { #endif if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); + // Only destroy the inlined pollset. If a proxy CQ is used, the proxy + // pollset will be destroyed by the proxy CQ. + cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -669,6 +840,14 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) { return cqd->pending_events.IncrementIfNonzero(); } +static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, + void* tag) { + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq); + return grpc_cq_begin_op(cqd->implementation, tag) && + cqd->pending_events.IncrementIfNonzero(); +} + bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); @@ -832,7 +1011,7 @@ static void cq_end_op_for_pluck( GRPC_ERROR_UNREF(error); } -static void functor_callback(void* arg, grpc_error* error) { +void functor_callback(void* arg, grpc_error* error) { auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg); functor->functor_run(functor, error == GRPC_ERROR_NONE); } @@ -892,6 +1071,40 @@ static void cq_end_op_for_callback( GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); } +static void cq_end_op_for_callback_alternative( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal) { + GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0); + + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq); + + if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || + (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE)) { + const char* errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 6, (cq, tag, errmsg, done, done_arg, storage)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + // Pass through the actual work to the internal nextable CQ + grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage, + internal); + + cq_check_tag(cq, tag, true); /* Used in debug builds only */ + + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { + cq_finish_shutdown_callback_alternative(cq); + } +} + void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, @@ -899,6 +1112,13 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } +static grpc_pollset* cq_proxy_pollset_for_callback_alternative( + grpc_completion_queue* cq) { + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq)); + return POLLSET_FROM_CQ(cqd->implementation); +} + struct cq_is_finished_arg { gpr_atm last_seen_things_queued_ever; grpc_completion_queue* cq; @@ -1379,6 +1599,21 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GRPC_ERROR_NONE); } +static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) { + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq); + auto* callback = cqd->shutdown_callback; + + GPR_ASSERT(cqd->shutdown_called); + + // Shutdown the non-proxy pollset + cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); + grpc_core::Executor::Run( + GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr), + GRPC_ERROR_NONE); +} + static void cq_shutdown_callback(grpc_completion_queue* cq) { cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); @@ -1405,6 +1640,33 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } +static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) { + cq_callback_alternative_data* cqd = + static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq); + + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ + GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); + return; + } + cqd->shutdown_called = true; + if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { + gpr_mu_unlock(cq->mu); + cq_finish_shutdown_callback_alternative(cq); + } else { + gpr_mu_unlock(cq->mu); + } + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); +} + /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 4a114be8285..002f7b0728b 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -69,6 +69,14 @@ void grpc_cq_internal_unref(grpc_completion_queue* cc); /* Initializes global variables used by completion queues */ void grpc_cq_global_init(); +// Completion queue initializations that must be done after iomgr +// TODO(vjpai): Remove when callback_alternative is no longer needed. +void grpc_cq_init(); + +// Completion queue shutdowns that must be done before iomgr shutdown. +// TODO(vjpai): Remove when callback_alternative is no longer needed. +void grpc_cq_shutdown(); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index d8bc4e4dd32..3bbd7dc53d6 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -144,6 +144,7 @@ void grpc_init(void) { grpc_core::ApplicationCallbackExecCtx::GlobalInit(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); + grpc_cq_init(); gpr_timers_global_init(); grpc_core::HandshakerRegistry::Init(); grpc_security_init(); @@ -169,6 +170,7 @@ void grpc_shutdown_internal_locked(void) { int i; { grpc_core::ExecCtx exec_ctx(0); + grpc_cq_shutdown(); grpc_iomgr_shutdown_background_closure(); { grpc_timer_manager_set_threading(false); // shutdown timer_manager thread diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 5387e5f0af9..63e726d0cde 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -130,10 +130,10 @@ class ClientCallbackEnd2endTest server_ = builder.BuildAndStart(); is_server_started_ = true; - if (GetParam().protocol == Protocol::TCP && - !grpc_iomgr_run_in_background()) { - do_not_test_ = true; - } + // if (GetParam().protocol == Protocol::TCP && + // !grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // } } void ResetStub() { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f157d20cc6f..e6538344b03 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -328,11 +328,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { } void SetUp() override { - if (GetParam().callback_server && !GetParam().inproc && - !grpc_iomgr_run_in_background()) { - do_not_test_ = true; - return; - } + // if (GetParam().callback_server && !GetParam().inproc && + // !grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // return; + // } } void TearDown() override { diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc index b34a08ef281..a54e03a744a 100644 --- a/test/cpp/end2end/message_allocator_end2end_test.cc +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -118,12 +118,12 @@ class MessageAllocatorEnd2endTestBase protected: MessageAllocatorEnd2endTestBase() { GetParam().Log(); - if (GetParam().protocol == Protocol::TCP) { - if (!grpc_iomgr_run_in_background()) { - do_not_test_ = true; - return; - } - } + // if (GetParam().protocol == Protocol::TCP) { + // if (!grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // return; + // } + // } } ~MessageAllocatorEnd2endTestBase() = default; diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index c53eb2b9413..1640c4d52f0 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,6 +69,11 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(void* /*arg*/, grpc_cq_completion* /*completion*/) {} +static void DoneWithCompletionOnHeap(void* /*arg*/, + grpc_cq_completion* completion) { + delete completion; +} + class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { @@ -205,8 +210,15 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); - grpc_completion_queue* cc = - grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); + // This test with stack-allocated completions only works for non-polling or + // EM-polling callback core CQs. For generality, test with non-polling. + grpc_completion_queue_attributes attr; + attr.version = 2; + attr.cq_completion_type = GRPC_CQ_CALLBACK; + attr.cq_polling_type = GRPC_CQ_NON_POLLING; + attr.cq_shutdown_cb = &shutdown_cb; + grpc_completion_queue* cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); for (auto _ : state) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; @@ -240,7 +252,53 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); } +static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) { + TrackCounters track_counters; + int iteration = 0, current_iterations = 0; + TagCallback tag_cb(&iteration); + gpr_mu_init(&mu); + gpr_cv_init(&cv); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&shutdown_cv); + bool got_shutdown = false; + ShutdownCallback shutdown_cb(&got_shutdown); + grpc_completion_queue* cc = + grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); + for (auto _ : state) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_cq_completion* completion = new grpc_cq_completion; + GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb)); + grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap, + nullptr, completion); + } + shutdown_and_destroy(cc); + + gpr_mu_lock(&mu); + current_iterations = static_cast<int>(state.iterations()); + while (current_iterations != iteration) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); + + GPR_ASSERT(got_shutdown); + GPR_ASSERT(iteration == static_cast<int>(state.iterations())); + track_counters.Finish(state); + gpr_cv_destroy(&cv); + gpr_mu_destroy(&mu); + gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&shutdown_mu); +} BENCHMARK(BM_Callback_CQ_Pass1Core); +BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion); } // namespace testing } // namespace grpc