From b84a489f02a01f73ae7cc91c91b437b156a44b14 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Mon, 10 Jul 2017 16:54:47 -0700 Subject: [PATCH] Revert "Reduce server memory usage" This reverts commit 53e6b56e32c79ba401a67cb349519c12991539cc. --- include/grpc++/server_builder.h | 5 +- src/core/lib/support/mpscq.c | 25 +------ src/core/lib/support/mpscq.h | 26 +------- src/core/lib/surface/server.c | 112 +++++++++++++++++++++++--------- 4 files changed, 86 insertions(+), 82 deletions(-) diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 50f947d4c77..eafd63619d2 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -196,10 +196,7 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), - min_pollers(1), - max_pollers(2), - cq_timeout_msec(10000) {} + : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} /// Number of server completion queues to create to listen to incoming RPCs. int num_cqs; diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 58c4c435d3f..e9f893988df 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); - return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { - gpr_mpscq_init(&q->queue); - q->read_lock = GPR_SPINLOCK_INITIALIZER; -} - -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { - gpr_mpscq_destroy(&q->queue); -} - -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { - return gpr_mpscq_push(&q->queue, n); -} - -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { - if (gpr_spinlock_trylock(&q->read_lock)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); - gpr_spinlock_unlock(&q->read_lock); - return n; - } - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 2f4739d7f81..daa51768f7f 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -22,7 +22,6 @@ #include #include #include -#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -44,34 +43,11 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) -// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); -// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing -// only one thread will succeed concurrently -typedef struct gpr_locked_mpscq { - gpr_mpscq queue; - gpr_spinlock read_lock; -} gpr_locked_mpscq; - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q); -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); -// Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -// Thread safe - can be called from multiple threads concurrently -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); - #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9c..0cd436883a0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +160,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +205,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); }