Adds gRPC Experimental CQ DoThenAsyncNext lambda API

pull/13084/head
Ken Payson 7 years ago
parent 0d1150855d
commit 42bd87e376
  1. 2
      grpc.def
  2. 39
      include/grpc++/impl/codegen/completion_queue.h
  3. 17
      include/grpc/grpc.h
  4. 58
      src/core/lib/surface/completion_queue.cc
  5. 3
      src/core/lib/surface/completion_queue.h
  6. 1
      src/core/lib/surface/init.cc
  7. 25
      src/cpp/common/completion_queue_cc.cc
  8. 4
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  9. 6
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  10. 76
      test/core/surface/completion_queue_test.c
  11. 111
      test/cpp/end2end/async_end2end_test.cc
  12. 42
      test/cpp/qps/client.h
  13. 38
      test/cpp/qps/client_async.cc
  14. 39
      test/cpp/qps/client_sync.cc
  15. 27
      test/cpp/qps/server_async.cc

@ -54,6 +54,8 @@ EXPORTS
grpc_completion_queue_pluck grpc_completion_queue_pluck
grpc_completion_queue_shutdown grpc_completion_queue_shutdown
grpc_completion_queue_destroy grpc_completion_queue_destroy
grpc_completion_queue_thread_local_cache_init
grpc_completion_queue_thread_local_cache_flush
grpc_alarm_create grpc_alarm_create
grpc_alarm_set grpc_alarm_set
grpc_alarm_cancel grpc_alarm_cancel

@ -109,6 +109,30 @@ class CompletionQueue : private GrpcLibraryCodegen {
TIMEOUT ///< deadline was reached. TIMEOUT ///< deadline was reached.
}; };
/// EXPERIMENTAL
/// First executes \a F, then reads from the queue, blocking up to
/// \a deadline (or the queue's shutdown).
/// Both \a tag and \a ok are updated upon success (if an event is available
/// within the \a deadline). A \a tag points to an arbitrary location usually
/// employed to uniquely identify an event.
///
/// \param F[in] Function to execute before calling AsyncNext on this queue.
/// \param tag[out] Upon sucess, updated to point to the event's tag.
/// \param ok[out] Upon sucess, true if read a regular event, false otherwise.
/// \param deadline[in] How long to block in wait for an event.
///
/// \return The type of event read.
template <typename T, typename F>
NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
f();
if (cache.Flush(tag, ok)) {
return GOT_EVENT;
} else {
return AsyncNext(tag, ok, deadline);
}
}
/// Read from the queue, blocking up to \a deadline (or the queue's shutdown). /// Read from the queue, blocking up to \a deadline (or the queue's shutdown).
/// Both \a tag and \a ok are updated upon success (if an event is available /// Both \a tag and \a ok are updated upon success (if an event is available
/// within the \a deadline). A \a tag points to an arbitrary location usually /// within the \a deadline). A \a tag points to an arbitrary location usually
@ -213,6 +237,21 @@ class CompletionQueue : private GrpcLibraryCodegen {
const InputMessage& request, const InputMessage& request,
OutputMessage* result); OutputMessage* result);
/// EXPERIMENTAL
/// Creates a Thread Local cache to store the first event
/// On this completion queue queued from this thread. Once
/// initialized, it must be flushed on the same thread.
class CompletionQueueTLSCache {
public:
CompletionQueueTLSCache(CompletionQueue* cq);
~CompletionQueueTLSCache();
bool Flush(void** tag, bool* ok);
private:
CompletionQueue* cq_;
bool flushed_;
};
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
/// Wraps \a grpc_completion_queue_pluck. /// Wraps \a grpc_completion_queue_pluck.

@ -143,6 +143,23 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
drained and no threads are executing grpc_completion_queue_next */ drained and no threads are executing grpc_completion_queue_next */
GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq); GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq);
/*********** EXPERIMENTAL API ************/
/** Initializes a thread local cache for \a cq.
* grpc_flush_cq_tls_cache() MUST be called on the same thread,
* with the same cq.
*/
GRPCAPI void grpc_completion_queue_thread_local_cache_init(
grpc_completion_queue *cq);
/*********** EXPERIMENTAL API ************/
/** Flushes the thread local cache for \a cq.
* Returns 1 if there was contents in the cache. If there was an event
* in \a cq tls cache, its tag is placed in tag, and ok is set to the
* event success.
*/
GRPCAPI int grpc_completion_queue_thread_local_cache_flush(
grpc_completion_queue *cq, void **tag, int *ok);
/** Create a completion queue alarm instance */ /** Create a completion queue alarm instance */
GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved); GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved);

@ -28,6 +28,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <grpc/support/tls.h>
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset.h"
@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount =
GRPC_TRACER_INITIALIZER(false, "cq_refcount"); GRPC_TRACER_INITIALIZER(false, "cq_refcount");
#endif #endif
// Specifies a cq thread local cache.
// The first event that occurs on a thread
// with a cq cache will go into that cache, and
// will only be returned on the thread that initialized the cache.
// NOTE: Only one event will ever be cached.
GPR_TLS_DECL(g_cached_event);
GPR_TLS_DECL(g_cached_cq);
typedef struct { typedef struct {
grpc_pollset_worker **worker; grpc_pollset_worker **worker;
void *tag; void *tag;
@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace =
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
grpc_error *error); grpc_error *error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
gpr_tls_init(&g_cached_cq);
}
void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) {
if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) {
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)cq);
}
}
int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq,
void **tag, int *ok) {
grpc_cq_completion *storage =
(grpc_cq_completion *)gpr_tls_get(&g_cached_event);
int ret = 0;
if (storage != NULL &&
(grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
storage->done(&exec_ctx, storage->done_arg, storage);
*ok = (storage->next & (uintptr_t)(1)) == 1;
ret = 1;
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(&exec_ctx, cq);
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
}
grpc_exec_ctx_finish(&exec_ctx);
}
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)0);
return ret;
}
static void cq_event_queue_init(grpc_cq_event_queue *q) { static void cq_event_queue_init(grpc_cq_event_queue *q) {
gpr_mpscq_init(&q->queue); gpr_mpscq_init(&q->queue);
q->queue_lock = GPR_SPINLOCK_INITIALIZER; q->queue_lock = GPR_SPINLOCK_INITIALIZER;
@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
} }
} }
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE); int is_success = (error == GRPC_ERROR_NONE);
@ -628,12 +676,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
cq_check_tag(cq, tag, true); /* Used in debug builds only */ cq_check_tag(cq, tag, true); /* Used in debug builds only */
if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq &&
(grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) {
gpr_tls_set(&g_cached_event, (intptr_t)storage);
} else {
/* Add the completion to the queue */ /* Add the completion to the queue */
bool is_first = cq_event_queue_push(&cqd->queue, storage); bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
/* Since we do not hold the cq lock here, it is important to do an 'acquire' /* Since we do not hold the cq lock here, it is important to do an 'acquire'
load here (instead of a 'no_barrier' load) to match with the release store load here (instead of a 'no_barrier' load) to match with the release
store
(done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
*/ */
bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
@ -667,6 +720,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(cq->mu); gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
} }
}
GPR_TIMER_END("cq_end_op_for_next", 0); GPR_TIMER_END("cq_end_op_for_next", 0);

@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc)
#endif #endif
/* Initializes global variables used by completion queues */
void grpc_cq_global_init();
/* Flag that an operation is beginning: the completion channel will not finish /* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made. shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. Return true on success, and \a tag is currently used only in debug builds. Return true on success, and

@ -64,6 +64,7 @@ static void do_basic_init(void) {
gpr_log_verbosity_init(); gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu); gpr_mu_init(&g_init_mu);
grpc_register_built_in_plugins(); grpc_register_built_in_plugins();
grpc_cq_global_init();
g_initializations = 0; g_initializations = 0;
} }

@ -71,4 +71,29 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
} }
} }
CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
CompletionQueue* cq)
: cq_(cq), flushed_(false) {
grpc_completion_queue_thread_local_cache_init(cq_->cq_);
}
CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
GPR_ASSERT(flushed_);
}
bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
int res = 0;
void* res_tag;
flushed_ = true;
if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
&res)) {
auto cq_tag = static_cast<CompletionQueueTag*>(res_tag);
*ok = res == 1;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
}
}
return false;
}
} // namespace grpc } // namespace grpc

@ -77,6 +77,8 @@ grpc_completion_queue_next_type grpc_completion_queue_next_import;
grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import;
grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import; grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import;
grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import;
grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import;
grpc_alarm_create_type grpc_alarm_create_import; grpc_alarm_create_type grpc_alarm_create_import;
grpc_alarm_set_type grpc_alarm_set_import; grpc_alarm_set_type grpc_alarm_set_import;
grpc_alarm_cancel_type grpc_alarm_cancel_import; grpc_alarm_cancel_type grpc_alarm_cancel_import;
@ -385,6 +387,8 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck"); grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck");
grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown"); grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown");
grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy"); grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy");
grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init");
grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush");
grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create"); grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create");
grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set"); grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set");
grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");

@ -212,6 +212,12 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import
typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq); typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq);
extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
#define grpc_completion_queue_destroy grpc_completion_queue_destroy_import #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import
typedef void(*grpc_completion_queue_thread_local_cache_init_type)(grpc_completion_queue *cq);
extern grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import;
#define grpc_completion_queue_thread_local_cache_init grpc_completion_queue_thread_local_cache_init_import
typedef int(*grpc_completion_queue_thread_local_cache_flush_type)(grpc_completion_queue *cq, void **tag, int *ok);
extern grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import;
#define grpc_completion_queue_thread_local_cache_flush grpc_completion_queue_thread_local_cache_flush_import
typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved); typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved);
extern grpc_alarm_create_type grpc_alarm_create_import; extern grpc_alarm_create_type grpc_alarm_create_import;
#define grpc_alarm_create grpc_alarm_create_import #define grpc_alarm_create grpc_alarm_create_import

@ -158,6 +158,80 @@ static void test_cq_end_op(void) {
} }
} }
static void test_cq_tls_cache_full(void) {
grpc_event ev;
grpc_completion_queue *cc;
grpc_cq_completion completion;
grpc_cq_polling_type polling_types[] = {
GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
grpc_completion_queue_attributes attr;
grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx exec_ctx;
void *tag = create_test_tag();
void *res_tag;
int ok;
LOG_TEST("test_cq_tls_cache_full");
attr.version = 1;
attr.cq_completion_type = GRPC_CQ_NEXT;
for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
exec_ctx = init_exec_ctx; // Reset exec_ctx
attr.cq_polling_type = polling_types[i];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
grpc_completion_queue_thread_local_cache_init(cc);
GPR_ASSERT(grpc_cq_begin_op(cc, tag));
grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE,
do_nothing_end_completion, NULL, &completion);
ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
GPR_ASSERT(
grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
GPR_ASSERT(res_tag == tag);
GPR_ASSERT(ok);
ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
shutdown_and_destroy(cc);
grpc_exec_ctx_finish(&exec_ctx);
}
}
static void test_cq_tls_cache_empty(void) {
grpc_completion_queue *cc;
grpc_cq_polling_type polling_types[] = {
GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
grpc_completion_queue_attributes attr;
grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx exec_ctx;
void *res_tag;
int ok;
LOG_TEST("test_cq_tls_cache_empty");
attr.version = 1;
attr.cq_completion_type = GRPC_CQ_NEXT;
for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
exec_ctx = init_exec_ctx; // Reset exec_ctx
attr.cq_polling_type = polling_types[i];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
GPR_ASSERT(
grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
grpc_completion_queue_thread_local_cache_init(cc);
GPR_ASSERT(
grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
shutdown_and_destroy(cc);
grpc_exec_ctx_finish(&exec_ctx);
}
}
static void test_shutdown_then_next_polling(void) { static void test_shutdown_then_next_polling(void) {
grpc_cq_polling_type polling_types[] = { grpc_cq_polling_type polling_types[] = {
GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
@ -300,6 +374,8 @@ int main(int argc, char **argv) {
test_cq_end_op(); test_cq_end_op();
test_pluck(); test_pluck();
test_pluck_after_shutdown(); test_pluck_after_shutdown();
test_cq_tls_cache_full();
test_cq_tls_cache_empty();
grpc_shutdown(); grpc_shutdown();
return 0; return 0;
} }

@ -99,7 +99,7 @@ class PollingOverrider {
class Verifier { class Verifier {
public: public:
explicit Verifier(bool spin) : spin_(spin) {} explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag // Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) { Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false); return ExpectUnless(i, expect_ok, false);
@ -142,6 +142,18 @@ class Verifier {
return detag(got_tag); return detag(got_tag);
} }
template <typename T>
CompletionQueue::NextStatus DoOnceThenAsyncNext(
CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
std::function<void(void)> lambda) {
if (lambda_run_) {
return cq->AsyncNext(got_tag, ok, deadline);
} else {
lambda_run_ = true;
return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
}
}
// Verify keeps calling Next until all currently set // Verify keeps calling Next until all currently set
// expected tags are complete // expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); } void Verify(CompletionQueue* cq) { Verify(cq, false); }
@ -154,6 +166,7 @@ class Verifier {
Next(cq, ignore_ok); Next(cq, ignore_ok);
} }
} }
// This version of Verify stops after a certain deadline // This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq, void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) { std::chrono::system_clock::time_point deadline) {
@ -193,6 +206,47 @@ class Verifier {
} }
} }
// This version of Verify stops after a certain deadline, and uses the
// DoThenAsyncNext API
// to call the lambda
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline,
std::function<void(void)> lambda) {
if (expectations_.empty()) {
bool ok;
void* got_tag;
if (spin_) {
while (std::chrono::system_clock::now() < deadline) {
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::TIMEOUT);
}
} else {
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::TIMEOUT);
}
} else {
while (!expectations_.empty()) {
bool ok;
void* got_tag;
if (spin_) {
for (;;) {
GPR_ASSERT(std::chrono::system_clock::now() < deadline);
auto r = DoOnceThenAsyncNext(
cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::GOT_EVENT);
}
GotTag(got_tag, ok, false);
}
}
}
private: private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) { void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag); auto it = expectations_.find(got_tag);
@ -226,6 +280,7 @@ class Verifier {
std::map<void*, bool> expectations_; std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_; std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_; bool spin_;
bool lambda_run_;
}; };
bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
// Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
std::chrono::system_clock::time_point time_now(
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
auto resp_writer_ptr = &response_writer;
auto lambda_2 = [&, this, resp_writer_ptr]() {
gpr_log(GPR_ERROR, "CALLED");
service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
cq_.get(), tag(2));
};
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Verify(cq_.get(), time_limit, lambda_2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto recv_resp_ptr = &recv_response;
auto status_ptr = &recv_status;
send_response.set_message(recv_request.message());
auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
};
response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
lambda_3);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
// Two pings and a final pong. // Two pings and a final pong.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub(); ResetStub();

@ -226,8 +226,6 @@ class Client {
} }
virtual void DestroyMultithreading() = 0; virtual void DestroyMultithreading() = 0;
virtual void InitThreadFunc(size_t thread_idx) = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) { void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads // Set up the load distribution based on the number of threads
@ -275,7 +273,6 @@ class Client {
: std::bind(&Client::NextIssueTime, this, thread_idx); : std::bind(&Client::NextIssueTime, this, thread_idx);
} }
private:
class Thread { class Thread {
public: public:
Thread(Client* client, size_t idx) Thread(Client* client, size_t idx)
@ -295,6 +292,16 @@ class Client {
MergeStatusHistogram(statuses_, s); MergeStatusHistogram(statuses_, s);
} }
void UpdateHistogram(HistogramEntry* entry) {
std::lock_guard<std::mutex> g(mu_);
if (entry->value_used()) {
histogram_.Add(entry->value());
}
if (entry->status_used()) {
statuses_[entry->status()]++;
}
}
private: private:
Thread(const Thread&); Thread(const Thread&);
Thread& operator=(const Thread&); Thread& operator=(const Thread&);
@ -310,29 +317,8 @@ class Client {
wait_loop++; wait_loop++;
} }
client_->InitThreadFunc(idx_); client_->ThreadFunc(idx_, this);
for (;;) {
// run the loop body
HistogramEntry entry;
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
if (entry.value_used()) {
histogram_.Add(entry.value());
}
if (entry.status_used()) {
statuses_[entry.status()]++;
}
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
}
if (!thread_still_ok ||
static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
client_->CompleteThread(); client_->CompleteThread();
return;
}
}
} }
std::mutex mu_; std::mutex mu_;
@ -343,6 +329,12 @@ class Client {
std::thread impl_; std::thread impl_;
}; };
bool ThreadCompleted() {
return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
}
virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
std::vector<std::unique_ptr<Thread>> threads_; std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<UsageTimer> timer_; std::unique_ptr<UsageTimer> timer_;

@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution this->EndThreads(); // this needed for resolution
} }
void InitThreadFunc(size_t thread_idx) override final {} void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
void* got_tag; void* got_tag;
bool ok; bool ok;
if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { HistogramEntry entry;
HistogramEntry* entry_ptr = &entry;
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
return;
}
ClientRpcContext* ctx;
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
do {
t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it // Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that // Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down // this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); shutdown_mu->lock();
if (shutdown_state_[thread_idx]->shutdown) { if (shutdown_state_[thread_idx]->shutdown) {
ctx->TryCancel(); ctx->TryCancel();
delete ctx; delete ctx;
return true; while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ctx = ClientRpcContext::detag(got_tag);
ctx->TryCancel();
delete ctx;
}
shutdown_mu->unlock();
return;
} }
if (!ctx->RunNextState(ok, entry)) { } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
[&, ctx, ok, entry_ptr, shutdown_mu]() {
bool next_ok = ok;
if (!ctx->RunNextState(next_ok, entry_ptr)) {
// The RPC and callback are done, so clone the ctx // The RPC and callback are done, so clone the ctx
// and kickstart the new one // and kickstart the new one
ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
delete ctx; delete ctx;
} }
return true; shutdown_mu->unlock();
} else { },
// queue is shutting down, so we must be done &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
return true;
}
} }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;

@ -62,6 +62,25 @@ class SynchronousClient
virtual ~SynchronousClient(){}; virtual ~SynchronousClient(){};
virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
void ThreadFunc(size_t thread_idx, Thread* t) override {
InitThreadFuncImpl(thread_idx);
for (;;) {
// run the loop body
HistogramEntry entry;
const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
t->UpdateHistogram(&entry);
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
}
if (!thread_still_ok || ThreadCompleted()) {
return;
}
}
}
protected: protected:
// WaitToIssue returns false if we realize that we need to break out // WaitToIssue returns false if we realize that we need to break out
bool WaitToIssue(int thread_idx) { bool WaitToIssue(int thread_idx) {
@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
} }
~SynchronousUnaryClient() {} ~SynchronousUnaryClient() {}
void InitThreadFunc(size_t thread_idx) override {} void InitThreadFuncImpl(size_t thread_idx) override {}
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) { if (!WaitToIssue(thread_idx)) {
return true; return true;
} }
@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final
} }
} }
void InitThreadFunc(size_t thread_idx) override { void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
messages_issued_[thread_idx] = 0; messages_issued_[thread_idx] = 0;
} }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) { if (!WaitToIssue(thread_idx)) {
return true; return true;
} }
@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final
} }
} }
void InitThreadFunc(size_t thread_idx) override { void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
&responses_[thread_idx]); &responses_[thread_idx]);
last_issue_[thread_idx] = UsageTimer::Now(); last_issue_[thread_idx] = UsageTimer::Now();
} }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced // Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) { if (!WaitToIssue(thread_idx)) {
return true; return true;
@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final
public: public:
SynchronousStreamingFromServerClient(const ClientConfig& config) SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {} : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
void InitThreadFunc(size_t thread_idx) override { void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stream_[thread_idx] =
stub->StreamingFromServer(&context_[thread_idx], request_); stub->StreamingFromServer(&context_[thread_idx], request_);
last_recv_[thread_idx] = UsageTimer::Now(); last_recv_[thread_idx] = UsageTimer::Now();
} }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) { if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
double now = UsageTimer::Now(); double now = UsageTimer::Now();
@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final
} }
} }
void InitThreadFunc(size_t thread_idx) override { void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub(); auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
} }
bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this // TODO (vjpai): Do this
return true; return true;
} }

@ -194,23 +194,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void *got_tag; void *got_tag;
while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag); return;
}
ServerRpcContext *ctx;
std::mutex *mu_ptr;
do {
ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that // Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down // this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); mu_ptr = &shutdown_state_[thread_idx]->mutex;
mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) { if (shutdown_state_[thread_idx]->shutdown) {
mu_ptr->unlock();
return; return;
} }
std::lock_guard<ServerRpcContext> l2(*ctx); } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
const bool still_going = ctx->RunNextState(ok); [&, ctx, ok, mu_ptr]() {
// if this RPC context is done, refresh it ctx->lock();
if (!still_going) { if (!ctx->RunNextState(ok)) {
ctx->Reset(); ctx->Reset();
} }
} ctx->unlock();
return; mu_ptr->unlock();
},
&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
} }
class ServerRpcContext { class ServerRpcContext {

Loading…
Cancel
Save