diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index c04c2cfa59c..d2dce0d5113 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -203,7 +203,7 @@ typedef struct { #define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport" /** If non-zero, a pointer to a buffer pool (use grpc_resource_quota_arg_vtable to fetch an appropriate pointer arg vtable */ -#define GRPC_ARG_BUFFER_POOL "grpc.resource_quota" +#define GRPC_ARG_RESOURCE_QUOTA "grpc.resource_quota" /** Service config data, to be passed to subchannels. Not intended for external use. */ #define GRPC_ARG_SERVICE_CONFIG "grpc.service_config" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d74ab0e7688..11f12941eb8 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2266,13 +2266,13 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_from_static_string("Buffers full")); } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, - "HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR " streams", t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); } t->benign_reclaimer_registered = false; if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclaimation( + grpc_resource_user_finish_reclamation( exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); @@ -2298,7 +2298,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, } } if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclaimation( + grpc_resource_user_finish_reclamation( exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index bdc18ac4bff..26baae1eaba 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -227,7 +227,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_arg arg; - arg.key = GRPC_ARG_BUFFER_POOL; + arg.key = GRPC_ARG_RESOURCE_QUOTA; arg.type = GRPC_ARG_POINTER; arg.value.pointer.p = req->resource_quota; arg.value.pointer.vtable = grpc_resource_quota_arg_vtable(); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 89c795c0ec1..54669734081 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -44,27 +44,29 @@ int grpc_resource_quota_trace = 0; -typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota); - -typedef struct { - grpc_resource_user *head; - grpc_resource_user *tail; -} grpc_resource_user_list; - struct grpc_resource_quota { + /* refcount */ gpr_refcount refs; + /* Master combiner lock: all activity on a quota executes under this combiner + */ grpc_combiner *combiner; + /* Size of the resource quota */ int64_t size; + /* Amount of free memory in the resource quota */ int64_t free_pool; + /* Has rq_step been scheduled to occur? */ bool step_scheduled; + /* Are we currently reclaiming memory */ bool reclaiming; - grpc_closure bpstep_closure; - grpc_closure bpreclaimation_done_closure; + /* Closure around rq_step */ + grpc_closure rq_step_closure; + /* Closure around rq_reclamation_done */ + grpc_closure rq_reclamation_done_closure; - grpc_resource_user *roots[GRPC_BULIST_COUNT]; + /* Roots of all resource user lists */ + grpc_resource_user *roots[GRPC_RULIST_COUNT]; char *name; }; @@ -73,8 +75,8 @@ struct grpc_resource_quota { * list management */ -static void bulist_add_tail(grpc_resource_user *resource_user, - grpc_bulist list) { +static void rulist_add_tail(grpc_resource_user *resource_user, + grpc_rulist list) { grpc_resource_quota *resource_quota = resource_user->resource_quota; grpc_resource_user **root = &resource_quota->roots[list]; if (*root == NULL) { @@ -89,8 +91,8 @@ static void bulist_add_tail(grpc_resource_user *resource_user, } } -static void bulist_add_head(grpc_resource_user *resource_user, - grpc_bulist list) { +static void rulist_add_head(grpc_resource_user *resource_user, + grpc_rulist list) { grpc_resource_quota *resource_quota = resource_user->resource_quota; grpc_resource_user **root = &resource_quota->roots[list]; if (*root == NULL) { @@ -106,13 +108,13 @@ static void bulist_add_head(grpc_resource_user *resource_user, } } -static bool bulist_empty(grpc_resource_quota *resource_quota, - grpc_bulist list) { +static bool rulist_empty(grpc_resource_quota *resource_quota, + grpc_rulist list) { return resource_quota->roots[list] == NULL; } -static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota, - grpc_bulist list) { +static grpc_resource_user *rulist_pop(grpc_resource_quota *resource_quota, + grpc_rulist list) { grpc_resource_user **root = &resource_quota->roots[list]; grpc_resource_user *resource_user = *root; if (resource_user == NULL) { @@ -131,7 +133,7 @@ static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota, return resource_user; } -static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) { +static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) { if (resource_user->links[list].next == NULL) return; grpc_resource_quota *resource_quota = resource_user->resource_quota; if (resource_quota->roots[list] == resource_user) { @@ -150,41 +152,41 @@ static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) { * buffer pool state machine */ -static bool bpalloc(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota); -static bool bpscavenge(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota); -static bool bpreclaim(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota, bool destructive); +static bool rq_alloc(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota); +static bool rq_scavenge(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota); +static bool rq_reclaim(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota, bool destructive); -static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { +static void rq_step(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { grpc_resource_quota *resource_quota = bp; resource_quota->step_scheduled = false; do { - if (bpalloc(exec_ctx, resource_quota)) goto done; - } while (bpscavenge(exec_ctx, resource_quota)); - bpreclaim(exec_ctx, resource_quota, false) || - bpreclaim(exec_ctx, resource_quota, true); + if (rq_alloc(exec_ctx, resource_quota)) goto done; + } while (rq_scavenge(exec_ctx, resource_quota)); + rq_reclaim(exec_ctx, resource_quota, false) || + rq_reclaim(exec_ctx, resource_quota, true); done: grpc_resource_quota_internal_unref(exec_ctx, resource_quota); } -static void bpstep_sched(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota) { +static void rq_step_sched(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota) { if (resource_quota->step_scheduled) return; resource_quota->step_scheduled = true; grpc_resource_quota_internal_ref(resource_quota); grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner, - &resource_quota->bpstep_closure, + &resource_quota->rq_step_closure, GRPC_ERROR_NONE, false); } /* returns true if all allocations are completed */ -static bool bpalloc(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota) { +static bool rq_alloc(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; while ((resource_user = - bulist_pop(resource_quota, GRPC_BULIST_AWAITING_ALLOCATION))) { + rulist_pop(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool < 0 && -resource_user->free_pool <= resource_quota->free_pool) { @@ -193,7 +195,7 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, resource_quota->free_pool -= amt; if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 - " bytes; bp_free_pool -> %" PRId64, + " bytes; rq_free_pool -> %" PRId64, resource_quota->name, resource_user->name, amt, resource_quota->free_pool); } @@ -206,7 +208,7 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); gpr_mu_unlock(&resource_user->mu); } else { - bulist_add_head(resource_user, GRPC_BULIST_AWAITING_ALLOCATION); + rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); gpr_mu_unlock(&resource_user->mu); return false; } @@ -215,11 +217,11 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, } /* returns true if any memory could be reclaimed from buffers */ -static bool bpscavenge(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota) { +static bool rq_scavenge(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; while ((resource_user = - bulist_pop(resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { + rulist_pop(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool > 0) { int64_t amt = resource_user->free_pool; @@ -227,7 +229,7 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, resource_quota->free_pool += amt; if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 - " bytes; bp_free_pool -> %" PRId64, + " bytes; rq_free_pool -> %" PRId64, resource_quota->name, resource_user->name, amt, resource_quota->free_pool); } @@ -240,16 +242,16 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, return false; } -/* returns true if reclaimation is proceeding */ -static bool bpreclaim(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota, bool destructive) { +/* returns true if reclamation is proceeding */ +static bool rq_reclaim(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota, bool destructive) { if (resource_quota->reclaiming) return true; - grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE - : GRPC_BULIST_RECLAIMER_BENIGN; - grpc_resource_user *resource_user = bulist_pop(resource_quota, list); + grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE + : GRPC_RULIST_RECLAIMER_BENIGN; + grpc_resource_user *resource_user = rulist_pop(resource_quota, list); if (resource_user == NULL) return false; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", + gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclamation", resource_quota->name, resource_user->name, destructive ? "destructive" : "benign"); } @@ -262,7 +264,7 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, } /******************************************************************************* - * bu_slice: a slice implementation that is backed by a grpc_resource_user + * ru_slice: a slice implementation that is backed by a grpc_resource_user */ typedef struct { @@ -270,22 +272,24 @@ typedef struct { gpr_refcount refs; grpc_resource_user *resource_user; size_t size; -} bu_slice_refcount; +} ru_slice_refcount; -static void bu_slice_ref(void *p) { - bu_slice_refcount *rc = p; +static void ru_slice_ref(void *p) { + ru_slice_refcount *rc = p; gpr_ref(&rc->refs); } -static void bu_slice_unref(void *p) { - bu_slice_refcount *rc = p; +static void ru_slice_unref(void *p) { + ru_slice_refcount *rc = p; if (gpr_unref(&rc->refs)) { /* TODO(ctiller): this is dangerous, but I think safe for now: we have no guarantee here that we're at a safe point for creating an execution context, but we have no way of writing this code otherwise. In the future: consider lifting gpr_slice to grpc, and offering an - internal_{ref,unref} pair that is execution context aware. Alternatively, - make exec_ctx be thread local and 'do the right thing' (whatever that is) + internal_{ref,unref} pair that is execution context aware. + Alternatively, + make exec_ctx be thread local and 'do the right thing' (whatever that + is) if NULL */ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size); @@ -294,11 +298,11 @@ static void bu_slice_unref(void *p) { } } -static gpr_slice bu_slice_create(grpc_resource_user *resource_user, +static gpr_slice ru_slice_create(grpc_resource_user *resource_user, size_t size) { - bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size); - rc->base.ref = bu_slice_ref; - rc->base.unref = bu_slice_unref; + ru_slice_refcount *rc = gpr_malloc(sizeof(ru_slice_refcount) + size); + rc->base.ref = ru_slice_ref; + rc->base.unref = ru_slice_unref; gpr_ref_init(&rc->refs, 1); rc->resource_user = resource_user; rc->size = size; @@ -313,62 +317,62 @@ static gpr_slice bu_slice_create(grpc_resource_user *resource_user, * grpc_resource_quota internal implementation */ -static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { +static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { grpc_resource_user *resource_user = bu; - if (bulist_empty(resource_user->resource_quota, - GRPC_BULIST_AWAITING_ALLOCATION)) { - bpstep_sched(exec_ctx, resource_user->resource_quota); + if (rulist_empty(resource_user->resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION)) { + rq_step_sched(exec_ctx, resource_user->resource_quota); } - bulist_add_tail(resource_user, GRPC_BULIST_AWAITING_ALLOCATION); + rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); } -static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { grpc_resource_user *resource_user = bu; - if (!bulist_empty(resource_user->resource_quota, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_NON_EMPTY_FREE_POOL)) { - bpstep_sched(exec_ctx, resource_user->resource_quota); + if (!rulist_empty(resource_user->resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_NON_EMPTY_FREE_POOL)) { + rq_step_sched(exec_ctx, resource_user->resource_quota); } - bulist_add_tail(resource_user, GRPC_BULIST_NON_EMPTY_FREE_POOL); + rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } -static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { grpc_resource_user *resource_user = bu; - if (!bulist_empty(resource_user->resource_quota, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_NON_EMPTY_FREE_POOL) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_RECLAIMER_BENIGN)) { - bpstep_sched(exec_ctx, resource_user->resource_quota); + if (!rulist_empty(resource_user->resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_NON_EMPTY_FREE_POOL) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_RECLAIMER_BENIGN)) { + rq_step_sched(exec_ctx, resource_user->resource_quota); } - bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_BENIGN); + rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); } -static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { grpc_resource_user *resource_user = bu; - if (!bulist_empty(resource_user->resource_quota, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_NON_EMPTY_FREE_POOL) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_RECLAIMER_BENIGN) && - bulist_empty(resource_user->resource_quota, - GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) { - bpstep_sched(exec_ctx, resource_user->resource_quota); + if (!rulist_empty(resource_user->resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_NON_EMPTY_FREE_POOL) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_RECLAIMER_BENIGN) && + rulist_empty(resource_user->resource_quota, + GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) { + rq_step_sched(exec_ctx, resource_user->resource_quota); } - bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); + rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } -static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { +static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { grpc_resource_user *resource_user = bu; GPR_ASSERT(resource_user->allocated == 0); - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { - bulist_remove(resource_user, (grpc_bulist)i); + for (int i = 0; i < GRPC_RULIST_COUNT; i++) { + rulist_remove(resource_user, (grpc_rulist)i); } grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED, NULL); @@ -379,17 +383,17 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { GRPC_ERROR_NONE, NULL); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; - bpstep_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(exec_ctx, resource_user->resource_quota); } } -static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, +static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, grpc_error *error) { grpc_resource_user_slice_allocator *slice_allocator = ts; if (error == GRPC_ERROR_NONE) { for (size_t i = 0; i < slice_allocator->count; i++) { gpr_slice_buffer_add_indexed( - slice_allocator->dest, bu_slice_create(slice_allocator->resource_user, + slice_allocator->dest, ru_slice_create(slice_allocator->resource_user, slice_allocator->length)); } } @@ -400,29 +404,29 @@ typedef struct { int64_t size; grpc_resource_quota *resource_quota; grpc_closure closure; -} bp_resize_args; +} rq_resize_args; -static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - bp_resize_args *a = args; +static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { + rq_resize_args *a = args; int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; if (delta < 0 && a->resource_quota->free_pool < 0) { - bpstep_sched(exec_ctx, a->resource_quota); + rq_step_sched(exec_ctx, a->resource_quota); } else if (delta > 0 && - !bulist_empty(a->resource_quota, - GRPC_BULIST_AWAITING_ALLOCATION)) { - bpstep_sched(exec_ctx, a->resource_quota); + !rulist_empty(a->resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION)) { + rq_step_sched(exec_ctx, a->resource_quota); } grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota); gpr_free(a); } -static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, - grpc_error *error) { +static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error) { grpc_resource_quota *resource_quota = bp; resource_quota->reclaiming = false; - bpstep_sched(exec_ctx, resource_quota); + rq_step_sched(exec_ctx, resource_quota); grpc_resource_quota_internal_unref(exec_ctx, resource_quota); } @@ -444,10 +448,10 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init(&resource_quota->bpstep_closure, bpstep, resource_quota); - grpc_closure_init(&resource_quota->bpreclaimation_done_closure, - bp_reclaimation_done, resource_quota); - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota); + grpc_closure_init(&resource_quota->rq_reclamation_done_closure, + rq_reclamation_done, resource_quota); + for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_quota->roots[i] = NULL; } return resource_quota; @@ -481,10 +485,10 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - bp_resize_args *a = gpr_malloc(sizeof(*a)); + rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_internal_ref(resource_quota); a->size = (int64_t)size; - grpc_closure_init(&a->closure, bp_resize, a); + grpc_closure_init(&a->closure, rq_resize, a); grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure, GRPC_ERROR_NONE, false); grpc_exec_ctx_finish(&exec_ctx); @@ -497,29 +501,29 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args) { for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { if (channel_args->args[i].type == GRPC_ARG_POINTER) { return grpc_resource_quota_internal_ref( channel_args->args[i].value.pointer.p); } else { - gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer"); + gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer"); } } } return grpc_resource_quota_create(NULL); } -static void *bp_copy(void *bp) { +static void *rq_copy(void *bp) { grpc_resource_quota_ref(bp); return bp; } -static void bp_destroy(void *bp) { grpc_resource_quota_unref(bp); } +static void rq_destroy(void *bp) { grpc_resource_quota_unref(bp); } -static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); } const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { - static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp}; + static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp}; return &vtable; } @@ -532,15 +536,15 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, const char *name) { resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota); - grpc_closure_init(&resource_user->allocate_closure, &bu_allocate, + grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, resource_user); grpc_closure_init(&resource_user->add_to_free_pool_closure, - &bu_add_to_free_pool, resource_user); + &ru_add_to_free_pool, resource_user); grpc_closure_init(&resource_user->post_reclaimer_closure[0], - &bu_post_benign_reclaimer, resource_user); + &ru_post_benign_reclaimer, resource_user); grpc_closure_init(&resource_user->post_reclaimer_closure[1], - &bu_post_destructive_reclaimer, resource_user); - grpc_closure_init(&resource_user->destroy_closure, &bu_destroy, + &ru_post_destructive_reclaimer, resource_user); + grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user); gpr_mu_init(&resource_user->mu); resource_user->allocated = 0; @@ -551,7 +555,7 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } #ifndef NDEBUG @@ -678,22 +682,22 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, } } -void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user) { +void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", + gpr_log(GPR_DEBUG, "BP %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } grpc_combiner_execute( exec_ctx, resource_user->resource_quota->combiner, - &resource_user->resource_quota->bpreclaimation_done_closure, + &resource_user->resource_quota->rq_reclamation_done_closure, GRPC_ERROR_NONE, false); } void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { - grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices, + grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices, slice_allocator); grpc_closure_init(&slice_allocator->on_done, cb, p); slice_allocator->resource_user = resource_user; diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index c4015b42cc4..af94a199115 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -38,6 +38,37 @@ #include "src/core/lib/iomgr/exec_ctx.h" +/** \file Tracks resource usage against a pool. + + The current implementation tracks only memory usage, but in the future + this may be extended to (for example) threads and file descriptors. + + A grpc_resource_quota represents the pooled resources, and + grpc_resource_user instances attach to the quota and consume those + resources. They also offer a vector for reclamation: if we become + resource constrained, grpc_resource_user instances are asked (in turn) to + free up whatever they can so that the system as a whole can make progress. + + There are three kinds of reclamation that take place: + - an internal reclamation, where cached resource at the resource user level + is returned to the quota + - a benign reclamation phase, whereby resources that are in use but are not + helping anything make progress are reclaimed + - a destructive reclamation, whereby resources that are helping something + make progress may be enacted so that at least one part of the system can + complete. + + These reclamations are tried in priority order, and only one reclamation + is outstanding for a quota at any given time (meaning that if a destructive + reclamation makes progress, we may follow up with a benign reclamation). + + Future work will be to expose the current resource pressure so that back + pressure can be applied to avoid reclamation phases starting. + + Resource users own references to resource quotas, and resource quotas + maintain lists of users (which users arrange to leave before they are + destroyed) */ + extern int grpc_resource_quota_trace; grpc_resource_quota *grpc_resource_quota_internal_ref( @@ -47,46 +78,83 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); +/* Resource users are kept in (potentially) several intrusive linked lists + at once. These are the list names. */ typedef enum { - GRPC_BULIST_AWAITING_ALLOCATION, - GRPC_BULIST_NON_EMPTY_FREE_POOL, - GRPC_BULIST_RECLAIMER_BENIGN, - GRPC_BULIST_RECLAIMER_DESTRUCTIVE, - GRPC_BULIST_COUNT -} grpc_bulist; + /* Resource users that are waiting for an allocation */ + GRPC_RULIST_AWAITING_ALLOCATION, + /* Resource users that have free memory available for internal reclamation */ + GRPC_RULIST_NON_EMPTY_FREE_POOL, + /* Resource users that have published a benign reclamation is available */ + GRPC_RULIST_RECLAIMER_BENIGN, + /* Resource users that have published a destructive reclamation is + available */ + GRPC_RULIST_RECLAIMER_DESTRUCTIVE, + /* Number of lists: must be last */ + GRPC_RULIST_COUNT +} grpc_rulist; typedef struct grpc_resource_user grpc_resource_user; +/* Internal linked list pointers for a resource user */ typedef struct { grpc_resource_user *next; grpc_resource_user *prev; } grpc_resource_user_link; struct grpc_resource_user { + /* The quota this resource user consumes from */ grpc_resource_quota *resource_quota; + /* Closure to schedule an allocation onder the resource quota combiner lock */ grpc_closure allocate_closure; + /* Closure to publish a non empty free pool under the resource quota combiner + lock */ grpc_closure add_to_free_pool_closure; #ifndef NDEBUG + /* Canary object to detect leaked resource users with ASAN */ void *asan_canary; #endif gpr_mu mu; + /* Total allocated memory outstanding by this resource user; + always positive */ int64_t allocated; + /* The amount of memory this user has cached for its own use: to avoid quota + contention, each resource user can keep some memory aside from the quota, + and the quota can pull it back under memory pressure. + This value can become negative if more memory has been requested than + existed in the free pool, at which point the quota is consulted to bring + this value non-negative (asynchronously). */ int64_t free_pool; + /* A list of closures to call once free_pool becomes non-negative - ie when + all outstanding allocations have been granted. */ grpc_closure_list on_allocated; + /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; + /* True if we are currently trying to add ourselves to the non-free quota + list, false otherwise */ bool added_to_free_pool; + /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer + */ grpc_closure *reclaimers[2]; + /* Trampoline closures to finish reclamation and re-enter the quota combiner + lock */ grpc_closure post_reclaimer_closure[2]; + /* Closure to execute under the quota combiner to de-register and shutdown the + resource user */ grpc_closure destroy_closure; + /* User supplied closure to call once the user has finished shutting down AND + all outstanding allocations have been freed */ gpr_atm on_done_destroy_closure; - grpc_resource_user_link links[GRPC_BULIST_COUNT]; + /* Links in the various grpc_rulist lists */ + grpc_resource_user_link links[GRPC_RULIST_COUNT]; + /* The name of this resource user, for debugging/tracing */ char *name; }; @@ -99,17 +167,29 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user); +/* Allocate from the resource user (and it's quota). + If optional_on_done is NULL, then allocate immediately. This may push the + quota over-limit, at which point reclamation will kick in. + If optional_on_done is non-NULL, it will be scheduled when the allocation has + been granted by the quota. */ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size, grpc_closure *optional_on_done); +/* Release memory back to the quota */ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size); +/* Post a memory reclaimer to the resource user. Only one benign and one + destructive reclaimer can be posted at once. When executed, the reclaimer + MUST call grpc_resource_user_finish_reclamation before it completes, to + return control to the resource quota. */ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, bool destructive, grpc_closure *closure); -void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user); +/* Finish a reclamation step */ +void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user); +/* Helper to allocate slices from a resource user */ typedef struct grpc_resource_user_slice_allocator { grpc_closure on_allocated; grpc_closure on_done; @@ -119,10 +199,12 @@ typedef struct grpc_resource_user_slice_allocator { grpc_resource_user *resource_user; } grpc_resource_user_slice_allocator; +/* Initialize a slice allocator */ void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p); +/* Allocate \a count slices of length \a length into \a dest. */ void grpc_resource_user_alloc_slices( grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator, size_t length, diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index b854e5aadc6..d1b0e9cd19f 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -39,6 +39,8 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/sockaddr.h" +/* Channel arg (integer) setting how large a slice to try and read from the wire + each time recvmsg (or equivalent) is called */ #define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" /* Asynchronously connect to an address (specified as (addr, len)), and call diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index e74a696c2fd..500c9881460 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -134,7 +134,8 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( 8 * 1024 * 1024}; tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( &channel_args->args[i], options); - } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_internal_unref(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_internal_ref( channel_args->args[i].value.pointer.p); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index b2eb89f429a..648736caa9b 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -175,7 +175,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } - } else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) { + } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); s->resource_quota = @@ -183,7 +183,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } else { grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); - return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL + return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } } diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index 5462e0d72ab..d136d49c89f 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -116,7 +116,8 @@ void ChannelArguments::SetUserAgentPrefix( void ChannelArguments::SetResourceQuota( const grpc::ResourceQuota& resource_quota) { - SetPointerWithVtable(GRPC_ARG_BUFFER_POOL, resource_quota.c_resource_quota(), + SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, + resource_quota.c_resource_quota(), grpc_resource_quota_arg_vtable()); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 68df1d11e1e..953a4337ece 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -197,7 +197,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { maybe_default_compression_algorithm_.algorithm); } if (resource_quota_ != nullptr) { - args.SetPointerWithVtable(GRPC_ARG_BUFFER_POOL, resource_quota_, + args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_, grpc_resource_quota_arg_vtable()); } std::unique_ptr server(new Server(thread_pool.release(), true, diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index cc04b14577a..b9acdc1d4a7 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -90,7 +90,7 @@ END2END_TESTS = { 'bad_hostname': default_test_options, 'binary_metadata': default_test_options, 'resource_quota_server': default_test_options._replace(large_writes=True, - proxyable=False), + proxyable=False), 'call_creds': default_test_options._replace(secure=True), 'cancel_after_accept': default_test_options._replace(cpu_cost=LOWCPU), 'cancel_after_client_done': default_test_options, diff --git a/test/core/end2end/tests/buffer_pool_server.c b/test/core/end2end/tests/buffer_pool_server.c index 81850aea58f..beda4f7487a 100644 --- a/test/core/end2end/tests/buffer_pool_server.c +++ b/test/core/end2end/tests/buffer_pool_server.c @@ -119,7 +119,7 @@ void resource_quota_server(grpc_end2end_test_config config) { #define SERVER_END_BASE_TAG 4000 grpc_arg arg; - arg.key = GRPC_ARG_BUFFER_POOL; + arg.key = GRPC_ARG_RESOURCE_QUOTA; arg.type = GRPC_ARG_POINTER; arg.value.pointer.p = resource_quota; arg.value.pointer.vtable = grpc_resource_quota_arg_vtable(); diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 02fef94f677..a2431eed7e5 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -119,7 +119,7 @@ void resource_quota_server(grpc_end2end_test_config config) { #define SERVER_END_BASE_TAG 4000 grpc_arg arg; - arg.key = GRPC_ARG_BUFFER_POOL; + arg.key = GRPC_ARG_RESOURCE_QUOTA; arg.type = GRPC_ARG_POINTER; arg.value.pointer.p = resource_quota; arg.value.pointer.vtable = grpc_resource_quota_arg_vtable(); diff --git a/test/core/iomgr/resource_quota_test.c b/test/core/iomgr/resource_quota_test.c index 5963ed089b4..0fb94a495d9 100644 --- a/test/core/iomgr/resource_quota_test.c +++ b/test/core/iomgr/resource_quota_test.c @@ -57,7 +57,7 @@ static void reclaimer_cb(grpc_exec_ctx *exec_ctx, void *args, GPR_ASSERT(error == GRPC_ERROR_NONE); reclaimer_args *a = args; grpc_resource_user_free(exec_ctx, a->resource_user, a->size); - grpc_resource_user_finish_reclaimation(exec_ctx, a->resource_user); + grpc_resource_user_finish_reclamation(exec_ctx, a->resource_user); grpc_closure_run(exec_ctx, a->then, GRPC_ERROR_NONE); gpr_free(a); }