Merge pull request #9661 from ctiller/combiner_refs

Make combiners refcounted, to facilitate sharing
pull/9666/head
Craig Tiller 8 years ago committed by GitHub
commit bdf3f00bb5
  1. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 28
      src/core/lib/iomgr/combiner.c
  3. 23
      src/core/lib/iomgr/combiner.h
  4. 2
      src/core/lib/iomgr/resource_quota.c
  5. 8
      test/core/iomgr/combiner_test.c

@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
grpc_combiner_destroy(exec_ctx, t->combiner);
GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport");
cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));

@ -72,6 +72,7 @@ struct grpc_combiner {
bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_free(lock);
}
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@ -161,6 +163,30 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
#ifdef GRPC_COMBINER_REFCOUNT_DEBUG
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \
"combiner[%p] %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \
gpr_atm_no_barrier_load(&lock->refs.count), \
gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason);
#else
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) {
start_destroy(exec_ctx, lock);
}
}
grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
gpr_ref(&lock->refs);
return lock;
}
static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock) {
lock->next_combiner_on_this_exec_ctx = NULL;

@ -48,8 +48,27 @@
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
//#define GRPC_COMBINER_REFCOUNT_DEBUG
#ifdef GRPC_COMBINER_REFCOUNT_DEBUG
#define GRPC_COMBINER_DEBUG_ARGS \
, const char *file, int line, const char *reason
#define GRPC_COMBINER_REF(combiner, reason) \
grpc_combiner_ref((combiner), __FILE__, __LINE__, (reason))
#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \
grpc_combiner_unref((exec_ctx), (combiner), __FILE__, __LINE__, (reason))
#else
#define GRPC_COMBINER_DEBUG_ARGS
#define GRPC_COMBINER_REF(combiner, reason) grpc_combiner_ref((combiner))
#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \
grpc_combiner_unref((exec_ctx), (combiner))
#endif
// Ref/unref the lock, for when we're sharing the lock ownership
// Prefer to use the macros above
grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against
grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
bool covered_by_poller);

@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
GRPC_COMBINER_UNREF(exec_ctx, resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
gpr_free(resource_quota);
}

@ -44,7 +44,7 @@
static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL));
GRPC_COMBINER_UNREF(&exec_ctx, grpc_combiner_create(NULL), "test_no_op");
grpc_exec_ctx_finish(&exec_ctx);
}
@ -65,7 +65,7 @@ static void test_execute_one(void) {
GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done);
grpc_combiner_destroy(&exec_ctx, lock);
GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_one");
grpc_exec_ctx_finish(&exec_ctx);
}
@ -125,7 +125,7 @@ static void test_execute_many(void) {
gpr_thd_join(thds[i]);
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, lock);
GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_many");
grpc_exec_ctx_finish(&exec_ctx);
}
@ -153,7 +153,7 @@ static void test_execute_finally(void) {
GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(got_in_finally);
grpc_combiner_destroy(&exec_ctx, lock);
GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_finally");
grpc_exec_ctx_finish(&exec_ctx);
}

Loading…
Cancel
Save