From 9c5a39c6db06325f7590adb58ad4bde3457ef81c Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 09:55:43 -0700 Subject: [PATCH] Revert "Merge pull request #23361 from vjpai/em_agnostic_core_callback_cq" This reverts commit a46cb5e86a8ec99fa423be61253c223914cdbf2a, reversing changes made to b5d42e75fb8a7180e40cdd8ed9f6c3a1b95f1f49. --- src/core/lib/iomgr/ev_posix.cc | 2 - src/core/lib/iomgr/iomgr.cc | 14 - src/core/lib/iomgr/iomgr.h | 10 - src/core/lib/surface/completion_queue.cc | 282 +----------------- src/core/lib/surface/completion_queue.h | 8 - src/core/lib/surface/init.cc | 2 - .../end2end/client_callback_end2end_test.cc | 8 +- test/cpp/end2end/end2end_test.cc | 10 +- .../end2end/message_allocator_end2end_test.cc | 12 +- test/cpp/microbenchmarks/bm_cq.cc | 62 +--- 10 files changed, 27 insertions(+), 383 deletions(-) diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index f0cb416447f..3d32359be46 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -37,7 +37,6 @@ #include "src/core/lib/iomgr/ev_epollex_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/iomgr/internal_errqueue.h" -#include "src/core/lib/iomgr/iomgr.h" GPR_GLOBAL_CONFIG_DEFINE_STRING( grpc_poll_strategy, "all", @@ -108,7 +107,6 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { auto ret = grpc_init_poll_posix(explicit_request); real_poll_function = grpc_poll_function; grpc_poll_function = dummy_poll; - grpc_iomgr_mark_non_polling_internal(); return ret; } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index d7da7c9151d..802e3bdcb4d 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -50,7 +50,6 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; static bool g_grpc_abort_on_leaks; -static bool g_iomgr_non_polling; void grpc_iomgr_init() { grpc_core::ExecCtx exec_ctx; @@ -193,16 +192,3 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) { } bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; } - -bool grpc_iomgr_non_polling() { - gpr_mu_lock(&g_mu); - bool ret = g_iomgr_non_polling; - gpr_mu_unlock(&g_mu); - return ret; -} - -void grpc_iomgr_mark_non_polling_internal() { - gpr_mu_lock(&g_mu); - g_iomgr_non_polling = true; - gpr_mu_unlock(&g_mu); -} diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index cb9f3eb35c8..e02f15e551c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -45,16 +45,6 @@ void grpc_iomgr_shutdown_background_closure(); */ bool grpc_iomgr_run_in_background(); -/* Returns true if polling engine is non-polling, false otherwise. - * Currently only 'none' is non-polling. - */ -bool grpc_iomgr_non_polling(); - -/* Mark the polling engine as non-polling. For internal use only. - * Currently only 'none' is non-polling. - */ -void grpc_iomgr_mark_non_polling_internal(); - /** Returns true if the caller is a worker thread for any background poller. */ bool grpc_iomgr_is_any_background_poller_thread(); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index c59e329fabe..a9f65bd5310 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -39,7 +39,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" -#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" @@ -209,9 +208,6 @@ struct cq_vtable { void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); - // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer - // needed. - grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq); }; namespace { @@ -313,7 +309,7 @@ struct cq_pluck_data { }; struct cq_callback_data { - explicit cq_callback_data( + cq_callback_data( grpc_experimental_completion_queue_functor* shutdown_callback) : shutdown_callback(shutdown_callback) {} @@ -338,81 +334,6 @@ struct cq_callback_data { grpc_experimental_completion_queue_functor* shutdown_callback; }; -// TODO(vjpai): Remove all callback_alternative variants when event manager is -// the only supported poller. -struct cq_callback_alternative_data { - explicit cq_callback_alternative_data( - grpc_experimental_completion_queue_functor* shutdown_callback) - : implementation(SharedNextableCQ()), - shutdown_callback(shutdown_callback) {} - - /* This just points to a single shared nextable CQ */ - grpc_completion_queue* const implementation; - - /** Number of outstanding events (+1 if not shut down) - Initial count is dropped by grpc_completion_queue_shutdown */ - grpc_core::Atomic pending_events{1}; - - /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called = false; - - /** A callback that gets invoked when the CQ completes shutdown */ - grpc_experimental_completion_queue_functor* shutdown_callback; - - static grpc_completion_queue* SharedNextableCQ() { - grpc_core::MutexLock lock(&*shared_cq_next_mu); - - if (shared_cq_next == nullptr) { - shared_cq_next = grpc_completion_queue_create_for_next(nullptr); - int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32); - threads_remaining.Store(num_nexting_threads, - grpc_core::MemoryOrder::RELEASE); - for (int i = 0; i < num_nexting_threads; i++) { - grpc_core::Executor::Run( - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* /*error*/) { - grpc_completion_queue* cq = - static_cast(arg); - while (true) { - grpc_event event = grpc_completion_queue_next( - cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - if (event.type == GRPC_QUEUE_SHUTDOWN) { - break; - } - GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE); - // We can always execute the callback inline rather than - // pushing it to another Executor thread because this - // thread is definitely running on an executor, does not - // hold any application locks before executing the callback, - // and cannot be entered recursively. - auto* functor = static_cast< - grpc_experimental_completion_queue_functor*>(event.tag); - functor->functor_run(functor, event.success); - } - if (threads_remaining.FetchSub( - 1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - grpc_completion_queue_destroy(cq); - } - }, - shared_cq_next, nullptr), - GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT, - grpc_core::ExecutorJobType::LONG); - } - } - return shared_cq_next; - } - // Use manually-constructed Mutex to avoid static construction issues - static grpc_core::ManualConstructor shared_cq_next_mu; - static grpc_completion_queue* - shared_cq_next; // GUARDED_BY(shared_cq_next_mu) - static grpc_core::Atomic threads_remaining; -}; - -grpc_core::ManualConstructor - cq_callback_alternative_data::shared_cq_next_mu; -grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr; -grpc_core::Atomic cq_callback_alternative_data::threads_remaining{0}; - } // namespace /* Completion queue structure */ @@ -425,12 +346,6 @@ struct grpc_completion_queue { const cq_vtable* vtable; const cq_poller_vtable* poller_vtable; - // The pollset entry is allowed to enable proxy CQs like the - // callback_alternative. - // TODO(vjpai): Consider removing pollset and reverting to previous - // calculation of pollset once callback_alternative is no longer needed. - grpc_pollset* pollset; - #ifndef NDEBUG void** outstanding_tags; size_t outstanding_tag_count; @@ -445,17 +360,13 @@ struct grpc_completion_queue { static void cq_finish_shutdown_next(grpc_completion_queue* cq); static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); static void cq_finish_shutdown_callback(grpc_completion_queue* cq); -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq); static void cq_shutdown_next(grpc_completion_queue* cq); static void cq_shutdown_pluck(grpc_completion_queue* cq); static void cq_shutdown_callback(grpc_completion_queue* cq); -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, - void* tag); // A cq_end_op function is called when an operation on a given CQ with // a given tag has completed. The storage argument is a reference to the @@ -478,20 +389,12 @@ static void cq_end_op_for_callback( void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); -static void cq_end_op_for_callback_alternative( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal); - static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -static grpc_pollset* cq_proxy_pollset_for_callback_alternative( - grpc_completion_queue* cq); - // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback static void cq_init_next( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); @@ -499,39 +402,29 @@ static void cq_init_pluck( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_init_callback( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); -// poller becomes only option. -static void cq_init_callback_alternative( - void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); -static void cq_destroy_callback_alternative(void* data); /* Completion queue vtables based on the completion-type */ -// TODO(vjpai): Make this const again once we stop needing callback_alternative -static cq_vtable g_polling_cq_vtable[] = { +static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, - nullptr, nullptr}, + nullptr}, /* GRPC_CQ_PLUCK */ {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, - cq_pluck, nullptr}, + cq_pluck}, /* GRPC_CQ_CALLBACK */ {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, - cq_end_op_for_callback, nullptr, nullptr, nullptr}, + cq_end_op_for_callback, nullptr, nullptr}, }; -// Separate vtable for non-polling cqs, assign at init -static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) / - sizeof(g_polling_cq_vtable[0])]; - #define DATA_FROM_CQ(cq) ((void*)(cq + 1)) -#define INLINE_POLLSET_FROM_CQ(cq) \ +#define POLLSET_FROM_CQ(cq) \ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) -#define POLLSET_FROM_CQ(cq) (cq->pollset) grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); @@ -550,46 +443,6 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); gpr_tls_init(&g_cached_cq); - g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT]; - g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK]; - g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] = - g_polling_cq_vtable[GRPC_CQ_CALLBACK]; -} - -// TODO(vjpai): Remove when callback_alternative is no longer needed -void grpc_cq_init() { - // If the iomgr runs in the background, we can use the preferred callback CQ. - // If the iomgr is non-polling, we cannot use the alternative callback CQ. - if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { - cq_callback_alternative_data::shared_cq_next_mu.Init(); - g_polling_cq_vtable[GRPC_CQ_CALLBACK] = { - GRPC_CQ_CALLBACK, - sizeof(cq_callback_alternative_data), - cq_init_callback_alternative, - cq_shutdown_callback_alternative, - cq_destroy_callback_alternative, - cq_begin_op_for_callback_alternative, - cq_end_op_for_callback_alternative, - nullptr, - nullptr, - cq_proxy_pollset_for_callback_alternative}; - } -} - -// TODO(vjpai): Remove when callback_alternative is no longer needed -void grpc_cq_shutdown() { - if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { - { - grpc_core::MutexLock lock( - &*cq_callback_alternative_data::shared_cq_next_mu); - if (cq_callback_alternative_data::shared_cq_next != nullptr) { - grpc_completion_queue_shutdown( - cq_callback_alternative_data::shared_cq_next); - } - cq_callback_alternative_data::shared_cq_next = nullptr; - } - cq_callback_alternative_data::shared_cq_next_mu.Destroy(); - } } void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { @@ -668,9 +521,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING) - ? &g_nonpolling_cq_vtable[completion_type] - : &g_polling_cq_vtable[completion_type]; + const cq_vtable* vtable = &g_cq_vtable[completion_type]; const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; @@ -687,18 +538,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal( /* One for destroy(), one for pollset_shutdown */ new (&cq->owning_refs) grpc_core::RefCount(2); + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); vtable->init(DATA_FROM_CQ(cq), shutdown_callback); - // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can - // be removed and the nullptr proxy_pollset value below can be the definition - // of POLLSET_FROM_CQ. - cq->pollset = cq->vtable->proxy_pollset == nullptr - ? INLINE_POLLSET_FROM_CQ(cq) - : cq->vtable->proxy_pollset(cq); - // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be - // init'ed in its CQ init. - cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu); - GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, grpc_schedule_on_exec_ctx); return cq; @@ -736,17 +578,6 @@ static void cq_destroy_callback(void* data) { cqd->~cq_callback_data(); } -static void cq_init_callback_alternative( - void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - new (data) cq_callback_alternative_data(shutdown_callback); -} - -static void cq_destroy_callback_alternative(void* data) { - cq_callback_alternative_data* cqd = - static_cast(data); - cqd->~cq_callback_alternative_data(); -} - grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; } @@ -787,9 +618,7 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) { #endif if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - // Only destroy the inlined pollset. If a proxy CQ is used, the proxy - // pollset will be destroyed by the proxy CQ. - cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq)); + cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -840,14 +669,6 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) { return cqd->pending_events.IncrementIfNonzero(); } -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, - void* tag) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - return grpc_cq_begin_op(cqd->implementation, tag) && - cqd->pending_events.IncrementIfNonzero(); -} - bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); @@ -1011,7 +832,7 @@ static void cq_end_op_for_pluck( GRPC_ERROR_UNREF(error); } -void functor_callback(void* arg, grpc_error* error) { +static void functor_callback(void* arg, grpc_error* error) { auto* functor = static_cast(arg); functor->functor_run(functor, error == GRPC_ERROR_NONE); } @@ -1071,40 +892,6 @@ static void cq_end_op_for_callback( GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); } -static void cq_end_op_for_callback_alternative( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { - GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0); - - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - - if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || - (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && - error != GRPC_ERROR_NONE)) { - const char* errmsg = grpc_error_string(error); - GRPC_API_TRACE( - "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, " - "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && - error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); - } - } - - // Pass through the actual work to the internal nextable CQ - grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage, - internal); - - cq_check_tag(cq, tag, true); /* Used in debug builds only */ - - if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - cq_finish_shutdown_callback_alternative(cq); - } -} - void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, @@ -1112,13 +899,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } -static grpc_pollset* cq_proxy_pollset_for_callback_alternative( - grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast(DATA_FROM_CQ(cq)); - return POLLSET_FROM_CQ(cqd->implementation); -} - struct cq_is_finished_arg { gpr_atm last_seen_things_queued_ever; grpc_completion_queue* cq; @@ -1599,21 +1379,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GRPC_ERROR_NONE); } -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - auto* callback = cqd->shutdown_callback; - - GPR_ASSERT(cqd->shutdown_called); - - // Shutdown the non-proxy pollset - cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); - grpc_core::Executor::Run( - GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr), - GRPC_ERROR_NONE); -} - static void cq_shutdown_callback(grpc_completion_queue* cq) { cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -1640,33 +1405,6 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - - /* Need an extra ref for cq here because: - * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. - * Pollset shutdown decrements the cq ref count which can potentially destroy - * the cq (if that happens to be the last ref). - * Creating an extra ref here prevents the cq from getting destroyed while - * this function is still active */ - GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)"); - gpr_mu_lock(cq->mu); - if (cqd->shutdown_called) { - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); - return; - } - cqd->shutdown_called = true; - if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - gpr_mu_unlock(cq->mu); - cq_finish_shutdown_callback_alternative(cq); - } else { - gpr_mu_unlock(cq->mu); - } - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); -} - /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 002f7b0728b..4a114be8285 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -69,14 +69,6 @@ void grpc_cq_internal_unref(grpc_completion_queue* cc); /* Initializes global variables used by completion queues */ void grpc_cq_global_init(); -// Completion queue initializations that must be done after iomgr -// TODO(vjpai): Remove when callback_alternative is no longer needed. -void grpc_cq_init(); - -// Completion queue shutdowns that must be done before iomgr shutdown. -// TODO(vjpai): Remove when callback_alternative is no longer needed. -void grpc_cq_shutdown(); - /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 3bbd7dc53d6..d8bc4e4dd32 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -144,7 +144,6 @@ void grpc_init(void) { grpc_core::ApplicationCallbackExecCtx::GlobalInit(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); - grpc_cq_init(); gpr_timers_global_init(); grpc_core::HandshakerRegistry::Init(); grpc_security_init(); @@ -170,7 +169,6 @@ void grpc_shutdown_internal_locked(void) { int i; { grpc_core::ExecCtx exec_ctx(0); - grpc_cq_shutdown(); grpc_iomgr_shutdown_background_closure(); { grpc_timer_manager_set_threading(false); // shutdown timer_manager thread diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 63e726d0cde..5387e5f0af9 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -130,10 +130,10 @@ class ClientCallbackEnd2endTest server_ = builder.BuildAndStart(); is_server_started_ = true; - // if (GetParam().protocol == Protocol::TCP && - // !grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // } + if (GetParam().protocol == Protocol::TCP && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + } } void ResetStub() { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index e6538344b03..f157d20cc6f 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -328,11 +328,11 @@ class End2endTest : public ::testing::TestWithParam { } void SetUp() override { - // if (GetParam().callback_server && !GetParam().inproc && - // !grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // return; - // } + if (GetParam().callback_server && !GetParam().inproc && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } } void TearDown() override { diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc index 95bf7f4faa1..c15066794bc 100644 --- a/test/cpp/end2end/message_allocator_end2end_test.cc +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -119,12 +119,12 @@ class MessageAllocatorEnd2endTestBase protected: MessageAllocatorEnd2endTestBase() { GetParam().Log(); - // if (GetParam().protocol == Protocol::TCP) { - // if (!grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // return; - // } - // } + if (GetParam().protocol == Protocol::TCP) { + if (!grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } } ~MessageAllocatorEnd2endTestBase() = default; diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 1640c4d52f0..c53eb2b9413 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,11 +69,6 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(void* /*arg*/, grpc_cq_completion* /*completion*/) {} -static void DoneWithCompletionOnHeap(void* /*arg*/, - grpc_cq_completion* completion) { - delete completion; -} - class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { @@ -210,15 +205,8 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); - // This test with stack-allocated completions only works for non-polling or - // EM-polling callback core CQs. For generality, test with non-polling. - grpc_completion_queue_attributes attr; - attr.version = 2; - attr.cq_completion_type = GRPC_CQ_CALLBACK; - attr.cq_polling_type = GRPC_CQ_NON_POLLING; - attr.cq_shutdown_cb = &shutdown_cb; - grpc_completion_queue* cc = grpc_completion_queue_create( - grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); + grpc_completion_queue* cc = + grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); for (auto _ : state) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; @@ -252,53 +240,7 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); } -static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) { - TrackCounters track_counters; - int iteration = 0, current_iterations = 0; - TagCallback tag_cb(&iteration); - gpr_mu_init(&mu); - gpr_cv_init(&cv); - gpr_mu_init(&shutdown_mu); - gpr_cv_init(&shutdown_cv); - bool got_shutdown = false; - ShutdownCallback shutdown_cb(&got_shutdown); - grpc_completion_queue* cc = - grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); - for (auto _ : state) { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - grpc_cq_completion* completion = new grpc_cq_completion; - GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb)); - grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap, - nullptr, completion); - } - shutdown_and_destroy(cc); - - gpr_mu_lock(&mu); - current_iterations = static_cast(state.iterations()); - while (current_iterations != iteration) { - // Wait for all the callbacks to complete. - gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&mu); - - gpr_mu_lock(&shutdown_mu); - while (!got_shutdown) { - // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&shutdown_mu); - - GPR_ASSERT(got_shutdown); - GPR_ASSERT(iteration == static_cast(state.iterations())); - track_counters.Finish(state); - gpr_cv_destroy(&cv); - gpr_mu_destroy(&mu); - gpr_cv_destroy(&shutdown_cv); - gpr_mu_destroy(&shutdown_mu); -} BENCHMARK(BM_Callback_CQ_Pass1Core); -BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion); } // namespace testing } // namespace grpc