Merge branch 'buffer_pools_for_realsies' into bwest

pull/9511/head
Craig Tiller 8 years ago
commit c7f1a6d5fc
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 2
      src/core/lib/http/httpcli.c
  4. 270
      src/core/lib/iomgr/resource_quota.c
  5. 100
      src/core/lib/iomgr/resource_quota.h
  6. 2
      src/core/lib/iomgr/tcp_client.h
  7. 3
      src/core/lib/iomgr/tcp_client_posix.c
  8. 4
      src/core/lib/iomgr/tcp_server_posix.c
  9. 3
      src/cpp/common/channel_arguments.cc
  10. 2
      src/cpp/server/server_builder.cc
  11. 2
      test/core/end2end/gen_build_yaml.py
  12. 2
      test/core/end2end/tests/buffer_pool_server.c
  13. 2
      test/core/end2end/tests/resource_quota_server.c
  14. 2
      test/core/iomgr/resource_quota_test.c

@ -203,7 +203,7 @@ typedef struct {
#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
/** If non-zero, a pointer to a buffer pool (use grpc_resource_quota_arg_vtable
to fetch an appropriate pointer arg vtable */
#define GRPC_ARG_BUFFER_POOL "grpc.resource_quota"
#define GRPC_ARG_RESOURCE_QUOTA "grpc.resource_quota"
/** Service config data, to be passed to subchannels.
Not intended for external use. */
#define GRPC_ARG_SERVICE_CONFIG "grpc.service_config"

@ -2266,13 +2266,13 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_slice_from_static_string("Buffers full"));
} else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG,
"HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclaimation(
grpc_resource_user_finish_reclamation(
exec_ctx, grpc_endpoint_get_resource_user(t->ep));
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
@ -2298,7 +2298,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclaimation(
grpc_resource_user_finish_reclamation(
exec_ctx, grpc_endpoint_get_resource_user(t->ep));
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");

@ -227,7 +227,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
grpc_arg arg;
arg.key = GRPC_ARG_BUFFER_POOL;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
arg.value.pointer.p = req->resource_quota;
arg.value.pointer.vtable = grpc_resource_quota_arg_vtable();

@ -44,27 +44,29 @@
int grpc_resource_quota_trace = 0;
typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
typedef struct {
grpc_resource_user *head;
grpc_resource_user *tail;
} grpc_resource_user_list;
struct grpc_resource_quota {
/* refcount */
gpr_refcount refs;
/* Master combiner lock: all activity on a quota executes under this combiner
*/
grpc_combiner *combiner;
/* Size of the resource quota */
int64_t size;
/* Amount of free memory in the resource quota */
int64_t free_pool;
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
/* Are we currently reclaiming memory */
bool reclaiming;
grpc_closure bpstep_closure;
grpc_closure bpreclaimation_done_closure;
/* Closure around rq_step */
grpc_closure rq_step_closure;
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
grpc_resource_user *roots[GRPC_BULIST_COUNT];
/* Roots of all resource user lists */
grpc_resource_user *roots[GRPC_RULIST_COUNT];
char *name;
};
@ -73,8 +75,8 @@ struct grpc_resource_quota {
* list management
*/
static void bulist_add_tail(grpc_resource_user *resource_user,
grpc_bulist list) {
static void rulist_add_tail(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
if (*root == NULL) {
@ -89,8 +91,8 @@ static void bulist_add_tail(grpc_resource_user *resource_user,
}
}
static void bulist_add_head(grpc_resource_user *resource_user,
grpc_bulist list) {
static void rulist_add_head(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
if (*root == NULL) {
@ -106,13 +108,13 @@ static void bulist_add_head(grpc_resource_user *resource_user,
}
}
static bool bulist_empty(grpc_resource_quota *resource_quota,
grpc_bulist list) {
static bool rulist_empty(grpc_resource_quota *resource_quota,
grpc_rulist list) {
return resource_quota->roots[list] == NULL;
}
static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota,
grpc_bulist list) {
static grpc_resource_user *rulist_pop(grpc_resource_quota *resource_quota,
grpc_rulist list) {
grpc_resource_user **root = &resource_quota->roots[list];
grpc_resource_user *resource_user = *root;
if (resource_user == NULL) {
@ -131,7 +133,7 @@ static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota,
return resource_user;
}
static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) {
static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
if (resource_user->links[list].next == NULL) return;
grpc_resource_quota *resource_quota = resource_user->resource_quota;
if (resource_quota->roots[list] == resource_user) {
@ -150,41 +152,41 @@ static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) {
* buffer pool state machine
*/
static bool bpalloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool bpscavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool bpreclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive);
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive);
static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
static void rq_step(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
grpc_resource_quota *resource_quota = bp;
resource_quota->step_scheduled = false;
do {
if (bpalloc(exec_ctx, resource_quota)) goto done;
} while (bpscavenge(exec_ctx, resource_quota));
bpreclaim(exec_ctx, resource_quota, false) ||
bpreclaim(exec_ctx, resource_quota, true);
if (rq_alloc(exec_ctx, resource_quota)) goto done;
} while (rq_scavenge(exec_ctx, resource_quota));
rq_reclaim(exec_ctx, resource_quota, false) ||
rq_reclaim(exec_ctx, resource_quota, true);
done:
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
}
static void bpstep_sched(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
static void rq_step_sched(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_internal_ref(resource_quota);
grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
&resource_quota->bpstep_closure,
&resource_quota->rq_step_closure,
GRPC_ERROR_NONE, false);
}
/* returns true if all allocations are completed */
static bool bpalloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
while ((resource_user =
bulist_pop(resource_quota, GRPC_BULIST_AWAITING_ALLOCATION))) {
rulist_pop(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool < 0 &&
-resource_user->free_pool <= resource_quota->free_pool) {
@ -193,7 +195,7 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx,
resource_quota->free_pool -= amt;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
" bytes; bp_free_pool -> %" PRId64,
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
}
@ -206,7 +208,7 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
gpr_mu_unlock(&resource_user->mu);
} else {
bulist_add_head(resource_user, GRPC_BULIST_AWAITING_ALLOCATION);
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
gpr_mu_unlock(&resource_user->mu);
return false;
}
@ -215,11 +217,11 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx,
}
/* returns true if any memory could be reclaimed from buffers */
static bool bpscavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
while ((resource_user =
bulist_pop(resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
rulist_pop(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool > 0) {
int64_t amt = resource_user->free_pool;
@ -227,7 +229,7 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx,
resource_quota->free_pool += amt;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
" bytes; bp_free_pool -> %" PRId64,
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
}
@ -240,16 +242,16 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx,
return false;
}
/* returns true if reclaimation is proceeding */
static bool bpreclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive) {
/* returns true if reclamation is proceeding */
static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive) {
if (resource_quota->reclaiming) return true;
grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE
: GRPC_BULIST_RECLAIMER_BENIGN;
grpc_resource_user *resource_user = bulist_pop(resource_quota, list);
grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
: GRPC_RULIST_RECLAIMER_BENIGN;
grpc_resource_user *resource_user = rulist_pop(resource_quota, list);
if (resource_user == NULL) return false;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation",
gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclamation",
resource_quota->name, resource_user->name,
destructive ? "destructive" : "benign");
}
@ -262,7 +264,7 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
* bu_slice: a slice implementation that is backed by a grpc_resource_user
* ru_slice: a slice implementation that is backed by a grpc_resource_user
*/
typedef struct {
@ -270,22 +272,24 @@ typedef struct {
gpr_refcount refs;
grpc_resource_user *resource_user;
size_t size;
} bu_slice_refcount;
} ru_slice_refcount;
static void bu_slice_ref(void *p) {
bu_slice_refcount *rc = p;
static void ru_slice_ref(void *p) {
ru_slice_refcount *rc = p;
gpr_ref(&rc->refs);
}
static void bu_slice_unref(void *p) {
bu_slice_refcount *rc = p;
static void ru_slice_unref(void *p) {
ru_slice_refcount *rc = p;
if (gpr_unref(&rc->refs)) {
/* TODO(ctiller): this is dangerous, but I think safe for now:
we have no guarantee here that we're at a safe point for creating an
execution context, but we have no way of writing this code otherwise.
In the future: consider lifting gpr_slice to grpc, and offering an
internal_{ref,unref} pair that is execution context aware. Alternatively,
make exec_ctx be thread local and 'do the right thing' (whatever that is)
internal_{ref,unref} pair that is execution context aware.
Alternatively,
make exec_ctx be thread local and 'do the right thing' (whatever that
is)
if NULL */
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size);
@ -294,11 +298,11 @@ static void bu_slice_unref(void *p) {
}
}
static gpr_slice bu_slice_create(grpc_resource_user *resource_user,
static gpr_slice ru_slice_create(grpc_resource_user *resource_user,
size_t size) {
bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size);
rc->base.ref = bu_slice_ref;
rc->base.unref = bu_slice_unref;
ru_slice_refcount *rc = gpr_malloc(sizeof(ru_slice_refcount) + size);
rc->base.ref = ru_slice_ref;
rc->base.unref = ru_slice_unref;
gpr_ref_init(&rc->refs, 1);
rc->resource_user = resource_user;
rc->size = size;
@ -313,62 +317,62 @@ static gpr_slice bu_slice_create(grpc_resource_user *resource_user,
* grpc_resource_quota internal implementation
*/
static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
grpc_resource_user *resource_user = bu;
if (bulist_empty(resource_user->resource_quota,
GRPC_BULIST_AWAITING_ALLOCATION)) {
bpstep_sched(exec_ctx, resource_user->resource_quota);
if (rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
bulist_add_tail(resource_user, GRPC_BULIST_AWAITING_ALLOCATION);
rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
}
static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
if (!bulist_empty(resource_user->resource_quota,
GRPC_BULIST_AWAITING_ALLOCATION) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_NON_EMPTY_FREE_POOL)) {
bpstep_sched(exec_ctx, resource_user->resource_quota);
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
bulist_add_tail(resource_user, GRPC_BULIST_NON_EMPTY_FREE_POOL);
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
}
static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
if (!bulist_empty(resource_user->resource_quota,
GRPC_BULIST_AWAITING_ALLOCATION) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_RECLAIMER_BENIGN)) {
bpstep_sched(exec_ctx, resource_user->resource_quota);
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_RECLAIMER_BENIGN)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_BENIGN);
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
}
static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
if (!bulist_empty(resource_user->resource_quota,
GRPC_BULIST_AWAITING_ALLOCATION) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_RECLAIMER_BENIGN) &&
bulist_empty(resource_user->resource_quota,
GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) {
bpstep_sched(exec_ctx, resource_user->resource_quota);
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_RECLAIMER_BENIGN) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE);
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
grpc_resource_user *resource_user = bu;
GPR_ASSERT(resource_user->allocated == 0);
for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
bulist_remove(resource_user, (grpc_bulist)i);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED, NULL);
@ -379,17 +383,17 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
GRPC_ERROR_NONE, NULL);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
bpstep_sched(exec_ctx, resource_user->resource_quota);
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
}
static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
grpc_error *error) {
grpc_resource_user_slice_allocator *slice_allocator = ts;
if (error == GRPC_ERROR_NONE) {
for (size_t i = 0; i < slice_allocator->count; i++) {
gpr_slice_buffer_add_indexed(
slice_allocator->dest, bu_slice_create(slice_allocator->resource_user,
slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
slice_allocator->length));
}
}
@ -400,29 +404,29 @@ typedef struct {
int64_t size;
grpc_resource_quota *resource_quota;
grpc_closure closure;
} bp_resize_args;
} rq_resize_args;
static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
bp_resize_args *a = args;
static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
rq_resize_args *a = args;
int64_t delta = a->size - a->resource_quota->size;
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
if (delta < 0 && a->resource_quota->free_pool < 0) {
bpstep_sched(exec_ctx, a->resource_quota);
rq_step_sched(exec_ctx, a->resource_quota);
} else if (delta > 0 &&
!bulist_empty(a->resource_quota,
GRPC_BULIST_AWAITING_ALLOCATION)) {
bpstep_sched(exec_ctx, a->resource_quota);
!rulist_empty(a->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION)) {
rq_step_sched(exec_ctx, a->resource_quota);
}
grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
gpr_free(a);
}
static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
grpc_error *error) {
static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp,
grpc_error *error) {
grpc_resource_quota *resource_quota = bp;
resource_quota->reclaiming = false;
bpstep_sched(exec_ctx, resource_quota);
rq_step_sched(exec_ctx, resource_quota);
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
}
@ -444,10 +448,10 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
grpc_closure_init(&resource_quota->bpstep_closure, bpstep, resource_quota);
grpc_closure_init(&resource_quota->bpreclaimation_done_closure,
bp_reclaimation_done, resource_quota);
for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota);
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
return resource_quota;
@ -481,10 +485,10 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
bp_resize_args *a = gpr_malloc(sizeof(*a));
rq_resize_args *a = gpr_malloc(sizeof(*a));
a->resource_quota = grpc_resource_quota_internal_ref(resource_quota);
a->size = (int64_t)size;
grpc_closure_init(&a->closure, bp_resize, a);
grpc_closure_init(&a->closure, rq_resize, a);
grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
GRPC_ERROR_NONE, false);
grpc_exec_ctx_finish(&exec_ctx);
@ -497,29 +501,29 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) {
return grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
} else {
gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer");
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
}
}
}
return grpc_resource_quota_create(NULL);
}
static void *bp_copy(void *bp) {
static void *rq_copy(void *bp) {
grpc_resource_quota_ref(bp);
return bp;
}
static void bp_destroy(void *bp) { grpc_resource_quota_unref(bp); }
static void rq_destroy(void *bp) { grpc_resource_quota_unref(bp); }
static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) {
static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp};
static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
return &vtable;
}
@ -532,15 +536,15 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
const char *name) {
resource_user->resource_quota =
grpc_resource_quota_internal_ref(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &bu_allocate,
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
resource_user);
grpc_closure_init(&resource_user->add_to_free_pool_closure,
&bu_add_to_free_pool, resource_user);
&ru_add_to_free_pool, resource_user);
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
&bu_post_benign_reclaimer, resource_user);
&ru_post_benign_reclaimer, resource_user);
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
&bu_post_destructive_reclaimer, resource_user);
grpc_closure_init(&resource_user->destroy_closure, &bu_destroy,
&ru_post_destructive_reclaimer, resource_user);
grpc_closure_init(&resource_user->destroy_closure, &ru_destroy,
resource_user);
gpr_mu_init(&resource_user->mu);
resource_user->allocated = 0;
@ -551,7 +555,7 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL;
}
#ifndef NDEBUG
@ -678,22 +682,22 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
}
}
void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
gpr_log(GPR_DEBUG, "BP %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
grpc_combiner_execute(
exec_ctx, resource_user->resource_quota->combiner,
&resource_user->resource_quota->bpreclaimation_done_closure,
&resource_user->resource_quota->rq_reclamation_done_closure,
GRPC_ERROR_NONE, false);
}
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices,
grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
slice_allocator);
grpc_closure_init(&slice_allocator->on_done, cb, p);
slice_allocator->resource_user = resource_user;

@ -38,6 +38,37 @@
#include "src/core/lib/iomgr/exec_ctx.h"
/** \file Tracks resource usage against a pool.
The current implementation tracks only memory usage, but in the future
this may be extended to (for example) threads and file descriptors.
A grpc_resource_quota represents the pooled resources, and
grpc_resource_user instances attach to the quota and consume those
resources. They also offer a vector for reclamation: if we become
resource constrained, grpc_resource_user instances are asked (in turn) to
free up whatever they can so that the system as a whole can make progress.
There are three kinds of reclamation that take place:
- an internal reclamation, where cached resource at the resource user level
is returned to the quota
- a benign reclamation phase, whereby resources that are in use but are not
helping anything make progress are reclaimed
- a destructive reclamation, whereby resources that are helping something
make progress may be enacted so that at least one part of the system can
complete.
These reclamations are tried in priority order, and only one reclamation
is outstanding for a quota at any given time (meaning that if a destructive
reclamation makes progress, we may follow up with a benign reclamation).
Future work will be to expose the current resource pressure so that back
pressure can be applied to avoid reclamation phases starting.
Resource users own references to resource quotas, and resource quotas
maintain lists of users (which users arrange to leave before they are
destroyed) */
extern int grpc_resource_quota_trace;
grpc_resource_quota *grpc_resource_quota_internal_ref(
@ -47,46 +78,83 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
/* Resource users are kept in (potentially) several intrusive linked lists
at once. These are the list names. */
typedef enum {
GRPC_BULIST_AWAITING_ALLOCATION,
GRPC_BULIST_NON_EMPTY_FREE_POOL,
GRPC_BULIST_RECLAIMER_BENIGN,
GRPC_BULIST_RECLAIMER_DESTRUCTIVE,
GRPC_BULIST_COUNT
} grpc_bulist;
/* Resource users that are waiting for an allocation */
GRPC_RULIST_AWAITING_ALLOCATION,
/* Resource users that have free memory available for internal reclamation */
GRPC_RULIST_NON_EMPTY_FREE_POOL,
/* Resource users that have published a benign reclamation is available */
GRPC_RULIST_RECLAIMER_BENIGN,
/* Resource users that have published a destructive reclamation is
available */
GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
/* Number of lists: must be last */
GRPC_RULIST_COUNT
} grpc_rulist;
typedef struct grpc_resource_user grpc_resource_user;
/* Internal linked list pointers for a resource user */
typedef struct {
grpc_resource_user *next;
grpc_resource_user *prev;
} grpc_resource_user_link;
struct grpc_resource_user {
/* The quota this resource user consumes from */
grpc_resource_quota *resource_quota;
/* Closure to schedule an allocation onder the resource quota combiner lock */
grpc_closure allocate_closure;
/* Closure to publish a non empty free pool under the resource quota combiner
lock */
grpc_closure add_to_free_pool_closure;
#ifndef NDEBUG
/* Canary object to detect leaked resource users with ASAN */
void *asan_canary;
#endif
gpr_mu mu;
/* Total allocated memory outstanding by this resource user;
always positive */
int64_t allocated;
/* The amount of memory this user has cached for its own use: to avoid quota
contention, each resource user can keep some memory aside from the quota,
and the quota can pull it back under memory pressure.
This value can become negative if more memory has been requested than
existed in the free pool, at which point the quota is consulted to bring
this value non-negative (asynchronously). */
int64_t free_pool;
/* A list of closures to call once free_pool becomes non-negative - ie when
all outstanding allocations have been granted. */
grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */
bool allocating;
/* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */
bool added_to_free_pool;
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure *reclaimers[2];
/* Trampoline closures to finish reclamation and re-enter the quota combiner
lock */
grpc_closure post_reclaimer_closure[2];
/* Closure to execute under the quota combiner to de-register and shutdown the
resource user */
grpc_closure destroy_closure;
/* User supplied closure to call once the user has finished shutting down AND
all outstanding allocations have been freed */
gpr_atm on_done_destroy_closure;
grpc_resource_user_link links[GRPC_BULIST_COUNT];
/* Links in the various grpc_rulist lists */
grpc_resource_user_link links[GRPC_RULIST_COUNT];
/* The name of this resource user, for debugging/tracing */
char *name;
};
@ -99,17 +167,29 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
/* Allocate from the resource user (and it's quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
If optional_on_done is non-NULL, it will be scheduled when the allocation has
been granted by the quota. */
void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, size_t size,
grpc_closure *optional_on_done);
/* Release memory back to the quota */
void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, size_t size);
/* Post a memory reclaimer to the resource user. Only one benign and one
destructive reclaimer can be posted at once. When executed, the reclaimer
MUST call grpc_resource_user_finish_reclamation before it completes, to
return control to the resource quota. */
void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
bool destructive, grpc_closure *closure);
void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
/* Finish a reclamation step */
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
/* Helper to allocate slices from a resource user */
typedef struct grpc_resource_user_slice_allocator {
grpc_closure on_allocated;
grpc_closure on_done;
@ -119,10 +199,12 @@ typedef struct grpc_resource_user_slice_allocator {
grpc_resource_user *resource_user;
} grpc_resource_user_slice_allocator;
/* Initialize a slice allocator */
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p);
/* Allocate \a count slices of length \a length into \a dest. */
void grpc_resource_user_alloc_slices(
grpc_exec_ctx *exec_ctx,
grpc_resource_user_slice_allocator *slice_allocator, size_t length,

@ -39,6 +39,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/sockaddr.h"
/* Channel arg (integer) setting how large a slice to try and read from the wire
each time recvmsg (or equivalent) is called */
#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
/* Asynchronously connect to an address (specified as (addr, len)), and call

@ -134,7 +134,8 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
8 * 1024 * 1024};
tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);

@ -175,7 +175,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) {
} else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
s->resource_quota =
@ -183,7 +183,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
} else {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
}
}

@ -116,7 +116,8 @@ void ChannelArguments::SetUserAgentPrefix(
void ChannelArguments::SetResourceQuota(
const grpc::ResourceQuota& resource_quota) {
SetPointerWithVtable(GRPC_ARG_BUFFER_POOL, resource_quota.c_resource_quota(),
SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA,
resource_quota.c_resource_quota(),
grpc_resource_quota_arg_vtable());
}

@ -197,7 +197,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
maybe_default_compression_algorithm_.algorithm);
}
if (resource_quota_ != nullptr) {
args.SetPointerWithVtable(GRPC_ARG_BUFFER_POOL, resource_quota_,
args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
grpc_resource_quota_arg_vtable());
}
std::unique_ptr<Server> server(new Server(thread_pool.release(), true,

@ -90,7 +90,7 @@ END2END_TESTS = {
'bad_hostname': default_test_options,
'binary_metadata': default_test_options,
'resource_quota_server': default_test_options._replace(large_writes=True,
proxyable=False),
proxyable=False),
'call_creds': default_test_options._replace(secure=True),
'cancel_after_accept': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_after_client_done': default_test_options,

@ -119,7 +119,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
#define SERVER_END_BASE_TAG 4000
grpc_arg arg;
arg.key = GRPC_ARG_BUFFER_POOL;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
arg.value.pointer.p = resource_quota;
arg.value.pointer.vtable = grpc_resource_quota_arg_vtable();

@ -119,7 +119,7 @@ void resource_quota_server(grpc_end2end_test_config config) {
#define SERVER_END_BASE_TAG 4000
grpc_arg arg;
arg.key = GRPC_ARG_BUFFER_POOL;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
arg.value.pointer.p = resource_quota;
arg.value.pointer.vtable = grpc_resource_quota_arg_vtable();

@ -57,7 +57,7 @@ static void reclaimer_cb(grpc_exec_ctx *exec_ctx, void *args,
GPR_ASSERT(error == GRPC_ERROR_NONE);
reclaimer_args *a = args;
grpc_resource_user_free(exec_ctx, a->resource_user, a->size);
grpc_resource_user_finish_reclaimation(exec_ctx, a->resource_user);
grpc_resource_user_finish_reclamation(exec_ctx, a->resource_user);
grpc_closure_run(exec_ctx, a->then, GRPC_ERROR_NONE);
gpr_free(a);
}

Loading…
Cancel
Save