Merge branch 'combiner_refs' into c3+r

reviewable/pr9662/r1
Craig Tiller 8 years ago
commit 5e83dd9a44
  1. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 17
      src/core/lib/iomgr/combiner.c
  3. 21
      src/core/lib/iomgr/combiner.h
  4. 2
      src/core/lib/iomgr/resource_quota.c
  5. 8
      test/core/iomgr/combiner_test.c

@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
grpc_combiner_unref(exec_ctx, t->combiner); GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport");
cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));

@ -163,13 +163,26 @@ static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
} }
} }
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { #ifdef GRPC_COMBINER_REFCOUNT_DEBUG
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \
"combiner[%p] %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \
gpr_atm_no_barrier_load(&lock->refs.count), \
gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason);
#else
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) { if (gpr_unref(&lock->refs)) {
start_destroy(exec_ctx, lock); start_destroy(exec_ctx, lock);
} }
} }
grpc_combiner *grpc_combiner_ref(grpc_combiner *lock) { grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
gpr_ref(&lock->refs); gpr_ref(&lock->refs);
return lock; return lock;
} }

@ -48,9 +48,26 @@
// Initialize the lock, with an optional workqueue to shift load to when // Initialize the lock, with an optional workqueue to shift load to when
// necessary // necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
//#define GRPC_COMBINER_REFCOUNT_DEBUG
#ifdef GRPC_COMBINER_REFCOUNT_DEBUG
#define GRPC_COMBINER_DEBUG_ARGS \
, const char *file, int line, const char *reason
#define GRPC_COMBINER_REF(combiner, reason) \
grpc_combiner_ref((combiner), __FILE__, __LINE__, (reason))
#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \
grpc_combiner_unref((exec_ctx), (combiner), __FILE__, __LINE__, (reason))
#else
#define GRPC_COMBINER_DEBUG_ARGS
#define GRPC_COMBINER_REF(combiner) grpc_combiner_ref((combiner))
#define GRPC_COMBINER_UNREF(exec_ctx, combiner) \
grpc_combiner_unref((exec_ctx), (combiner))
#endif
// Ref/unref the lock, for when we're sharing the lock ownership // Ref/unref the lock, for when we're sharing the lock ownership
grpc_combiner *grpc_combiner_ref(grpc_combiner *lock); grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against // Fetch a scheduler to schedule closures against
grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
bool covered_by_poller); bool covered_by_poller);

@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) { grpc_resource_quota *resource_quota) {
if (gpr_unref(&resource_quota->refs)) { if (gpr_unref(&resource_quota->refs)) {
grpc_combiner_unref(exec_ctx, resource_quota->combiner); GRPC_COMBINER_UNREF(exec_ctx, resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name); gpr_free(resource_quota->name);
gpr_free(resource_quota); gpr_free(resource_quota);
} }

@ -44,7 +44,7 @@
static void test_no_op(void) { static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op"); gpr_log(GPR_DEBUG, "test_no_op");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL)); GRPC_COMBINER_UNREF(&exec_ctx, grpc_combiner_create(NULL), "test_no_op");
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -65,7 +65,7 @@ static void test_execute_one(void) {
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done); GPR_ASSERT(done);
grpc_combiner_unref(&exec_ctx, lock); GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_one");
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -125,7 +125,7 @@ static void test_execute_many(void) {
gpr_thd_join(thds[i]); gpr_thd_join(thds[i]);
} }
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_unref(&exec_ctx, lock); GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_many");
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -153,7 +153,7 @@ static void test_execute_finally(void) {
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(got_in_finally); GPR_ASSERT(got_in_finally);
grpc_combiner_unref(&exec_ctx, lock); GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_finally");
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }

Loading…
Cancel
Save