diff --git a/grpc.def b/grpc.def index 558be60c3c1..e4281f3ab67 100644 --- a/grpc.def +++ b/grpc.def @@ -54,6 +54,8 @@ EXPORTS grpc_completion_queue_pluck grpc_completion_queue_shutdown grpc_completion_queue_destroy + grpc_completion_queue_thread_local_cache_init + grpc_completion_queue_thread_local_cache_flush grpc_alarm_create grpc_alarm_set grpc_alarm_cancel diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ca757e2a9c5..e2c0c29dca4 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -109,6 +109,30 @@ class CompletionQueue : private GrpcLibraryCodegen { TIMEOUT ///< deadline was reached. }; + /// EXPERIMENTAL + /// First executes \a F, then reads from the queue, blocking up to + /// \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param F[in] Function to execute before calling AsyncNext on this queue. + /// \param tag[out] Upon sucess, updated to point to the event's tag. + /// \param ok[out] Upon sucess, true if read a regular event, false otherwise. + /// \param deadline[in] How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { + CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); + f(); + if (cache.Flush(tag, ok)) { + return GOT_EVENT; + } else { + return AsyncNext(tag, ok, deadline); + } + } + /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). /// Both \a tag and \a ok are updated upon success (if an event is available /// within the \a deadline). A \a tag points to an arbitrary location usually @@ -213,6 +237,21 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// EXPERIMENTAL + /// Creates a Thread Local cache to store the first event + /// On this completion queue queued from this thread. Once + /// initialized, it must be flushed on the same thread. + class CompletionQueueTLSCache { + public: + CompletionQueueTLSCache(CompletionQueue* cq); + ~CompletionQueueTLSCache(); + bool Flush(void** tag, bool* ok); + + private: + CompletionQueue* cq_; + bool flushed_; + }; + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 1de289fba45..6df3b8086e8 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -143,6 +143,23 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq); drained and no threads are executing grpc_completion_queue_next */ GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq); +/*********** EXPERIMENTAL API ************/ +/** Initializes a thread local cache for \a cq. + * grpc_flush_cq_tls_cache() MUST be called on the same thread, + * with the same cq. + */ +GRPCAPI void grpc_completion_queue_thread_local_cache_init( + grpc_completion_queue *cq); + +/*********** EXPERIMENTAL API ************/ +/** Flushes the thread local cache for \a cq. + * Returns 1 if there was contents in the cache. If there was an event + * in \a cq tls cache, its tag is placed in tag, and ok is set to the + * event success. + */ +GRPCAPI int grpc_completion_queue_thread_local_cache_flush( + grpc_completion_queue *cq, void **tag, int *ok); + /** Create a completion queue alarm instance */ GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved); diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 5aba5078a88..6efcff84b8b 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -452,6 +452,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 21664f03c8b..5009f786e68 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" @@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif +// Specifies a cq thread local cache. +// The first event that occurs on a thread +// with a cq cache will go into that cache, and +// will only be returned on the thread that initialized the cache. +// NOTE: Only one event will ever be cached. +GPR_TLS_DECL(g_cached_event); +GPR_TLS_DECL(g_cached_cq); + typedef struct { grpc_pollset_worker **worker; void *tag; @@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); +void grpc_cq_global_init() { + gpr_tls_init(&g_cached_event); + gpr_tls_init(&g_cached_cq); +} + +void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) { + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)cq); + } +} + +int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, + void **tag, int *ok) { + grpc_cq_completion *storage = + (grpc_cq_completion *)gpr_tls_get(&g_cached_event); + int ret = 0; + if (storage != NULL && + (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) { + *tag = storage->tag; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + storage->done(&exec_ctx, storage->done_arg, storage); + *ok = (storage->next & (uintptr_t)(1)) == 1; + ret = 1; + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(&exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); + } + grpc_exec_ctx_finish(&exec_ctx); + } + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)0); + + return ret; +} + static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); q->queue_lock = GPR_SPINLOCK_INITIALIZER; @@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); @@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, cq_check_tag(cq, tag, true); /* Used in debug builds only */ - /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - /* Since we do not hold the cq lock here, it is important to do an 'acquire' - load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next - */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { - /* Only kick if this is the first item queued */ - if (is_first) { - gpr_mu_lock(cq->mu); - grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cq->mu); + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq && + (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)storage); + } else { + /* Add the completion to the queue */ + bool is_first = cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release + store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - GRPC_ERROR_UNREF(kick_error); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } } - } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } + } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } - } else { - GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); - gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 69d144bd95c..c02bc5da071 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif +/* Initializes global variables used by completion queues */ +void grpc_cq_global_init(); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index b089da2c54f..058e88f8048 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -64,6 +64,7 @@ static void do_basic_init(void) { gpr_log_verbosity_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); + grpc_cq_global_init(); g_initializations = 0; } diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index f34b0f3d583..4a2e2be6880 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -71,4 +71,29 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } +CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache( + CompletionQueue* cq) + : cq_(cq), flushed_(false) { + grpc_completion_queue_thread_local_cache_init(cq_->cq_); +} + +CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() { + GPR_ASSERT(flushed_); +} + +bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { + int res = 0; + void* res_tag; + flushed_ = true; + if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, + &res)) { + auto cq_tag = static_cast(res_tag); + *ok = res == 1; + if (cq_tag->FinalizeResult(tag, ok)) { + return true; + } + } + return false; +} + } // namespace grpc diff --git a/src/cpp/util/error_details.cc b/src/cpp/util/error_details.cc index 44bc4d16485..f06b475683a 100644 --- a/src/cpp/util/error_details.cc +++ b/src/cpp/util/error_details.cc @@ -37,7 +37,8 @@ Status SetErrorDetails(const ::google::rpc::Status& from, Status* to) { return Status(StatusCode::FAILED_PRECONDITION, ""); } StatusCode code = StatusCode::UNKNOWN; - if (from.code() >= StatusCode::OK && from.code() <= StatusCode::DATA_LOSS) { + if (from.code() >= StatusCode::OK && + from.code() <= StatusCode::UNAUTHENTICATED) { code = static_cast(from.code()); } *to = Status(code, from.message(), from.SerializeAsString()); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 70831494fad..cd1bd98abcd 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -77,6 +77,8 @@ grpc_completion_queue_next_type grpc_completion_queue_next_import; grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import; grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; +grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import; +grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import; grpc_alarm_create_type grpc_alarm_create_import; grpc_alarm_set_type grpc_alarm_set_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; @@ -385,6 +387,8 @@ void grpc_rb_load_imports(HMODULE library) { grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck"); grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown"); grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy"); + grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init"); + grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush"); grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create"); grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set"); grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 868772cfc85..c7e78b70dcb 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -212,6 +212,12 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq); extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import +typedef void(*grpc_completion_queue_thread_local_cache_init_type)(grpc_completion_queue *cq); +extern grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import; +#define grpc_completion_queue_thread_local_cache_init grpc_completion_queue_thread_local_cache_init_import +typedef int(*grpc_completion_queue_thread_local_cache_flush_type)(grpc_completion_queue *cq, void **tag, int *ok); +extern grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import; +#define grpc_completion_queue_thread_local_cache_flush grpc_completion_queue_thread_local_cache_flush_import typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved); extern grpc_alarm_create_type grpc_alarm_create_import; #define grpc_alarm_create grpc_alarm_create_import diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index e6372a379ca..e4e4c9f1b2d 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -158,6 +158,80 @@ static void test_cq_end_op(void) { } } +static void test_cq_tls_cache_full(void) { + grpc_event ev; + grpc_completion_queue *cc; + grpc_cq_completion completion; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *tag = create_test_tag(); + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_full"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT(grpc_cq_begin_op(cc, tag)); + grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completion); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1); + GPR_ASSERT(res_tag == tag); + GPR_ASSERT(ok); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +static void test_cq_tls_cache_empty(void) { + grpc_completion_queue *cc; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_empty"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + static void test_shutdown_then_next_polling(void) { grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; @@ -300,6 +374,8 @@ int main(int argc, char **argv) { test_cq_end_op(); test_pluck(); test_pluck_after_shutdown(); + test_cq_tls_cache_full(); + test_cq_tls_cache_empty(); grpc_shutdown(); return 0; } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae115..b7634d04381 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -99,7 +99,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +142,18 @@ class Verifier { return detag(got_tag); } + template + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +166,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +206,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +280,7 @@ class Verifier { std::map expectations_; std::map maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr& plugin) { @@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index a5049e5502b..82c6361abd3 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -230,8 +230,6 @@ class Client { } virtual void DestroyMultithreading() = 0; - virtual void InitThreadFunc(size_t thread_idx) = 0; - virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -279,7 +277,6 @@ class Client { : std::bind(&Client::NextIssueTime, this, thread_idx); } - private: class Thread { public: Thread(Client* client, size_t idx) @@ -299,6 +296,16 @@ class Client { MergeStatusHistogram(statuses_, s); } + void UpdateHistogram(HistogramEntry* entry) { + std::lock_guard g(mu_); + if (entry->value_used()) { + histogram_.Add(entry->value()); + } + if (entry->status_used()) { + statuses_[entry->status()]++; + } + } + private: Thread(const Thread&); Thread& operator=(const Thread&); @@ -314,29 +321,8 @@ class Client { wait_loop++; } - client_->InitThreadFunc(idx_); - - for (;;) { - // run the loop body - HistogramEntry entry; - const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); - // lock, update histogram if needed and see if we're done - std::lock_guard g(mu_); - if (entry.value_used()) { - histogram_.Add(entry.value()); - } - if (entry.status_used()) { - statuses_[entry.status()]++; - } - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } - if (!thread_still_ok || - static_cast(gpr_atm_acq_load(&client_->thread_pool_done_))) { - client_->CompleteThread(); - return; - } - } + client_->ThreadFunc(idx_, this); + client_->CompleteThread(); } std::mutex mu_; @@ -347,6 +333,12 @@ class Client { std::thread impl_; }; + bool ThreadCompleted() { + return static_cast(gpr_atm_acq_load(&thread_pool_done_)); + } + + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; + std::vector> threads_; std::unique_ptr timer_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ed4e0b3552..b5c7208664c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl { this->EndThreads(); // this needed for resolution } - void InitThreadFunc(size_t thread_idx) override final {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; - if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + HistogramEntry entry; + HistogramEntry* entry_ptr = &entry; + if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ClientRpcContext* ctx; + std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; + do { + t->UpdateHistogram(entry_ptr); // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard l(shutdown_state_[thread_idx]->mutex); + shutdown_mu->lock(); if (shutdown_state_[thread_idx]->shutdown) { ctx->TryCancel(); delete ctx; - return true; - } - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; + while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + ctx = ClientRpcContext::detag(got_tag); + ctx->TryCancel(); + delete ctx; + } + shutdown_mu->unlock(); + return; } - return true; - } else { - // queue is shutting down, so we must be done - return true; - } + } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + bool next_ok = ok; + if (!ctx->RunNextState(next_ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } std::vector> cli_cqs_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 94554a46b20..9f20b148eb3 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFunc(size_t thread_idx) override {} + void InitThreadFuncImpl(size_t thread_idx) override {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], &responses_[thread_idx]); last_issue_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromServer(&context_[thread_idx], request_); last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 776371a2c6e..4576be5bb35 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -202,23 +202,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard l(shutdown_state_[thread_idx]->mutex); + mu_ptr = &shutdown_state_[thread_idx]->mutex; + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc index 69a6876a3f9..16a00fb201c 100644 --- a/test/cpp/util/error_details_test.cc +++ b/test/cpp/util/error_details_test.cc @@ -82,7 +82,7 @@ TEST(SetTest, NullInput) { TEST(SetTest, OutOfScopeErrorCode) { google::rpc::Status expected; - expected.set_code(20); // Out of scope (DATA_LOSS is 15). + expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16). expected.set_message("I am an error message"); testing::EchoRequest expected_details; expected_details.set_message(grpc::string(100, '\0')); @@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) { EXPECT_EQ(expected.SerializeAsString(), to.error_details()); } +TEST(SetTest, ValidScopeErrorCode) { + for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) { + google::rpc::Status expected; + expected.set_code(c); + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(c, to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); + } +} + } // namespace } // namespace grpc diff --git a/tools/internal_ci/linux/grpc_sanity.cfg b/tools/internal_ci/linux/grpc_sanity.cfg index 24e7984f3a5..e06a2f42410 100644 --- a/tools/internal_ci/linux/grpc_sanity.cfg +++ b/tools/internal_ci/linux/grpc_sanity.cfg @@ -16,7 +16,7 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh" -timeout_mins: 20 +timeout_mins: 40 action { define_artifacts { regex: "**/*sponge_log.xml" diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py index d037e139218..bb7a8647c7c 100755 --- a/tools/interop_matrix/run_interop_matrix_tests.py +++ b/tools/interop_matrix/run_interop_matrix_tests.py @@ -122,15 +122,13 @@ def find_all_images_for_lang(lang): return images # caches test cases (list of JobSpec) loaded from file. Keyed by lang and runtime. -_loaded_testcases = {} def find_test_cases(lang, release, suite_name): """Returns the list of test cases from testcase files per lang/release.""" file_tmpl = os.path.join(os.path.dirname(__file__), 'testcases/%s__%s') + testcase_release = release if not os.path.exists(file_tmpl % (lang, release)): - release = 'master' - testcases = file_tmpl % (lang, release) - if lang in _loaded_testcases.keys() and release in _loaded_testcases[lang].keys(): - return _loaded_testcases[lang][release] + testcase_release = 'master' + testcases = file_tmpl % (lang, testcase_release) job_spec_list=[] try: @@ -155,9 +153,6 @@ def find_test_cases(lang, release, suite_name): do_newline=True) except IOError as err: jobset.message('FAILED', err, do_newline=True) - if lang not in _loaded_testcases.keys(): - _loaded_testcases[lang] = {} - _loaded_testcases[lang][release]=job_spec_list return job_spec_list _xml_report_tree = report_utils.new_junit_xml_tree()