mirror of https://github.com/grpc/grpc.git
commit
6e3e39fe3b
215 changed files with 8879 additions and 1377 deletions
@ -0,0 +1,70 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPCXX_RESOURCE_QUOTA_H |
||||
#define GRPCXX_RESOURCE_QUOTA_H |
||||
|
||||
struct grpc_resource_quota; |
||||
|
||||
#include <grpc++/impl/codegen/config.h> |
||||
|
||||
namespace grpc { |
||||
|
||||
/// ResourceQuota represents a bound on memory usage by the gRPC library.
|
||||
/// A ResourceQuota can be attached to a server (via ServerBuilder), or a client
|
||||
/// channel (via ChannelArguments). gRPC will attempt to keep memory used by
|
||||
/// all attached entities below the ResourceQuota bound.
|
||||
class ResourceQuota GRPC_FINAL { |
||||
public: |
||||
explicit ResourceQuota(const grpc::string& name); |
||||
ResourceQuota(); |
||||
~ResourceQuota(); |
||||
|
||||
/// Resize this ResourceQuota to a new size. If new_size is smaller than the
|
||||
/// current size of the pool, memory usage will be monotonically decreased
|
||||
/// until it falls under new_size. No time bound is given for this to occur
|
||||
/// however.
|
||||
ResourceQuota& Resize(size_t new_size); |
||||
|
||||
grpc_resource_quota* c_resource_quota() const { return impl_; } |
||||
|
||||
private: |
||||
ResourceQuota(const ResourceQuota& rhs); |
||||
ResourceQuota& operator=(const ResourceQuota& rhs); |
||||
|
||||
grpc_resource_quota* const impl_; |
||||
}; |
||||
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPCXX_RESOURCE_QUOTA_H
|
@ -0,0 +1,717 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/iomgr/resource_quota.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/useful.h> |
||||
|
||||
#include "src/core/lib/iomgr/combiner.h" |
||||
|
||||
int grpc_resource_quota_trace = 0; |
||||
|
||||
struct grpc_resource_quota { |
||||
/* refcount */ |
||||
gpr_refcount refs; |
||||
|
||||
/* Master combiner lock: all activity on a quota executes under this combiner
|
||||
* (so no mutex is needed for this data structure) |
||||
*/ |
||||
grpc_combiner *combiner; |
||||
/* Size of the resource quota */ |
||||
int64_t size; |
||||
/* Amount of free memory in the resource quota */ |
||||
int64_t free_pool; |
||||
|
||||
/* Has rq_step been scheduled to occur? */ |
||||
bool step_scheduled; |
||||
/* Are we currently reclaiming memory */ |
||||
bool reclaiming; |
||||
/* Closure around rq_step */ |
||||
grpc_closure rq_step_closure; |
||||
/* Closure around rq_reclamation_done */ |
||||
grpc_closure rq_reclamation_done_closure; |
||||
|
||||
/* Roots of all resource user lists */ |
||||
grpc_resource_user *roots[GRPC_RULIST_COUNT]; |
||||
|
||||
char *name; |
||||
}; |
||||
|
||||
/*******************************************************************************
|
||||
* list management |
||||
*/ |
||||
|
||||
static void rulist_add_head(grpc_resource_user *resource_user, |
||||
grpc_rulist list) { |
||||
grpc_resource_quota *resource_quota = resource_user->resource_quota; |
||||
grpc_resource_user **root = &resource_quota->roots[list]; |
||||
if (*root == NULL) { |
||||
*root = resource_user; |
||||
resource_user->links[list].next = resource_user->links[list].prev = |
||||
resource_user; |
||||
} else { |
||||
resource_user->links[list].next = *root; |
||||
resource_user->links[list].prev = (*root)->links[list].prev; |
||||
resource_user->links[list].next->links[list].prev = |
||||
resource_user->links[list].prev->links[list].next = resource_user; |
||||
*root = resource_user; |
||||
} |
||||
} |
||||
|
||||
static void rulist_add_tail(grpc_resource_user *resource_user, |
||||
grpc_rulist list) { |
||||
grpc_resource_quota *resource_quota = resource_user->resource_quota; |
||||
grpc_resource_user **root = &resource_quota->roots[list]; |
||||
if (*root == NULL) { |
||||
*root = resource_user; |
||||
resource_user->links[list].next = resource_user->links[list].prev = |
||||
resource_user; |
||||
} else { |
||||
resource_user->links[list].next = (*root)->links[list].next; |
||||
resource_user->links[list].prev = *root; |
||||
resource_user->links[list].next->links[list].prev = |
||||
resource_user->links[list].prev->links[list].next = resource_user; |
||||
} |
||||
} |
||||
|
||||
static bool rulist_empty(grpc_resource_quota *resource_quota, |
||||
grpc_rulist list) { |
||||
return resource_quota->roots[list] == NULL; |
||||
} |
||||
|
||||
static grpc_resource_user *rulist_pop_head(grpc_resource_quota *resource_quota, |
||||
grpc_rulist list) { |
||||
grpc_resource_user **root = &resource_quota->roots[list]; |
||||
grpc_resource_user *resource_user = *root; |
||||
if (resource_user == NULL) { |
||||
return NULL; |
||||
} |
||||
if (resource_user->links[list].next == resource_user) { |
||||
*root = NULL; |
||||
} else { |
||||
resource_user->links[list].next->links[list].prev = |
||||
resource_user->links[list].prev; |
||||
resource_user->links[list].prev->links[list].next = |
||||
resource_user->links[list].next; |
||||
*root = resource_user->links[list].next; |
||||
} |
||||
resource_user->links[list].next = resource_user->links[list].prev = NULL; |
||||
return resource_user; |
||||
} |
||||
|
||||
static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) { |
||||
if (resource_user->links[list].next == NULL) return; |
||||
grpc_resource_quota *resource_quota = resource_user->resource_quota; |
||||
if (resource_quota->roots[list] == resource_user) { |
||||
resource_quota->roots[list] = resource_user->links[list].next; |
||||
if (resource_quota->roots[list] == resource_user) { |
||||
resource_quota->roots[list] = NULL; |
||||
} |
||||
} |
||||
resource_user->links[list].next->links[list].prev = |
||||
resource_user->links[list].prev; |
||||
resource_user->links[list].prev->links[list].next = |
||||
resource_user->links[list].next; |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* resource quota state machine |
||||
*/ |
||||
|
||||
static bool rq_alloc(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota); |
||||
static bool rq_reclaim_from_per_user_free_pool( |
||||
grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota); |
||||
static bool rq_reclaim(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota, bool destructive); |
||||
|
||||
static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) { |
||||
grpc_resource_quota *resource_quota = rq; |
||||
resource_quota->step_scheduled = false; |
||||
do { |
||||
if (rq_alloc(exec_ctx, resource_quota)) goto done; |
||||
} while (rq_reclaim_from_per_user_free_pool(exec_ctx, resource_quota)); |
||||
|
||||
if (!rq_reclaim(exec_ctx, resource_quota, false)) { |
||||
rq_reclaim(exec_ctx, resource_quota, true); |
||||
} |
||||
|
||||
done: |
||||
grpc_resource_quota_internal_unref(exec_ctx, resource_quota); |
||||
} |
||||
|
||||
static void rq_step_sched(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota) { |
||||
if (resource_quota->step_scheduled) return; |
||||
resource_quota->step_scheduled = true; |
||||
grpc_resource_quota_internal_ref(resource_quota); |
||||
grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner, |
||||
&resource_quota->rq_step_closure, |
||||
GRPC_ERROR_NONE, false); |
||||
} |
||||
|
||||
/* returns true if all allocations are completed */ |
||||
static bool rq_alloc(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota) { |
||||
grpc_resource_user *resource_user; |
||||
while ((resource_user = rulist_pop_head(resource_quota, |
||||
GRPC_RULIST_AWAITING_ALLOCATION))) { |
||||
gpr_mu_lock(&resource_user->mu); |
||||
if (resource_user->free_pool < 0 && |
||||
-resource_user->free_pool <= resource_quota->free_pool) { |
||||
int64_t amt = -resource_user->free_pool; |
||||
resource_user->free_pool = 0; |
||||
resource_quota->free_pool -= amt; |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64 |
||||
" bytes; rq_free_pool -> %" PRId64, |
||||
resource_quota->name, resource_user->name, amt, |
||||
resource_quota->free_pool); |
||||
} |
||||
} else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request", |
||||
resource_quota->name, resource_user->name); |
||||
} |
||||
if (resource_user->free_pool >= 0) { |
||||
resource_user->allocating = false; |
||||
grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
} else { |
||||
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/* returns true if any memory could be reclaimed from buffers */ |
||||
static bool rq_reclaim_from_per_user_free_pool( |
||||
grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { |
||||
grpc_resource_user *resource_user; |
||||
while ((resource_user = rulist_pop_head(resource_quota, |
||||
GRPC_RULIST_NON_EMPTY_FREE_POOL))) { |
||||
gpr_mu_lock(&resource_user->mu); |
||||
if (resource_user->free_pool > 0) { |
||||
int64_t amt = resource_user->free_pool; |
||||
resource_user->free_pool = 0; |
||||
resource_quota->free_pool += amt; |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64 |
||||
" bytes; rq_free_pool -> %" PRId64, |
||||
resource_quota->name, resource_user->name, amt, |
||||
resource_quota->free_pool); |
||||
} |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
return true; |
||||
} else { |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
/* returns true if reclamation is proceeding */ |
||||
static bool rq_reclaim(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota, bool destructive) { |
||||
if (resource_quota->reclaiming) return true; |
||||
grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE |
||||
: GRPC_RULIST_RECLAIMER_BENIGN; |
||||
grpc_resource_user *resource_user = rulist_pop_head(resource_quota, list); |
||||
if (resource_user == NULL) return false; |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation", |
||||
resource_quota->name, resource_user->name, |
||||
destructive ? "destructive" : "benign"); |
||||
} |
||||
resource_quota->reclaiming = true; |
||||
grpc_resource_quota_internal_ref(resource_quota); |
||||
grpc_closure *c = resource_user->reclaimers[destructive]; |
||||
resource_user->reclaimers[destructive] = NULL; |
||||
grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); |
||||
return true; |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* ru_slice: a slice implementation that is backed by a grpc_resource_user |
||||
*/ |
||||
|
||||
typedef struct { |
||||
gpr_slice_refcount base; |
||||
gpr_refcount refs; |
||||
grpc_resource_user *resource_user; |
||||
size_t size; |
||||
} ru_slice_refcount; |
||||
|
||||
static void ru_slice_ref(void *p) { |
||||
ru_slice_refcount *rc = p; |
||||
gpr_ref(&rc->refs); |
||||
} |
||||
|
||||
static void ru_slice_unref(void *p) { |
||||
ru_slice_refcount *rc = p; |
||||
if (gpr_unref(&rc->refs)) { |
||||
/* TODO(ctiller): this is dangerous, but I think safe for now:
|
||||
we have no guarantee here that we're at a safe point for creating an |
||||
execution context, but we have no way of writing this code otherwise. |
||||
In the future: consider lifting gpr_slice to grpc, and offering an |
||||
internal_{ref,unref} pair that is execution context aware. |
||||
Alternatively, |
||||
make exec_ctx be thread local and 'do the right thing' (whatever that |
||||
is) |
||||
if NULL */ |
||||
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
||||
grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size); |
||||
grpc_exec_ctx_finish(&exec_ctx); |
||||
gpr_free(rc); |
||||
} |
||||
} |
||||
|
||||
static gpr_slice ru_slice_create(grpc_resource_user *resource_user, |
||||
size_t size) { |
||||
ru_slice_refcount *rc = gpr_malloc(sizeof(ru_slice_refcount) + size); |
||||
rc->base.ref = ru_slice_ref; |
||||
rc->base.unref = ru_slice_unref; |
||||
gpr_ref_init(&rc->refs, 1); |
||||
rc->resource_user = resource_user; |
||||
rc->size = size; |
||||
gpr_slice slice; |
||||
slice.refcount = &rc->base; |
||||
slice.data.refcounted.bytes = (uint8_t *)(rc + 1); |
||||
slice.data.refcounted.length = size; |
||||
return slice; |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* grpc_resource_quota internal implementation: resource user manipulation under |
||||
* the combiner |
||||
*/ |
||||
|
||||
static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { |
||||
grpc_resource_user *resource_user = ru; |
||||
if (rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_AWAITING_ALLOCATION)) { |
||||
rq_step_sched(exec_ctx, resource_user->resource_quota); |
||||
} |
||||
rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); |
||||
} |
||||
|
||||
static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, |
||||
grpc_error *error) { |
||||
grpc_resource_user *resource_user = ru; |
||||
if (!rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_AWAITING_ALLOCATION) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_NON_EMPTY_FREE_POOL)) { |
||||
rq_step_sched(exec_ctx, resource_user->resource_quota); |
||||
} |
||||
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); |
||||
} |
||||
|
||||
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, |
||||
grpc_error *error) { |
||||
grpc_resource_user *resource_user = ru; |
||||
if (!rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_AWAITING_ALLOCATION) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_NON_EMPTY_FREE_POOL) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_RECLAIMER_BENIGN)) { |
||||
rq_step_sched(exec_ctx, resource_user->resource_quota); |
||||
} |
||||
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); |
||||
} |
||||
|
||||
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, |
||||
grpc_error *error) { |
||||
grpc_resource_user *resource_user = ru; |
||||
if (!rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_AWAITING_ALLOCATION) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_NON_EMPTY_FREE_POOL) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_RECLAIMER_BENIGN) && |
||||
rulist_empty(resource_user->resource_quota, |
||||
GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) { |
||||
rq_step_sched(exec_ctx, resource_user->resource_quota); |
||||
} |
||||
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); |
||||
} |
||||
|
||||
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { |
||||
grpc_resource_user *resource_user = ru; |
||||
GPR_ASSERT(resource_user->allocated == 0); |
||||
for (int i = 0; i < GRPC_RULIST_COUNT; i++) { |
||||
rulist_remove(resource_user, (grpc_rulist)i); |
||||
} |
||||
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], |
||||
GRPC_ERROR_CANCELLED, NULL); |
||||
grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], |
||||
GRPC_ERROR_CANCELLED, NULL); |
||||
grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( |
||||
&resource_user->on_done_destroy_closure), |
||||
GRPC_ERROR_NONE, NULL); |
||||
if (resource_user->free_pool != 0) { |
||||
resource_user->resource_quota->free_pool += resource_user->free_pool; |
||||
rq_step_sched(exec_ctx, resource_user->resource_quota); |
||||
} |
||||
} |
||||
|
||||
static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, |
||||
grpc_error *error) { |
||||
grpc_resource_user_slice_allocator *slice_allocator = arg; |
||||
if (error == GRPC_ERROR_NONE) { |
||||
for (size_t i = 0; i < slice_allocator->count; i++) { |
||||
gpr_slice_buffer_add_indexed( |
||||
slice_allocator->dest, ru_slice_create(slice_allocator->resource_user, |
||||
slice_allocator->length)); |
||||
} |
||||
} |
||||
grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* grpc_resource_quota internal implementation: quota manipulation under the |
||||
* combiner |
||||
*/ |
||||
|
||||
typedef struct { |
||||
int64_t size; |
||||
grpc_resource_quota *resource_quota; |
||||
grpc_closure closure; |
||||
} rq_resize_args; |
||||
|
||||
static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { |
||||
rq_resize_args *a = args; |
||||
int64_t delta = a->size - a->resource_quota->size; |
||||
a->resource_quota->size += delta; |
||||
a->resource_quota->free_pool += delta; |
||||
rq_step_sched(exec_ctx, a->resource_quota); |
||||
grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota); |
||||
gpr_free(a); |
||||
} |
||||
|
||||
static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, |
||||
grpc_error *error) { |
||||
grpc_resource_quota *resource_quota = rq; |
||||
resource_quota->reclaiming = false; |
||||
rq_step_sched(exec_ctx, resource_quota); |
||||
grpc_resource_quota_internal_unref(exec_ctx, resource_quota); |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* grpc_resource_quota api |
||||
*/ |
||||
|
||||
/* Public API */ |
||||
grpc_resource_quota *grpc_resource_quota_create(const char *name) { |
||||
grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); |
||||
gpr_ref_init(&resource_quota->refs, 1); |
||||
resource_quota->combiner = grpc_combiner_create(NULL); |
||||
resource_quota->free_pool = INT64_MAX; |
||||
resource_quota->size = INT64_MAX; |
||||
resource_quota->step_scheduled = false; |
||||
resource_quota->reclaiming = false; |
||||
if (name != NULL) { |
||||
resource_quota->name = gpr_strdup(name); |
||||
} else { |
||||
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, |
||||
(intptr_t)resource_quota); |
||||
} |
||||
grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota); |
||||
grpc_closure_init(&resource_quota->rq_reclamation_done_closure, |
||||
rq_reclamation_done, resource_quota); |
||||
for (int i = 0; i < GRPC_RULIST_COUNT; i++) { |
||||
resource_quota->roots[i] = NULL; |
||||
} |
||||
return resource_quota; |
||||
} |
||||
|
||||
void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota) { |
||||
if (gpr_unref(&resource_quota->refs)) { |
||||
grpc_combiner_destroy(exec_ctx, resource_quota->combiner); |
||||
gpr_free(resource_quota->name); |
||||
gpr_free(resource_quota); |
||||
} |
||||
} |
||||
|
||||
/* Public API */ |
||||
void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) { |
||||
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
||||
grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); |
||||
grpc_exec_ctx_finish(&exec_ctx); |
||||
} |
||||
|
||||
grpc_resource_quota *grpc_resource_quota_internal_ref( |
||||
grpc_resource_quota *resource_quota) { |
||||
gpr_ref(&resource_quota->refs); |
||||
return resource_quota; |
||||
} |
||||
|
||||
/* Public API */ |
||||
void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { |
||||
grpc_resource_quota_internal_ref(resource_quota); |
||||
} |
||||
|
||||
/* Public API */ |
||||
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, |
||||
size_t size) { |
||||
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
||||
rq_resize_args *a = gpr_malloc(sizeof(*a)); |
||||
a->resource_quota = grpc_resource_quota_internal_ref(resource_quota); |
||||
a->size = (int64_t)size; |
||||
grpc_closure_init(&a->closure, rq_resize, a); |
||||
grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure, |
||||
GRPC_ERROR_NONE, false); |
||||
grpc_exec_ctx_finish(&exec_ctx); |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* grpc_resource_user channel args api |
||||
*/ |
||||
|
||||
grpc_resource_quota *grpc_resource_quota_from_channel_args( |
||||
const grpc_channel_args *channel_args) { |
||||
for (size_t i = 0; i < channel_args->num_args; i++) { |
||||
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { |
||||
if (channel_args->args[i].type == GRPC_ARG_POINTER) { |
||||
return grpc_resource_quota_internal_ref( |
||||
channel_args->args[i].value.pointer.p); |
||||
} else { |
||||
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer"); |
||||
} |
||||
} |
||||
} |
||||
return grpc_resource_quota_create(NULL); |
||||
} |
||||
|
||||
static void *rq_copy(void *rq) { |
||||
grpc_resource_quota_ref(rq); |
||||
return rq; |
||||
} |
||||
|
||||
static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); } |
||||
|
||||
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); } |
||||
|
||||
const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { |
||||
static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp}; |
||||
return &vtable; |
||||
} |
||||
|
||||
/*******************************************************************************
|
||||
* grpc_resource_user api |
||||
*/ |
||||
|
||||
void grpc_resource_user_init(grpc_resource_user *resource_user, |
||||
grpc_resource_quota *resource_quota, |
||||
const char *name) { |
||||
resource_user->resource_quota = |
||||
grpc_resource_quota_internal_ref(resource_quota); |
||||
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, |
||||
resource_user); |
||||
grpc_closure_init(&resource_user->add_to_free_pool_closure, |
||||
&ru_add_to_free_pool, resource_user); |
||||
grpc_closure_init(&resource_user->post_reclaimer_closure[0], |
||||
&ru_post_benign_reclaimer, resource_user); |
||||
grpc_closure_init(&resource_user->post_reclaimer_closure[1], |
||||
&ru_post_destructive_reclaimer, resource_user); |
||||
grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, |
||||
resource_user); |
||||
gpr_mu_init(&resource_user->mu); |
||||
resource_user->allocated = 0; |
||||
resource_user->free_pool = 0; |
||||
grpc_closure_list_init(&resource_user->on_allocated); |
||||
resource_user->allocating = false; |
||||
resource_user->added_to_free_pool = false; |
||||
gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0); |
||||
resource_user->reclaimers[0] = NULL; |
||||
resource_user->reclaimers[1] = NULL; |
||||
for (int i = 0; i < GRPC_RULIST_COUNT; i++) { |
||||
resource_user->links[i].next = resource_user->links[i].prev = NULL; |
||||
} |
||||
if (name != NULL) { |
||||
resource_user->name = gpr_strdup(name); |
||||
} else { |
||||
gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR, |
||||
(intptr_t)resource_user); |
||||
} |
||||
} |
||||
|
||||
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, |
||||
grpc_closure *on_done) { |
||||
gpr_mu_lock(&resource_user->mu); |
||||
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) == |
||||
0); |
||||
gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, |
||||
(gpr_atm)on_done); |
||||
if (resource_user->allocated == 0) { |
||||
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->destroy_closure, GRPC_ERROR_NONE, |
||||
false); |
||||
} |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
} |
||||
|
||||
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user) { |
||||
grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota); |
||||
gpr_mu_destroy(&resource_user->mu); |
||||
gpr_free(resource_user->name); |
||||
} |
||||
|
||||
void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, size_t size, |
||||
grpc_closure *optional_on_done) { |
||||
gpr_mu_lock(&resource_user->mu); |
||||
grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( |
||||
&resource_user->on_done_destroy_closure); |
||||
if (on_done_destroy != NULL) { |
||||
/* already shutdown */ |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown", |
||||
resource_user->resource_quota->name, resource_user->name, size); |
||||
} |
||||
grpc_exec_ctx_sched( |
||||
exec_ctx, optional_on_done, |
||||
GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
return; |
||||
} |
||||
resource_user->allocated += (int64_t)size; |
||||
resource_user->free_pool -= (int64_t)size; |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 |
||||
", free_pool -> %" PRId64, |
||||
resource_user->resource_quota->name, resource_user->name, size, |
||||
resource_user->allocated, resource_user->free_pool); |
||||
} |
||||
if (resource_user->free_pool < 0) { |
||||
grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, |
||||
GRPC_ERROR_NONE); |
||||
if (!resource_user->allocating) { |
||||
resource_user->allocating = true; |
||||
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->allocate_closure, GRPC_ERROR_NONE, |
||||
false); |
||||
} |
||||
} else { |
||||
grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); |
||||
} |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
} |
||||
|
||||
void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, size_t size) { |
||||
gpr_mu_lock(&resource_user->mu); |
||||
GPR_ASSERT(resource_user->allocated >= (int64_t)size); |
||||
bool was_zero_or_negative = resource_user->free_pool <= 0; |
||||
resource_user->free_pool += (int64_t)size; |
||||
resource_user->allocated -= (int64_t)size; |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64 |
||||
", free_pool -> %" PRId64, |
||||
resource_user->resource_quota->name, resource_user->name, size, |
||||
resource_user->allocated, resource_user->free_pool); |
||||
} |
||||
bool is_bigger_than_zero = resource_user->free_pool > 0; |
||||
if (is_bigger_than_zero && was_zero_or_negative && |
||||
!resource_user->added_to_free_pool) { |
||||
resource_user->added_to_free_pool = true; |
||||
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->add_to_free_pool_closure, |
||||
GRPC_ERROR_NONE, false); |
||||
} |
||||
grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( |
||||
&resource_user->on_done_destroy_closure); |
||||
if (on_done_destroy != NULL && resource_user->allocated == 0) { |
||||
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->destroy_closure, GRPC_ERROR_NONE, |
||||
false); |
||||
} |
||||
gpr_mu_unlock(&resource_user->mu); |
||||
} |
||||
|
||||
void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, |
||||
bool destructive, |
||||
grpc_closure *closure) { |
||||
if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) { |
||||
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); |
||||
resource_user->reclaimers[destructive] = closure; |
||||
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->post_reclaimer_closure[destructive], |
||||
GRPC_ERROR_NONE, false); |
||||
} else { |
||||
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); |
||||
} |
||||
} |
||||
|
||||
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user) { |
||||
if (grpc_resource_quota_trace) { |
||||
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", |
||||
resource_user->resource_quota->name, resource_user->name); |
||||
} |
||||
grpc_combiner_execute( |
||||
exec_ctx, resource_user->resource_quota->combiner, |
||||
&resource_user->resource_quota->rq_reclamation_done_closure, |
||||
GRPC_ERROR_NONE, false); |
||||
} |
||||
|
||||
void grpc_resource_user_slice_allocator_init( |
||||
grpc_resource_user_slice_allocator *slice_allocator, |
||||
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { |
||||
grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices, |
||||
slice_allocator); |
||||
grpc_closure_init(&slice_allocator->on_done, cb, p); |
||||
slice_allocator->resource_user = resource_user; |
||||
} |
||||
|
||||
void grpc_resource_user_alloc_slices( |
||||
grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user_slice_allocator *slice_allocator, size_t length, |
||||
size_t count, gpr_slice_buffer *dest) { |
||||
slice_allocator->length = length; |
||||
slice_allocator->count = count; |
||||
slice_allocator->dest = dest; |
||||
grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, |
||||
count * length, &slice_allocator->on_allocated); |
||||
} |
@ -0,0 +1,224 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H |
||||
#define GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
|
||||
/** \file Tracks resource usage against a pool.
|
||||
|
||||
The current implementation tracks only memory usage, but in the future |
||||
this may be extended to (for example) threads and file descriptors. |
||||
|
||||
A grpc_resource_quota represents the pooled resources, and |
||||
grpc_resource_user instances attach to the quota and consume those |
||||
resources. They also offer a vector for reclamation: if we become |
||||
resource constrained, grpc_resource_user instances are asked (in turn) to |
||||
free up whatever they can so that the system as a whole can make progress. |
||||
|
||||
There are three kinds of reclamation that take place, in order of increasing |
||||
invasiveness: |
||||
- an internal reclamation, where cached resource at the resource user level |
||||
is returned to the quota |
||||
- a benign reclamation phase, whereby resources that are in use but are not |
||||
helping anything make progress are reclaimed |
||||
- a destructive reclamation, whereby resources that are helping something |
||||
make progress may be enacted so that at least one part of the system can |
||||
complete. |
||||
|
||||
Only one reclamation will be outstanding for a given quota at a given time. |
||||
On each reclamation attempt, the kinds of reclamation are tried in order of |
||||
increasing invasiveness, stopping at the first one that succeeds. Thus, on a |
||||
given reclamation attempt, if internal and benign reclamation both fail, it |
||||
will wind up doing a destructive reclamation. However, the next reclamation |
||||
attempt may then be able to get what it needs via internal or benign |
||||
reclamation, due to resources that may have been freed up by the destructive |
||||
reclamation in the previous attempt. |
||||
|
||||
Future work will be to expose the current resource pressure so that back |
||||
pressure can be applied to avoid reclamation phases starting. |
||||
|
||||
Resource users own references to resource quotas, and resource quotas |
||||
maintain lists of users (which users arrange to leave before they are |
||||
destroyed) */ |
||||
|
||||
extern int grpc_resource_quota_trace; |
||||
|
||||
grpc_resource_quota *grpc_resource_quota_internal_ref( |
||||
grpc_resource_quota *resource_quota); |
||||
void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_quota *resource_quota); |
||||
grpc_resource_quota *grpc_resource_quota_from_channel_args( |
||||
const grpc_channel_args *channel_args); |
||||
|
||||
/* Resource users are kept in (potentially) several intrusive linked lists
|
||||
at once. These are the list names. */ |
||||
typedef enum { |
||||
/* Resource users that are waiting for an allocation */ |
||||
GRPC_RULIST_AWAITING_ALLOCATION, |
||||
/* Resource users that have free memory available for internal reclamation */ |
||||
GRPC_RULIST_NON_EMPTY_FREE_POOL, |
||||
/* Resource users that have published a benign reclamation is available */ |
||||
GRPC_RULIST_RECLAIMER_BENIGN, |
||||
/* Resource users that have published a destructive reclamation is
|
||||
available */ |
||||
GRPC_RULIST_RECLAIMER_DESTRUCTIVE, |
||||
/* Number of lists: must be last */ |
||||
GRPC_RULIST_COUNT |
||||
} grpc_rulist; |
||||
|
||||
typedef struct grpc_resource_user grpc_resource_user; |
||||
|
||||
/* Internal linked list pointers for a resource user */ |
||||
typedef struct { |
||||
grpc_resource_user *next; |
||||
grpc_resource_user *prev; |
||||
} grpc_resource_user_link; |
||||
|
||||
struct grpc_resource_user { |
||||
/* The quota this resource user consumes from */ |
||||
grpc_resource_quota *resource_quota; |
||||
|
||||
/* Closure to schedule an allocation under the resource quota combiner lock */ |
||||
grpc_closure allocate_closure; |
||||
/* Closure to publish a non empty free pool under the resource quota combiner
|
||||
lock */ |
||||
grpc_closure add_to_free_pool_closure; |
||||
|
||||
gpr_mu mu; |
||||
/* Total allocated memory outstanding by this resource user in bytes;
|
||||
always positive */ |
||||
int64_t allocated; |
||||
/* The amount of memory (in bytes) this user has cached for its own use: to
|
||||
avoid quota contention, each resource user can keep some memory in |
||||
addition to what it is immediately using (e.g., for caching), and the quota |
||||
can pull it back under memory pressure. |
||||
This value can become negative if more memory has been requested than |
||||
existed in the free pool, at which point the quota is consulted to bring |
||||
this value non-negative (asynchronously). */ |
||||
int64_t free_pool; |
||||
/* A list of closures to call once free_pool becomes non-negative - ie when
|
||||
all outstanding allocations have been granted. */ |
||||
grpc_closure_list on_allocated; |
||||
/* True if we are currently trying to allocate from the quota, false if not */ |
||||
bool allocating; |
||||
/* True if we are currently trying to add ourselves to the non-free quota
|
||||
list, false otherwise */ |
||||
bool added_to_free_pool; |
||||
|
||||
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
|
||||
*/ |
||||
grpc_closure *reclaimers[2]; |
||||
/* Trampoline closures to finish reclamation and re-enter the quota combiner
|
||||
lock */ |
||||
grpc_closure post_reclaimer_closure[2]; |
||||
|
||||
/* Closure to execute under the quota combiner to de-register and shutdown the
|
||||
resource user */ |
||||
grpc_closure destroy_closure; |
||||
/* User supplied closure to call once the user has finished shutting down AND
|
||||
all outstanding allocations have been freed. Real type is grpc_closure*, |
||||
but it's stored as an atomic to avoid a mutex on some fast paths. */ |
||||
gpr_atm on_done_destroy_closure; |
||||
|
||||
/* Links in the various grpc_rulist lists */ |
||||
grpc_resource_user_link links[GRPC_RULIST_COUNT]; |
||||
|
||||
/* The name of this resource user, for debugging/tracing */ |
||||
char *name; |
||||
}; |
||||
|
||||
void grpc_resource_user_init(grpc_resource_user *resource_user, |
||||
grpc_resource_quota *resource_quota, |
||||
const char *name); |
||||
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, |
||||
grpc_closure *on_done); |
||||
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user); |
||||
|
||||
/* Allocate from the resource user (and its quota).
|
||||
If optional_on_done is NULL, then allocate immediately. This may push the |
||||
quota over-limit, at which point reclamation will kick in. |
||||
If optional_on_done is non-NULL, it will be scheduled when the allocation has |
||||
been granted by the quota. */ |
||||
void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, size_t size, |
||||
grpc_closure *optional_on_done); |
||||
/* Release memory back to the quota */ |
||||
void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, size_t size); |
||||
/* Post a memory reclaimer to the resource user. Only one benign and one
|
||||
destructive reclaimer can be posted at once. When executed, the reclaimer |
||||
MUST call grpc_resource_user_finish_reclamation before it completes, to |
||||
return control to the resource quota. */ |
||||
void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user, |
||||
bool destructive, grpc_closure *closure); |
||||
/* Finish a reclamation step */ |
||||
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user *resource_user); |
||||
|
||||
/* Helper to allocate slices from a resource user */ |
||||
typedef struct grpc_resource_user_slice_allocator { |
||||
/* Closure for when a resource user allocation completes */ |
||||
grpc_closure on_allocated; |
||||
/* Closure to call when slices have been allocated */ |
||||
grpc_closure on_done; |
||||
/* Length of slices to allocate on the current request */ |
||||
size_t length; |
||||
/* Number of slices to allocate on the current request */ |
||||
size_t count; |
||||
/* Destination for slices to allocate on the current request */ |
||||
gpr_slice_buffer *dest; |
||||
/* Parent resource user */ |
||||
grpc_resource_user *resource_user; |
||||
} grpc_resource_user_slice_allocator; |
||||
|
||||
/* Initialize a slice allocator.
|
||||
When an allocation is completed, calls \a cb with arg \p. */ |
||||
void grpc_resource_user_slice_allocator_init( |
||||
grpc_resource_user_slice_allocator *slice_allocator, |
||||
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p); |
||||
|
||||
/* Allocate \a count slices of length \a length into \a dest. Only one request
|
||||
can be outstanding at a time. */ |
||||
void grpc_resource_user_alloc_slices( |
||||
grpc_exec_ctx *exec_ctx, |
||||
grpc_resource_user_slice_allocator *slice_allocator, size_t length, |
||||
size_t count, gpr_slice_buffer *dest); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ |
@ -0,0 +1,45 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H |
||||
#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H |
||||
|
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/ev_posix.h" |
||||
#include "src/core/lib/iomgr/tcp_client.h" |
||||
|
||||
grpc_endpoint *grpc_tcp_client_create_from_fd( |
||||
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, |
||||
const char *addr_str); |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ |
@ -0,0 +1,51 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc++/resource_quota.h> |
||||
#include <grpc/grpc.h> |
||||
|
||||
namespace grpc { |
||||
|
||||
ResourceQuota::ResourceQuota() : impl_(grpc_resource_quota_create(nullptr)) {} |
||||
|
||||
ResourceQuota::ResourceQuota(const grpc::string& name) |
||||
: impl_(grpc_resource_quota_create(name.c_str())) {} |
||||
|
||||
ResourceQuota::~ResourceQuota() { grpc_resource_quota_unref(impl_); } |
||||
|
||||
ResourceQuota& ResourceQuota::Resize(size_t new_size) { |
||||
grpc_resource_quota_resize(impl_, new_size); |
||||
return *this; |
||||
} |
||||
|
||||
} // namespace grpc
|
@ -0,0 +1,181 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc++/impl/sync.h> |
||||
#include <grpc++/impl/thd.h> |
||||
#include <grpc/support/log.h> |
||||
#include <climits> |
||||
|
||||
#include "src/cpp/thread_manager/thread_manager.h" |
||||
|
||||
namespace grpc { |
||||
|
||||
ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) |
||||
: thd_mgr_(thd_mgr), thd_(&ThreadManager::WorkerThread::Run, this) {} |
||||
|
||||
void ThreadManager::WorkerThread::Run() { |
||||
thd_mgr_->MainWorkLoop(); |
||||
thd_mgr_->MarkAsCompleted(this); |
||||
} |
||||
|
||||
ThreadManager::WorkerThread::~WorkerThread() { thd_.join(); } |
||||
|
||||
ThreadManager::ThreadManager(int min_pollers, int max_pollers) |
||||
: shutdown_(false), |
||||
num_pollers_(0), |
||||
min_pollers_(min_pollers), |
||||
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), |
||||
num_threads_(0) {} |
||||
|
||||
ThreadManager::~ThreadManager() { |
||||
{ |
||||
std::unique_lock<grpc::mutex> lock(mu_); |
||||
GPR_ASSERT(num_threads_ == 0); |
||||
} |
||||
|
||||
CleanupCompletedThreads(); |
||||
} |
||||
|
||||
void ThreadManager::Wait() { |
||||
std::unique_lock<grpc::mutex> lock(mu_); |
||||
while (num_threads_ != 0) { |
||||
shutdown_cv_.wait(lock); |
||||
} |
||||
} |
||||
|
||||
void ThreadManager::Shutdown() { |
||||
std::unique_lock<grpc::mutex> lock(mu_); |
||||
shutdown_ = true; |
||||
} |
||||
|
||||
bool ThreadManager::IsShutdown() { |
||||
std::unique_lock<grpc::mutex> lock(mu_); |
||||
return shutdown_; |
||||
} |
||||
|
||||
void ThreadManager::MarkAsCompleted(WorkerThread* thd) { |
||||
{ |
||||
std::unique_lock<grpc::mutex> list_lock(list_mu_); |
||||
completed_threads_.push_back(thd); |
||||
} |
||||
|
||||
grpc::unique_lock<grpc::mutex> lock(mu_); |
||||
num_threads_--; |
||||
if (num_threads_ == 0) { |
||||
shutdown_cv_.notify_one(); |
||||
} |
||||
} |
||||
|
||||
void ThreadManager::CleanupCompletedThreads() { |
||||
std::unique_lock<grpc::mutex> lock(list_mu_); |
||||
for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); |
||||
thd = completed_threads_.erase(thd)) { |
||||
delete *thd; |
||||
} |
||||
} |
||||
|
||||
void ThreadManager::Initialize() { |
||||
for (int i = 0; i < min_pollers_; i++) { |
||||
MaybeCreatePoller(); |
||||
} |
||||
} |
||||
|
||||
// If the number of pollers (i.e threads currently blocked in PollForWork()) is
|
||||
// less than max threshold (i.e max_pollers_) and the total number of threads is
|
||||
// below the maximum threshold, we can let the current thread continue as poller
|
||||
bool ThreadManager::MaybeContinueAsPoller() { |
||||
std::unique_lock<grpc::mutex> lock(mu_); |
||||
if (shutdown_ || num_pollers_ > max_pollers_) { |
||||
return false; |
||||
} |
||||
|
||||
num_pollers_++; |
||||
return true; |
||||
} |
||||
|
||||
// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
|
||||
// threads currently blocked in PollForWork()) is below the threshold (i.e
|
||||
// min_pollers_) and the total number of threads is below the maximum threshold
|
||||
void ThreadManager::MaybeCreatePoller() { |
||||
grpc::unique_lock<grpc::mutex> lock(mu_); |
||||
if (!shutdown_ && num_pollers_ < min_pollers_) { |
||||
num_pollers_++; |
||||
num_threads_++; |
||||
|
||||
// Create a new thread (which ends up calling the MainWorkLoop() function
|
||||
new WorkerThread(this); |
||||
} |
||||
} |
||||
|
||||
void ThreadManager::MainWorkLoop() { |
||||
void* tag; |
||||
bool ok; |
||||
|
||||
/*
|
||||
1. Poll for work (i.e PollForWork()) |
||||
2. After returning from PollForWork, reduce the number of pollers by 1. If |
||||
PollForWork() returned a TIMEOUT, then it may indicate that we have more |
||||
polling threads than needed. Check if the number of pollers is greater |
||||
than min_pollers and if so, terminate the thread. |
||||
3. Since we are short of one poller now, see if a new poller has to be |
||||
created (i.e see MaybeCreatePoller() for more details) |
||||
4. Do the actual work (DoWork()) |
||||
5. After doing the work, see it this thread can resume polling work (i.e |
||||
see MaybeContinueAsPoller() for more details) */ |
||||
do { |
||||
WorkStatus work_status = PollForWork(&tag, &ok); |
||||
|
||||
{ |
||||
grpc::unique_lock<grpc::mutex> lock(mu_); |
||||
num_pollers_--; |
||||
|
||||
if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { |
||||
break; |
||||
} |
||||
} |
||||
|
||||
// Note that MaybeCreatePoller does check for shutdown and creates a new
|
||||
// thread only if ThreadManager is not shutdown
|
||||
if (work_status == WORK_FOUND) { |
||||
MaybeCreatePoller(); |
||||
DoWork(tag, ok); |
||||
} |
||||
} while (MaybeContinueAsPoller()); |
||||
|
||||
CleanupCompletedThreads(); |
||||
|
||||
// If we are here, either ThreadManager is shutting down or it already has
|
||||
// enough threads.
|
||||
} |
||||
|
||||
} // namespace grpc
|
@ -0,0 +1,159 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H |
||||
#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H |
||||
|
||||
#include <list> |
||||
#include <memory> |
||||
|
||||
#include <grpc++/impl/sync.h> |
||||
#include <grpc++/impl/thd.h> |
||||
#include <grpc++/support/config.h> |
||||
|
||||
namespace grpc { |
||||
|
||||
class ThreadManager { |
||||
public: |
||||
explicit ThreadManager(int min_pollers, int max_pollers); |
||||
virtual ~ThreadManager(); |
||||
|
||||
// Initializes and Starts the Rpc Manager threads
|
||||
void Initialize(); |
||||
|
||||
// The return type of PollForWork() function
|
||||
enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT }; |
||||
|
||||
// "Polls" for new work.
|
||||
// If the return value is WORK_FOUND:
|
||||
// - The implementaion of PollForWork() MAY set some opaque identifier to
|
||||
// (identify the work item found) via the '*tag' parameter
|
||||
// - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
|
||||
// value of 'false' indicates some implemenation specific error (that is
|
||||
// neither SHUTDOWN nor TIMEOUT)
|
||||
// - ThreadManager does not interpret the values of 'tag' and 'ok'
|
||||
// - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
|
||||
// DoWork()
|
||||
//
|
||||
// If the return value is SHUTDOWN:,
|
||||
// - ThreadManager WILL NOT call DoWork() and terminates the thead
|
||||
//
|
||||
// If the return value is TIMEOUT:,
|
||||
// - ThreadManager WILL NOT call DoWork()
|
||||
// - ThreadManager MAY terminate the thread depending on the current number
|
||||
// of active poller threads and mix_pollers/max_pollers settings
|
||||
// - Also, the value of timeout is specific to the derived class
|
||||
// implementation
|
||||
virtual WorkStatus PollForWork(void** tag, bool* ok) = 0; |
||||
|
||||
// The implementation of DoWork() is supposed to perform the work found by
|
||||
// PollForWork(). The tag and ok parameters are the same as returned by
|
||||
// PollForWork()
|
||||
//
|
||||
// The implementation of DoWork() should also do any setup needed to ensure
|
||||
// that the next call to PollForWork() (not necessarily by the current thread)
|
||||
// actually finds some work
|
||||
virtual void DoWork(void* tag, bool ok) = 0; |
||||
|
||||
// Mark the ThreadManager as shutdown and begin draining the work. This is a
|
||||
// non-blocking call and the caller should call Wait(), a blocking call which
|
||||
// returns only once the shutdown is complete
|
||||
void Shutdown(); |
||||
|
||||
// Has Shutdown() been called
|
||||
bool IsShutdown(); |
||||
|
||||
// A blocking call that returns only after the ThreadManager has shutdown and
|
||||
// all the threads have drained all the outstanding work
|
||||
void Wait(); |
||||
|
||||
private: |
||||
// Helper wrapper class around std::thread. This takes a ThreadManager object
|
||||
// and starts a new std::thread to calls the Run() function.
|
||||
//
|
||||
// The Run() function calls ThreadManager::MainWorkLoop() function and once
|
||||
// that completes, it marks the WorkerThread completed by calling
|
||||
// ThreadManager::MarkAsCompleted()
|
||||
class WorkerThread { |
||||
public: |
||||
WorkerThread(ThreadManager* thd_mgr); |
||||
~WorkerThread(); |
||||
|
||||
private: |
||||
// Calls thd_mgr_->MainWorkLoop() and once that completes, calls
|
||||
// thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
|
||||
void Run(); |
||||
|
||||
ThreadManager* thd_mgr_; |
||||
grpc::thread thd_; |
||||
}; |
||||
|
||||
// The main funtion in ThreadManager
|
||||
void MainWorkLoop(); |
||||
|
||||
// Create a new poller if the number of current pollers is less than the
|
||||
// minimum number of pollers needed (i.e min_pollers).
|
||||
void MaybeCreatePoller(); |
||||
|
||||
// Returns true if the current thread can resume as a poller. i.e if the
|
||||
// current number of pollers is less than the max_pollers.
|
||||
bool MaybeContinueAsPoller(); |
||||
|
||||
void MarkAsCompleted(WorkerThread* thd); |
||||
void CleanupCompletedThreads(); |
||||
|
||||
// Protects shutdown_, num_pollers_ and num_threads_
|
||||
// TODO: sreek - Change num_pollers and num_threads_ to atomics
|
||||
grpc::mutex mu_; |
||||
|
||||
bool shutdown_; |
||||
grpc::condition_variable shutdown_cv_; |
||||
|
||||
// Number of threads doing polling
|
||||
int num_pollers_; |
||||
|
||||
// The minimum and maximum number of threads that should be doing polling
|
||||
int min_pollers_; |
||||
int max_pollers_; |
||||
|
||||
// The total number of threads (includes threads includes the threads that are
|
||||
// currently polling i.e num_pollers_)
|
||||
int num_threads_; |
||||
|
||||
grpc::mutex list_mu_; |
||||
std::list<WorkerThread*> completed_threads_; |
||||
}; |
||||
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_INTERNAL_CPP_THREAD_MANAGER_H
|
@ -0,0 +1,291 @@ |
||||
/* |
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
/** |
||||
* Benchmark client module |
||||
* @module |
||||
*/ |
||||
|
||||
'use strict'; |
||||
|
||||
var fs = require('fs'); |
||||
var path = require('path'); |
||||
var util = require('util'); |
||||
var EventEmitter = require('events'); |
||||
var http = require('http'); |
||||
var https = require('https'); |
||||
|
||||
var async = require('async'); |
||||
var _ = require('lodash'); |
||||
var PoissonProcess = require('poisson-process'); |
||||
var Histogram = require('./histogram'); |
||||
|
||||
/** |
||||
* Convert a time difference, as returned by process.hrtime, to a number of |
||||
* nanoseconds. |
||||
* @param {Array.<number>} time_diff The time diff, represented as |
||||
* [seconds, nanoseconds] |
||||
* @return {number} The total number of nanoseconds |
||||
*/ |
||||
function timeDiffToNanos(time_diff) { |
||||
return time_diff[0] * 1e9 + time_diff[1]; |
||||
} |
||||
|
||||
function BenchmarkClient(server_targets, channels, histogram_params, |
||||
security_params) { |
||||
var options = { |
||||
method: 'PUT', |
||||
headers: { |
||||
'Content-Type': 'application/json' |
||||
} |
||||
}; |
||||
var protocol; |
||||
if (security_params) { |
||||
var ca_path; |
||||
protocol = https; |
||||
this.request = _.bind(https.request, https); |
||||
if (security_params.use_test_ca) { |
||||
ca_path = path.join(__dirname, '../test/data/ca.pem'); |
||||
var ca_data = fs.readFileSync(ca_path); |
||||
options.ca = ca_data; |
||||
} |
||||
if (security_params.server_host_override) { |
||||
var host_override = security_params.server_host_override; |
||||
options.servername = host_override; |
||||
} |
||||
} else { |
||||
protocol = http; |
||||
} |
||||
|
||||
this.request = _.bind(protocol.request, protocol); |
||||
|
||||
this.client_options = []; |
||||
|
||||
for (var i = 0; i < channels; i++) { |
||||
var host_port; |
||||
host_port = server_targets[i % server_targets.length].split(':') |
||||
var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options); |
||||
new_options.agent = new protocol.Agent(new_options); |
||||
this.client_options[i] = new_options; |
||||
} |
||||
|
||||
this.histogram = new Histogram(histogram_params.resolution, |
||||
histogram_params.max_possible); |
||||
|
||||
this.running = false; |
||||
|
||||
this.pending_calls = 0; |
||||
} |
||||
|
||||
util.inherits(BenchmarkClient, EventEmitter); |
||||
|
||||
function startAllClients(client_options_list, outstanding_rpcs_per_channel, |
||||
makeCall, emitter) { |
||||
_.each(client_options_list, function(client_options) { |
||||
_.times(outstanding_rpcs_per_channel, function() { |
||||
makeCall(client_options); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
BenchmarkClient.prototype.startClosedLoop = function( |
||||
outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) { |
||||
var self = this; |
||||
|
||||
var options = {}; |
||||
|
||||
self.running = true; |
||||
|
||||
if (rpc_type == 'UNARY') { |
||||
options.path = '/serviceProto.BenchmarkService.service/unaryCall'; |
||||
} else { |
||||
self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type)); |
||||
} |
||||
|
||||
if (generic) { |
||||
self.emit('error', new Error('Generic client not supported')); |
||||
} |
||||
|
||||
self.last_wall_time = process.hrtime(); |
||||
|
||||
var argument = { |
||||
response_size: resp_size, |
||||
payload: { |
||||
body: '0'.repeat(req_size) |
||||
} |
||||
}; |
||||
|
||||
function makeCall(client_options) { |
||||
if (self.running) { |
||||
self.pending_calls++; |
||||
var start_time = process.hrtime(); |
||||
var req = self.request(client_options, function(res) { |
||||
var res_data = ''; |
||||
res.on('data', function(data) { |
||||
res_data += data; |
||||
}); |
||||
res.on('end', function() { |
||||
JSON.parse(res_data); |
||||
var time_diff = process.hrtime(start_time); |
||||
self.histogram.add(timeDiffToNanos(time_diff)); |
||||
makeCall(client_options); |
||||
self.pending_calls--; |
||||
if ((!self.running) && self.pending_calls == 0) { |
||||
self.emit('finished'); |
||||
} |
||||
}); |
||||
}); |
||||
req.write(JSON.stringify(argument)); |
||||
req.end(); |
||||
req.on('error', function(error) { |
||||
self.emit('error', new Error('Client error: ' + error.message)); |
||||
self.running = false; |
||||
}); |
||||
} |
||||
} |
||||
|
||||
startAllClients(_.map(self.client_options, _.partial(_.assign, options)), |
||||
outstanding_rpcs_per_channel, makeCall, self); |
||||
}; |
||||
|
||||
BenchmarkClient.prototype.startPoisson = function( |
||||
outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load, |
||||
generic) { |
||||
var self = this; |
||||
|
||||
var options = {}; |
||||
|
||||
self.running = true; |
||||
|
||||
if (rpc_type == 'UNARY') { |
||||
options.path = '/serviceProto.BenchmarkService.service/unaryCall'; |
||||
} else { |
||||
self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type)); |
||||
} |
||||
|
||||
if (generic) { |
||||
self.emit('error', new Error('Generic client not supported')); |
||||
} |
||||
|
||||
self.last_wall_time = process.hrtime(); |
||||
|
||||
var argument = { |
||||
response_size: resp_size, |
||||
payload: { |
||||
body: '0'.repeat(req_size) |
||||
} |
||||
}; |
||||
|
||||
function makeCall(client_options, poisson) { |
||||
if (self.running) { |
||||
self.pending_calls++; |
||||
var start_time = process.hrtime(); |
||||
var req = self.request(client_options, function(res) { |
||||
var res_data = ''; |
||||
res.on('data', function(data) { |
||||
res_data += data; |
||||
}); |
||||
res.on('end', function() { |
||||
JSON.parse(res_data); |
||||
var time_diff = process.hrtime(start_time); |
||||
self.histogram.add(timeDiffToNanos(time_diff)); |
||||
self.pending_calls--; |
||||
if ((!self.running) && self.pending_calls == 0) { |
||||
self.emit('finished'); |
||||
} |
||||
}); |
||||
}); |
||||
req.write(JSON.stringify(argument)); |
||||
req.end(); |
||||
req.on('error', function(error) { |
||||
self.emit('error', new Error('Client error: ' + error.message)); |
||||
self.running = false; |
||||
}); |
||||
} else { |
||||
poisson.stop(); |
||||
} |
||||
} |
||||
|
||||
var averageIntervalMs = (1 / offered_load) * 1000; |
||||
|
||||
startAllClients(_.map(self.client_options, _.partial(_.assign, options)), |
||||
outstanding_rpcs_per_channel, function(opts){ |
||||
var p = PoissonProcess.create(averageIntervalMs, function() { |
||||
makeCall(opts, p); |
||||
}); |
||||
p.start(); |
||||
}, self); |
||||
}; |
||||
|
||||
/** |
||||
* Return curent statistics for the client. If reset is set, restart |
||||
* statistic collection. |
||||
* @param {boolean} reset Indicates that statistics should be reset |
||||
* @return {object} Client statistics |
||||
*/ |
||||
BenchmarkClient.prototype.mark = function(reset) { |
||||
var wall_time_diff = process.hrtime(this.last_wall_time); |
||||
var histogram = this.histogram; |
||||
if (reset) { |
||||
this.last_wall_time = process.hrtime(); |
||||
this.histogram = new Histogram(histogram.resolution, |
||||
histogram.max_possible); |
||||
} |
||||
|
||||
return { |
||||
latencies: { |
||||
bucket: histogram.getContents(), |
||||
min_seen: histogram.minimum(), |
||||
max_seen: histogram.maximum(), |
||||
sum: histogram.getSum(), |
||||
sum_of_squares: histogram.sumOfSquares(), |
||||
count: histogram.getCount() |
||||
}, |
||||
time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, |
||||
// Not sure how to measure these values
|
||||
time_user: 0, |
||||
time_system: 0 |
||||
}; |
||||
}; |
||||
|
||||
/** |
||||
* Stop the clients. |
||||
* @param {function} callback Called when the clients have finished shutting |
||||
* down |
||||
*/ |
||||
BenchmarkClient.prototype.stop = function(callback) { |
||||
this.running = false; |
||||
this.on('finished', callback); |
||||
}; |
||||
|
||||
module.exports = BenchmarkClient; |
@ -0,0 +1,109 @@ |
||||
/* |
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
/** |
||||
* Benchmark server module |
||||
* @module |
||||
*/ |
||||
|
||||
'use strict'; |
||||
|
||||
var fs = require('fs'); |
||||
var path = require('path'); |
||||
var http = require('http'); |
||||
var https = require('https'); |
||||
var EventEmitter = require('events'); |
||||
var util = require('util'); |
||||
|
||||
var express = require('express'); |
||||
var bodyParser = require('body-parser') |
||||
|
||||
function unaryCall(req, res) { |
||||
var reqObj = req.body; |
||||
var payload = {body: '0'.repeat(reqObj.response_size)}; |
||||
res.json(payload); |
||||
} |
||||
|
||||
function BenchmarkServer(host, port, tls, generic, response_size) { |
||||
var app = express(); |
||||
app.use(bodyParser.json()) |
||||
app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall); |
||||
this.input_host = host; |
||||
this.input_port = port; |
||||
if (tls) { |
||||
var credentials = {}; |
||||
var key_path = path.join(__dirname, '../test/data/server1.key'); |
||||
var pem_path = path.join(__dirname, '../test/data/server1.pem'); |
||||
|
||||
var key_data = fs.readFileSync(key_path); |
||||
var pem_data = fs.readFileSync(pem_path); |
||||
credentials['key'] = key_data; |
||||
credentials['cert'] = pem_data; |
||||
this.server = https.createServer(credentials, app); |
||||
} else { |
||||
this.server = http.createServer(app); |
||||
} |
||||
} |
||||
|
||||
util.inherits(BenchmarkServer, EventEmitter); |
||||
|
||||
BenchmarkServer.prototype.start = function() { |
||||
var self = this; |
||||
this.server.listen(this.input_port, this.input_hostname, function() { |
||||
self.last_wall_time = process.hrtime(); |
||||
self.emit('started'); |
||||
}); |
||||
}; |
||||
|
||||
BenchmarkServer.prototype.getPort = function() { |
||||
return this.server.address().port; |
||||
}; |
||||
|
||||
BenchmarkServer.prototype.mark = function(reset) { |
||||
var wall_time_diff = process.hrtime(this.last_wall_time); |
||||
if (reset) { |
||||
this.last_wall_time = process.hrtime(); |
||||
} |
||||
return { |
||||
time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, |
||||
// Not sure how to measure these values
|
||||
time_user: 0, |
||||
time_system: 0 |
||||
}; |
||||
}; |
||||
|
||||
BenchmarkServer.prototype.stop = function(callback) { |
||||
this.server.close(callback); |
||||
}; |
||||
|
||||
module.exports = BenchmarkServer; |
@ -0,0 +1,37 @@ |
||||
|
||||
// Copyright 2016, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
syntax = "proto2"; |
||||
|
||||
package grpc.testing.proto2; |
||||
|
||||
message EmptyWithExtensions { |
||||
extensions 100 to 999; |
||||
} |
@ -0,0 +1,43 @@ |
||||
// Copyright 2016, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
syntax = "proto2"; |
||||
|
||||
import "src/proto/grpc/testing/proto2/empty2.proto"; |
||||
|
||||
package grpc.testing.proto2; |
||||
|
||||
// Fill emptiness with music. |
||||
extend grpc.testing.proto2.EmptyWithExtensions { |
||||
optional int64 Deadmau5 = 124; |
||||
optional float Madeon = 125; |
||||
optional string AboveAndBeyond = 126; |
||||
optional bool Tycho = 127; |
||||
optional fixed64 Pendulum = 128; |
||||
} |
@ -1 +1,3 @@ |
||||
gens/ |
||||
*_pb2.py |
||||
*_pb2_grpc.py |
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue