Merge pull request #8641 from ctiller/more-wtfs

Make resource_quota a heap allocated object
pull/8667/head
Craig Tiller 8 years ago committed by GitHub
commit 72e674d1e6
  1. 189
      src/core/lib/iomgr/resource_quota.c
  2. 86
      src/core/lib/iomgr/resource_quota.h
  3. 37
      src/core/lib/iomgr/tcp_posix.c
  4. 53
      src/core/lib/iomgr/tcp_windows.c
  5. 253
      test/core/iomgr/resource_quota_test.c
  6. 33
      test/core/util/mock_endpoint.c
  7. 18
      test/core/util/passthru_endpoint.c

@ -44,6 +44,81 @@
int grpc_resource_quota_trace = 0;
/* Internal linked list pointers for a resource user */
typedef struct {
grpc_resource_user *next;
grpc_resource_user *prev;
} grpc_resource_user_link;
/* Resource users are kept in (potentially) several intrusive linked lists
at once. These are the list names. */
typedef enum {
/* Resource users that are waiting for an allocation */
GRPC_RULIST_AWAITING_ALLOCATION,
/* Resource users that have free memory available for internal reclamation */
GRPC_RULIST_NON_EMPTY_FREE_POOL,
/* Resource users that have published a benign reclamation is available */
GRPC_RULIST_RECLAIMER_BENIGN,
/* Resource users that have published a destructive reclamation is
available */
GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
/* Number of lists: must be last */
GRPC_RULIST_COUNT
} grpc_rulist;
struct grpc_resource_user {
/* The quota this resource user consumes from */
grpc_resource_quota *resource_quota;
/* Closure to schedule an allocation under the resource quota combiner lock */
grpc_closure allocate_closure;
/* Closure to publish a non empty free pool under the resource quota combiner
lock */
grpc_closure add_to_free_pool_closure;
/* one ref for each ref call (released by grpc_resource_user_unref), and one
ref for each byte allocated (released by grpc_resource_user_free) */
gpr_atm refs;
/* is this resource user unlocked? starts at 0, increases for each shutdown
call */
gpr_atm shutdown;
gpr_mu mu;
/* The amount of memory (in bytes) this user has cached for its own use: to
avoid quota contention, each resource user can keep some memory in
addition to what it is immediately using (e.g., for caching), and the quota
can pull it back under memory pressure.
This value can become negative if more memory has been requested than
existed in the free pool, at which point the quota is consulted to bring
this value non-negative (asynchronously). */
int64_t free_pool;
/* A list of closures to call once free_pool becomes non-negative - ie when
all outstanding allocations have been granted. */
grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */
bool allocating;
/* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */
bool added_to_free_pool;
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure *reclaimers[2];
/* Trampoline closures to finish reclamation and re-enter the quota combiner
lock */
grpc_closure post_reclaimer_closure[2];
/* Closure to execute under the quota combiner to de-register and shutdown the
resource user */
grpc_closure destroy_closure;
/* Links in the various grpc_rulist lists */
grpc_resource_user_link links[GRPC_RULIST_COUNT];
/* The name of this resource user, for debugging/tracing */
char *name;
};
struct grpc_resource_quota {
/* refcount */
gpr_refcount refs;
@ -373,9 +448,19 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED, NULL);
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED, NULL);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
GPR_ASSERT(resource_user->allocated == 0);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
@ -383,13 +468,14 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
GRPC_ERROR_CANCELLED, NULL);
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED, NULL);
grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load(
&resource_user->on_done_destroy_closure),
GRPC_ERROR_NONE, NULL);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
gpr_mu_destroy(&resource_user->mu);
gpr_free(resource_user->name);
gpr_free(resource_user);
}
static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg,
@ -539,9 +625,9 @@ const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) {
* grpc_resource_user api
*/
void grpc_resource_user_init(grpc_resource_user *resource_user,
grpc_resource_quota *resource_quota,
const char *name) {
grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota *resource_quota, const char *name) {
grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user));
resource_user->resource_quota =
grpc_resource_quota_internal_ref(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
@ -555,12 +641,12 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
grpc_closure_init(&resource_user->destroy_closure, &ru_destroy,
resource_user);
gpr_mu_init(&resource_user->mu);
resource_user->allocated = 0;
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
resource_user->free_pool = 0;
grpc_closure_list_init(&resource_user->on_allocated);
resource_user->allocating = false;
resource_user->added_to_free_pool = false;
gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
@ -572,56 +658,54 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
(intptr_t)resource_user);
}
return resource_user;
}
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
grpc_closure *on_done) {
gpr_mu_lock(&resource_user->mu);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) ==
0);
gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure,
(gpr_atm)on_done);
if (resource_user->allocated == 0) {
static void ru_ref_by(grpc_resource_user *resource_user, gpr_atm amount) {
GPR_ASSERT(amount > 0);
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
}
static void ru_unref_by(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, gpr_atm amount) {
GPR_ASSERT(amount > 0);
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
&resource_user->destroy_closure, GRPC_ERROR_NONE,
false);
}
gpr_mu_unlock(&resource_user->mu);
}
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_ref(grpc_resource_user *resource_user) {
ru_ref_by(resource_user, 1);
}
void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
gpr_mu_destroy(&resource_user->mu);
gpr_free(resource_user->name);
ru_unref_by(exec_ctx, resource_user, 1);
}
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
grpc_closure_create(ru_shutdown, resource_user),
GRPC_ERROR_NONE, false);
}
}
void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, size_t size,
grpc_closure *optional_on_done) {
gpr_mu_lock(&resource_user->mu);
grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
&resource_user->on_done_destroy_closure);
if (on_done_destroy != NULL) {
/* already shutdown */
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown",
resource_user->resource_quota->name, resource_user->name, size);
}
grpc_exec_ctx_sched(
exec_ctx, optional_on_done,
GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
gpr_mu_unlock(&resource_user->mu);
return;
}
resource_user->allocated += (int64_t)size;
ru_ref_by(resource_user, (gpr_atm)size);
resource_user->free_pool -= (int64_t)size;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
resource_user->free_pool);
}
if (resource_user->free_pool < 0) {
grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
@ -641,15 +725,12 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user, size_t size) {
gpr_mu_lock(&resource_user->mu);
GPR_ASSERT(resource_user->allocated >= (int64_t)size);
bool was_zero_or_negative = resource_user->free_pool <= 0;
resource_user->free_pool += (int64_t)size;
resource_user->allocated -= (int64_t)size;
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
resource_user->free_pool);
}
bool is_bigger_than_zero = resource_user->free_pool > 0;
if (is_bigger_than_zero && was_zero_or_negative &&
@ -659,29 +740,23 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
&resource_user->add_to_free_pool_closure,
GRPC_ERROR_NONE, false);
}
grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
&resource_user->on_done_destroy_closure);
if (on_done_destroy != NULL && resource_user->allocated == 0) {
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
&resource_user->destroy_closure, GRPC_ERROR_NONE,
false);
}
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(exec_ctx, resource_user, (gpr_atm)size);
}
void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
bool destructive,
grpc_closure *closure) {
if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) {
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
return;
}
resource_user->reclaimers[destructive] = closure;
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE, false);
} else {
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
}
}
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,

@ -84,90 +84,14 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
/* Resource users are kept in (potentially) several intrusive linked lists
at once. These are the list names. */
typedef enum {
/* Resource users that are waiting for an allocation */
GRPC_RULIST_AWAITING_ALLOCATION,
/* Resource users that have free memory available for internal reclamation */
GRPC_RULIST_NON_EMPTY_FREE_POOL,
/* Resource users that have published a benign reclamation is available */
GRPC_RULIST_RECLAIMER_BENIGN,
/* Resource users that have published a destructive reclamation is
available */
GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
/* Number of lists: must be last */
GRPC_RULIST_COUNT
} grpc_rulist;
typedef struct grpc_resource_user grpc_resource_user;
/* Internal linked list pointers for a resource user */
typedef struct {
grpc_resource_user *next;
grpc_resource_user *prev;
} grpc_resource_user_link;
struct grpc_resource_user {
/* The quota this resource user consumes from */
grpc_resource_quota *resource_quota;
/* Closure to schedule an allocation under the resource quota combiner lock */
grpc_closure allocate_closure;
/* Closure to publish a non empty free pool under the resource quota combiner
lock */
grpc_closure add_to_free_pool_closure;
gpr_mu mu;
/* Total allocated memory outstanding by this resource user in bytes;
always positive */
int64_t allocated;
/* The amount of memory (in bytes) this user has cached for its own use: to
avoid quota contention, each resource user can keep some memory in
addition to what it is immediately using (e.g., for caching), and the quota
can pull it back under memory pressure.
This value can become negative if more memory has been requested than
existed in the free pool, at which point the quota is consulted to bring
this value non-negative (asynchronously). */
int64_t free_pool;
/* A list of closures to call once free_pool becomes non-negative - ie when
all outstanding allocations have been granted. */
grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */
bool allocating;
/* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */
bool added_to_free_pool;
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure *reclaimers[2];
/* Trampoline closures to finish reclamation and re-enter the quota combiner
lock */
grpc_closure post_reclaimer_closure[2];
/* Closure to execute under the quota combiner to de-register and shutdown the
resource user */
grpc_closure destroy_closure;
/* User supplied closure to call once the user has finished shutting down AND
all outstanding allocations have been freed. Real type is grpc_closure*,
but it's stored as an atomic to avoid a mutex on some fast paths. */
gpr_atm on_done_destroy_closure;
/* Links in the various grpc_rulist lists */
grpc_resource_user_link links[GRPC_RULIST_COUNT];
/* The name of this resource user, for debugging/tracing */
char *name;
};
void grpc_resource_user_init(grpc_resource_user *resource_user,
grpc_resource_quota *resource_quota,
const char *name);
grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota *resource_quota, const char *name);
void grpc_resource_user_ref(grpc_resource_user *resource_user);
void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
grpc_closure *on_done);
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
/* Allocate from the resource user (and its quota).

@ -102,7 +102,7 @@ typedef struct {
char *peer_string;
grpc_resource_user resource_user;
grpc_resource_user *resource_user;
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
@ -110,28 +110,18 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) {
grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
grpc_closure_create(tcp_unref_closure, tcp));
}
}
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer);
grpc_resource_user_destroy(exec_ctx, &tcp->resource_user);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@ -168,15 +158,9 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(exec_ctx, arg, "resource_user");
}
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@ -515,7 +499,7 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
return tcp->resource_user;
}
static const grpc_endpoint_vtable vtable = {tcp_read,
@ -543,9 +527,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy, and with the shutdown for our
* resource_user */
gpr_ref_init(&tcp->refcount, 2);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
tcp->read_closure.cb = tcp_handle_read;
@ -553,10 +536,9 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(&tcp->slice_allocator,
&tcp->resource_user,
tcp_read_allocation_done, tcp);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
&tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@ -576,7 +558,6 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}

@ -109,46 +109,35 @@ typedef struct grpc_tcp {
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
grpc_resource_user resource_user;
grpc_resource_user *resource_user;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
gpr_atm resource_user_shutdown_count;
char *peer_string;
} grpc_tcp;
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) {
grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
grpc_closure_create(win_unref_closure, tcp));
}
}
static void tcp_free(grpc_tcp *tcp) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_UNREF(exec_ctx, tcp, reason) \
tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
int line) {
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
tcp_free(exec_ctx, tcp);
}
}
@ -159,22 +148,17 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) {
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
tcp_free(exec_ctx, tcp);
}
}
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp;
@ -203,7 +187,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
TCP_UNREF(exec_ctx, tcp, "read");
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
@ -287,7 +271,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
}
TCP_UNREF(tcp, "write");
TCP_UNREF(exec_ctx, tcp, "write");
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
@ -355,7 +339,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
TCP_UNREF(exec_ctx, tcp, "write");
grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
NULL);
return;
@ -396,15 +380,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
win_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_mu_unlock(&tcp->mu);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
win_maybe_shutdown_resource_user(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy");
TCP_UNREF(exec_ctx, tcp, "destroy");
}
static char *win_get_peer(grpc_endpoint *ep) {
@ -416,7 +399,7 @@ static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
return tcp->resource_user;
}
static grpc_endpoint_vtable vtable = {win_read,
@ -441,7 +424,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);

@ -81,11 +81,7 @@ grpc_closure *make_unused_reclaimer(grpc_closure *then) {
static void destroy_user(grpc_resource_user *usr) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
bool done = false;
grpc_resource_user_shutdown(&exec_ctx, usr, set_bool(&done));
grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done);
grpc_resource_user_destroy(&exec_ctx, usr);
grpc_resource_user_unref(&exec_ctx, usr);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -106,10 +102,9 @@ static void test_resource_user_no_op(void) {
gpr_log(GPR_INFO, "** test_resource_user_no_op **");
grpc_resource_quota *q =
grpc_resource_quota_create("test_resource_user_no_op");
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_instant_alloc_then_free(void) {
@ -117,20 +112,19 @@ static void test_instant_alloc_then_free(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_instant_alloc_then_free");
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, NULL);
grpc_resource_user_alloc(&exec_ctx, usr, 1024, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_instant_alloc_free_pair(void) {
@ -138,16 +132,15 @@ static void test_instant_alloc_free_pair(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_instant_alloc_free_pair");
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, NULL);
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_alloc(&exec_ctx, usr, 1024, NULL);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_simple_async_alloc(void) {
@ -155,22 +148,21 @@ static void test_simple_async_alloc(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_simple_async_alloc");
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_async_alloc_blocked_by_size(void) {
@ -178,12 +170,11 @@ static void test_async_alloc_blocked_by_size(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_async_alloc_blocked_by_size");
grpc_resource_quota_resize(q, 1);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
bool done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
@ -191,87 +182,83 @@ static void test_async_alloc_blocked_by_size(void) {
GPR_ASSERT(done);
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_scavenge(void) {
gpr_log(GPR_INFO, "** test_scavenge **");
grpc_resource_quota *q = grpc_resource_quota_create("test_scavenge");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
grpc_resource_user *usr1 = grpc_resource_user_create(q, "usr1");
grpc_resource_user *usr2 = grpc_resource_user_create(q, "usr2");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr1, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr1, 1024);
grpc_resource_user_free(&exec_ctx, usr1, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr2, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_resource_user_free(&exec_ctx, usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
destroy_user(usr1);
destroy_user(usr2);
}
static void test_scavenge_blocked(void) {
gpr_log(GPR_INFO, "** test_scavenge_blocked **");
grpc_resource_quota *q = grpc_resource_quota_create("test_scavenge_blocked");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
grpc_resource_user *usr1 = grpc_resource_user_create(q, "usr1");
grpc_resource_user *usr2 = grpc_resource_user_create(q, "usr2");
bool done;
{
done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr1, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr2, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr1, 1024);
grpc_resource_user_free(&exec_ctx, usr1, 1024);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_resource_user_free(&exec_ctx, usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
destroy_user(usr1);
destroy_user(usr2);
}
static void test_blocked_until_scheduled_reclaim(void) {
@ -279,12 +266,11 @@ static void test_blocked_until_scheduled_reclaim(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_blocked_until_scheduled_reclaim");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
@ -292,25 +278,25 @@ static void test_blocked_until_scheduled_reclaim(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false,
make_reclaimer(&usr, 1024, set_bool(&reclaim_done)));
&exec_ctx, usr, false,
make_reclaimer(usr, 1024, set_bool(&reclaim_done)));
grpc_exec_ctx_finish(&exec_ctx);
}
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(reclaim_done);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
@ -318,14 +304,12 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
grpc_resource_quota *q = grpc_resource_quota_create(
"test_blocked_until_scheduled_reclaim_and_scavenge");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr1;
grpc_resource_user usr2;
grpc_resource_user_init(&usr1, q, "usr1");
grpc_resource_user_init(&usr2, q, "usr2");
grpc_resource_user *usr1 = grpc_resource_user_create(q, "usr1");
grpc_resource_user *usr2 = grpc_resource_user_create(q, "usr2");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr1, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
@ -333,26 +317,26 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr1, false,
make_reclaimer(&usr1, 1024, set_bool(&reclaim_done)));
&exec_ctx, usr1, false,
make_reclaimer(usr1, 1024, set_bool(&reclaim_done)));
grpc_exec_ctx_finish(&exec_ctx);
}
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr2, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(reclaim_done);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr2, 1024);
grpc_resource_user_free(&exec_ctx, usr2, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr1);
destroy_user(&usr2);
destroy_user(usr1);
destroy_user(usr2);
}
static void test_blocked_until_scheduled_destructive_reclaim(void) {
@ -360,12 +344,11 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
grpc_resource_quota *q = grpc_resource_quota_create(
"test_blocked_until_scheduled_destructive_reclaim");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
@ -373,25 +356,25 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, true,
make_reclaimer(&usr, 1024, set_bool(&reclaim_done)));
&exec_ctx, usr, true,
make_reclaimer(usr, 1024, set_bool(&reclaim_done)));
grpc_exec_ctx_finish(&exec_ctx);
}
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(reclaim_done);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
}
static void test_unused_reclaim_is_cancelled(void) {
@ -399,23 +382,22 @@ static void test_unused_reclaim_is_cancelled(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_unused_reclaim_is_cancelled");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false, make_unused_reclaimer(set_bool(&benign_done)));
&exec_ctx, usr, false, make_unused_reclaimer(set_bool(&benign_done)));
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, true,
&exec_ctx, usr, true,
make_unused_reclaimer(set_bool(&destructive_done)));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!benign_done);
GPR_ASSERT(!destructive_done);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
}
@ -425,24 +407,23 @@ static void test_benign_reclaim_is_preferred(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_benign_reclaim_is_preferred");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false,
make_reclaimer(&usr, 1024, set_bool(&benign_done)));
&exec_ctx, usr, false,
make_reclaimer(usr, 1024, set_bool(&benign_done)));
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, true,
&exec_ctx, usr, true,
make_unused_reclaimer(set_bool(&destructive_done)));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!benign_done);
@ -451,7 +432,7 @@ static void test_benign_reclaim_is_preferred(void) {
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(benign_done);
GPR_ASSERT(!destructive_done);
@ -459,11 +440,11 @@ static void test_benign_reclaim_is_preferred(void) {
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
}
@ -473,25 +454,24 @@ static void test_multiple_reclaims_can_be_triggered(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_multiple_reclaims_can_be_triggered");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
bool benign_done = false;
bool destructive_done = false;
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false,
make_reclaimer(&usr, 512, set_bool(&benign_done)));
&exec_ctx, usr, false,
make_reclaimer(usr, 512, set_bool(&benign_done)));
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, true,
make_reclaimer(&usr, 512, set_bool(&destructive_done)));
&exec_ctx, usr, true,
make_reclaimer(usr, 512, set_bool(&destructive_done)));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!benign_done);
GPR_ASSERT(!destructive_done);
@ -499,7 +479,7 @@ static void test_multiple_reclaims_can_be_triggered(void) {
{
bool done = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&done));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
@ -507,11 +487,11 @@ static void test_multiple_reclaims_can_be_triggered(void) {
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(&usr);
destroy_user(usr);
GPR_ASSERT(benign_done);
GPR_ASSERT(destructive_done);
}
@ -522,30 +502,21 @@ static void test_resource_user_stays_allocated_until_memory_released(void) {
grpc_resource_quota *q = grpc_resource_quota_create(
"test_resource_user_stays_allocated_until_memory_released");
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
bool done = false;
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, NULL);
grpc_resource_user_alloc(&exec_ctx, usr, 1024, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_quota_unref(q);
grpc_resource_user_shutdown(&exec_ctx, &usr, set_bool(&done));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_unref(&exec_ctx, usr);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_destroy(&exec_ctx, &usr);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
}
@ -562,14 +533,12 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
"released");
grpc_resource_quota_resize(q, 1024);
for (int i = 0; i < 10; i++) {
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
bool done = false;
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
bool reclaimer_cancelled = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false,
&exec_ctx, usr, false,
make_unused_reclaimer(set_bool(&reclaimer_cancelled)));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!reclaimer_cancelled);
@ -577,30 +546,23 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
{
bool allocated = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&allocated));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&allocated));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(allocated);
GPR_ASSERT(!reclaimer_cancelled);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_shutdown(&exec_ctx, &usr, set_bool(&done));
grpc_resource_user_unref(&exec_ctx, usr);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
GPR_ASSERT(!reclaimer_cancelled);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(done);
GPR_ASSERT(reclaimer_cancelled);
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_destroy(&exec_ctx, &usr);
grpc_exec_ctx_finish(&exec_ctx);
}
}
grpc_resource_quota_unref(q);
}
@ -610,12 +572,11 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
grpc_resource_quota *q =
grpc_resource_quota_create("test_reclaimers_can_be_posted_repeatedly");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
{
bool allocated = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&allocated));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&allocated));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(allocated);
}
@ -624,15 +585,15 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
&exec_ctx, &usr, false,
make_reclaimer(&usr, 1024, set_bool(&reclaimer_done)));
&exec_ctx, usr, false,
make_reclaimer(usr, 1024, set_bool(&reclaimer_done)));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!reclaimer_done);
}
{
bool allocated = false;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_alloc(&exec_ctx, &usr, 1024, set_bool(&allocated));
grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_bool(&allocated));
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(allocated);
GPR_ASSERT(reclaimer_done);
@ -640,10 +601,10 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
}
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_free(&exec_ctx, &usr, 1024);
grpc_resource_user_free(&exec_ctx, usr, 1024);
grpc_exec_ctx_finish(&exec_ctx);
}
destroy_user(&usr);
destroy_user(usr);
grpc_resource_quota_unref(q);
}
@ -653,13 +614,11 @@ static void test_one_slice(void) {
grpc_resource_quota *q = grpc_resource_quota_create("test_one_slice");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
grpc_resource_user_slice_allocator alloc;
int num_allocs = 0;
grpc_resource_user_slice_allocator_init(&alloc, &usr, inc_int_cb,
&num_allocs);
grpc_resource_user_slice_allocator_init(&alloc, usr, inc_int_cb, &num_allocs);
gpr_slice_buffer buffer;
gpr_slice_buffer_init(&buffer);
@ -673,7 +632,7 @@ static void test_one_slice(void) {
}
gpr_slice_buffer_destroy(&buffer);
destroy_user(&usr);
destroy_user(usr);
grpc_resource_quota_unref(q);
}
@ -684,13 +643,11 @@ static void test_one_slice_deleted_late(void) {
grpc_resource_quota_create("test_one_slice_deleted_late");
grpc_resource_quota_resize(q, 1024);
grpc_resource_user usr;
grpc_resource_user_init(&usr, q, "usr");
grpc_resource_user *usr = grpc_resource_user_create(q, "usr");
grpc_resource_user_slice_allocator alloc;
int num_allocs = 0;
grpc_resource_user_slice_allocator_init(&alloc, &usr, inc_int_cb,
&num_allocs);
grpc_resource_user_slice_allocator_init(&alloc, usr, inc_int_cb, &num_allocs);
gpr_slice_buffer buffer;
gpr_slice_buffer_init(&buffer);
@ -703,22 +660,14 @@ static void test_one_slice_deleted_late(void) {
GPR_ASSERT(num_allocs == start_allocs + 1);
}
bool done = false;
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_shutdown(&exec_ctx, &usr, set_bool(&done));
grpc_resource_user_unref(&exec_ctx, usr);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(!done);
}
grpc_resource_quota_unref(q);
gpr_slice_buffer_destroy(&buffer);
GPR_ASSERT(done);
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_destroy(&exec_ctx, &usr);
grpc_exec_ctx_finish(&exec_ctx);
}
}
int main(int argc, char **argv) {

@ -41,12 +41,11 @@
typedef struct grpc_mock_endpoint {
grpc_endpoint base;
gpr_mu mu;
int refs;
void (*on_write)(gpr_slice slice);
gpr_slice_buffer read_buffer;
gpr_slice_buffer *on_read_out;
grpc_closure *on_read;
grpc_resource_user resource_user;
grpc_resource_user *resource_user;
} grpc_mock_endpoint;
static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@ -78,24 +77,6 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void unref(grpc_exec_ctx *exec_ctx, grpc_mock_endpoint *m) {
gpr_mu_lock(&m->mu);
if (0 == --m->refs) {
gpr_mu_unlock(&m->mu);
gpr_slice_buffer_destroy(&m->read_buffer);
grpc_resource_user_destroy(exec_ctx, &m->resource_user);
gpr_free(m);
} else {
gpr_mu_unlock(&m->mu);
}
}
static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me,
grpc_error *error) {
grpc_mock_endpoint *m = me;
unref(exec_ctx, m);
}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
gpr_mu_lock(&m->mu);
@ -104,14 +85,15 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL);
m->on_read = NULL;
}
grpc_resource_user_shutdown(exec_ctx, &m->resource_user,
grpc_closure_create(me_finish_shutdown, m));
gpr_mu_unlock(&m->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
unref(exec_ctx, m);
gpr_slice_buffer_destroy(&m->read_buffer);
grpc_resource_user_unref(exec_ctx, m->resource_user);
gpr_free(m);
}
static char *me_get_peer(grpc_endpoint *ep) {
@ -120,7 +102,7 @@ static char *me_get_peer(grpc_endpoint *ep) {
static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
return &m->resource_user;
return m->resource_user;
}
static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
@ -141,10 +123,9 @@ grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice),
grpc_resource_quota *resource_quota) {
grpc_mock_endpoint *m = gpr_malloc(sizeof(*m));
m->base.vtable = &vtable;
m->refs = 2;
char *name;
gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m);
grpc_resource_user_init(&m->resource_user, resource_quota, name);
m->resource_user = grpc_resource_user_create(resource_quota, name);
gpr_free(name);
gpr_slice_buffer_init(&m->read_buffer);
gpr_mu_init(&m->mu);

@ -46,7 +46,7 @@ typedef struct {
gpr_slice_buffer read_buffer;
gpr_slice_buffer *on_read_out;
grpc_closure *on_read;
grpc_resource_user resource_user;
grpc_resource_user *resource_user;
} half;
struct passthru_endpoint {
@ -123,10 +123,10 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
m->on_read = NULL;
}
gpr_mu_unlock(&m->parent->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
}
static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
grpc_error *error) {
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
passthru_endpoint *p = ((half *)ep)->parent;
gpr_mu_lock(&p->mu);
if (0 == --p->halves) {
@ -134,18 +134,14 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
gpr_mu_destroy(&p->mu);
gpr_slice_buffer_destroy(&p->client.read_buffer);
gpr_slice_buffer_destroy(&p->server.read_buffer);
grpc_resource_user_unref(exec_ctx, p->client.resource_user);
grpc_resource_user_unref(exec_ctx, p->server.resource_user);
gpr_free(p);
} else {
gpr_mu_unlock(&p->mu);
}
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
half *m = (half *)ep;
grpc_resource_user_shutdown(exec_ctx, &m->resource_user,
grpc_closure_create(me_really_destroy, m));
}
static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
@ -154,7 +150,7 @@ static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
half *m = (half *)ep;
return &m->resource_user;
return m->resource_user;
}
static const grpc_endpoint_vtable vtable = {
@ -179,7 +175,7 @@ static void half_init(half *m, passthru_endpoint *parent,
char *name;
gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
(intptr_t)parent);
grpc_resource_user_init(&m->resource_user, resource_quota, name);
m->resource_user = grpc_resource_user_create(resource_quota, name);
gpr_free(name);
}

Loading…
Cancel
Save