Revert "Reduce server memory usage"

This reverts commit 53e6b56e32.
pull/11741/head
Ken Payson 8 years ago
parent a4710a090d
commit b84a489f02
  1. 5
      include/grpc++/server_builder.h
  2. 25
      src/core/lib/support/mpscq.c
  3. 26
      src/core/lib/support/mpscq.h
  4. 110
      src/core/lib/surface/server.c

@ -196,10 +196,7 @@ class ServerBuilder {
struct SyncServerSettings { struct SyncServerSettings {
SyncServerSettings() SyncServerSettings()
: num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
min_pollers(1),
max_pollers(2),
cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs. /// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs; int num_cqs;

@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
GPR_ASSERT(q->tail == &q->stub); GPR_ASSERT(q->tail == &q->stub);
} }
bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev = gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n);
return prev == &q->stub;
} }
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) {
*empty = false; *empty = false;
return NULL; return NULL;
} }
void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
gpr_mpscq_init(&q->queue);
q->read_lock = GPR_SPINLOCK_INITIALIZER;
}
void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
gpr_mpscq_destroy(&q->queue);
}
bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
return gpr_mpscq_push(&q->queue, n);
}
gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
if (gpr_spinlock_trylock(&q->read_lock)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
gpr_spinlock_unlock(&q->read_lock);
return n;
}
return NULL;
}

@ -22,7 +22,6 @@
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
#include "src/core/lib/support/spinlock.h"
// Multiple-producer single-consumer lock free queue, based upon the // Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here: // implementation from Dmitry Vyukov here:
@ -44,34 +43,11 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_init(gpr_mpscq *q);
void gpr_mpscq_destroy(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q);
// Push a node // Push a node
// Thread safe - can be called from multiple threads concurrently void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that // Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!) // the queue is empty!!)
// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not // Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty);
// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
// only one thread will succeed concurrently
typedef struct gpr_locked_mpscq {
gpr_mpscq queue;
gpr_spinlock read_lock;
} gpr_locked_mpscq;
void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread safe - can be called from multiple threads concurrently
gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);
#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */

@ -32,8 +32,7 @@
#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/mpscq.h" #include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/spinlock.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);
typedef struct requested_call { typedef struct requested_call {
gpr_mpscq_node request_link; /* must be first */
requested_call_type type; requested_call_type type;
size_t cq_idx; size_t cq_idx;
void *tag; void *tag;
@ -162,7 +160,7 @@ struct request_matcher {
grpc_server *server; grpc_server *server;
call_data *pending_head; call_data *pending_head;
call_data *pending_tail; call_data *pending_tail;
gpr_locked_mpscq *requests_per_cq; gpr_stack_lockfree **requests_per_cq;
}; };
struct registered_method { struct registered_method {
@ -207,6 +205,11 @@ struct grpc_server {
registered_method *registered_methods; registered_method *registered_methods;
/** one request matcher for unregistered methods */ /** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher; request_matcher unregistered_request_matcher;
/** free list of available requested_calls_per_cq indices */
gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
requested_call **requested_calls_per_cq;
int max_requested_calls_per_cq;
gpr_atm shutdown_flag; gpr_atm shutdown_flag;
uint8_t shutdown_published; uint8_t shutdown_published;
@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher * request_matcher
*/ */
static void request_matcher_init(request_matcher *rm, grpc_server *server) { static void request_matcher_init(request_matcher *rm, size_t entries,
grpc_server *server) {
memset(rm, 0, sizeof(*rm)); memset(rm, 0, sizeof(*rm));
rm->server = server; rm->server = server;
rm->requests_per_cq = rm->requests_per_cq =
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
gpr_locked_mpscq_init(&rm->requests_per_cq[i]); rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
} }
} }
static void request_matcher_destroy(request_matcher *rm) { static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) { for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
} }
gpr_free(rm->requests_per_cq); gpr_free(rm->requests_per_cq);
} }
@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server, grpc_server *server,
request_matcher *rm, request_matcher *rm,
grpc_error *error) { grpc_error *error) {
requested_call *rc; int request_id;
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
/* Here we know: while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
1. no requests are being added (since the server is shut down) -1) {
2. no other threads are pulling (since the shut down process is single fail_call(exec_ctx, server, i,
threaded) &server->requested_calls_per_cq[i][request_id],
So, we can ignore the queue lock and just pop, with the guarantee that a GRPC_ERROR_REF(error));
NULL returned here truly means that the queue is empty */
while ((rc = (requested_call *)gpr_mpscq_pop(
&rm->requests_per_cq[i].queue)) != NULL) {
fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
} }
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
} }
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
if (server->started) {
gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
gpr_free(server->requested_calls_per_cq[i]);
} }
}
gpr_free(server->request_freelist_per_cq);
gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs); gpr_free(server->cqs);
gpr_free(server->pollsets); gpr_free(server->pollsets);
gpr_free(server->shutdown_tags); gpr_free(server->shutdown_tags);
@ -452,9 +458,23 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) { grpc_cq_completion *c) {
requested_call *rc = req;
grpc_server *server = rc->server;
if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
rc < server->requested_calls_per_cq[rc->cq_idx] +
server->max_requested_calls_per_cq) {
GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
gpr_stack_lockfree_push(
server->request_freelist_per_cq[rc->cq_idx],
(int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req); gpr_free(req);
} }
server_unref(exec_ctx, server);
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, size_t cq_idx, requested_call *rc) { call_data *calld, size_t cq_idx, requested_call *rc) {
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
} }
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion); done_request_event, rc, &rc->completion);
} }
@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count; size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call *rc = int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
(requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); if (request_id == -1) {
if (rc == NULL) {
continue; continue;
} else { } else {
gpr_mu_lock(&calld->mu_state); gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx, rc); publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */ return; /* early out */
} }
} }
@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev = server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data; &server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args); server->channel_args = grpc_channel_args_copy(args);
return server; return server;
@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) {
server->started = true; server->started = true;
server->pollset_count = 0; server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
server->request_freelist_per_cq =
gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) { if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] = server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]); grpc_cq_pollset(server->cqs[i]);
} }
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
} }
request_matcher_init(&server->unregistered_request_matcher, server); server->requested_calls_per_cq[i] =
gpr_malloc((size_t)server->max_requested_calls_per_cq *
sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->request_matcher, server); request_matcher_init(&rm->request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
} }
server_ref(server); server_ref(server);
@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call *rc) { requested_call *rc) {
call_data *calld = NULL; call_data *calld = NULL;
request_matcher *rm = NULL; request_matcher *rm = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) { if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc, fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) { switch (rc->type) {
case BATCH_CALL: case BATCH_CALL:
rm = &server->unregistered_request_matcher; rm = &server->unregistered_request_matcher;
@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher; rm = &rc->data.registered.registered_method->request_matcher;
break; break;
} }
if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start /* this was the first queued request: we need to lock and start
matching calls */ matching calls */
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) { while ((calld = rm->pending_head) != NULL) {
rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
if (rc == NULL) break; if (request_id == -1) break;
rm->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state); gpr_mu_lock(&calld->mu_state);
@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING); GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx, rc); publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
} }
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
} }
@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->initial_metadata->count = 0; rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion); done_request_event, rc, &rc->completion);
} }

Loading…
Cancel
Save