Merge pull request #12648 from kpayson64/attempt_2

Add back mpscq request matcher implementation
pull/13300/merge
kpayson64 7 years ago committed by GitHub
commit c03867ff22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      include/grpc++/server_builder.h
  2. 37
      src/core/lib/support/mpscq.cc
  3. 30
      src/core/lib/support/mpscq.h
  4. 2
      src/core/lib/surface/completion_queue.cc
  5. 181
      src/core/lib/surface/server.cc

@ -202,7 +202,10 @@ class ServerBuilder {
struct SyncServerSettings { struct SyncServerSettings {
SyncServerSettings() SyncServerSettings()
: num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
min_pollers(1),
max_pollers(2),
cq_timeout_msec(10000) {}
/// Number of server completion queues to create to listen to incoming RPCs. /// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs; int num_cqs;

@ -31,11 +31,12 @@ void gpr_mpscq_destroy(gpr_mpscq* q) {
GPR_ASSERT(q->tail == &q->stub); GPR_ASSERT(q->tail == &q->stub);
} }
void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) { bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node* prev = gpr_mpscq_node* prev =
(gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n); (gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n);
return prev == &q->stub;
} }
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) { gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) {
@ -77,3 +78,37 @@ gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) {
*empty = false; *empty = false;
return NULL; return NULL;
} }
void gpr_locked_mpscq_init(gpr_locked_mpscq* q) {
gpr_mpscq_init(&q->queue);
gpr_mu_init(&q->mu);
}
void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) {
gpr_mpscq_destroy(&q->queue);
gpr_mu_destroy(&q->mu);
}
bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) {
return gpr_mpscq_push(&q->queue, n);
}
gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) {
if (gpr_mu_trylock(&q->mu)) {
gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue);
gpr_mu_unlock(&q->mu);
return n;
}
return NULL;
}
gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) {
gpr_mu_lock(&q->mu);
bool empty = false;
gpr_mpscq_node* n;
do {
n = gpr_mpscq_pop_and_check_end(&q->queue, &empty);
} while (n == NULL && !empty);
gpr_mu_unlock(&q->mu);
return n;
}

@ -20,6 +20,7 @@
#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H #define GRPC_CORE_LIB_SUPPORT_MPSCQ_H
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
@ -49,13 +50,40 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq* q); void gpr_mpscq_init(gpr_mpscq* q);
void gpr_mpscq_destroy(gpr_mpscq* q); void gpr_mpscq_destroy(gpr_mpscq* q);
// Push a node // Push a node
void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n); // Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that // Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!) // the queue is empty!!)
// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q); gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not // Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty); gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty);
// An mpscq with a lock: it's safe to pop from multiple threads, but doing
// only one thread will succeed concurrently
typedef struct gpr_locked_mpscq {
gpr_mpscq queue;
gpr_mu mu;
} gpr_locked_mpscq;
void gpr_locked_mpscq_init(gpr_locked_mpscq* q);
void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q);
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread safe - can be called from multiple threads concurrently
gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q);
// Pop a node. Returns NULL only if the queue was empty at some point after
// calling this function
gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -376,8 +376,8 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
(grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag; *tag = storage->tag;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
storage->done(&exec_ctx, storage->done_arg, storage);
*ok = (storage->next & (uintptr_t)(1)) == 1; *ok = (storage->next & (uintptr_t)(1)) == 1;
storage->done(&exec_ctx, storage->done_arg, storage);
ret = 1; ret = 1;
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {

@ -33,7 +33,8 @@
#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/mpscq.h"
#include "src/core/lib/support/spinlock.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
@ -63,6 +64,7 @@ grpc_tracer_flag grpc_server_channel_trace =
GRPC_TRACER_INITIALIZER(false, "server_channel"); GRPC_TRACER_INITIALIZER(false, "server_channel");
typedef struct requested_call { typedef struct requested_call {
gpr_mpscq_node request_link; /* must be first */
requested_call_type type; requested_call_type type;
size_t cq_idx; size_t cq_idx;
void* tag; void* tag;
@ -128,10 +130,7 @@ typedef struct request_matcher request_matcher;
struct call_data { struct call_data {
grpc_call* call; grpc_call* call;
/** protects state */ gpr_atm state;
gpr_mu mu_state;
/** the current state of a call - see call_state */
call_state state;
bool path_set; bool path_set;
bool host_set; bool host_set;
@ -162,7 +161,7 @@ struct request_matcher {
grpc_server* server; grpc_server* server;
call_data* pending_head; call_data* pending_head;
call_data* pending_tail; call_data* pending_tail;
gpr_stack_lockfree** requests_per_cq; gpr_locked_mpscq* requests_per_cq;
}; };
struct registered_method { struct registered_method {
@ -207,11 +206,6 @@ struct grpc_server {
registered_method* registered_methods; registered_method* registered_methods;
/** one request matcher for unregistered methods */ /** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher; request_matcher unregistered_request_matcher;
/** free list of available requested_calls_per_cq indices */
gpr_stack_lockfree** request_freelist_per_cq;
/** requested call backing data */
requested_call** requested_calls_per_cq;
int max_requested_calls_per_cq;
gpr_atm shutdown_flag; gpr_atm shutdown_flag;
uint8_t shutdown_published; uint8_t shutdown_published;
@ -313,21 +307,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx* exec_ctx,
* request_matcher * request_matcher
*/ */
static void request_matcher_init(request_matcher* rm, size_t entries, static void request_matcher_init(request_matcher* rm, grpc_server* server) {
grpc_server* server) {
memset(rm, 0, sizeof(*rm)); memset(rm, 0, sizeof(*rm));
rm->server = server; rm->server = server;
rm->requests_per_cq = (gpr_stack_lockfree**)gpr_malloc( rm->requests_per_cq = (gpr_locked_mpscq*)gpr_malloc(
sizeof(*rm->requests_per_cq) * server->cq_count); sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
} }
} }
static void request_matcher_destroy(request_matcher* rm) { static void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) { for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
} }
gpr_free(rm->requests_per_cq); gpr_free(rm->requests_per_cq);
} }
@ -342,9 +335,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx,
while (rm->pending_head) { while (rm->pending_head) {
call_data* calld = rm->pending_head; call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
gpr_mu_lock(&calld->mu_state); gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie, &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@ -357,13 +348,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx,
grpc_server* server, grpc_server* server,
request_matcher* rm, request_matcher* rm,
grpc_error* error) { grpc_error* error) {
int request_id; requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != /* Here we know:
-1) { 1. no requests are being added (since the server is shut down)
fail_call(exec_ctx, server, i, 2. no other threads are pulling (since the shut down process is single
&server->requested_calls_per_cq[i][request_id], threaded)
GRPC_ERROR_REF(error)); So, we can ignore the queue lock and just pop, with the guarantee that a
NULL returned here truly means that the queue is empty */
while ((rc = (requested_call*)gpr_mpscq_pop(
&rm->requests_per_cq[i].queue)) != NULL) {
fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
} }
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -398,13 +393,7 @@ static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) {
} }
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
if (server->started) {
gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
gpr_free(server->requested_calls_per_cq[i]);
}
} }
gpr_free(server->request_freelist_per_cq);
gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs); gpr_free(server->cqs);
gpr_free(server->pollsets); gpr_free(server->pollsets);
gpr_free(server->shutdown_tags); gpr_free(server->shutdown_tags);
@ -462,21 +451,7 @@ static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand,
static void done_request_event(grpc_exec_ctx* exec_ctx, void* req, static void done_request_event(grpc_exec_ctx* exec_ctx, void* req,
grpc_cq_completion* c) { grpc_cq_completion* c) {
requested_call* rc = (requested_call*)req; gpr_free(req);
grpc_server* server = rc->server;
if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
rc < server->requested_calls_per_cq[rc->cq_idx] +
server->max_requested_calls_per_cq) {
GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
gpr_stack_lockfree_push(
server->request_freelist_per_cq[rc->cq_idx],
(int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}
server_unref(exec_ctx, server);
} }
static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server, static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
@ -508,10 +483,6 @@ static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
} }
grpc_call_element* elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
channel_data* chand = (channel_data*)elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion); done_request_event, rc, &rc->completion);
} }
@ -525,9 +496,7 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
grpc_server* server = rm->server; grpc_server* server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state); gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie, &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@ -539,16 +508,14 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count; size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); requested_call* rc =
if (request_id == -1) { (requested_call*)gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]);
if (rc == NULL) {
continue; continue;
} else { } else {
GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i); GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i);
gpr_mu_lock(&calld->mu_state); gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
calld->state = ACTIVATED; publish_call(exec_ctx, server, calld, cq_idx, rc);
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */ return; /* early out */
} }
} }
@ -556,9 +523,27 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
/* no cq to take the request found: queue it on the slow list */ /* no cq to take the request found: queue it on the slow list */
GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx); GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx);
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING; // We need to ensure that all the queues are empty. We do this under
gpr_mu_unlock(&calld->mu_state); // the server mu_call lock to ensure that if something is added to
// an empty request queue, it will block until the call is actually
// added to the pending list.
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
requested_call* rc =
(requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
if (rc == NULL) {
continue;
} else {
gpr_mu_unlock(&server->mu_call);
GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i + server->cq_count);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
publish_call(exec_ctx, server, calld, cq_idx, rc);
return; /* early out */
}
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
if (rm->pending_head == NULL) { if (rm->pending_head == NULL) {
rm->pending_tail = rm->pending_head = calld; rm->pending_tail = rm->pending_head = calld;
} else { } else {
@ -576,9 +561,7 @@ static void finish_start_new_rpc(
call_data* calld = (call_data*)elem->call_data; call_data* calld = (call_data*)elem->call_data;
if (gpr_atm_acq_load(&server->shutdown_flag)) { if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state); gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
@ -807,21 +790,14 @@ static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
start_new_rpc(exec_ctx, elem); start_new_rpc(exec_ctx, elem);
} else { } else {
gpr_mu_lock(&calld->mu_state); if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} else if (calld->state == PENDING) { } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
/* zombied call will be destroyed when it's removed from the pending /* zombied call will be destroyed when it's removed from the pending
queue... later */ queue... later */
} else {
gpr_mu_unlock(&calld->mu_state);
} }
} }
} }
@ -885,7 +861,6 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
memset(calld, 0, sizeof(call_data)); memset(calld, 0, sizeof(call_data));
calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem); calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem, server_on_recv_initial_metadata, elem,
@ -912,8 +887,6 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_metadata_array_destroy(&calld->initial_metadata); grpc_metadata_array_destroy(&calld->initial_metadata);
grpc_byte_buffer_destroy(calld->payload); grpc_byte_buffer_destroy(calld->payload);
gpr_mu_destroy(&calld->mu_state);
server_unref(exec_ctx, chand->server); server_unref(exec_ctx, chand->server);
} }
@ -1020,8 +993,6 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->root_channel_data.next = server->root_channel_data.prev = server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data; &server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args); server->channel_args = grpc_channel_args_copy(args);
return server; return server;
@ -1095,29 +1066,15 @@ void grpc_server_start(grpc_server* server) {
server->pollset_count = 0; server->pollset_count = 0;
server->pollsets = server->pollsets =
(grpc_pollset**)gpr_malloc(sizeof(grpc_pollset*) * server->cq_count); (grpc_pollset**)gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
server->request_freelist_per_cq = (gpr_stack_lockfree**)gpr_malloc(
sizeof(*server->request_freelist_per_cq) * server->cq_count);
server->requested_calls_per_cq = (requested_call**)gpr_malloc(
sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) { if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] = server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]); grpc_cq_pollset(server->cqs[i]);
} }
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
}
server->requested_calls_per_cq[i] =
(requested_call*)gpr_malloc((size_t)server->max_requested_calls_per_cq *
sizeof(*server->requested_calls_per_cq[i]));
} }
request_matcher_init(&server->unregistered_request_matcher, request_matcher_init(&server->unregistered_request_matcher, server);
(size_t)server->max_requested_calls_per_cq, server);
for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->matcher, request_matcher_init(&rm->matcher, server);
(size_t)server->max_requested_calls_per_cq, server);
} }
server_ref(server); server_ref(server);
@ -1373,21 +1330,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
requested_call* rc) { requested_call* rc) {
call_data* calld = NULL; call_data* calld = NULL;
request_matcher* rm = NULL; request_matcher* rm = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) { if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc, fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) { switch (rc->type) {
case BATCH_CALL: case BATCH_CALL:
rm = &server->unregistered_request_matcher; rm = &server->unregistered_request_matcher;
@ -1396,20 +1343,17 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
rm = &rc->data.registered.method->matcher; rm = &rc->data.registered.method->matcher;
break; break;
} }
server->requested_calls_per_cq[cq_idx][request_id] = *rc; if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start /* this was the first queued request: we need to lock and start
matching calls */ matching calls */
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) { while ((calld = rm->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); rc = (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
if (request_id == -1) break; if (rc == NULL) break;
rm->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state); if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
if (calld->state == ZOMBIED) { // Zombied Call
gpr_mu_unlock(&calld->mu_state);
GRPC_CLOSURE_INIT( GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie, &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@ -1417,11 +1361,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} else { } else {
GPR_ASSERT(calld->state == PENDING); publish_call(exec_ctx, server, calld, cq_idx, rc);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
} }
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
} }
@ -1540,7 +1480,6 @@ static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
rc->initial_metadata->count = 0; rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion); done_request_event, rc, &rc->completion);
} }

Loading…
Cancel
Save