TCP buffer pool integration done

reviewable/pr8239/r2
Craig Tiller 9 years ago
parent 1f8d1d5afd
commit 61ecb9259b
  1. 30
      src/core/lib/iomgr/buffer_pool.c
  2. 6
      src/core/lib/iomgr/buffer_pool.h
  3. 4
      src/core/lib/iomgr/tcp_posix.c
  4. 31
      test/core/iomgr/buffer_pool_test.c
  5. 2
      test/core/util/mock_endpoint.c
  6. 2
      test/core/util/passthru_endpoint.c

@ -115,6 +115,7 @@ static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool,
buffer_user->links[list].prev; buffer_user->links[list].prev;
buffer_user->links[list].prev->links[list].next = buffer_user->links[list].prev->links[list].next =
buffer_user->links[list].next; buffer_user->links[list].next;
*root = buffer_user->links[list].next;
} }
buffer_user->links[list].next = buffer_user->links[list].prev = NULL; buffer_user->links[list].next = buffer_user->links[list].prev = NULL;
return buffer_user; return buffer_user;
@ -365,8 +366,8 @@ static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
gpr_free(a); gpr_free(a);
} }
static void bpreclaimation_done_closure(grpc_exec_ctx *exec_ctx, void *bp, static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
grpc_error *error) { grpc_error *error) {
grpc_buffer_pool *buffer_pool = bp; grpc_buffer_pool *buffer_pool = bp;
buffer_pool->reclaiming = false; buffer_pool->reclaiming = false;
bpstep_sched(exec_ctx, buffer_pool); bpstep_sched(exec_ctx, buffer_pool);
@ -386,7 +387,7 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) {
buffer_pool->reclaiming = false; buffer_pool->reclaiming = false;
grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool);
grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, grpc_closure_init(&buffer_pool->bpreclaimation_done_closure,
bpreclaimation_done_closure, buffer_pool); bp_reclaimation_done, buffer_pool);
for (int i = 0; i < GRPC_BULIST_COUNT; i++) { for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
buffer_pool->roots[i] = NULL; buffer_pool->roots[i] = NULL;
} }
@ -481,6 +482,7 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
grpc_closure_list_init(&buffer_user->on_allocated); grpc_closure_list_init(&buffer_user->on_allocated);
buffer_user->allocating = false; buffer_user->allocating = false;
buffer_user->added_to_free_pool = false; buffer_user->added_to_free_pool = false;
buffer_user->on_done_destroy = NULL;
buffer_user->reclaimers[0] = NULL; buffer_user->reclaimers[0] = NULL;
buffer_user->reclaimers[1] = NULL; buffer_user->reclaimers[1] = NULL;
for (int i = 0; i < GRPC_BULIST_COUNT; i++) { for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
@ -488,18 +490,25 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
} }
} }
void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user, grpc_buffer_user *buffer_user,
grpc_closure *on_done) { grpc_closure *on_done) {
gpr_mu_lock(&buffer_user->mu);
GPR_ASSERT(buffer_user->on_done_destroy == NULL);
buffer_user->on_done_destroy = on_done; buffer_user->on_done_destroy = on_done;
grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, if (buffer_user->allocated == 0) {
&buffer_user->destroy_closure, GRPC_ERROR_NONE, false); grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
&buffer_user->destroy_closure, GRPC_ERROR_NONE,
false);
}
gpr_mu_unlock(&buffer_user->mu);
} }
void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user, size_t size, grpc_buffer_user *buffer_user, size_t size,
grpc_closure *optional_on_done) { grpc_closure *optional_on_done) {
gpr_mu_lock(&buffer_user->mu); gpr_mu_lock(&buffer_user->mu);
GPR_ASSERT(buffer_user->on_done_destroy == NULL);
buffer_user->allocated += (int64_t)size; buffer_user->allocated += (int64_t)size;
buffer_user->free_pool -= (int64_t)size; buffer_user->free_pool -= (int64_t)size;
if (buffer_user->free_pool < 0) { if (buffer_user->free_pool < 0) {
@ -532,6 +541,11 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
&buffer_user->add_to_free_pool_closure, &buffer_user->add_to_free_pool_closure,
GRPC_ERROR_NONE, false); GRPC_ERROR_NONE, false);
} }
if (buffer_user->on_done_destroy != NULL && buffer_user->allocated == 0) {
grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
&buffer_user->destroy_closure, GRPC_ERROR_NONE,
false);
}
gpr_mu_unlock(&buffer_user->mu); gpr_mu_unlock(&buffer_user->mu);
} }

@ -83,9 +83,9 @@ struct grpc_buffer_user {
void grpc_buffer_user_init(grpc_buffer_user *buffer_user, void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
grpc_buffer_pool *buffer_pool); grpc_buffer_pool *buffer_pool);
void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user, grpc_buffer_user *buffer_user,
grpc_closure *on_done); grpc_closure *on_done);
void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user, size_t size, grpc_buffer_user *buffer_user, size_t size,

@ -125,8 +125,8 @@ static void tcp_begin_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
"tcp_unref_orphan"); "tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string); gpr_free(tcp->peer_string);
grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user, grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user,
grpc_closure_create(tcp_end_free, tcp)); grpc_closure_create(tcp_end_free, tcp));
} }
/*#define GRPC_TCP_REFCOUNT_DEBUG*/ /*#define GRPC_TCP_REFCOUNT_DEBUG*/

@ -78,7 +78,7 @@ grpc_closure *make_unused_reclaimer(grpc_closure *then) {
static void destroy_user(grpc_buffer_user *usr) { static void destroy_user(grpc_buffer_user *usr) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
bool done = false; bool done = false;
grpc_buffer_user_destroy(&exec_ctx, usr, set_bool(&done)); grpc_buffer_user_shutdown(&exec_ctx, usr, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done); GPR_ASSERT(done);
} }
@ -498,6 +498,34 @@ static void test_multiple_reclaims_can_be_triggered(void) {
GPR_ASSERT(destructive_done); GPR_ASSERT(destructive_done);
} }
static void test_buffer_user_stays_allocated_until_memory_released(void) {
gpr_log(GPR_INFO,
"** test_buffer_user_stays_allocated_until_memory_released **");
grpc_buffer_pool *p = grpc_buffer_pool_create();
grpc_buffer_pool_resize(p, 1024 * 1024);
grpc_buffer_user usr;
grpc_buffer_user_init(&usr, p);
bool done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_buffer_user_alloc(&exec_ctx, &usr, 1024, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_buffer_pool_unref(p);
grpc_buffer_user_shutdown(&exec_ctx, &usr, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_buffer_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init(); grpc_init();
@ -516,6 +544,7 @@ int main(int argc, char **argv) {
test_unused_reclaim_is_cancelled(); test_unused_reclaim_is_cancelled();
test_benign_reclaim_is_preferred(); test_benign_reclaim_is_preferred();
test_multiple_reclaims_can_be_triggered(); test_multiple_reclaims_can_be_triggered();
test_buffer_user_stays_allocated_until_memory_released();
grpc_shutdown(); grpc_shutdown();
return 0; return 0;
} }

@ -95,7 +95,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *mp,
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user,
grpc_closure_create(me_really_destroy, m)); grpc_closure_create(me_really_destroy, m));
} }

@ -140,7 +140,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
half *m = (half *)ep; half *m = (half *)ep;
grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user,
grpc_closure_create(me_really_destroy, m)); grpc_closure_create(me_really_destroy, m));
} }

Loading…
Cancel
Save