From dd339ea915ca62950659aa1e9924077f127a75b6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 27 Sep 2016 13:21:31 -0700 Subject: [PATCH] Cleanup of some buffer pool implementation --- .../chttp2/transport/chttp2_transport.c | 5 ++- src/core/lib/iomgr/buffer_pool.c | 40 +++++++++++------- src/core/lib/iomgr/buffer_pool.h | 2 +- test/core/iomgr/buffer_pool_test.c | 42 +++++++++++++++++++ 4 files changed, 72 insertions(+), 17 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e5bc89c0301..3ebb4673327 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -256,8 +256,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->read_action_locked, read_action_locked, t); grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); - grpc_closure_init(&t->benign_reclaimer, benign_reclaimer_locked, t); - grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer_locked, t); + grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -379,6 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); + post_benign_reclaimer(exec_ctx, t); } static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index cb9a76b3dca..c0fd7cc45f3 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -218,9 +218,9 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list); if (buffer_user == NULL) return false; buffer_pool->reclaiming = true; - grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[destructive], - GRPC_ERROR_NONE, NULL); + grpc_closure *c = buffer_user->reclaimers[destructive]; buffer_user->reclaimers[destructive] = NULL; + grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); return true; } @@ -330,8 +330,9 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { GRPC_ERROR_CANCELLED, NULL); grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[1], GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, buffer_user->on_done_destroy, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure), + GRPC_ERROR_NONE, NULL); if (buffer_user->free_pool != 0) { buffer_user->buffer_pool->free_pool += buffer_user->free_pool; bpstep_sched(exec_ctx, buffer_user->buffer_pool); @@ -340,6 +341,7 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { gpr_free(buffer_user->asan_canary); #endif grpc_buffer_pool_internal_unref(exec_ctx, buffer_user->buffer_pool); + gpr_mu_destroy(&buffer_user->mu); } static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, @@ -492,7 +494,7 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, grpc_closure_list_init(&buffer_user->on_allocated); buffer_user->allocating = false; buffer_user->added_to_free_pool = false; - buffer_user->on_done_destroy = NULL; + gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, 0); buffer_user->reclaimers[0] = NULL; buffer_user->reclaimers[1] = NULL; for (int i = 0; i < GRPC_BULIST_COUNT; i++) { @@ -507,8 +509,10 @@ void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, grpc_closure *on_done) { gpr_mu_lock(&buffer_user->mu); - GPR_ASSERT(buffer_user->on_done_destroy == NULL); - buffer_user->on_done_destroy = on_done; + GPR_ASSERT(gpr_atm_no_barrier_load(&buffer_user->on_done_destroy_closure) == + 0); + gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, + (gpr_atm)on_done); if (buffer_user->allocated == 0) { grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, &buffer_user->destroy_closure, GRPC_ERROR_NONE, @@ -521,7 +525,9 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, size_t size, grpc_closure *optional_on_done) { gpr_mu_lock(&buffer_user->mu); - if (buffer_user->on_done_destroy != NULL) { + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure); + if (on_done_destroy != NULL) { /* already shutdown */ grpc_exec_ctx_sched( exec_ctx, optional_on_done, @@ -561,7 +567,9 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, &buffer_user->add_to_free_pool_closure, GRPC_ERROR_NONE, false); } - if (buffer_user->on_done_destroy != NULL && buffer_user->allocated == 0) { + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure); + if (on_done_destroy != NULL && buffer_user->allocated == 0) { grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, &buffer_user->destroy_closure, GRPC_ERROR_NONE, false); @@ -572,11 +580,15 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, bool destructive, grpc_closure *closure) { - GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL); - buffer_user->reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); + if (gpr_atm_acq_load(&buffer_user->on_done_destroy_closure) != 0) { + GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL); + buffer_user->reclaimers[destructive] = closure; + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE, false); + } else { + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + } } void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 2095be05d8c..ca4c39f76d2 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -80,7 +80,7 @@ struct grpc_buffer_user { grpc_closure post_reclaimer_closure[2]; grpc_closure destroy_closure; - grpc_closure *on_done_destroy; + gpr_atm on_done_destroy_closure; grpc_buffer_user_link links[GRPC_BULIST_COUNT]; }; diff --git a/test/core/iomgr/buffer_pool_test.c b/test/core/iomgr/buffer_pool_test.c index 23fac2f70db..a15e02e3c61 100644 --- a/test/core/iomgr/buffer_pool_test.c +++ b/test/core/iomgr/buffer_pool_test.c @@ -573,6 +573,47 @@ static void test_pools_merged_on_buffer_user_deletion(void) { grpc_buffer_pool_unref(p); } +static void test_reclaimers_can_be_posted_repeatedly(void) { + gpr_log(GPR_INFO, "** test_reclaimers_can_be_posted_repeatedly **"); + grpc_buffer_pool *p = grpc_buffer_pool_create(); + grpc_buffer_pool_resize(p, 1024); + grpc_buffer_user usr; + grpc_buffer_user_init(&usr, p); + { + bool allocated = false; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_alloc(&exec_ctx, &usr, 1024, set_bool(&allocated)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(allocated); + } + for (int i = 0; i < 10; i++) { + bool reclaimer_done = false; + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_post_reclaimer( + &exec_ctx, &usr, false, + make_reclaimer(&usr, 1024, set_bool(&reclaimer_done))); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(!reclaimer_done); + } + { + bool allocated = false; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_alloc(&exec_ctx, &usr, 1024, set_bool(&allocated)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(allocated); + GPR_ASSERT(reclaimer_done); + } + } + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_free(&exec_ctx, &usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); + } + destroy_user(&usr); + grpc_buffer_pool_unref(p); +} + static void test_one_slice(void) { gpr_log(GPR_INFO, "** test_one_slice **"); @@ -659,6 +700,7 @@ int main(int argc, char **argv) { test_multiple_reclaims_can_be_triggered(); test_buffer_user_stays_allocated_until_memory_released(); test_pools_merged_on_buffer_user_deletion(); + test_reclaimers_can_be_posted_repeatedly(); test_one_slice(); test_one_slice_deleted_late(); grpc_shutdown();