Merge pull request #23361 from vjpai/em_agnostic_core_callback_cq

EM-agnostic core callback CQ
pull/23122/head^2
Vijay Pai 5 years ago committed by GitHub
commit a46cb5e86a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/iomgr/ev_posix.cc
  2. 14
      src/core/lib/iomgr/iomgr.cc
  3. 10
      src/core/lib/iomgr/iomgr.h
  4. 282
      src/core/lib/surface/completion_queue.cc
  5. 8
      src/core/lib/surface/completion_queue.h
  6. 2
      src/core/lib/surface/init.cc
  7. 8
      test/cpp/end2end/client_callback_end2end_test.cc
  8. 10
      test/cpp/end2end/end2end_test.cc
  9. 12
      test/cpp/end2end/message_allocator_end2end_test.cc
  10. 62
      test/cpp/microbenchmarks/bm_cq.cc

@ -37,6 +37,7 @@
#include "src/core/lib/iomgr/ev_epollex_linux.h" #include "src/core/lib/iomgr/ev_epollex_linux.h"
#include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/iomgr/internal_errqueue.h" #include "src/core/lib/iomgr/internal_errqueue.h"
#include "src/core/lib/iomgr/iomgr.h"
GPR_GLOBAL_CONFIG_DEFINE_STRING( GPR_GLOBAL_CONFIG_DEFINE_STRING(
grpc_poll_strategy, "all", grpc_poll_strategy, "all",
@ -107,6 +108,7 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
auto ret = grpc_init_poll_posix(explicit_request); auto ret = grpc_init_poll_posix(explicit_request);
real_poll_function = grpc_poll_function; real_poll_function = grpc_poll_function;
grpc_poll_function = dummy_poll; grpc_poll_function = dummy_poll;
grpc_iomgr_mark_non_polling_internal();
return ret; return ret;
} }

@ -50,6 +50,7 @@ static gpr_cv g_rcv;
static int g_shutdown; static int g_shutdown;
static grpc_iomgr_object g_root_object; static grpc_iomgr_object g_root_object;
static bool g_grpc_abort_on_leaks; static bool g_grpc_abort_on_leaks;
static bool g_iomgr_non_polling;
void grpc_iomgr_init() { void grpc_iomgr_init() {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
@ -192,3 +193,16 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) {
} }
bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; } bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; }
bool grpc_iomgr_non_polling() {
gpr_mu_lock(&g_mu);
bool ret = g_iomgr_non_polling;
gpr_mu_unlock(&g_mu);
return ret;
}
void grpc_iomgr_mark_non_polling_internal() {
gpr_mu_lock(&g_mu);
g_iomgr_non_polling = true;
gpr_mu_unlock(&g_mu);
}

@ -45,6 +45,16 @@ void grpc_iomgr_shutdown_background_closure();
*/ */
bool grpc_iomgr_run_in_background(); bool grpc_iomgr_run_in_background();
/* Returns true if polling engine is non-polling, false otherwise.
* Currently only 'none' is non-polling.
*/
bool grpc_iomgr_non_polling();
/* Mark the polling engine as non-polling. For internal use only.
* Currently only 'none' is non-polling.
*/
void grpc_iomgr_mark_non_polling_internal();
/** Returns true if the caller is a worker thread for any background poller. */ /** Returns true if the caller is a worker thread for any background poller. */
bool grpc_iomgr_is_any_background_poller_thread(); bool grpc_iomgr_is_any_background_poller_thread();

@ -39,6 +39,7 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
@ -208,6 +209,9 @@ struct cq_vtable {
void* reserved); void* reserved);
grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved); gpr_timespec deadline, void* reserved);
// TODO(vjpai): Remove proxy_pollset once callback_alternative no longer
// needed.
grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq);
}; };
namespace { namespace {
@ -309,7 +313,7 @@ struct cq_pluck_data {
}; };
struct cq_callback_data { struct cq_callback_data {
cq_callback_data( explicit cq_callback_data(
grpc_experimental_completion_queue_functor* shutdown_callback) grpc_experimental_completion_queue_functor* shutdown_callback)
: shutdown_callback(shutdown_callback) {} : shutdown_callback(shutdown_callback) {}
@ -334,6 +338,81 @@ struct cq_callback_data {
grpc_experimental_completion_queue_functor* shutdown_callback; grpc_experimental_completion_queue_functor* shutdown_callback;
}; };
// TODO(vjpai): Remove all callback_alternative variants when event manager is
// the only supported poller.
struct cq_callback_alternative_data {
explicit cq_callback_alternative_data(
grpc_experimental_completion_queue_functor* shutdown_callback)
: implementation(SharedNextableCQ()),
shutdown_callback(shutdown_callback) {}
/* This just points to a single shared nextable CQ */
grpc_completion_queue* const implementation;
/** Number of outstanding events (+1 if not shut down)
Initial count is dropped by grpc_completion_queue_shutdown */
grpc_core::Atomic<intptr_t> pending_events{1};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called = false;
/** A callback that gets invoked when the CQ completes shutdown */
grpc_experimental_completion_queue_functor* shutdown_callback;
static grpc_completion_queue* SharedNextableCQ() {
grpc_core::MutexLock lock(&*shared_cq_next_mu);
if (shared_cq_next == nullptr) {
shared_cq_next = grpc_completion_queue_create_for_next(nullptr);
int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32);
threads_remaining.Store(num_nexting_threads,
grpc_core::MemoryOrder::RELEASE);
for (int i = 0; i < num_nexting_threads; i++) {
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
grpc_completion_queue* cq =
static_cast<grpc_completion_queue*>(arg);
while (true) {
grpc_event event = grpc_completion_queue_next(
cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE);
// We can always execute the callback inline rather than
// pushing it to another Executor thread because this
// thread is definitely running on an executor, does not
// hold any application locks before executing the callback,
// and cannot be entered recursively.
auto* functor = static_cast<
grpc_experimental_completion_queue_functor*>(event.tag);
functor->functor_run(functor, event.success);
}
if (threads_remaining.FetchSub(
1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
grpc_completion_queue_destroy(cq);
}
},
shared_cq_next, nullptr),
GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
}
}
return shared_cq_next;
}
// Use manually-constructed Mutex to avoid static construction issues
static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu;
static grpc_completion_queue*
shared_cq_next; // GUARDED_BY(shared_cq_next_mu)
static grpc_core::Atomic<int> threads_remaining;
};
grpc_core::ManualConstructor<grpc_core::Mutex>
cq_callback_alternative_data::shared_cq_next_mu;
grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr;
grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0};
} // namespace } // namespace
/* Completion queue structure */ /* Completion queue structure */
@ -346,6 +425,12 @@ struct grpc_completion_queue {
const cq_vtable* vtable; const cq_vtable* vtable;
const cq_poller_vtable* poller_vtable; const cq_poller_vtable* poller_vtable;
// The pollset entry is allowed to enable proxy CQs like the
// callback_alternative.
// TODO(vjpai): Consider removing pollset and reverting to previous
// calculation of pollset once callback_alternative is no longer needed.
grpc_pollset* pollset;
#ifndef NDEBUG #ifndef NDEBUG
void** outstanding_tags; void** outstanding_tags;
size_t outstanding_tag_count; size_t outstanding_tag_count;
@ -360,13 +445,17 @@ struct grpc_completion_queue {
static void cq_finish_shutdown_next(grpc_completion_queue* cq); static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback(grpc_completion_queue* cq); static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq); static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq); static void cq_shutdown_pluck(grpc_completion_queue* cq);
static void cq_shutdown_callback(grpc_completion_queue* cq); static void cq_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_callback_alternative(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
void* tag);
// A cq_end_op function is called when an operation on a given CQ with // A cq_end_op function is called when an operation on a given CQ with
// a given tag has completed. The storage argument is a reference to the // a given tag has completed. The storage argument is a reference to the
@ -389,12 +478,20 @@ static void cq_end_op_for_callback(
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal); grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_callback_alternative(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved); void* reserved);
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved); gpr_timespec deadline, void* reserved);
static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
grpc_completion_queue* cq);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
static void cq_init_next( static void cq_init_next(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
@ -402,29 +499,39 @@ static void cq_init_pluck(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_callback( static void cq_init_callback(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
// poller becomes only option.
static void cq_init_callback_alternative(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data); static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data); static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data); static void cq_destroy_callback(void* data);
static void cq_destroy_callback_alternative(void* data);
/* Completion queue vtables based on the completion-type */ /* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = { // TODO(vjpai): Make this const again once we stop needing callback_alternative
static cq_vtable g_polling_cq_vtable[] = {
/* GRPC_CQ_NEXT */ /* GRPC_CQ_NEXT */
{GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
nullptr}, nullptr, nullptr},
/* GRPC_CQ_PLUCK */ /* GRPC_CQ_PLUCK */
{GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
cq_pluck}, cq_pluck, nullptr},
/* GRPC_CQ_CALLBACK */ /* GRPC_CQ_CALLBACK */
{GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
cq_end_op_for_callback, nullptr, nullptr}, cq_end_op_for_callback, nullptr, nullptr, nullptr},
}; };
// Separate vtable for non-polling cqs, assign at init
static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) /
sizeof(g_polling_cq_vtable[0])];
#define DATA_FROM_CQ(cq) ((void*)(cq + 1)) #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
#define POLLSET_FROM_CQ(cq) \ #define INLINE_POLLSET_FROM_CQ(cq) \
((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
#define POLLSET_FROM_CQ(cq) (cq->pollset)
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
@ -443,6 +550,46 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error);
void grpc_cq_global_init() { void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event); gpr_tls_init(&g_cached_event);
gpr_tls_init(&g_cached_cq); gpr_tls_init(&g_cached_cq);
g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT];
g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK];
g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] =
g_polling_cq_vtable[GRPC_CQ_CALLBACK];
}
// TODO(vjpai): Remove when callback_alternative is no longer needed
void grpc_cq_init() {
// If the iomgr runs in the background, we can use the preferred callback CQ.
// If the iomgr is non-polling, we cannot use the alternative callback CQ.
if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
cq_callback_alternative_data::shared_cq_next_mu.Init();
g_polling_cq_vtable[GRPC_CQ_CALLBACK] = {
GRPC_CQ_CALLBACK,
sizeof(cq_callback_alternative_data),
cq_init_callback_alternative,
cq_shutdown_callback_alternative,
cq_destroy_callback_alternative,
cq_begin_op_for_callback_alternative,
cq_end_op_for_callback_alternative,
nullptr,
nullptr,
cq_proxy_pollset_for_callback_alternative};
}
}
// TODO(vjpai): Remove when callback_alternative is no longer needed
void grpc_cq_shutdown() {
if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
{
grpc_core::MutexLock lock(
&*cq_callback_alternative_data::shared_cq_next_mu);
if (cq_callback_alternative_data::shared_cq_next != nullptr) {
grpc_completion_queue_shutdown(
cq_callback_alternative_data::shared_cq_next);
}
cq_callback_alternative_data::shared_cq_next = nullptr;
}
cq_callback_alternative_data::shared_cq_next_mu.Destroy();
}
} }
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
@ -521,7 +668,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
"polling_type=%d)", "polling_type=%d)",
2, (completion_type, polling_type)); 2, (completion_type, polling_type));
const cq_vtable* vtable = &g_cq_vtable[completion_type]; const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING)
? &g_nonpolling_cq_vtable[completion_type]
: &g_polling_cq_vtable[completion_type];
const cq_poller_vtable* poller_vtable = const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type]; &g_poller_vtable_by_poller_type[polling_type];
@ -538,9 +687,18 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
/* One for destroy(), one for pollset_shutdown */ /* One for destroy(), one for pollset_shutdown */
new (&cq->owning_refs) grpc_core::RefCount(2); new (&cq->owning_refs) grpc_core::RefCount(2);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback); vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
// TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can
// be removed and the nullptr proxy_pollset value below can be the definition
// of POLLSET_FROM_CQ.
cq->pollset = cq->vtable->proxy_pollset == nullptr
? INLINE_POLLSET_FROM_CQ(cq)
: cq->vtable->proxy_pollset(cq);
// Init the inline pollset. If a proxy CQ is used, the proxy pollset will be
// init'ed in its CQ init.
cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu);
GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
return cq; return cq;
@ -578,6 +736,17 @@ static void cq_destroy_callback(void* data) {
cqd->~cq_callback_data(); cqd->~cq_callback_data();
} }
static void cq_init_callback_alternative(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
new (data) cq_callback_alternative_data(shutdown_callback);
}
static void cq_destroy_callback_alternative(void* data) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*>(data);
cqd->~cq_callback_alternative_data();
}
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type; return cq->vtable->cq_completion_type;
} }
@ -618,7 +787,9 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) {
#endif #endif
if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) { if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
cq->vtable->destroy(DATA_FROM_CQ(cq)); cq->vtable->destroy(DATA_FROM_CQ(cq));
cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); // Only destroy the inlined pollset. If a proxy CQ is used, the proxy
// pollset will be destroyed by the proxy CQ.
cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq));
#ifndef NDEBUG #ifndef NDEBUG
gpr_free(cq->outstanding_tags); gpr_free(cq->outstanding_tags);
#endif #endif
@ -669,6 +840,14 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
return cqd->pending_events.IncrementIfNonzero(); return cqd->pending_events.IncrementIfNonzero();
} }
static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
void* tag) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
return grpc_cq_begin_op(cqd->implementation, tag) &&
cqd->pending_events.IncrementIfNonzero();
}
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG #ifndef NDEBUG
gpr_mu_lock(cq->mu); gpr_mu_lock(cq->mu);
@ -832,7 +1011,7 @@ static void cq_end_op_for_pluck(
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void functor_callback(void* arg, grpc_error* error) { void functor_callback(void* arg, grpc_error* error) {
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg); auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
functor->functor_run(functor, error == GRPC_ERROR_NONE); functor->functor_run(functor, error == GRPC_ERROR_NONE);
} }
@ -892,6 +1071,40 @@ static void cq_end_op_for_callback(
GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
} }
static void cq_end_op_for_callback_alternative(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) {
GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0);
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
6, (cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
// Pass through the actual work to the internal nextable CQ
grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage,
internal);
cq_check_tag(cq, tag, true); /* Used in debug builds only */
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_callback_alternative(cq);
}
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, void* done_arg, grpc_cq_completion* storage,
@ -899,6 +1112,13 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
} }
static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq));
return POLLSET_FROM_CQ(cqd->implementation);
}
struct cq_is_finished_arg { struct cq_is_finished_arg {
gpr_atm last_seen_things_queued_ever; gpr_atm last_seen_things_queued_ever;
grpc_completion_queue* cq; grpc_completion_queue* cq;
@ -1379,6 +1599,21 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
auto* callback = cqd->shutdown_callback;
GPR_ASSERT(cqd->shutdown_called);
// Shutdown the non-proxy pollset
cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq),
&cq->pollset_shutdown_done);
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
GRPC_ERROR_NONE);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) { static void cq_shutdown_callback(grpc_completion_queue* cq) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
@ -1405,6 +1640,33 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
} }
static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
* Pollset shutdown decrements the cq ref count which can potentially destroy
* the cq (if that happens to be the last ref).
* Creating an extra ref here prevents the cq from getting destroyed while
* this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
return;
}
cqd->shutdown_called = true;
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback_alternative(cq);
} else {
gpr_mu_unlock(cq->mu);
}
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop /* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */ to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {

@ -69,6 +69,14 @@ void grpc_cq_internal_unref(grpc_completion_queue* cc);
/* Initializes global variables used by completion queues */ /* Initializes global variables used by completion queues */
void grpc_cq_global_init(); void grpc_cq_global_init();
// Completion queue initializations that must be done after iomgr
// TODO(vjpai): Remove when callback_alternative is no longer needed.
void grpc_cq_init();
// Completion queue shutdowns that must be done before iomgr shutdown.
// TODO(vjpai): Remove when callback_alternative is no longer needed.
void grpc_cq_shutdown();
/* Flag that an operation is beginning: the completion channel will not finish /* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made. shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. Return true on success, and \a tag is currently used only in debug builds. Return true on success, and

@ -144,6 +144,7 @@ void grpc_init(void) {
grpc_core::ApplicationCallbackExecCtx::GlobalInit(); grpc_core::ApplicationCallbackExecCtx::GlobalInit();
grpc_core::ExecCtx::GlobalInit(); grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init(); grpc_iomgr_init();
grpc_cq_init();
gpr_timers_global_init(); gpr_timers_global_init();
grpc_core::HandshakerRegistry::Init(); grpc_core::HandshakerRegistry::Init();
grpc_security_init(); grpc_security_init();
@ -169,6 +170,7 @@ void grpc_shutdown_internal_locked(void) {
int i; int i;
{ {
grpc_core::ExecCtx exec_ctx(0); grpc_core::ExecCtx exec_ctx(0);
grpc_cq_shutdown();
grpc_iomgr_shutdown_background_closure(); grpc_iomgr_shutdown_background_closure();
{ {
grpc_timer_manager_set_threading(false); // shutdown timer_manager thread grpc_timer_manager_set_threading(false); // shutdown timer_manager thread

@ -130,10 +130,10 @@ class ClientCallbackEnd2endTest
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
is_server_started_ = true; is_server_started_ = true;
if (GetParam().protocol == Protocol::TCP && // if (GetParam().protocol == Protocol::TCP &&
!grpc_iomgr_run_in_background()) { // !grpc_iomgr_run_in_background()) {
do_not_test_ = true; // do_not_test_ = true;
} // }
} }
void ResetStub() { void ResetStub() {

@ -328,11 +328,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
} }
void SetUp() override { void SetUp() override {
if (GetParam().callback_server && !GetParam().inproc && // if (GetParam().callback_server && !GetParam().inproc &&
!grpc_iomgr_run_in_background()) { // !grpc_iomgr_run_in_background()) {
do_not_test_ = true; // do_not_test_ = true;
return; // return;
} // }
} }
void TearDown() override { void TearDown() override {

@ -119,12 +119,12 @@ class MessageAllocatorEnd2endTestBase
protected: protected:
MessageAllocatorEnd2endTestBase() { MessageAllocatorEnd2endTestBase() {
GetParam().Log(); GetParam().Log();
if (GetParam().protocol == Protocol::TCP) { // if (GetParam().protocol == Protocol::TCP) {
if (!grpc_iomgr_run_in_background()) { // if (!grpc_iomgr_run_in_background()) {
do_not_test_ = true; // do_not_test_ = true;
return; // return;
} // }
} // }
} }
~MessageAllocatorEnd2endTestBase() = default; ~MessageAllocatorEnd2endTestBase() = default;

@ -69,6 +69,11 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(void* /*arg*/, static void DoneWithCompletionOnStack(void* /*arg*/,
grpc_cq_completion* /*completion*/) {} grpc_cq_completion* /*completion*/) {}
static void DoneWithCompletionOnHeap(void* /*arg*/,
grpc_cq_completion* completion) {
delete completion;
}
class DummyTag final : public internal::CompletionQueueTag { class DummyTag final : public internal::CompletionQueueTag {
public: public:
bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
@ -205,8 +210,15 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_init(&shutdown_cv); gpr_cv_init(&shutdown_cv);
bool got_shutdown = false; bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown); ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc = // This test with stack-allocated completions only works for non-polling or
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); // EM-polling callback core CQs. For generality, test with non-polling.
grpc_completion_queue_attributes attr;
attr.version = 2;
attr.cq_completion_type = GRPC_CQ_CALLBACK;
attr.cq_polling_type = GRPC_CQ_NON_POLLING;
attr.cq_shutdown_cb = &shutdown_cb;
grpc_completion_queue* cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
for (auto _ : state) { for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
@ -240,7 +252,53 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_destroy(&shutdown_cv); gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu); gpr_mu_destroy(&shutdown_mu);
} }
static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) {
TrackCounters track_counters;
int iteration = 0, current_iterations = 0;
TagCallback tag_cb(&iteration);
gpr_mu_init(&mu);
gpr_cv_init(&cv);
gpr_mu_init(&shutdown_mu);
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc =
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_cq_completion* completion = new grpc_cq_completion;
GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap,
nullptr, completion);
}
shutdown_and_destroy(cc);
gpr_mu_lock(&mu);
current_iterations = static_cast<int>(state.iterations());
while (current_iterations != iteration) {
// Wait for all the callbacks to complete.
gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&mu);
gpr_mu_lock(&shutdown_mu);
while (!got_shutdown) {
// Wait for the shutdown callback to complete.
gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&shutdown_mu);
GPR_ASSERT(got_shutdown);
GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
track_counters.Finish(state);
gpr_cv_destroy(&cv);
gpr_mu_destroy(&mu);
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
}
BENCHMARK(BM_Callback_CQ_Pass1Core); BENCHMARK(BM_Callback_CQ_Pass1Core);
BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion);
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save