From 5634ef6e4a86d284d973c400f49b6370ad6035eb Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Feb 2017 14:25:32 -0800 Subject: [PATCH 1/5] Make combiners refcounted, to facilitate sharing --- .../transport/chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/iomgr/combiner.c | 12 +++++++++++- src/core/lib/iomgr/combiner.h | 5 +++-- src/core/lib/iomgr/resource_quota.c | 2 +- test/core/iomgr/combiner_test.c | 8 ++++---- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8a9eaa8b6a2..d576f85dd25 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->combiner); + grpc_combiner_unref(exec_ctx, t->combiner); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index ba6c7087a92..93d8c0bd70a 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -72,6 +72,7 @@ struct grpc_combiner { bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; + gpr_refcount refs; }; static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, @@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; @@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_free(lock); } -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); @@ -161,6 +163,14 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + if (gpr_unref(&lock->refs)) { + start_destroy(exec_ctx, lock); + } +} + +void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); } + static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 81dff85d402..352e87d050a 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -48,8 +48,9 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); -// Destroy the lock -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +// Ref/unref the lock, for when we're sharing the lock ownership +void grpc_combiner_ref(grpc_combiner *lock); +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Fetch a scheduler to schedule closures against grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, bool covered_by_poller); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 2cc979467f6..3c701164ad6 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { if (gpr_unref(&resource_quota->refs)) { - grpc_combiner_destroy(exec_ctx, resource_quota->combiner); + grpc_combiner_unref(exec_ctx, resource_quota->combiner); gpr_free(resource_quota->name); gpr_free(resource_quota); } diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 4c9275a6732..08cd2c1e26a 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -44,7 +44,7 @@ static void test_no_op(void) { gpr_log(GPR_DEBUG, "test_no_op"); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL)); + grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL)); grpc_exec_ctx_finish(&exec_ctx); } @@ -65,7 +65,7 @@ static void test_execute_one(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(done); - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } @@ -125,7 +125,7 @@ static void test_execute_many(void) { gpr_thd_join(thds[i]); } grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } @@ -153,7 +153,7 @@ static void test_execute_finally(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(got_in_finally); - grpc_combiner_destroy(&exec_ctx, lock); + grpc_combiner_unref(&exec_ctx, lock); grpc_exec_ctx_finish(&exec_ctx); } From 3845e559816025477e59030b62ad864d5c093499 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Feb 2017 15:19:28 -0800 Subject: [PATCH 2/5] Add debug macros --- .../chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/iomgr/combiner.c | 20 ++++++++++++++++-- src/core/lib/iomgr/combiner.h | 21 +++++++++++++++++-- src/core/lib/iomgr/resource_quota.c | 2 +- test/core/iomgr/combiner_test.c | 8 +++---- 5 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d576f85dd25..3ee5e976f84 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_unref(exec_ctx, t->combiner); + GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 93d8c0bd70a..fa9966c3a69 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -163,13 +163,29 @@ static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } -void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +#ifdef GRPC_COMBINER_REFCOUNT_DEBUG +#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \ + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \ + "combiner[%p] %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \ + gpr_atm_no_barrier_load(&lock->refs.count), \ + gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason); +#else +#define GRPC_COMBINER_DEBUG_SPAM(op, delta) +#endif + +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { + GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { start_destroy(exec_ctx, lock); } } -void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); } +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { + GRPC_COMBINER_DEBUG_SPAM(" REF", 1); + gpr_ref(&lock->refs); + return lock; +} static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 352e87d050a..065689e38f7 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -48,9 +48,26 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); + +#define GRPC_COMBINER_REFCOUNT_DEBUG +#ifdef GRPC_COMBINER_REFCOUNT_DEBUG +#define GRPC_COMBINER_DEBUG_ARGS \ + , const char *file, int line, const char *reason +#define GRPC_COMBINER_REF(combiner, reason) \ + grpc_combiner_ref((combiner), __FILE__, __LINE__, (reason)) +#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \ + grpc_combiner_unref((exec_ctx), (combiner), __FILE__, __LINE__, (reason)) +#else +#define GRPC_COMBINER_DEBUG_ARGS +#define GRPC_COMBINER_REF(combiner) grpc_combiner_ref((combiner)) +#define GRPC_COMBINER_UNREF(exec_ctx, combiner) \ + grpc_combiner_unref((exec_ctx), (combiner)) +#endif + // Ref/unref the lock, for when we're sharing the lock ownership -void grpc_combiner_ref(grpc_combiner *lock); -void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); // Fetch a scheduler to schedule closures against grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, bool covered_by_poller); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 3c701164ad6..511ffdcdf13 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { if (gpr_unref(&resource_quota->refs)) { - grpc_combiner_unref(exec_ctx, resource_quota->combiner); + GRPC_COMBINER_UNREF(exec_ctx, resource_quota->combiner, "resource_quota"); gpr_free(resource_quota->name); gpr_free(resource_quota); } diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 08cd2c1e26a..bc4d2af8ac4 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -44,7 +44,7 @@ static void test_no_op(void) { gpr_log(GPR_DEBUG, "test_no_op"); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL)); + GRPC_COMBINER_UNREF(&exec_ctx, grpc_combiner_create(NULL), "test_no_op"); grpc_exec_ctx_finish(&exec_ctx); } @@ -65,7 +65,7 @@ static void test_execute_one(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(done); - grpc_combiner_unref(&exec_ctx, lock); + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_one"); grpc_exec_ctx_finish(&exec_ctx); } @@ -125,7 +125,7 @@ static void test_execute_many(void) { gpr_thd_join(thds[i]); } grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_combiner_unref(&exec_ctx, lock); + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_many"); grpc_exec_ctx_finish(&exec_ctx); } @@ -153,7 +153,7 @@ static void test_execute_finally(void) { GRPC_ERROR_NONE); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(got_in_finally); - grpc_combiner_unref(&exec_ctx, lock); + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_finally"); grpc_exec_ctx_finish(&exec_ctx); } From f6723896f0370ed1e04d8fff7e50750a574c0093 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Feb 2017 15:21:35 -0800 Subject: [PATCH 3/5] Default debug off --- src/core/lib/iomgr/combiner.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 065689e38f7..f1fa941d643 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -49,7 +49,7 @@ // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); -#define GRPC_COMBINER_REFCOUNT_DEBUG +//#define GRPC_COMBINER_REFCOUNT_DEBUG #ifdef GRPC_COMBINER_REFCOUNT_DEBUG #define GRPC_COMBINER_DEBUG_ARGS \ , const char *file, int line, const char *reason From b1b2854d3a2beffa8e33d7847bf6072636a38f46 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Feb 2017 15:23:12 -0800 Subject: [PATCH 4/5] Fix non-debug --- src/core/lib/iomgr/combiner.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index f1fa941d643..c7aa0866174 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -59,8 +59,8 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); grpc_combiner_unref((exec_ctx), (combiner), __FILE__, __LINE__, (reason)) #else #define GRPC_COMBINER_DEBUG_ARGS -#define GRPC_COMBINER_REF(combiner) grpc_combiner_ref((combiner)) -#define GRPC_COMBINER_UNREF(exec_ctx, combiner) \ +#define GRPC_COMBINER_REF(combiner, reason) grpc_combiner_ref((combiner)) +#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \ grpc_combiner_unref((exec_ctx), (combiner)) #endif From af1c45197de24b522e6e8fef67caff3965d2a6b6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Feb 2017 16:38:23 -0800 Subject: [PATCH 5/5] Add comment --- src/core/lib/iomgr/combiner.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index c7aa0866174..75dcb0b70a4 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -65,6 +65,7 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); #endif // Ref/unref the lock, for when we're sharing the lock ownership +// Prefer to use the macros above grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);