Revert "Merge pull request #23361 from vjpai/em_agnostic_core_callback_cq"

This reverts commit a46cb5e86a, reversing
changes made to b5d42e75fb.
pull/23651/head
Vijay Pai 4 years ago
parent bcb6530717
commit 9c5a39c6db
  1. 2
      src/core/lib/iomgr/ev_posix.cc
  2. 14
      src/core/lib/iomgr/iomgr.cc
  3. 10
      src/core/lib/iomgr/iomgr.h
  4. 282
      src/core/lib/surface/completion_queue.cc
  5. 8
      src/core/lib/surface/completion_queue.h
  6. 2
      src/core/lib/surface/init.cc
  7. 8
      test/cpp/end2end/client_callback_end2end_test.cc
  8. 10
      test/cpp/end2end/end2end_test.cc
  9. 12
      test/cpp/end2end/message_allocator_end2end_test.cc
  10. 62
      test/cpp/microbenchmarks/bm_cq.cc

@ -37,7 +37,6 @@
#include "src/core/lib/iomgr/ev_epollex_linux.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
#include "src/core/lib/iomgr/iomgr.h"
GPR_GLOBAL_CONFIG_DEFINE_STRING(
grpc_poll_strategy, "all",
@ -108,7 +107,6 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
auto ret = grpc_init_poll_posix(explicit_request);
real_poll_function = grpc_poll_function;
grpc_poll_function = dummy_poll;
grpc_iomgr_mark_non_polling_internal();
return ret;
}

@ -50,7 +50,6 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
static bool g_grpc_abort_on_leaks;
static bool g_iomgr_non_polling;
void grpc_iomgr_init() {
grpc_core::ExecCtx exec_ctx;
@ -193,16 +192,3 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) {
}
bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; }
bool grpc_iomgr_non_polling() {
gpr_mu_lock(&g_mu);
bool ret = g_iomgr_non_polling;
gpr_mu_unlock(&g_mu);
return ret;
}
void grpc_iomgr_mark_non_polling_internal() {
gpr_mu_lock(&g_mu);
g_iomgr_non_polling = true;
gpr_mu_unlock(&g_mu);
}

@ -45,16 +45,6 @@ void grpc_iomgr_shutdown_background_closure();
*/
bool grpc_iomgr_run_in_background();
/* Returns true if polling engine is non-polling, false otherwise.
* Currently only 'none' is non-polling.
*/
bool grpc_iomgr_non_polling();
/* Mark the polling engine as non-polling. For internal use only.
* Currently only 'none' is non-polling.
*/
void grpc_iomgr_mark_non_polling_internal();
/** Returns true if the caller is a worker thread for any background poller. */
bool grpc_iomgr_is_any_background_poller_thread();

@ -39,7 +39,6 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
@ -209,9 +208,6 @@ struct cq_vtable {
void* reserved);
grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// TODO(vjpai): Remove proxy_pollset once callback_alternative no longer
// needed.
grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq);
};
namespace {
@ -313,7 +309,7 @@ struct cq_pluck_data {
};
struct cq_callback_data {
explicit cq_callback_data(
cq_callback_data(
grpc_experimental_completion_queue_functor* shutdown_callback)
: shutdown_callback(shutdown_callback) {}
@ -338,81 +334,6 @@ struct cq_callback_data {
grpc_experimental_completion_queue_functor* shutdown_callback;
};
// TODO(vjpai): Remove all callback_alternative variants when event manager is
// the only supported poller.
struct cq_callback_alternative_data {
explicit cq_callback_alternative_data(
grpc_experimental_completion_queue_functor* shutdown_callback)
: implementation(SharedNextableCQ()),
shutdown_callback(shutdown_callback) {}
/* This just points to a single shared nextable CQ */
grpc_completion_queue* const implementation;
/** Number of outstanding events (+1 if not shut down)
Initial count is dropped by grpc_completion_queue_shutdown */
grpc_core::Atomic<intptr_t> pending_events{1};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called = false;
/** A callback that gets invoked when the CQ completes shutdown */
grpc_experimental_completion_queue_functor* shutdown_callback;
static grpc_completion_queue* SharedNextableCQ() {
grpc_core::MutexLock lock(&*shared_cq_next_mu);
if (shared_cq_next == nullptr) {
shared_cq_next = grpc_completion_queue_create_for_next(nullptr);
int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32);
threads_remaining.Store(num_nexting_threads,
grpc_core::MemoryOrder::RELEASE);
for (int i = 0; i < num_nexting_threads; i++) {
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
grpc_completion_queue* cq =
static_cast<grpc_completion_queue*>(arg);
while (true) {
grpc_event event = grpc_completion_queue_next(
cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE);
// We can always execute the callback inline rather than
// pushing it to another Executor thread because this
// thread is definitely running on an executor, does not
// hold any application locks before executing the callback,
// and cannot be entered recursively.
auto* functor = static_cast<
grpc_experimental_completion_queue_functor*>(event.tag);
functor->functor_run(functor, event.success);
}
if (threads_remaining.FetchSub(
1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
grpc_completion_queue_destroy(cq);
}
},
shared_cq_next, nullptr),
GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
}
}
return shared_cq_next;
}
// Use manually-constructed Mutex to avoid static construction issues
static grpc_core::ManualConstructor<grpc_core::Mutex> shared_cq_next_mu;
static grpc_completion_queue*
shared_cq_next; // GUARDED_BY(shared_cq_next_mu)
static grpc_core::Atomic<int> threads_remaining;
};
grpc_core::ManualConstructor<grpc_core::Mutex>
cq_callback_alternative_data::shared_cq_next_mu;
grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr;
grpc_core::Atomic<int> cq_callback_alternative_data::threads_remaining{0};
} // namespace
/* Completion queue structure */
@ -425,12 +346,6 @@ struct grpc_completion_queue {
const cq_vtable* vtable;
const cq_poller_vtable* poller_vtable;
// The pollset entry is allowed to enable proxy CQs like the
// callback_alternative.
// TODO(vjpai): Consider removing pollset and reverting to previous
// calculation of pollset once callback_alternative is no longer needed.
grpc_pollset* pollset;
#ifndef NDEBUG
void** outstanding_tags;
size_t outstanding_tag_count;
@ -445,17 +360,13 @@ struct grpc_completion_queue {
static void cq_finish_shutdown_next(grpc_completion_queue* cq);
static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq);
static void cq_shutdown_next(grpc_completion_queue* cq);
static void cq_shutdown_pluck(grpc_completion_queue* cq);
static void cq_shutdown_callback(grpc_completion_queue* cq);
static void cq_shutdown_callback_alternative(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
void* tag);
// A cq_end_op function is called when an operation on a given CQ with
// a given tag has completed. The storage argument is a reference to the
@ -478,20 +389,12 @@ static void cq_end_op_for_callback(
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_callback_alternative(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
grpc_completion_queue* cq);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
static void cq_init_next(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
@ -499,39 +402,29 @@ static void cq_init_pluck(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_callback(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
// poller becomes only option.
static void cq_init_callback_alternative(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
static void cq_destroy_callback_alternative(void* data);
/* Completion queue vtables based on the completion-type */
// TODO(vjpai): Make this const again once we stop needing callback_alternative
static cq_vtable g_polling_cq_vtable[] = {
static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */
{GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
nullptr, nullptr},
nullptr},
/* GRPC_CQ_PLUCK */
{GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
cq_pluck, nullptr},
cq_pluck},
/* GRPC_CQ_CALLBACK */
{GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
cq_end_op_for_callback, nullptr, nullptr, nullptr},
cq_end_op_for_callback, nullptr, nullptr},
};
// Separate vtable for non-polling cqs, assign at init
static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) /
sizeof(g_polling_cq_vtable[0])];
#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
#define INLINE_POLLSET_FROM_CQ(cq) \
#define POLLSET_FROM_CQ(cq) \
((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
#define POLLSET_FROM_CQ(cq) (cq->pollset)
grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
@ -550,46 +443,6 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
gpr_tls_init(&g_cached_cq);
g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT];
g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK];
g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] =
g_polling_cq_vtable[GRPC_CQ_CALLBACK];
}
// TODO(vjpai): Remove when callback_alternative is no longer needed
void grpc_cq_init() {
// If the iomgr runs in the background, we can use the preferred callback CQ.
// If the iomgr is non-polling, we cannot use the alternative callback CQ.
if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
cq_callback_alternative_data::shared_cq_next_mu.Init();
g_polling_cq_vtable[GRPC_CQ_CALLBACK] = {
GRPC_CQ_CALLBACK,
sizeof(cq_callback_alternative_data),
cq_init_callback_alternative,
cq_shutdown_callback_alternative,
cq_destroy_callback_alternative,
cq_begin_op_for_callback_alternative,
cq_end_op_for_callback_alternative,
nullptr,
nullptr,
cq_proxy_pollset_for_callback_alternative};
}
}
// TODO(vjpai): Remove when callback_alternative is no longer needed
void grpc_cq_shutdown() {
if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) {
{
grpc_core::MutexLock lock(
&*cq_callback_alternative_data::shared_cq_next_mu);
if (cq_callback_alternative_data::shared_cq_next != nullptr) {
grpc_completion_queue_shutdown(
cq_callback_alternative_data::shared_cq_next);
}
cq_callback_alternative_data::shared_cq_next = nullptr;
}
cq_callback_alternative_data::shared_cq_next_mu.Destroy();
}
}
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
@ -668,9 +521,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING)
? &g_nonpolling_cq_vtable[completion_type]
: &g_polling_cq_vtable[completion_type];
const cq_vtable* vtable = &g_cq_vtable[completion_type];
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
@ -687,18 +538,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
/* One for destroy(), one for pollset_shutdown */
new (&cq->owning_refs) grpc_core::RefCount(2);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
// TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can
// be removed and the nullptr proxy_pollset value below can be the definition
// of POLLSET_FROM_CQ.
cq->pollset = cq->vtable->proxy_pollset == nullptr
? INLINE_POLLSET_FROM_CQ(cq)
: cq->vtable->proxy_pollset(cq);
// Init the inline pollset. If a proxy CQ is used, the proxy pollset will be
// init'ed in its CQ init.
cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu);
GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
return cq;
@ -736,17 +578,6 @@ static void cq_destroy_callback(void* data) {
cqd->~cq_callback_data();
}
static void cq_init_callback_alternative(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
new (data) cq_callback_alternative_data(shutdown_callback);
}
static void cq_destroy_callback_alternative(void* data) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*>(data);
cqd->~cq_callback_alternative_data();
}
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
}
@ -787,9 +618,7 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) {
#endif
if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
// Only destroy the inlined pollset. If a proxy CQ is used, the proxy
// pollset will be destroyed by the proxy CQ.
cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq));
cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
gpr_free(cq->outstanding_tags);
#endif
@ -840,14 +669,6 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
return cqd->pending_events.IncrementIfNonzero();
}
static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq,
void* tag) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
return grpc_cq_begin_op(cqd->implementation, tag) &&
cqd->pending_events.IncrementIfNonzero();
}
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
@ -1011,7 +832,7 @@ static void cq_end_op_for_pluck(
GRPC_ERROR_UNREF(error);
}
void functor_callback(void* arg, grpc_error* error) {
static void functor_callback(void* arg, grpc_error* error) {
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
functor->functor_run(functor, error == GRPC_ERROR_NONE);
}
@ -1071,40 +892,6 @@ static void cq_end_op_for_callback(
GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
}
static void cq_end_op_for_callback_alternative(
grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) {
GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0);
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
6, (cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
// Pass through the actual work to the internal nextable CQ
grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage,
internal);
cq_check_tag(cq, tag, true); /* Used in debug builds only */
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
cq_finish_shutdown_callback_alternative(cq);
}
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage,
@ -1112,13 +899,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
}
static grpc_pollset* cq_proxy_pollset_for_callback_alternative(
grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*>(DATA_FROM_CQ(cq));
return POLLSET_FROM_CQ(cqd->implementation);
}
struct cq_is_finished_arg {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue* cq;
@ -1599,21 +1379,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GRPC_ERROR_NONE);
}
static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
auto* callback = cqd->shutdown_callback;
GPR_ASSERT(cqd->shutdown_called);
// Shutdown the non-proxy pollset
cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq),
&cq->pollset_shutdown_done);
grpc_core::Executor::Run(
GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr),
GRPC_ERROR_NONE);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
@ -1640,33 +1405,6 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) {
cq_callback_alternative_data* cqd =
static_cast<cq_callback_alternative_data*> DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
* Pollset shutdown decrements the cq ref count which can potentially destroy
* the cq (if that happens to be the last ref).
* Creating an extra ref here prevents the cq from getting destroyed while
* this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
return;
}
cqd->shutdown_called = true;
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback_alternative(cq);
} else {
gpr_mu_unlock(cq->mu);
}
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {

@ -69,14 +69,6 @@ void grpc_cq_internal_unref(grpc_completion_queue* cc);
/* Initializes global variables used by completion queues */
void grpc_cq_global_init();
// Completion queue initializations that must be done after iomgr
// TODO(vjpai): Remove when callback_alternative is no longer needed.
void grpc_cq_init();
// Completion queue shutdowns that must be done before iomgr shutdown.
// TODO(vjpai): Remove when callback_alternative is no longer needed.
void grpc_cq_shutdown();
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. Return true on success, and

@ -144,7 +144,6 @@ void grpc_init(void) {
grpc_core::ApplicationCallbackExecCtx::GlobalInit();
grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init();
grpc_cq_init();
gpr_timers_global_init();
grpc_core::HandshakerRegistry::Init();
grpc_security_init();
@ -170,7 +169,6 @@ void grpc_shutdown_internal_locked(void) {
int i;
{
grpc_core::ExecCtx exec_ctx(0);
grpc_cq_shutdown();
grpc_iomgr_shutdown_background_closure();
{
grpc_timer_manager_set_threading(false); // shutdown timer_manager thread

@ -130,10 +130,10 @@ class ClientCallbackEnd2endTest
server_ = builder.BuildAndStart();
is_server_started_ = true;
// if (GetParam().protocol == Protocol::TCP &&
// !grpc_iomgr_run_in_background()) {
// do_not_test_ = true;
// }
if (GetParam().protocol == Protocol::TCP &&
!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
}
}
void ResetStub() {

@ -328,11 +328,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
}
void SetUp() override {
// if (GetParam().callback_server && !GetParam().inproc &&
// !grpc_iomgr_run_in_background()) {
// do_not_test_ = true;
// return;
// }
if (GetParam().callback_server && !GetParam().inproc &&
!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
return;
}
}
void TearDown() override {

@ -119,12 +119,12 @@ class MessageAllocatorEnd2endTestBase
protected:
MessageAllocatorEnd2endTestBase() {
GetParam().Log();
// if (GetParam().protocol == Protocol::TCP) {
// if (!grpc_iomgr_run_in_background()) {
// do_not_test_ = true;
// return;
// }
// }
if (GetParam().protocol == Protocol::TCP) {
if (!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
return;
}
}
}
~MessageAllocatorEnd2endTestBase() = default;

@ -69,11 +69,6 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(void* /*arg*/,
grpc_cq_completion* /*completion*/) {}
static void DoneWithCompletionOnHeap(void* /*arg*/,
grpc_cq_completion* completion) {
delete completion;
}
class DummyTag final : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
@ -210,15 +205,8 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
// This test with stack-allocated completions only works for non-polling or
// EM-polling callback core CQs. For generality, test with non-polling.
grpc_completion_queue_attributes attr;
attr.version = 2;
attr.cq_completion_type = GRPC_CQ_CALLBACK;
attr.cq_polling_type = GRPC_CQ_NON_POLLING;
attr.cq_shutdown_cb = &shutdown_cb;
grpc_completion_queue* cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
grpc_completion_queue* cc =
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
@ -252,53 +240,7 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
}
static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) {
TrackCounters track_counters;
int iteration = 0, current_iterations = 0;
TagCallback tag_cb(&iteration);
gpr_mu_init(&mu);
gpr_cv_init(&cv);
gpr_mu_init(&shutdown_mu);
gpr_cv_init(&shutdown_cv);
bool got_shutdown = false;
ShutdownCallback shutdown_cb(&got_shutdown);
grpc_completion_queue* cc =
grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
for (auto _ : state) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_cq_completion* completion = new grpc_cq_completion;
GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap,
nullptr, completion);
}
shutdown_and_destroy(cc);
gpr_mu_lock(&mu);
current_iterations = static_cast<int>(state.iterations());
while (current_iterations != iteration) {
// Wait for all the callbacks to complete.
gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&mu);
gpr_mu_lock(&shutdown_mu);
while (!got_shutdown) {
// Wait for the shutdown callback to complete.
gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&shutdown_mu);
GPR_ASSERT(got_shutdown);
GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
track_counters.Finish(state);
gpr_cv_destroy(&cv);
gpr_mu_destroy(&mu);
gpr_cv_destroy(&shutdown_cv);
gpr_mu_destroy(&shutdown_mu);
}
BENCHMARK(BM_Callback_CQ_Pass1Core);
BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion);
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save