Merge pull request #16823 from AspirinSJL/rq

Account the memory usage of channel and call by resource quota
pull/17014/head
Juanli Shen 6 years ago committed by GitHub
commit 04c34590c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  2. 29
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.h
  4. 2
      src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
  5. 2
      src/core/ext/transport/chttp2/transport/internal.h
  6. 16
      src/core/lib/channel/channel_stack_builder.cc
  7. 8
      src/core/lib/channel/channel_stack_builder.h
  8. 68
      src/core/lib/iomgr/resource_quota.cc
  9. 27
      src/core/lib/iomgr/resource_quota.h
  10. 20
      src/core/lib/surface/channel.cc
  11. 3
      src/core/lib/surface/channel.h
  12. 28
      src/core/lib/surface/server.cc
  13. 5
      src/core/lib/surface/server.h
  14. 3
      tools/run_tests/sanity/core_banned_functions.py

@ -39,6 +39,7 @@
#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
@ -114,9 +115,16 @@ static void on_handshake_done(void* arg, grpc_error* error) {
server_connection_state* connection_state = server_connection_state* connection_state =
static_cast<server_connection_state*>(args->user_data); static_cast<server_connection_state*>(args->user_data);
gpr_mu_lock(&connection_state->svr_state->mu); gpr_mu_lock(&connection_state->svr_state->mu);
grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
connection_state->svr_state->server);
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char* error_str = grpc_error_string(error); const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
connection_state->svr_state->server);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so // We were shut down after handshaking completed successfully, so
// destroy the endpoint here. // destroy the endpoint here.
@ -134,13 +142,14 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// If the handshaking succeeded but there is no endpoint, then the // If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external // handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport. // code, so we can just clean up here without creating a transport.
// TODO(juanlishen): Do we need to free the memory to resource user?
if (args->endpoint != nullptr) { if (args->endpoint != nullptr) {
grpc_transport* transport = grpc_transport* transport = grpc_create_chttp2_transport(
grpc_create_chttp2_transport(args->args, args->endpoint, false); args->args, args->endpoint, false, resource_user);
grpc_server_setup_transport( grpc_server_setup_transport(
connection_state->svr_state->server, transport, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args, connection_state->accepting_pollset, args->args,
grpc_chttp2_transport_get_socket_uuid(transport)); grpc_chttp2_transport_get_socket_uuid(transport), resource_user);
// Use notify_on_receive_settings callback to enforce the // Use notify_on_receive_settings callback to enforce the
// handshake deadline. // handshake deadline.
connection_state->transport = connection_state->transport =
@ -183,6 +192,20 @@ static void on_accept(void* arg, grpc_endpoint* tcp,
gpr_free(acceptor); gpr_free(acceptor);
return; return;
} }
grpc_resource_user* resource_user =
grpc_server_get_default_resource_user(state->server);
if (resource_user != nullptr &&
!grpc_resource_user_safe_alloc(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
gpr_log(
GPR_ERROR,
"Memory quota exhausted, rejecting the connection, no handshaking.");
gpr_mu_unlock(&state->mu);
grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
return;
}
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs, grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs,
handshake_mgr); handshake_mgr);

@ -478,7 +478,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
static void init_transport(grpc_chttp2_transport* t, static void init_transport(grpc_chttp2_transport* t,
const grpc_channel_args* channel_args, const grpc_channel_args* channel_args,
grpc_endpoint* ep, bool is_client) { grpc_endpoint* ep, bool is_client,
grpc_resource_user* resource_user) {
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@ -491,6 +492,7 @@ static void init_transport(grpc_chttp2_transport* t,
t->endpoint_reading = 1; t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2; t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client; t->is_client = is_client;
t->resource_user = resource_user;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true; t->is_first_frame = true;
grpc_connectivity_state_init( grpc_connectivity_state_init(
@ -778,6 +780,10 @@ static void destroy_stream_locked(void* sp, grpc_error* error) {
s->flow_control.Destroy(); s->flow_control.Destroy();
if (t->resource_user != nullptr) {
grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE);
@ -816,7 +822,21 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
if (t->channel_callback.accept_stream == nullptr) { if (t->channel_callback.accept_stream == nullptr) {
return nullptr; return nullptr;
} }
grpc_chttp2_stream* accepting; // Don't accept the stream if memory quota doesn't allow. Note that we should
// simply refuse the stream here instead of canceling the stream after it's
// accepted since the latter will create the call which costs much memory.
if (t->resource_user != nullptr &&
!grpc_resource_user_safe_alloc(t->resource_user,
GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
grpc_slice_buffer_add(
&t->qbuf,
grpc_chttp2_rst_stream_create(
id, static_cast<uint32_t>(GRPC_HTTP2_REFUSED_STREAM), nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
return nullptr;
}
grpc_chttp2_stream* accepting = nullptr;
GPR_ASSERT(t->accepting_stream == nullptr); GPR_ASSERT(t->accepting_stream == nullptr);
t->accepting_stream = &accepting; t->accepting_stream = &accepting;
t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
@ -3185,10 +3205,11 @@ intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) {
} }
grpc_transport* grpc_create_chttp2_transport( grpc_transport* grpc_create_chttp2_transport(
const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
grpc_resource_user* resource_user) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>( grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(
gpr_zalloc(sizeof(grpc_chttp2_transport))); gpr_zalloc(sizeof(grpc_chttp2_transport)));
init_transport(t, channel_args, ep, is_client); init_transport(t, channel_args, ep, is_client, resource_user);
return &t->base; return &t->base;
} }

@ -32,7 +32,8 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount;
extern bool g_flow_control_enabled; extern bool g_flow_control_enabled;
grpc_transport* grpc_create_chttp2_transport( grpc_transport* grpc_create_chttp2_transport(
const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client); const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
grpc_resource_user* resource_user = nullptr);
intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport); intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport);

@ -32,7 +32,7 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats) { grpc_transport_one_way_stats* stats) {
static const size_t frame_size = 13; static const size_t frame_size = 13;
grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); grpc_slice slice = GRPC_SLICE_MALLOC(frame_size);
stats->framing_bytes += frame_size; if (stats != nullptr) stats->framing_bytes += frame_size;
uint8_t* p = GRPC_SLICE_START_PTR(slice); uint8_t* p = GRPC_SLICE_START_PTR(slice);
// Frame size. // Frame size.

@ -285,6 +285,8 @@ struct grpc_chttp2_transport {
grpc_endpoint* ep; grpc_endpoint* ep;
char* peer_string; char* peer_string;
grpc_resource_user* resource_user;
grpc_combiner* combiner; grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings; grpc_closure* notify_on_receive_settings;

@ -40,6 +40,7 @@ struct grpc_channel_stack_builder {
// various set/get-able parameters // various set/get-able parameters
grpc_channel_args* args; grpc_channel_args* args;
grpc_transport* transport; grpc_transport* transport;
grpc_resource_user* resource_user;
char* target; char* target;
const char* name; const char* name;
}; };
@ -157,6 +158,11 @@ void grpc_channel_stack_builder_set_channel_arguments(
builder->args = grpc_channel_args_copy(args); builder->args = grpc_channel_args_copy(args);
} }
const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments(
grpc_channel_stack_builder* builder) {
return builder->args;
}
void grpc_channel_stack_builder_set_transport( void grpc_channel_stack_builder_set_transport(
grpc_channel_stack_builder* builder, grpc_transport* transport) { grpc_channel_stack_builder* builder, grpc_transport* transport) {
GPR_ASSERT(builder->transport == nullptr); GPR_ASSERT(builder->transport == nullptr);
@ -168,9 +174,15 @@ grpc_transport* grpc_channel_stack_builder_get_transport(
return builder->transport; return builder->transport;
} }
const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments( void grpc_channel_stack_builder_set_resource_user(
grpc_channel_stack_builder* builder, grpc_resource_user* resource_user) {
GPR_ASSERT(builder->resource_user == nullptr);
builder->resource_user = resource_user;
}
grpc_resource_user* grpc_channel_stack_builder_get_resource_user(
grpc_channel_stack_builder* builder) { grpc_channel_stack_builder* builder) {
return builder->args; return builder->resource_user;
} }
bool grpc_channel_stack_builder_append_filter( bool grpc_channel_stack_builder_append_filter(

@ -54,6 +54,14 @@ void grpc_channel_stack_builder_set_transport(
grpc_transport* grpc_channel_stack_builder_get_transport( grpc_transport* grpc_channel_stack_builder_get_transport(
grpc_channel_stack_builder* builder); grpc_channel_stack_builder* builder);
/// Attach \a resource_user to the builder (does not take ownership)
void grpc_channel_stack_builder_set_resource_user(
grpc_channel_stack_builder* builder, grpc_resource_user* resource_user);
/// Fetch attached resource user
grpc_resource_user* grpc_channel_stack_builder_get_resource_user(
grpc_channel_stack_builder* builder);
/// Set channel arguments: copies args /// Set channel arguments: copies args
void grpc_channel_stack_builder_set_channel_arguments( void grpc_channel_stack_builder_set_channel_arguments(
grpc_channel_stack_builder* builder, const grpc_channel_args* args); grpc_channel_stack_builder* builder, const grpc_channel_args* args);

@ -90,7 +90,8 @@ struct grpc_resource_user {
grpc_closure_list on_allocated; grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */ /* True if we are currently trying to allocate from the quota, false if not */
bool allocating; bool allocating;
/* How many bytes of allocations are outstanding */ /* The amount of memory (in bytes) that has been requested from this user
* asynchronously but hasn't been granted yet. */
int64_t outstanding_allocations; int64_t outstanding_allocations;
/* True if we are currently trying to add ourselves to the non-free quota /* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */ list, false otherwise */
@ -135,6 +136,9 @@ struct grpc_resource_quota {
int64_t size; int64_t size;
/* Amount of free memory in the resource quota */ /* Amount of free memory in the resource quota */
int64_t free_pool; int64_t free_pool;
/* Used size of memory in the resource quota. Updated as soon as the resource
* users start to allocate or free the memory. */
gpr_atm used;
gpr_atm last_size; gpr_atm last_size;
@ -371,6 +375,7 @@ static bool rq_reclaim_from_per_user_free_pool(
while ((resource_user = rulist_pop_head(resource_quota, while ((resource_user = rulist_pop_head(resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL))) { GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&resource_user->mu); gpr_mu_lock(&resource_user->mu);
resource_user->added_to_free_pool = false;
if (resource_user->free_pool > 0) { if (resource_user->free_pool > 0) {
int64_t amt = resource_user->free_pool; int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0; resource_user->free_pool = 0;
@ -386,6 +391,13 @@ static bool rq_reclaim_from_per_user_free_pool(
gpr_mu_unlock(&resource_user->mu); gpr_mu_unlock(&resource_user->mu);
return true; return true;
} else { } else {
if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_INFO,
"RQ %s %s: failed to reclaim_from_per_user_free_pool; "
"free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
resource_quota->name, resource_user->name,
resource_user->free_pool, resource_quota->free_pool);
}
gpr_mu_unlock(&resource_user->mu); gpr_mu_unlock(&resource_user->mu);
} }
} }
@ -622,6 +634,7 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->combiner = grpc_combiner_create(); resource_quota->combiner = grpc_combiner_create();
resource_quota->free_pool = INT64_MAX; resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX; resource_quota->size = INT64_MAX;
resource_quota->used = 0;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
gpr_mu_init(&resource_quota->thread_count_mu); gpr_mu_init(&resource_quota->thread_count_mu);
resource_quota->max_threads = INT_MAX; resource_quota->max_threads = INT_MAX;
@ -712,7 +725,7 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
*/ */
grpc_resource_quota* grpc_resource_quota_from_channel_args( grpc_resource_quota* grpc_resource_quota_from_channel_args(
const grpc_channel_args* channel_args) { const grpc_channel_args* channel_args, bool create) {
for (size_t i = 0; i < channel_args->num_args; i++) { for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) { if (channel_args->args[i].type == GRPC_ARG_POINTER) {
@ -724,7 +737,7 @@ grpc_resource_quota* grpc_resource_quota_from_channel_args(
} }
} }
} }
return grpc_resource_quota_create(nullptr); return create ? grpc_resource_quota_create(nullptr) : nullptr;
} }
static void* rq_copy(void* rq) { static void* rq_copy(void* rq) {
@ -863,33 +876,68 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu); gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
} }
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, static void resource_user_alloc_locked(grpc_resource_user* resource_user,
size_t size,
grpc_closure* optional_on_done) { grpc_closure* optional_on_done) {
gpr_mu_lock(&resource_user->mu);
ru_ref_by(resource_user, static_cast<gpr_atm>(size)); ru_ref_by(resource_user, static_cast<gpr_atm>(size));
resource_user->free_pool -= static_cast<int64_t>(size); resource_user->free_pool -= static_cast<int64_t>(size);
resource_user->outstanding_allocations += static_cast<int64_t>(size);
if (grpc_resource_quota_trace.enabled()) { if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size, resource_user->resource_quota->name, resource_user->name, size,
resource_user->free_pool); resource_user->free_pool);
} }
if (resource_user->free_pool < 0) { if (resource_user->free_pool < 0) {
if (optional_on_done != nullptr) {
resource_user->outstanding_allocations += static_cast<int64_t>(size);
grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
}
if (!resource_user->allocating) { if (!resource_user->allocating) {
resource_user->allocating = true; resource_user->allocating = true;
GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
} }
} else { } else {
resource_user->outstanding_allocations -= static_cast<int64_t>(size);
GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
} }
}
bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
size_t size) {
if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
gpr_mu_lock(&resource_user->mu);
grpc_resource_quota* resource_quota = resource_user->resource_quota;
bool cas_success;
do {
gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
gpr_atm new_used = used + size;
if (static_cast<size_t>(new_used) >
grpc_resource_quota_peek_size(resource_quota)) {
gpr_mu_unlock(&resource_user->mu);
return false;
}
cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
} while (!cas_success);
resource_user_alloc_locked(resource_user, size, nullptr);
gpr_mu_unlock(&resource_user->mu);
return true;
}
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done) {
// TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
// because some tests become flaky after the change.
gpr_mu_lock(&resource_user->mu);
grpc_resource_quota* resource_quota = resource_user->resource_quota;
gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
resource_user_alloc_locked(resource_user, size, optional_on_done);
gpr_mu_unlock(&resource_user->mu); gpr_mu_unlock(&resource_user->mu);
} }
void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) { void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
gpr_mu_lock(&resource_user->mu); gpr_mu_lock(&resource_user->mu);
grpc_resource_quota* resource_quota = resource_user->resource_quota;
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
GPR_ASSERT(prior >= static_cast<long>(size));
bool was_zero_or_negative = resource_user->free_pool <= 0; bool was_zero_or_negative = resource_user->free_pool <= 0;
resource_user->free_pool += static_cast<int64_t>(size); resource_user->free_pool += static_cast<int64_t>(size);
if (grpc_resource_quota_trace.enabled()) { if (grpc_resource_quota_trace.enabled()) {
@ -940,6 +988,12 @@ void grpc_resource_user_slice_allocator_init(
void grpc_resource_user_alloc_slices( void grpc_resource_user_alloc_slices(
grpc_resource_user_slice_allocator* slice_allocator, size_t length, grpc_resource_user_slice_allocator* slice_allocator, size_t length,
size_t count, grpc_slice_buffer* dest) { size_t count, grpc_slice_buffer* dest) {
if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
GRPC_CLOSURE_SCHED(
&slice_allocator->on_allocated,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
return;
}
slice_allocator->length = length; slice_allocator->length = length;
slice_allocator->count = count; slice_allocator->count = count;
slice_allocator->dest = dest; slice_allocator->dest = dest;

@ -65,11 +65,16 @@
extern grpc_core::TraceFlag grpc_resource_quota_trace; extern grpc_core::TraceFlag grpc_resource_quota_trace;
// TODO(juanlishen): This is a hack. We need to do real accounting instead of
// hard coding.
constexpr size_t GRPC_RESOURCE_QUOTA_CALL_SIZE = 15 * 1024;
constexpr size_t GRPC_RESOURCE_QUOTA_CHANNEL_SIZE = 50 * 1024;
grpc_resource_quota* grpc_resource_quota_ref_internal( grpc_resource_quota* grpc_resource_quota_ref_internal(
grpc_resource_quota* resource_quota); grpc_resource_quota* resource_quota);
void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota); void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota);
grpc_resource_quota* grpc_resource_quota_from_channel_args( grpc_resource_quota* grpc_resource_quota_from_channel_args(
const grpc_channel_args* channel_args); const grpc_channel_args* channel_args, bool create = true);
/* Return a number indicating current memory pressure: /* Return a number indicating current memory pressure:
0.0 ==> no memory usage 0.0 ==> no memory usage
@ -109,11 +114,21 @@ bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
void grpc_resource_user_free_threads(grpc_resource_user* resource_user, void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
int thread_count); int thread_count);
/* Allocate from the resource user (and its quota). /* Allocates from the resource user 'size' worth of memory if this won't exceed
If optional_on_done is NULL, then allocate immediately. This may push the * the resource quota's total size. Returns whether the allocation is done
quota over-limit, at which point reclamation will kick in. * successfully. If allocated successfully, the memory should be freed by the
If optional_on_done is non-NULL, it will be scheduled when the allocation has * caller eventually. */
been granted by the quota. */ bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
size_t size);
/* Allocates from the resource user 'size' worth of memory.
* If optional_on_done is NULL, then allocate immediately. This may push the
* quota over-limit, at which point reclamation will kick in. The caller is
* always responsible to free the memory eventually.
* If optional_on_done is non-NULL, it will be scheduled without error when the
* allocation has been granted by the quota, and the caller is responsible to
* free the memory eventually. Or it may be scheduled with an error, in which
* case the caller fails to allocate the memory and shouldn't free the memory.
*/
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done); grpc_closure* optional_on_done);
/* Release memory back to the quota */ /* Release memory back to the quota */

@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
@ -63,6 +64,7 @@ struct grpc_channel {
grpc_compression_options compression_options; grpc_compression_options compression_options;
gpr_atm call_size_estimate; gpr_atm call_size_estimate;
grpc_resource_user* resource_user;
gpr_mu registered_call_mu; gpr_mu registered_call_mu;
registered_call* registered_calls; registered_call* registered_calls;
@ -82,6 +84,8 @@ grpc_channel* grpc_channel_create_with_builder(
char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder)); char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args* args = grpc_channel_args_copy( grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_resource_user* resource_user =
grpc_channel_stack_builder_get_resource_user(builder);
grpc_channel* channel; grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) { if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED(); GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
@ -101,6 +105,7 @@ grpc_channel* grpc_channel_create_with_builder(
} }
channel->target = target; channel->target = target;
channel->resource_user = resource_user;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT; bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
size_t channel_tracer_max_memory = 0; // default to off size_t channel_tracer_max_memory = 0; // default to off
@ -217,7 +222,8 @@ grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* grpc_channel_create(const char* target, grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args, const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type, grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport) { grpc_transport* optional_transport,
grpc_resource_user* resource_user) {
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
const grpc_core::UniquePtr<char> default_authority = const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args); get_default_authority(input_args);
@ -227,11 +233,17 @@ grpc_channel* grpc_channel_create(const char* target,
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_channel_stack_builder_set_target(builder, target); grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport); grpc_channel_stack_builder_set_transport(builder, optional_transport);
grpc_channel_stack_builder_set_resource_user(builder, resource_user);
if (!grpc_channel_init_create_stack(builder, channel_stack_type)) { if (!grpc_channel_init_create_stack(builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(builder); grpc_channel_stack_builder_destroy(builder);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
return nullptr; return nullptr;
} }
return grpc_channel_create_with_builder(builder, channel_stack_type); grpc_channel* channel =
grpc_channel_create_with_builder(builder, channel_stack_type);
return channel;
} }
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) { size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
@ -441,6 +453,10 @@ static void destroy_channel(void* arg, grpc_error* error) {
GRPC_MDELEM_UNREF(rc->authority); GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc); gpr_free(rc);
} }
if (channel->resource_user != nullptr) {
grpc_resource_user_free(channel->resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
gpr_mu_destroy(&channel->registered_call_mu); gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target); gpr_free(channel->target);
gpr_free(channel); gpr_free(channel);

@ -29,7 +29,8 @@
grpc_channel* grpc_channel_create(const char* target, grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* args, const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type, grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport); grpc_transport* optional_transport,
grpc_resource_user* resource_user = nullptr);
grpc_channel* grpc_channel_create_with_builder( grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder, grpc_channel_stack_builder* builder,

@ -189,6 +189,8 @@ typedef struct {
struct grpc_server { struct grpc_server {
grpc_channel_args* channel_args; grpc_channel_args* channel_args;
grpc_resource_user* default_resource_user;
grpc_completion_queue** cqs; grpc_completion_queue** cqs;
grpc_pollset** pollsets; grpc_pollset** pollsets;
size_t cq_count; size_t cq_count;
@ -1024,6 +1026,15 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
grpc_slice_from_static_string("Server created")); grpc_slice_from_static_string("Server created"));
} }
if (args != nullptr) {
grpc_resource_quota* resource_quota =
grpc_resource_quota_from_channel_args(args, false /* create */);
if (resource_quota != nullptr) {
server->default_resource_user =
grpc_resource_user_create(resource_quota, "default");
}
}
return server; return server;
} }
@ -1122,7 +1133,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport, void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
grpc_pollset* accepting_pollset, grpc_pollset* accepting_pollset,
const grpc_channel_args* args, const grpc_channel_args* args,
intptr_t socket_uuid) { intptr_t socket_uuid,
grpc_resource_user* resource_user) {
size_t num_registered_methods; size_t num_registered_methods;
size_t alloc; size_t alloc;
registered_method* rm; registered_method* rm;
@ -1135,7 +1147,8 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
uint32_t max_probes = 0; uint32_t max_probes = 0;
grpc_transport_op* op = nullptr; grpc_transport_op* op = nullptr;
channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport); channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
resource_user);
chand = static_cast<channel_data*>( chand = static_cast<channel_data*>(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0) grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
->channel_data); ->channel_data);
@ -1330,6 +1343,13 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
if (server->default_resource_user != nullptr) {
grpc_resource_quota_unref(
grpc_resource_user_quota(server->default_resource_user));
grpc_resource_user_shutdown(server->default_resource_user);
grpc_resource_user_unref(server->default_resource_user);
}
} }
void grpc_server_cancel_all_calls(grpc_server* server) { void grpc_server_cancel_all_calls(grpc_server* server) {
@ -1546,6 +1566,10 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
return server->channel_args; return server->channel_args;
} }
grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
return server->default_resource_user;
}
int grpc_server_has_open_connections(grpc_server* server) { int grpc_server_has_open_connections(grpc_server* server) {
int r; int r;
gpr_mu_lock(&server->mu_global); gpr_mu_lock(&server->mu_global);

@ -47,7 +47,8 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset, grpc_pollset* accepting_pollset,
const grpc_channel_args* args, const grpc_channel_args* args,
intptr_t socket_uuid); intptr_t socket_uuid,
grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */ /* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets( void grpc_server_populate_server_sockets(
@ -63,6 +64,8 @@ grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server); int grpc_server_has_open_connections(grpc_server* server);
/* Do not call this before grpc_server_start. Returns the pollsets and the /* Do not call this before grpc_server_start. Returns the pollsets and the

@ -24,7 +24,8 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '../../..'))
# map of banned function signature to whitelist # map of banned function signature to whitelist
BANNED_EXCEPT = { BANNED_EXCEPT = {
'grpc_resource_quota_ref(': ['src/core/lib/iomgr/resource_quota.cc'], 'grpc_resource_quota_ref(': ['src/core/lib/iomgr/resource_quota.cc'],
'grpc_resource_quota_unref(': ['src/core/lib/iomgr/resource_quota.cc'], 'grpc_resource_quota_unref(':
['src/core/lib/iomgr/resource_quota.cc', 'src/core/lib/surface/server.cc'],
'grpc_slice_buffer_destroy(': ['src/core/lib/slice/slice_buffer.cc'], 'grpc_slice_buffer_destroy(': ['src/core/lib/slice/slice_buffer.cc'],
'grpc_slice_buffer_reset_and_unref(': 'grpc_slice_buffer_reset_and_unref(':
['src/core/lib/slice/slice_buffer.cc'], ['src/core/lib/slice/slice_buffer.cc'],

Loading…
Cancel
Save