|
|
|
@ -33,6 +33,7 @@ |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/channel/channel_args.h" |
|
|
|
|
#include "src/core/lib/gpr/useful.h" |
|
|
|
|
#include "src/core/lib/iomgr/combiner.h" |
|
|
|
|
#include "src/core/lib/slice/slice_internal.h" |
|
|
|
@ -484,6 +485,7 @@ static grpc_slice ru_slice_create(grpc_resource_user* resource_user, |
|
|
|
|
* the combiner |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
// TODO(hork): rename all ru variables to resource_user
|
|
|
|
|
static void ru_allocate(void* ru, grpc_error_handle /*error*/) { |
|
|
|
|
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru); |
|
|
|
|
if (rulist_empty(resource_user->resource_quota, |
|
|
|
@ -590,11 +592,14 @@ static void ru_destroy(void* ru, grpc_error_handle /*error*/) { |
|
|
|
|
} |
|
|
|
|
grpc_resource_quota_unref_internal(resource_user->resource_quota); |
|
|
|
|
gpr_mu_destroy(&resource_user->mu); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "RU '%s' (%p) destroyed", resource_user->name.c_str(), |
|
|
|
|
resource_user); |
|
|
|
|
} |
|
|
|
|
delete resource_user; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ru_alloc_slices( |
|
|
|
|
grpc_resource_user_slice_allocator* slice_allocator) { |
|
|
|
|
static void ru_alloc_slices(grpc_slice_allocator* slice_allocator) { |
|
|
|
|
for (size_t i = 0; i < slice_allocator->count; i++) { |
|
|
|
|
grpc_slice_buffer_add_indexed( |
|
|
|
|
slice_allocator->dest, ru_slice_create(slice_allocator->resource_user, |
|
|
|
@ -603,8 +608,8 @@ static void ru_alloc_slices( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ru_allocated_slices(void* arg, grpc_error_handle error) { |
|
|
|
|
grpc_resource_user_slice_allocator* slice_allocator = |
|
|
|
|
static_cast<grpc_resource_user_slice_allocator*>(arg); |
|
|
|
|
grpc_slice_allocator* slice_allocator = |
|
|
|
|
static_cast<grpc_slice_allocator*>(arg); |
|
|
|
|
if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator); |
|
|
|
|
grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
@ -740,16 +745,10 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { |
|
|
|
|
|
|
|
|
|
grpc_resource_quota* grpc_resource_quota_from_channel_args( |
|
|
|
|
const grpc_channel_args* channel_args, bool create) { |
|
|
|
|
for (size_t i = 0; i < channel_args->num_args; i++) { |
|
|
|
|
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { |
|
|
|
|
if (channel_args->args[i].type == GRPC_ARG_POINTER) { |
|
|
|
|
return grpc_resource_quota_ref_internal( |
|
|
|
|
static_cast<grpc_resource_quota*>( |
|
|
|
|
channel_args->args[i].value.pointer.p)); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
auto* resource_quota = grpc_channel_args_find_pointer<grpc_resource_quota>( |
|
|
|
|
channel_args, GRPC_ARG_RESOURCE_QUOTA); |
|
|
|
|
if (resource_quota != nullptr) { |
|
|
|
|
return grpc_resource_quota_ref_internal(resource_quota); |
|
|
|
|
} |
|
|
|
|
return create ? grpc_resource_quota_create(nullptr) : nullptr; |
|
|
|
|
} |
|
|
|
@ -775,7 +774,7 @@ const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) { |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
grpc_resource_user* grpc_resource_user_create( |
|
|
|
|
grpc_resource_quota* resource_quota, const char* name) { |
|
|
|
|
grpc_resource_quota* resource_quota, absl::string_view name) { |
|
|
|
|
grpc_resource_user* resource_user = new grpc_resource_user; |
|
|
|
|
resource_user->resource_quota = |
|
|
|
|
grpc_resource_quota_ref_internal(resource_quota); |
|
|
|
@ -805,14 +804,16 @@ grpc_resource_user* grpc_resource_user_create( |
|
|
|
|
for (int i = 0; i < GRPC_RULIST_COUNT; i++) { |
|
|
|
|
resource_user->links[i].next = resource_user->links[i].prev = nullptr; |
|
|
|
|
} |
|
|
|
|
// TODO(hork): the RU should own a copy of the name. See Craig's comments on
|
|
|
|
|
// the EventEngine gRFC for justification.
|
|
|
|
|
if (name != nullptr) { |
|
|
|
|
resource_user->name = name; |
|
|
|
|
resource_user->name = std::string(name); |
|
|
|
|
} else { |
|
|
|
|
resource_user->name = absl::StrCat( |
|
|
|
|
"anonymous_resource_user_", reinterpret_cast<intptr_t>(resource_user)); |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "RU '%s' (%p) created", resource_user->name.c_str(), |
|
|
|
|
resource_user); |
|
|
|
|
} |
|
|
|
|
return resource_user; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -823,13 +824,22 @@ grpc_resource_quota* grpc_resource_user_quota( |
|
|
|
|
|
|
|
|
|
static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) { |
|
|
|
|
GPR_ASSERT(amount > 0); |
|
|
|
|
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0); |
|
|
|
|
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "RU '%s' (%p) reffing: %" PRIdPTR " -> %" PRIdPTR, |
|
|
|
|
resource_user->name.c_str(), resource_user, prior, prior + amount); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(prior != 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) { |
|
|
|
|
GPR_ASSERT(amount > 0); |
|
|
|
|
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); |
|
|
|
|
GPR_ASSERT(old >= amount); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "RU '%s' (%p) unreffing: %" PRIdPTR " -> %" PRIdPTR, |
|
|
|
|
resource_user->name.c_str(), resource_user, old, old - amount); |
|
|
|
|
} |
|
|
|
|
if (old == amount) { |
|
|
|
|
resource_user->resource_quota->combiner->Run( |
|
|
|
|
&resource_user->destroy_closure, GRPC_ERROR_NONE); |
|
|
|
@ -857,9 +867,10 @@ bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user, |
|
|
|
|
GPR_ASSERT(thread_count >= 0); |
|
|
|
|
bool is_success = false; |
|
|
|
|
gpr_mu_lock(&resource_user->resource_quota->thread_count_mu); |
|
|
|
|
grpc_resource_quota* rq = resource_user->resource_quota; |
|
|
|
|
if (rq->num_threads_allocated + thread_count <= rq->max_threads) { |
|
|
|
|
rq->num_threads_allocated += thread_count; |
|
|
|
|
grpc_resource_quota* resource_quota = resource_user->resource_quota; |
|
|
|
|
if (resource_quota->num_threads_allocated + thread_count <= |
|
|
|
|
resource_quota->max_threads) { |
|
|
|
|
resource_quota->num_threads_allocated += thread_count; |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated, |
|
|
|
|
thread_count); |
|
|
|
|
is_success = true; |
|
|
|
@ -872,15 +883,16 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user, |
|
|
|
|
int thread_count) { |
|
|
|
|
GPR_ASSERT(thread_count >= 0); |
|
|
|
|
gpr_mu_lock(&resource_user->resource_quota->thread_count_mu); |
|
|
|
|
grpc_resource_quota* rq = resource_user->resource_quota; |
|
|
|
|
rq->num_threads_allocated -= thread_count; |
|
|
|
|
grpc_resource_quota* resource_quota = resource_user->resource_quota; |
|
|
|
|
resource_quota->num_threads_allocated -= thread_count; |
|
|
|
|
int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add( |
|
|
|
|
&resource_user->num_threads_allocated, -thread_count)); |
|
|
|
|
if (old_count < thread_count || rq->num_threads_allocated < 0) { |
|
|
|
|
if (old_count < thread_count || resource_quota->num_threads_allocated < 0) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Releasing more threads (%d) than currently allocated (rq threads: " |
|
|
|
|
"%d, ru threads: %d)", |
|
|
|
|
thread_count, rq->num_threads_allocated + thread_count, old_count); |
|
|
|
|
"Releasing more threads (%d) than currently allocated " |
|
|
|
|
"(resource_quota threads: %d, ru threads: %d)", |
|
|
|
|
thread_count, resource_quota->num_threads_allocated + thread_count, |
|
|
|
|
old_count); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu); |
|
|
|
@ -988,19 +1000,69 @@ void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) { |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_resource_user_slice_allocator_init( |
|
|
|
|
grpc_resource_user_slice_allocator* slice_allocator, |
|
|
|
|
grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) { |
|
|
|
|
grpc_slice_allocator* grpc_slice_allocator_create( |
|
|
|
|
grpc_resource_quota* resource_quota, absl::string_view name, |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
grpc_slice_allocator* slice_allocator = new grpc_slice_allocator; |
|
|
|
|
slice_allocator->min_length = grpc_channel_args_find_integer( |
|
|
|
|
args, GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE, |
|
|
|
|
{GRPC_SLICE_ALLOCATOR_MIN_ALLOCATE_SIZE, -1, INT_MAX}); |
|
|
|
|
slice_allocator->max_length = grpc_channel_args_find_integer( |
|
|
|
|
args, GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE, |
|
|
|
|
{GRPC_SLICE_ALLOCATOR_MAX_ALLOCATE_SIZE, -1, INT_MAX}); |
|
|
|
|
slice_allocator->resource_user = |
|
|
|
|
grpc_resource_user_create(resource_quota, name); |
|
|
|
|
GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices, |
|
|
|
|
slice_allocator, grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
slice_allocator->resource_user = resource_user; |
|
|
|
|
return slice_allocator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_slice_allocator_destroy(grpc_slice_allocator* slice_allocator) { |
|
|
|
|
ru_unref_by(slice_allocator->resource_user, 1); |
|
|
|
|
delete slice_allocator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool grpc_resource_user_alloc_slices( |
|
|
|
|
grpc_resource_user_slice_allocator* slice_allocator, size_t length, |
|
|
|
|
size_t count, grpc_slice_buffer* dest) { |
|
|
|
|
static size_t grpc_slice_allocator_adjust_allocation_length( |
|
|
|
|
grpc_slice_allocator* slice_allocator, size_t requested_length, |
|
|
|
|
grpc_slice_allocator_intent intent) { |
|
|
|
|
if (intent == grpc_slice_allocator_intent::kDefault) { |
|
|
|
|
return requested_length; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(intent == grpc_slice_allocator_intent::kReadBuffer); |
|
|
|
|
double pressure = grpc_resource_quota_get_memory_pressure( |
|
|
|
|
slice_allocator->resource_user->resource_quota); |
|
|
|
|
// Reduce allocation size proportional to the pressure > 80% usage.
|
|
|
|
|
size_t target = |
|
|
|
|
requested_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); |
|
|
|
|
// Target will be some multiple of 8 bytes, rounded up
|
|
|
|
|
target = ((static_cast<size_t> GPR_CLAMP(target, slice_allocator->min_length, |
|
|
|
|
slice_allocator->max_length)) + |
|
|
|
|
255) & |
|
|
|
|
~static_cast<size_t>(255); |
|
|
|
|
// Don't use more than 1/16th of the overall resource quota for a single
|
|
|
|
|
// read alloc
|
|
|
|
|
size_t rqmax = grpc_resource_quota_peek_size( |
|
|
|
|
slice_allocator->resource_user->resource_quota); |
|
|
|
|
if (target > rqmax / 16 && rqmax > 1024) { |
|
|
|
|
target = rqmax / 16; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"SliceAllocator(%p) requested %zu bytes for (%s) intent, adjusted " |
|
|
|
|
"allocation size to %zu", |
|
|
|
|
slice_allocator, requested_length, |
|
|
|
|
intent == grpc_slice_allocator_intent::kDefault ? "default" : "read", |
|
|
|
|
target); |
|
|
|
|
} |
|
|
|
|
return target; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool grpc_slice_allocator_allocate(grpc_slice_allocator* slice_allocator, |
|
|
|
|
size_t length, size_t count, |
|
|
|
|
grpc_slice_allocator_intent intent, |
|
|
|
|
grpc_slice_buffer* dest, |
|
|
|
|
grpc_iomgr_cb_func cb, void* p) { |
|
|
|
|
if (GPR_UNLIKELY( |
|
|
|
|
gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) { |
|
|
|
|
grpc_core::ExecCtx::Run( |
|
|
|
@ -1008,12 +1070,35 @@ bool grpc_resource_user_alloc_slices( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
slice_allocator->length = length; |
|
|
|
|
GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
slice_allocator->length = grpc_slice_allocator_adjust_allocation_length( |
|
|
|
|
slice_allocator, length, intent); |
|
|
|
|
slice_allocator->count = count; |
|
|
|
|
slice_allocator->dest = dest; |
|
|
|
|
const bool ret = |
|
|
|
|
grpc_resource_user_alloc(slice_allocator->resource_user, count * length, |
|
|
|
|
&slice_allocator->on_allocated); |
|
|
|
|
const bool ret = grpc_resource_user_alloc(slice_allocator->resource_user, |
|
|
|
|
count * slice_allocator->length, |
|
|
|
|
&slice_allocator->on_allocated); |
|
|
|
|
if (ret) ru_alloc_slices(slice_allocator); |
|
|
|
|
return ret; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_slice_allocator_factory* grpc_slice_allocator_factory_create( |
|
|
|
|
grpc_resource_quota* resource_quota) { |
|
|
|
|
grpc_slice_allocator_factory* factory = new grpc_slice_allocator_factory; |
|
|
|
|
factory->resource_quota = resource_quota; |
|
|
|
|
return factory; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_slice_allocator* grpc_slice_allocator_factory_create_slice_allocator( |
|
|
|
|
grpc_slice_allocator_factory* slice_allocator_factory, |
|
|
|
|
absl::string_view name, grpc_channel_args* args) { |
|
|
|
|
return grpc_slice_allocator_create(slice_allocator_factory->resource_quota, |
|
|
|
|
name, args); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_slice_allocator_factory_destroy( |
|
|
|
|
grpc_slice_allocator_factory* slice_allocator_factory) { |
|
|
|
|
grpc_resource_quota_unref_internal(slice_allocator_factory->resource_quota); |
|
|
|
|
delete slice_allocator_factory; |
|
|
|
|
} |
|
|
|
|