Make combiners refcounted, to facilitate sharing

reviewable/pr9662/r3
Craig Tiller 8 years ago
parent 4638d7ab64
commit 5634ef6e4a
  1. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 12
      src/core/lib/iomgr/combiner.c
  3. 5
      src/core/lib/iomgr/combiner.h
  4. 2
      src/core/lib/iomgr/resource_quota.c
  5. 8
      test/core/iomgr/combiner_test.c

@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
grpc_combiner_destroy(exec_ctx, t->combiner);
grpc_combiner_unref(exec_ctx, t->combiner);
cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));

@ -72,6 +72,7 @@ struct grpc_combiner {
bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_free(lock);
}
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@ -161,6 +163,14 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
if (gpr_unref(&lock->refs)) {
start_destroy(exec_ctx, lock);
}
}
void grpc_combiner_ref(grpc_combiner *lock) { gpr_ref(&lock->refs); }
static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock) {
lock->next_combiner_on_this_exec_ctx = NULL;

@ -48,8 +48,9 @@
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Ref/unref the lock, for when we're sharing the lock ownership
void grpc_combiner_ref(grpc_combiner *lock);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Fetch a scheduler to schedule closures against
grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
bool covered_by_poller);

@ -599,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
grpc_combiner_unref(exec_ctx, resource_quota->combiner);
gpr_free(resource_quota->name);
gpr_free(resource_quota);
}

@ -44,7 +44,7 @@
static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL));
grpc_combiner_unref(&exec_ctx, grpc_combiner_create(NULL));
grpc_exec_ctx_finish(&exec_ctx);
}
@ -65,7 +65,7 @@ static void test_execute_one(void) {
GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done);
grpc_combiner_destroy(&exec_ctx, lock);
grpc_combiner_unref(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -125,7 +125,7 @@ static void test_execute_many(void) {
gpr_thd_join(thds[i]);
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, lock);
grpc_combiner_unref(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -153,7 +153,7 @@ static void test_execute_finally(void) {
GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(got_in_finally);
grpc_combiner_destroy(&exec_ctx, lock);
grpc_combiner_unref(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
}

Loading…
Cancel
Save