diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index 5bd7884e287..b4ae8f312cd 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -112,6 +112,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_push_retries", "server_requested_calls", "server_slowpath_requests_queued", + "cq_ev_queue_trylock_failures", + "cq_ev_queue_trylock_successes", + "cq_ev_queue_transient_pop_failures", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -222,6 +225,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", + "Number of lock (trylock) acquisition failures on completion queue event " + "queue. High value here indicates high contention on completion queues", + "Number of lock (trylock) acquisition successes on completion queue event " + "queue.", + "Number of times NULL was popped out of completion queue's event queue " + "even though the event queue was not empty", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "call_initial_size", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index d8e4e7d264b..d4c4437ca07 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -118,6 +118,9 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -425,6 +428,15 @@ typedef enum { #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES) #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \ grpc_stats_inc_call_initial_size((exec_ctx), (int)(value)) void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 5c0ab2262e5..00e81e74e20 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -272,4 +272,14 @@ - counter: server_slowpath_requests_queued doc: How many times was the server slow path taken (indicates too few outstanding requests) +# cq +- counter: cq_ev_queue_trylock_failures + doc: Number of lock (trylock) acquisition failures on completion queue event + queue. High value here indicates high contention on completion queues +- counter: cq_ev_queue_trylock_successes + doc: Number of lock (trylock) acquisition successes on completion queue event + queue. +- counter: cq_ev_queue_transient_pop_failures + doc: Number of times NULL was popped out of completion queue's event queue + even though the event queue was not empty diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 54869977b0b..f34b0c25a03 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -86,4 +86,7 @@ executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, -server_slowpath_requests_queued_per_iteration:FLOAT +server_slowpath_requests_queued_per_iteration:FLOAT, +cq_ev_queue_trylock_failures_per_iteration:FLOAT, +cq_ev_queue_trylock_successes_per_iteration:FLOAT, +cq_ev_queue_transient_pop_failures_per_iteration:FLOAT diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 36b4b835f82..21664f03c8b 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { grpc_cq_completion *c = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (gpr_spinlock_trylock(&q->queue_lock)) { - c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + + bool is_empty = false; + c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); + + if (c == NULL && !is_empty) { + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + } + } else { + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); } + grpc_exec_ctx_finish(&exec_ctx); + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index 087b6859639..f59989655ec 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -36,7 +36,21 @@ namespace Grpc.Core readonly Func getTrailersFunc; readonly Action disposeAction; - internal AsyncClientStreamingCall(IClientStreamWriter requestStream, Task responseAsync, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + /// + /// Creates a new AsyncClientStreamingCall object with the specified properties. + /// + /// Stream of request values. + /// The response of the asynchronous call. + /// Response headers of the asynchronous call. + /// Delegate returning the status of the call. + /// Delegate returning the trailing metadata of the call. + /// Delegate to invoke when Dispose is called on the call object. + public AsyncClientStreamingCall(IClientStreamWriter requestStream, + Task responseAsync, + Task responseHeadersAsync, + Func getStatusFunc, + Func getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ce49fb1596d..1cb1a918595 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -35,7 +35,21 @@ namespace Grpc.Core readonly Func getTrailersFunc; readonly Action disposeAction; - internal AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + /// + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// + /// Stream of request values. + /// Stream of response values. + /// Response headers of the asynchronous call. + /// Delegate returning the status of the call. + /// Delegate returning the trailing metadata of the call. + /// Delegate to invoke when Dispose is called on the call object. + public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, + IAsyncStreamReader responseStream, + Task responseHeadersAsync, + Func getStatusFunc, + Func getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index fbc97b81481..4303b0b1b02 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -33,7 +33,19 @@ namespace Grpc.Core readonly Func getTrailersFunc; readonly Action disposeAction; - internal AsyncServerStreamingCall(IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + /// + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// + /// Stream of response values. + /// Response headers of the asynchronous call. + /// Delegate returning the status of the call. + /// Delegate returning the trailing metadata of the call. + /// Delegate to invoke when Dispose is called on the call object. + public AsyncServerStreamingCall(IAsyncStreamReader responseStream, + Task responseHeadersAsync, + Func getStatusFunc, + Func getTrailersFunc, + Action disposeAction) { this.responseStream = responseStream; this.responseHeadersAsync = responseHeadersAsync; diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 6348f3c5fd4..17747f86caa 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -34,7 +34,20 @@ namespace Grpc.Core readonly Func getTrailersFunc; readonly Action disposeAction; - internal AsyncUnaryCall(Task responseAsync, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + + /// + /// Creates a new AsyncUnaryCall object with the specified properties. + /// + /// The response of the asynchronous call. + /// Response headers of the asynchronous call. + /// Delegate returning the status of the call. + /// Delegate returning the trailing metadata of the call. + /// Delegate to invoke when Dispose is called on the call object. + public AsyncUnaryCall(Task responseAsync, + Task responseHeadersAsync, + Func getStatusFunc, + Func getTrailersFunc, + Action disposeAction) { this.responseAsync = responseAsync; this.responseHeadersAsync = responseHeadersAsync; diff --git a/tools/run_tests/performance/massage_qps_stats.py b/tools/run_tests/performance/massage_qps_stats.py index ebbfe6c26cd..e0b6ce6ba63 100644 --- a/tools/run_tests/performance/massage_qps_stats.py +++ b/tools/run_tests/performance/massage_qps_stats.py @@ -109,6 +109,9 @@ def massage_qps_stats(scenario_result): stats["core_executor_push_retries"] = massage_qps_stats_helpers.counter(core_stats, "executor_push_retries") stats["core_server_requested_calls"] = massage_qps_stats_helpers.counter(core_stats, "server_requested_calls") stats["core_server_slowpath_requests_queued"] = massage_qps_stats_helpers.counter(core_stats, "server_slowpath_requests_queued") + stats["core_cq_ev_queue_trylock_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_failures") + stats["core_cq_ev_queue_trylock_successes"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_successes") + stats["core_cq_ev_queue_transient_pop_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_transient_pop_failures") h = massage_qps_stats_helpers.histogram(core_stats, "call_initial_size") stats["core_call_initial_size"] = ",".join("%f" % x for x in h.buckets) stats["core_call_initial_size_bkts"] = ",".join("%f" % x for x in h.boundaries) diff --git a/tools/run_tests/performance/scenario_result_schema.json b/tools/run_tests/performance/scenario_result_schema.json index 169221d18c5..f11e6359f65 100644 --- a/tools/run_tests/performance/scenario_result_schema.json +++ b/tools/run_tests/performance/scenario_result_schema.json @@ -555,6 +555,21 @@ "name": "core_server_slowpath_requests_queued", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_call_initial_size", @@ -1352,6 +1367,21 @@ "name": "core_server_slowpath_requests_queued", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_call_initial_size",