Make request freelists per cq, to allow drastically higher requested call counts

reviewable/pr6737/r6
Craig Tiller 9 years ago
parent 1d03e100eb
commit b19dbead67
  1. 70
      src/core/lib/surface/server.c
  2. 2
      test/cpp/qps/server_async.cc

@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call { typedef struct requested_call {
requested_call_type type; requested_call_type type;
size_t cq_idx;
void *tag; void *tag;
grpc_server *server; grpc_server *server;
grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_bound_to_call;
@ -207,10 +208,10 @@ struct grpc_server {
/** one request matcher for unregistered methods */ /** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher; request_matcher unregistered_request_matcher;
/** free list of available requested_calls indices */ /** free list of available requested_calls indices */
gpr_stack_lockfree *request_freelist; gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */ /** requested call backing data */
requested_call *requested_calls; requested_call **requested_calls_per_cq;
size_t max_requested_calls; int max_requested_calls_per_cq;
gpr_atm shutdown_flag; gpr_atm shutdown_flag;
uint8_t shutdown_published; uint8_t shutdown_published;
@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) { -1) {
fail_call(exec_ctx, server, i, &server->requested_calls[request_id], fail_call(exec_ctx, server, i,
&server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
} }
@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
} }
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
if (server->started) {
gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
gpr_free(server->requested_calls_per_cq[i]);
}
} }
gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->request_freelist_per_cq);
gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs); gpr_free(server->cqs);
gpr_free(server->pollsets); gpr_free(server->pollsets);
gpr_free(server->shutdown_tags); gpr_free(server->shutdown_tags);
gpr_free(server->requested_calls);
gpr_free(server); gpr_free(server);
} }
@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
requested_call *rc = req; requested_call *rc = req;
grpc_server *server = rc->server; grpc_server *server = rc->server;
if (rc >= server->requested_calls && if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
rc < server->requested_calls + server->max_requested_calls) { rc < server->requested_calls_per_cq[rc->cq_idx] +
GPR_ASSERT(rc - server->requested_calls <= INT_MAX); server->max_requested_calls_per_cq) {
gpr_stack_lockfree_push(server->request_freelist, GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
(int)(rc - server->requested_calls)); gpr_stack_lockfree_push(
server->request_freelist_per_cq[rc->cq_idx],
(int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else { } else {
gpr_free(req); gpr_free(req);
} }
@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx, publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls[request_id]); &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */ return; /* early out */
} }
} }
@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue(
} }
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
size_t i;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server)); grpc_server *server = gpr_malloc(sizeof(grpc_server));
@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
&server->root_channel_data; &server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */ /* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768; server->max_requested_calls_per_cq = 32768;
server->request_freelist =
gpr_stack_lockfree_create(server->max_requested_calls);
for (i = 0; i < (size_t)server->max_requested_calls; i++) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
server->channel_args = grpc_channel_args_copy(args); server->channel_args = grpc_channel_args_copy(args);
return server; return server;
@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) {
server->started = true; server->started = true;
size_t pollset_count = 0; size_t pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
server->request_freelist_per_cq =
gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
} }
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
}
server->requested_calls_per_cq[i] =
gpr_malloc((size_t)server->max_requested_calls_per_cq *
sizeof(*server->requested_calls_per_cq[i]));
} }
request_matcher_init(&server->unregistered_request_matcher, request_matcher_init(&server->unregistered_request_matcher,
server->max_requested_calls, server); (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->request_matcher, server->max_requested_calls, request_matcher_init(&rm->request_matcher,
server); (size_t)server->max_requested_calls_per_cq, server);
} }
for (l = server->listeners; l; l = l->next) { for (l = server->listeners; l; l = l->next) {
@ -1307,7 +1317,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_CREATE("Server Shutdown")); GRPC_ERROR_CREATE("Server Shutdown"));
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
request_id = gpr_stack_lockfree_pop(server->request_freelist); request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) { if (request_id == -1) {
/* out of request ids: just fail this one */ /* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc, fail_call(exec_ctx, server, cq_idx, rc,
@ -1322,7 +1332,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher; rm = &rc->data.registered.registered_method->request_matcher;
break; break;
} }
server->requested_calls[request_id] = *rc; server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc); gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start /* this was the first queued request: we need to lock and start
@ -1346,7 +1356,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx, publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls[request_id]); &server->requested_calls_per_cq[cq_idx][request_id]);
} }
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
} }
@ -1382,6 +1392,7 @@ grpc_call_error grpc_server_request_call(
} }
grpc_cq_begin_op(cq_for_notification, tag); grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL; details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL; rc->type = BATCH_CALL;
rc->server = server; rc->server = server;
rc->tag = tag; rc->tag = tag;
@ -1430,6 +1441,7 @@ grpc_call_error grpc_server_request_registered_call(
goto done; goto done;
} }
grpc_cq_begin_op(cq_for_notification, tag); grpc_cq_begin_op(cq_for_notification, tag);
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL; rc->type = REGISTERED_CALL;
rc->server = server; rc->server = server;
rc->tag = tag; rc->tag = tag;

@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server {
auto process_rpc_bound = auto process_rpc_bound =
std::bind(process_rpc, config.payload_config(), _1, _2); std::bind(process_rpc, config.payload_config(), _1, _2);
for (int i = 0; i < 10000 / num_threads; i++) { for (int i = 0; i < 20000; i++) {
for (int j = 0; j < num_threads; j++) { for (int j = 0; j < num_threads; j++) {
if (request_unary_function) { if (request_unary_function) {
auto request_unary = auto request_unary =

Loading…
Cancel
Save