Review feedback

reviewable/pr8239/r5
Craig Tiller 8 years ago
parent fe8c5012d3
commit 6b5d682c98
  1. 8
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 117
      src/core/lib/iomgr/resource_quota.c
  3. 28
      src/core/lib/iomgr/resource_quota.h
  4. 353
      test/core/end2end/tests/buffer_pool_server.c
  5. 6
      test/core/end2end/tests/resource_quota_server.c
  6. 162
      test/core/iomgr/resource_quota_test.c

@ -2108,7 +2108,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
}
/*******************************************************************************
* BUFFER POOLS
* RESOURCE QUOTAS
*/
static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
@ -2152,6 +2152,8 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
/* Channel with no active streams: send a goaway to try and make it
* disconnect cleanly */
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
t->peer_string);
@ -2188,6 +2190,10 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_CHTTP2_ENHANCE_YOUR_CALM));
if (n > 1) {
/* Since we cancel one stream per destructive reclaimation, if
there are more streams left, we can immediately post a new
reclaimer in case the resource quota needs to free more
memory */
post_destructive_reclaimer(exec_ctx, t);
}
}

@ -49,6 +49,7 @@ struct grpc_resource_quota {
gpr_refcount refs;
/* Master combiner lock: all activity on a quota executes under this combiner
* (so no mutex is needed for this data structure)
*/
grpc_combiner *combiner;
/* Size of the resource quota */
@ -75,7 +76,7 @@ struct grpc_resource_quota {
* list management
*/
static void rulist_add_tail(grpc_resource_user *resource_user,
static void rulist_add_head(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
@ -91,7 +92,7 @@ static void rulist_add_tail(grpc_resource_user *resource_user,
}
}
static void rulist_add_head(grpc_resource_user *resource_user,
static void rulist_add_tail(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
@ -113,8 +114,8 @@ static bool rulist_empty(grpc_resource_quota *resource_quota,
return resource_quota->roots[list] == NULL;
}
static grpc_resource_user *rulist_pop(grpc_resource_quota *resource_quota,
grpc_rulist list) {
static grpc_resource_user *rulist_pop_tail(grpc_resource_quota *resource_quota,
grpc_rulist list) {
grpc_resource_user **root = &resource_quota->roots[list];
grpc_resource_user *resource_user = *root;
if (resource_user == NULL) {
@ -149,22 +150,22 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
}
/*******************************************************************************
* buffer pool state machine
* resource quota state machine
*/
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
static bool rq_reclaim_from_per_user_free_pool(
grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota);
static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive);
static void rq_step(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
grpc_resource_quota *resource_quota = bp;
static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) {
grpc_resource_quota *resource_quota = rq;
resource_quota->step_scheduled = false;
do {
if (rq_alloc(exec_ctx, resource_quota)) goto done;
} while (rq_scavenge(exec_ctx, resource_quota));
} while (rq_reclaim_from_per_user_free_pool(exec_ctx, resource_quota));
rq_reclaim(exec_ctx, resource_quota, false) ||
rq_reclaim(exec_ctx, resource_quota, true);
done:
@ -185,8 +186,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
while ((resource_user =
rulist_pop(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) {
while ((resource_user = rulist_pop_tail(resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool < 0 &&
-resource_user->free_pool <= resource_quota->free_pool) {
@ -194,13 +195,13 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
resource_user->free_pool = 0;
resource_quota->free_pool -= amt;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
}
} else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) {
gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request",
resource_quota->name, resource_user->name);
}
if (resource_user->free_pool >= 0) {
@ -208,7 +209,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
gpr_mu_unlock(&resource_user->mu);
return false;
}
@ -217,18 +218,18 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
/* returns true if any memory could be reclaimed from buffers */
static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
static bool rq_reclaim_from_per_user_free_pool(
grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
while ((resource_user =
rulist_pop(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
while ((resource_user = rulist_pop_tail(resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool > 0) {
int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool += amt;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
@ -248,10 +249,10 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
if (resource_quota->reclaiming) return true;
grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
: GRPC_RULIST_RECLAIMER_BENIGN;
grpc_resource_user *resource_user = rulist_pop(resource_quota, list);
grpc_resource_user *resource_user = rulist_pop_tail(resource_quota, list);
if (resource_user == NULL) return false;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclamation",
gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation",
resource_quota->name, resource_user->name,
destructive ? "destructive" : "benign");
}
@ -314,33 +315,34 @@ static gpr_slice ru_slice_create(grpc_resource_user *resource_user,
}
/*******************************************************************************
* grpc_resource_quota internal implementation
* grpc_resource_quota internal implementation: resource user manipulation under
* the combiner
*/
static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
grpc_resource_user *resource_user = bu;
static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
if (rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
}
static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
rulist_add_head(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
}
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@ -349,12 +351,12 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
GRPC_RULIST_RECLAIMER_BENIGN)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
}
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = bu;
grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@ -365,11 +367,11 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
grpc_resource_user *resource_user = bu;
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
GPR_ASSERT(resource_user->allocated == 0);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
@ -387,9 +389,9 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
}
}
static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_resource_user_slice_allocator *slice_allocator = ts;
grpc_resource_user_slice_allocator *slice_allocator = arg;
if (error == GRPC_ERROR_NONE) {
for (size_t i = 0; i < slice_allocator->count; i++) {
gpr_slice_buffer_add_indexed(
@ -400,6 +402,11 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
}
/*******************************************************************************
* grpc_resource_quota internal implementation: quota manipulation under the
* combiner
*/
typedef struct {
int64_t size;
grpc_resource_quota *resource_quota;
@ -411,20 +418,14 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
int64_t delta = a->size - a->resource_quota->size;
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
if (delta < 0 && a->resource_quota->free_pool < 0) {
rq_step_sched(exec_ctx, a->resource_quota);
} else if (delta > 0 &&
!rulist_empty(a->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION)) {
rq_step_sched(exec_ctx, a->resource_quota);
}
rq_step_sched(exec_ctx, a->resource_quota);
grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
gpr_free(a);
}
static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp,
static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_error *error) {
grpc_resource_quota *resource_quota = bp;
grpc_resource_quota *resource_quota = rq;
resource_quota->reclaiming = false;
rq_step_sched(exec_ctx, resource_quota);
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
@ -434,6 +435,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp,
* grpc_resource_quota api
*/
/* Public API */
grpc_resource_quota *grpc_resource_quota_create(const char *name) {
grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota));
gpr_ref_init(&resource_quota->refs, 1);
@ -466,6 +468,7 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
/* Public API */
void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
@ -478,10 +481,12 @@ grpc_resource_quota *grpc_resource_quota_internal_ref(
return resource_quota;
}
/* Public API */
void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
grpc_resource_quota_internal_ref(resource_quota);
}
/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -513,12 +518,12 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
return grpc_resource_quota_create(NULL);
}
static void *rq_copy(void *bp) {
grpc_resource_quota_ref(bp);
return bp;
static void *rq_copy(void *rq) {
grpc_resource_quota_ref(rq);
return rq;
}
static void rq_destroy(void *bp) { grpc_resource_quota_unref(bp); }
static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); }
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
@ -604,7 +609,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
if (on_done_destroy != NULL) {
/* already shutdown */
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown",
resource_user->resource_quota->name, resource_user->name, size);
}
grpc_exec_ctx_sched(
@ -616,7 +621,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
resource_user->allocated += (int64_t)size;
resource_user->free_pool -= (int64_t)size;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
@ -644,7 +649,7 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
resource_user->free_pool += (int64_t)size;
resource_user->allocated -= (int64_t)size;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
@ -685,7 +690,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "BP %s %s: reclamation complete",
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
grpc_combiner_execute(

@ -106,7 +106,7 @@ struct grpc_resource_user {
/* The quota this resource user consumes from */
grpc_resource_quota *resource_quota;
/* Closure to schedule an allocation onder the resource quota combiner lock */
/* Closure to schedule an allocation under the resource quota combiner lock */
grpc_closure allocate_closure;
/* Closure to publish a non empty free pool under the resource quota combiner
lock */
@ -118,12 +118,13 @@ struct grpc_resource_user {
#endif
gpr_mu mu;
/* Total allocated memory outstanding by this resource user;
/* Total allocated memory outstanding by this resource user in bytes;
always positive */
int64_t allocated;
/* The amount of memory this user has cached for its own use: to avoid quota
contention, each resource user can keep some memory aside from the quota,
and the quota can pull it back under memory pressure.
/* The amount of memory (in bytes) this user has cached for its own use: to
avoid quota contention, each resource user can keep some memory in
addition to what it is immediately using (e.g., for caching), and the quota
can pull it back under memory pressure.
This value can become negative if more memory has been requested than
existed in the free pool, at which point the quota is consulted to bring
this value non-negative (asynchronously). */
@ -148,7 +149,8 @@ struct grpc_resource_user {
resource user */
grpc_closure destroy_closure;
/* User supplied closure to call once the user has finished shutting down AND
all outstanding allocations have been freed */
all outstanding allocations have been freed. Real type is grpc_closure*,
but it's stored as an atomic to avoid a mutex on some fast paths. */
gpr_atm on_done_destroy_closure;
/* Links in the various grpc_rulist lists */
@ -167,7 +169,7 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
/* Allocate from the resource user (and it's quota).
/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
If optional_on_done is non-NULL, it will be scheduled when the allocation has
@ -191,20 +193,28 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
/* Helper to allocate slices from a resource user */
typedef struct grpc_resource_user_slice_allocator {
/* Closure for when a resource user allocation completes */
grpc_closure on_allocated;
/* Closure to call when slices have been allocated */
grpc_closure on_done;
/* Length of slices to allocate on the current request */
size_t length;
/* Number of slices to allocate on the current request */
size_t count;
/* Destination for slices to allocate on the current request */
gpr_slice_buffer *dest;
/* Parent resource user */
grpc_resource_user *resource_user;
} grpc_resource_user_slice_allocator;
/* Initialize a slice allocator */
/* Initialize a slice allocator.
When an allocation is completed, calls \a cb with arg \p. */
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p);
/* Allocate \a count slices of length \a length into \a dest. */
/* Allocate \a count slices of length \a length into \a dest. Only one request
can be outstanding at a time. */
void grpc_resource_user_alloc_slices(
grpc_exec_ctx *exec_ctx,
grpc_resource_user_slice_allocator *slice_allocator, size_t length,

@ -1,353 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
static void *tag(intptr_t t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
}
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
}
/* Creates and returns a gpr_slice containing random alphanumeric characters. */
static gpr_slice generate_random_slice() {
size_t i;
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char output[1024 * 1024];
for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
}
output[GPR_ARRAY_SIZE(output) - 1] = '\0';
return gpr_slice_from_copied_string(output);
}
void resource_quota_server(grpc_end2end_test_config config) {
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("test_server");
grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024);
#define NUM_CALLS 100
#define CLIENT_BASE_TAG 1000
#define SERVER_START_BASE_TAG 2000
#define SERVER_RECV_BASE_TAG 3000
#define SERVER_END_BASE_TAG 4000
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
arg.type = GRPC_ARG_POINTER;
arg.value.pointer.p = resource_quota;
arg.value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {1, &arg};
grpc_end2end_test_fixture f =
begin_test(config, "resource_quota_server", NULL, &args);
/* Create large request and response bodies. These are big enough to require
* multiple round trips to deliver to the peer, and their exact contents of
* will be verified on completion. */
gpr_slice request_payload_slice = generate_random_slice();
grpc_call *client_calls[NUM_CALLS];
grpc_call *server_calls[NUM_CALLS];
grpc_metadata_array initial_metadata_recv[NUM_CALLS];
grpc_metadata_array trailing_metadata_recv[NUM_CALLS];
grpc_metadata_array request_metadata_recv[NUM_CALLS];
grpc_call_details call_details[NUM_CALLS];
grpc_status_code status[NUM_CALLS];
char *details[NUM_CALLS];
size_t details_capacity[NUM_CALLS];
grpc_byte_buffer *request_payload_recv[NUM_CALLS];
int was_cancelled[NUM_CALLS];
grpc_call_error error;
int pending_client_calls = 0;
int pending_server_start_calls = 0;
int pending_server_recv_calls = 0;
int pending_server_end_calls = 0;
int cancelled_calls_on_client = 0;
int cancelled_calls_on_server = 0;
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_op ops[6];
grpc_op *op;
for (int i = 0; i < NUM_CALLS; i++) {
grpc_metadata_array_init(&initial_metadata_recv[i]);
grpc_metadata_array_init(&trailing_metadata_recv[i]);
grpc_metadata_array_init(&request_metadata_recv[i]);
grpc_call_details_init(&call_details[i]);
details[i] = NULL;
details_capacity[i] = 0;
request_payload_recv[i] = NULL;
was_cancelled[i] = 0;
}
for (int i = 0; i < NUM_CALLS; i++) {
error = grpc_server_request_call(
f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i],
f.cq, f.cq, tag(SERVER_START_BASE_TAG + i));
GPR_ASSERT(GRPC_CALL_OK == error);
pending_server_start_calls++;
}
for (int i = 0; i < NUM_CALLS; i++) {
client_calls[i] = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo",
"foo.test.google.fr", n_seconds_time(60), NULL);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv[i];
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&trailing_metadata_recv[i];
op->data.recv_status_on_client.status = &status[i];
op->data.recv_status_on_client.status_details = &details[i];
op->data.recv_status_on_client.status_details_capacity =
&details_capacity[i];
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops),
tag(CLIENT_BASE_TAG + i), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
pending_client_calls++;
}
while (pending_client_calls + pending_server_recv_calls +
pending_server_end_calls >
0) {
grpc_event ev = grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
int ev_tag = (int)(intptr_t)ev.tag;
if (ev_tag < CLIENT_BASE_TAG) {
abort(); /* illegal tag */
} else if (ev_tag < SERVER_START_BASE_TAG) {
/* client call finished */
int call_id = ev_tag - CLIENT_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
switch (status[call_id]) {
case GRPC_STATUS_RESOURCE_EXHAUSTED:
cancelled_calls_on_client++;
break;
case GRPC_STATUS_OK:
break;
default:
gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]);
abort();
}
GPR_ASSERT(pending_client_calls > 0);
grpc_metadata_array_destroy(&initial_metadata_recv[call_id]);
grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]);
grpc_call_destroy(client_calls[call_id]);
gpr_free(details[call_id]);
pending_client_calls--;
} else if (ev_tag < SERVER_RECV_BASE_TAG) {
/* new incoming call to the server */
int call_id = ev_tag - SERVER_START_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv[call_id];
op->flags = 0;
op->reserved = NULL;
op++;
error =
grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
tag(SERVER_RECV_BASE_TAG + call_id), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
GPR_ASSERT(pending_server_start_calls > 0);
pending_server_start_calls--;
pending_server_recv_calls++;
grpc_call_details_destroy(&call_details[call_id]);
grpc_metadata_array_destroy(&request_metadata_recv[call_id]);
} else if (ev_tag < SERVER_END_BASE_TAG) {
/* finished read on the server */
int call_id = ev_tag - SERVER_RECV_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
if (ev.success) {
if (request_payload_recv[call_id] != NULL) {
grpc_byte_buffer_destroy(request_payload_recv[call_id]);
request_payload_recv[call_id] = NULL;
}
} else {
GPR_ASSERT(request_payload_recv[call_id] == NULL);
}
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled[call_id];
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op->reserved = NULL;
op++;
error =
grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
tag(SERVER_END_BASE_TAG + call_id), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
GPR_ASSERT(pending_server_recv_calls > 0);
pending_server_recv_calls--;
pending_server_end_calls++;
} else {
int call_id = ev_tag - SERVER_END_BASE_TAG;
GPR_ASSERT(call_id >= 0);
GPR_ASSERT(call_id < NUM_CALLS);
if (was_cancelled[call_id]) {
cancelled_calls_on_server++;
}
GPR_ASSERT(pending_server_end_calls > 0);
pending_server_end_calls--;
grpc_call_destroy(server_calls[call_id]);
}
}
gpr_log(
GPR_INFO,
"Done. %d total calls: %d cancelled at server, %d cancelled at client.",
NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client);
GPR_ASSERT(cancelled_calls_on_client >= cancelled_calls_on_server);
GPR_ASSERT(cancelled_calls_on_server >= 0.9 * cancelled_calls_on_client);
grpc_byte_buffer_destroy(request_payload);
gpr_slice_unref(request_payload_slice);
grpc_resource_quota_unref(resource_quota);
end_test(&f);
config.tear_down_data(&f);
}
void resource_quota_server_pre_init(void) {}

@ -339,7 +339,13 @@ void resource_quota_server(grpc_end2end_test_config config) {
"Done. %d total calls: %d cancelled at server, %d cancelled at client.",
NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client);
/* The server call may be cancelled after it's received it's status, but
* before the client does: this means that we should see strictly more
* failures on the client than on the server */
GPR_ASSERT(cancelled_calls_on_client >= cancelled_calls_on_server);
/* However, we shouldn't see radically more... 0.9 is a guessed bound on what
* we'd want that ratio to be... to at least trigger some investigation should
* that ratio become much higher. */
GPR_ASSERT(cancelled_calls_on_server >= 0.9 * cancelled_calls_on_client);
grpc_byte_buffer_destroy(request_payload);

@ -96,29 +96,29 @@ static void test_no_op(void) {
static void test_resize_then_destroy(void) {
gpr_log(GPR_INFO, "** test_resize_then_destroy **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_resize_then_destroy");
grpc_resource_quota_resize(p, 1024 * 1024);
grpc_resource_quota_unref(p);
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_quota_unref(q);
}
static void test_resource_user_no_op(void) {
gpr_log(GPR_INFO, "** test_resource_user_no_op **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_resource_user_no_op");
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_quota_unref(p);
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_instant_alloc_then_free(void) {
gpr_log(GPR_INFO, "** test_instant_alloc_then_free **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_instant_alloc_then_free");
grpc_resource_quota_resize(p, 1024 * 1024);
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, NULL);
@ -129,34 +129,34 @@ static void test_instant_alloc_then_free(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_instant_alloc_free_pair(void) {
gpr_log(GPR_INFO, "** test_instant_alloc_free_pair **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_instant_alloc_free_pair");
grpc_resource_quota_resize(p, 1024 * 1024);
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, NULL);
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_simple_async_alloc(void) {
gpr_log(GPR_INFO, "** test_simple_async_alloc **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_simple_async_alloc");
grpc_resource_quota_resize(p, 1024 * 1024);
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -169,17 +169,17 @@ static void test_simple_async_alloc(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_async_alloc_blocked_by_size(void) {
gpr_log(GPR_INFO, "** test_async_alloc_blocked_by_size **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_async_alloc_blocked_by_size");
grpc_resource_quota_resize(p, 1);
grpc_resource_quota_resize(q, 1);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -187,25 +187,25 @@ static void test_async_alloc_blocked_by_size(void) {
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
GPR_ASSERT(done);
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_scavenge(void) {
gpr_log(GPR_INFO, "** test_scavenge **");
grpc_resource_quota *p = grpc_resource_quota_create("test_scavenge");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota *q = grpc_resource_quota_create("test_scavenge");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, p, "usr1");
grpc_resource_user_init(&usr2, p, "usr2");
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -230,19 +230,19 @@ static void test_scavenge(void) {
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
}
static void test_scavenge_blocked(void) {
gpr_log(GPR_INFO, "** test_scavenge_blocked **");
grpc_resource_quota *p = grpc_resource_quota_create("test_scavenge_blocked");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota *q = grpc_resource_quota_create("test_scavenge_blocked");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, p, "usr1");
grpc_resource_user_init(&usr2, p, "usr2");
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
bool done;
{
done = false;
@ -269,18 +269,18 @@ static void test_scavenge_blocked(void) {
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
}
static void test_blocked_until_scheduled_reclaim(void) {
gpr_log(GPR_INFO, "** test_blocked_until_scheduled_reclaim **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_blocked_until_scheduled_reclaim");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -309,19 +309,19 @@ static void test_blocked_until_scheduled_reclaim(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
gpr_log(GPR_INFO, "** test_blocked_until_scheduled_reclaim_and_scavenge **");
grpc_resource_quota *p = grpc_resource_quota_create(
grpc_resource_quota *q = grpc_resource_quota_create(
"test_blocked_until_scheduled_reclaim_and_scavenge");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, p, "usr1");
grpc_resource_user_init(&usr2, p, "usr2");
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -350,18 +350,18 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
}
static void test_blocked_until_scheduled_destructive_reclaim(void) {
gpr_log(GPR_INFO, "** test_blocked_until_scheduled_destructive_reclaim **");
grpc_resource_quota *p = grpc_resource_quota_create(
grpc_resource_quota *q = grpc_resource_quota_create(
"test_blocked_until_scheduled_destructive_reclaim");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -390,17 +390,17 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
}
static void test_unused_reclaim_is_cancelled(void) {
gpr_log(GPR_INFO, "** test_unused_reclaim_is_cancelled **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_unused_reclaim_is_cancelled");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
@ -414,7 +414,7 @@ static void test_unused_reclaim_is_cancelled(void) {
GPR_ASSERT(!benign_done);
GPR_ASSERT(!destructive_done);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
@ -422,11 +422,11 @@ static void test_unused_reclaim_is_cancelled(void) {
static void test_benign_reclaim_is_preferred(void) {
gpr_log(GPR_INFO, "** test_benign_reclaim_is_preferred **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_benign_reclaim_is_preferred");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
@ -462,7 +462,7 @@ static void test_benign_reclaim_is_preferred(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
@ -470,11 +470,11 @@ static void test_benign_reclaim_is_preferred(void) {
static void test_multiple_reclaims_can_be_triggered(void) {
gpr_log(GPR_INFO, "** test_multiple_reclaims_can_be_triggered **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_multiple_reclaims_can_be_triggered");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
@ -510,7 +510,7 @@ static void test_multiple_reclaims_can_be_triggered(void) {
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
destroy_user(&usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
@ -519,11 +519,11 @@ static void test_multiple_reclaims_can_be_triggered(void) {
static void test_resource_user_stays_allocated_until_memory_released(void) {
gpr_log(GPR_INFO,
"** test_resource_user_stays_allocated_until_memory_released **");
grpc_resource_quota *p = grpc_resource_quota_create(
grpc_resource_quota *q = grpc_resource_quota_create(
"test_resource_user_stays_allocated_until_memory_released");
grpc_resource_quota_resize(p, 1024 * 1024);
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -532,7 +532,7 @@ static void test_resource_user_stays_allocated_until_memory_released(void) {
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
grpc_resource_user_shutdown(&exec_ctx, &usr, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
@ -550,14 +550,16 @@ static void test_resource_user_stays_allocated_until_memory_released(void) {
}
}
static void test_pools_merged_on_resource_user_deletion(void) {
static void
test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
void) {
gpr_log(GPR_INFO, "** test_pools_merged_on_resource_user_deletion **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_pools_merged_on_resource_user_deletion");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
for (int i = 0; i < 10; i++) {
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
bool done = false;
bool reclaimer_cancelled = false;
{
@ -596,16 +598,16 @@ static void test_pools_merged_on_resource_user_deletion(void) {
grpc_exec_ctx_finish(&exec_ctx);
}
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
}
static void test_reclaimers_can_be_posted_repeatedly(void) {
gpr_log(GPR_INFO, "** test_reclaimers_can_be_posted_repeatedly **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_reclaimers_can_be_posted_repeatedly");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
{
bool allocated = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -638,17 +640,17 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
grpc_exec_ctx_finish(&exec_ctx);
}
destroy_user(&usr);
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
}
static void test_one_slice(void) {
gpr_log(GPR_INFO, "** test_one_slice **");
grpc_resource_quota *p = grpc_resource_quota_create("test_one_slice");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota *q = grpc_resource_quota_create("test_one_slice");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user_slice_allocator alloc;
int num_allocs = 0;
@ -668,18 +670,18 @@ static void test_one_slice(void) {
gpr_slice_buffer_destroy(&buffer);
destroy_user(&usr);
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
}
static void test_one_slice_deleted_late(void) {
gpr_log(GPR_INFO, "** test_one_slice_deleted_late **");
grpc_resource_quota *p =
grpc_resource_quota *q =
grpc_resource_quota_create("test_one_slice_deleted_late");
grpc_resource_quota_resize(p, 1024);
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, p, "usr");
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user_slice_allocator alloc;
int num_allocs = 0;
@ -705,7 +707,7 @@ static void test_one_slice_deleted_late(void) {
GPR_ASSERT(!done);
}
grpc_resource_quota_unref(p);
grpc_resource_quota_unref(q);
gpr_slice_buffer_destroy(&buffer);
GPR_ASSERT(done);
{
@ -734,7 +736,7 @@ int main(int argc, char **argv) {
test_benign_reclaim_is_preferred();
test_multiple_reclaims_can_be_triggered();
test_resource_user_stays_allocated_until_memory_released();
test_pools_merged_on_resource_user_deletion();
test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released();
test_reclaimers_can_be_posted_repeatedly();
test_one_slice();
test_one_slice_deleted_late();

Loading…
Cancel
Save