|
|
|
@ -69,11 +69,6 @@ typedef struct call_data call_data; |
|
|
|
|
typedef struct channel_data channel_data; |
|
|
|
|
typedef struct registered_method registered_method; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
call_data *next; |
|
|
|
|
call_data *prev; |
|
|
|
|
} call_link; |
|
|
|
|
|
|
|
|
|
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; |
|
|
|
|
|
|
|
|
|
typedef struct requested_call { |
|
|
|
@ -170,10 +165,9 @@ struct call_data { |
|
|
|
|
|
|
|
|
|
struct request_matcher { |
|
|
|
|
grpc_server *server; |
|
|
|
|
size_t cq_idx; |
|
|
|
|
call_data *pending_head; |
|
|
|
|
call_data *pending_tail; |
|
|
|
|
gpr_stack_lockfree *requests; |
|
|
|
|
gpr_stack_lockfree **requests_per_cq; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct registered_method { |
|
|
|
@ -182,7 +176,7 @@ struct registered_method { |
|
|
|
|
grpc_server_register_method_payload_handling payload_handling; |
|
|
|
|
uint32_t flags; |
|
|
|
|
/* one request matcher per method per cq */ |
|
|
|
|
request_matcher *request_matchers; |
|
|
|
|
request_matcher request_matcher; |
|
|
|
|
registered_method *next; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -211,7 +205,7 @@ struct grpc_server { |
|
|
|
|
|
|
|
|
|
registered_method *registered_methods; |
|
|
|
|
/** one request matcher for unregistered methods per cq */ |
|
|
|
|
request_matcher *unregistered_request_matchers; |
|
|
|
|
request_matcher unregistered_request_matcher; |
|
|
|
|
/** free list of available requested_calls indices */ |
|
|
|
|
gpr_stack_lockfree *request_freelist; |
|
|
|
|
/** requested call backing data */ |
|
|
|
@ -317,16 +311,22 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
static void request_matcher_init(request_matcher *rm, size_t entries, |
|
|
|
|
size_t cq_idx, grpc_server *server) { |
|
|
|
|
grpc_server *server) { |
|
|
|
|
memset(rm, 0, sizeof(*rm)); |
|
|
|
|
rm->server = server; |
|
|
|
|
rm->cq_idx = cq_idx; |
|
|
|
|
rm->requests = gpr_stack_lockfree_create(entries); |
|
|
|
|
rm->requests_per_cq = |
|
|
|
|
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); |
|
|
|
|
for (size_t i = 0; i < server->cq_count; i++) { |
|
|
|
|
rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void request_matcher_destroy(request_matcher *rm) { |
|
|
|
|
GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1); |
|
|
|
|
gpr_stack_lockfree_destroy(rm->requests); |
|
|
|
|
for (size_t i = 0; i < rm->server->cq_count; i++) { |
|
|
|
|
GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); |
|
|
|
|
gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); |
|
|
|
|
} |
|
|
|
|
gpr_free(rm->requests_per_cq); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, |
|
|
|
@ -355,9 +355,11 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, |
|
|
|
|
request_matcher *rm, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
int request_id; |
|
|
|
|
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { |
|
|
|
|
fail_call(exec_ctx, server, rm->cq_idx, |
|
|
|
|
&server->requested_calls[request_id], GRPC_ERROR_REF(error)); |
|
|
|
|
for (size_t i = 0; i < server->cq_count; i++) { |
|
|
|
|
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != |
|
|
|
|
-1) { |
|
|
|
|
fail_call(exec_ctx, server, i, &server->requested_calls[request_id], GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
@ -379,23 +381,19 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { |
|
|
|
|
while ((rm = server->registered_methods) != NULL) { |
|
|
|
|
server->registered_methods = rm->next; |
|
|
|
|
if (server->started) { |
|
|
|
|
for (i = 0; i < server->cq_count; i++) { |
|
|
|
|
request_matcher_destroy(&rm->request_matchers[i]); |
|
|
|
|
} |
|
|
|
|
gpr_free(rm->request_matchers); |
|
|
|
|
request_matcher_destroy(&rm->request_matcher); |
|
|
|
|
} |
|
|
|
|
gpr_free(rm->method); |
|
|
|
|
gpr_free(rm->host); |
|
|
|
|
gpr_free(rm); |
|
|
|
|
} |
|
|
|
|
if (server->started) { |
|
|
|
|
request_matcher_destroy(&server->unregistered_request_matcher); |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < server->cq_count; i++) { |
|
|
|
|
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); |
|
|
|
|
if (server->started) { |
|
|
|
|
request_matcher_destroy(&server->unregistered_request_matchers[i]); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_stack_lockfree_destroy(server->request_freelist); |
|
|
|
|
gpr_free(server->unregistered_request_matchers); |
|
|
|
|
gpr_free(server->cqs); |
|
|
|
|
gpr_free(server->pollsets); |
|
|
|
|
gpr_free(server->shutdown_tags); |
|
|
|
@ -515,7 +513,9 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, |
|
|
|
|
|
|
|
|
|
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
call_data *calld = arg; |
|
|
|
|
grpc_call_element *call_elem = arg; |
|
|
|
|
call_data *calld = call_elem->call_data; |
|
|
|
|
channel_data *chand = call_elem->channel_data; |
|
|
|
|
request_matcher *rm = calld->request_matcher; |
|
|
|
|
grpc_server *server = rm->server; |
|
|
|
|
|
|
|
|
@ -530,27 +530,34 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int request_id = gpr_stack_lockfree_pop(rm->requests); |
|
|
|
|
if (request_id == -1) { |
|
|
|
|
gpr_mu_lock(&server->mu_call); |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = PENDING; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (rm->pending_head == NULL) { |
|
|
|
|
rm->pending_tail = rm->pending_head = calld; |
|
|
|
|
for (size_t i = 0; i < server->cq_count; i++) { |
|
|
|
|
size_t cq_idx = (chand->cq_idx + i) % server->cq_count; |
|
|
|
|
int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); |
|
|
|
|
if (request_id == -1) { |
|
|
|
|
continue; |
|
|
|
|
} else { |
|
|
|
|
rm->pending_tail->pending_next = calld; |
|
|
|
|
rm->pending_tail = calld; |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = ACTIVATED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
publish_call(exec_ctx, server, calld, cq_idx, |
|
|
|
|
&server->requested_calls[request_id]); |
|
|
|
|
return; /* early out */ |
|
|
|
|
} |
|
|
|
|
calld->pending_next = NULL; |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* no cq to take the request found: queue it on the slow list */ |
|
|
|
|
gpr_mu_lock(&server->mu_call); |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = PENDING; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
if (rm->pending_head == NULL) { |
|
|
|
|
rm->pending_tail = rm->pending_head = calld; |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = ACTIVATED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
publish_call(exec_ctx, server, calld, rm->cq_idx, |
|
|
|
|
&server->requested_calls[request_id]); |
|
|
|
|
rm->pending_tail->pending_next = calld; |
|
|
|
|
rm->pending_tail = calld; |
|
|
|
|
} |
|
|
|
|
calld->pending_next = NULL; |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void finish_start_new_rpc( |
|
|
|
@ -573,14 +580,14 @@ static void finish_start_new_rpc( |
|
|
|
|
|
|
|
|
|
switch (payload_handling) { |
|
|
|
|
case GRPC_SRM_PAYLOAD_NONE: |
|
|
|
|
publish_new_rpc(exec_ctx, calld, GRPC_ERROR_NONE); |
|
|
|
|
publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); |
|
|
|
|
break; |
|
|
|
|
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { |
|
|
|
|
grpc_op op; |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message = &calld->payload; |
|
|
|
|
grpc_closure_init(&calld->publish, publish_new_rpc, calld); |
|
|
|
|
grpc_closure_init(&calld->publish, publish_new_rpc, elem); |
|
|
|
|
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, |
|
|
|
|
&calld->publish); |
|
|
|
|
break; |
|
|
|
@ -609,10 +616,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { |
|
|
|
|
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && |
|
|
|
|
!calld->recv_idempotent_request) |
|
|
|
|
continue; |
|
|
|
|
finish_start_new_rpc( |
|
|
|
|
exec_ctx, server, elem, |
|
|
|
|
&rm->server_registered_method->request_matchers[chand->cq_idx], |
|
|
|
|
rm->server_registered_method->payload_handling); |
|
|
|
|
finish_start_new_rpc(exec_ctx, server, elem, |
|
|
|
|
&rm->server_registered_method->request_matcher, |
|
|
|
|
rm->server_registered_method->payload_handling); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* check for a wildcard method definition (no host set) */ |
|
|
|
@ -626,15 +632,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { |
|
|
|
|
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && |
|
|
|
|
!calld->recv_idempotent_request) |
|
|
|
|
continue; |
|
|
|
|
finish_start_new_rpc( |
|
|
|
|
exec_ctx, server, elem, |
|
|
|
|
&rm->server_registered_method->request_matchers[chand->cq_idx], |
|
|
|
|
rm->server_registered_method->payload_handling); |
|
|
|
|
finish_start_new_rpc(exec_ctx, server, elem, |
|
|
|
|
&rm->server_registered_method->request_matcher, |
|
|
|
|
rm->server_registered_method->payload_handling); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
finish_start_new_rpc(exec_ctx, server, elem, |
|
|
|
|
&server->unregistered_request_matchers[chand->cq_idx], |
|
|
|
|
&server->unregistered_request_matcher, |
|
|
|
|
GRPC_SRM_PAYLOAD_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -665,19 +670,14 @@ static int num_channels(grpc_server *server) { |
|
|
|
|
static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_server *server, grpc_error *error) { |
|
|
|
|
if (server->started) { |
|
|
|
|
for (size_t i = 0; i < server->cq_count; i++) { |
|
|
|
|
request_matcher_kill_requests(exec_ctx, server, |
|
|
|
|
&server->unregistered_request_matchers[i], |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
request_matcher_zombify_all_pending_calls( |
|
|
|
|
exec_ctx, &server->unregistered_request_matchers[i]); |
|
|
|
|
for (registered_method *rm = server->registered_methods; rm; |
|
|
|
|
rm = rm->next) { |
|
|
|
|
request_matcher_kill_requests( |
|
|
|
|
exec_ctx, server, &rm->request_matchers[i], GRPC_ERROR_REF(error)); |
|
|
|
|
request_matcher_zombify_all_pending_calls(exec_ctx, |
|
|
|
|
&rm->request_matchers[i]); |
|
|
|
|
} |
|
|
|
|
request_matcher_kill_requests(exec_ctx, server, |
|
|
|
|
&server->unregistered_request_matcher, GRPC_ERROR_REF(error)); |
|
|
|
|
request_matcher_zombify_all_pending_calls( |
|
|
|
|
exec_ctx, &server->unregistered_request_matcher); |
|
|
|
|
for (registered_method *rm = server->registered_methods; rm; |
|
|
|
|
rm = rm->next) { |
|
|
|
|
request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); |
|
|
|
|
request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
@ -1064,21 +1064,14 @@ void grpc_server_start(grpc_server *server) { |
|
|
|
|
|
|
|
|
|
server->started = true; |
|
|
|
|
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); |
|
|
|
|
server->unregistered_request_matchers = gpr_malloc( |
|
|
|
|
sizeof(*server->unregistered_request_matchers) * server->cq_count); |
|
|
|
|
for (i = 0; i < server->cq_count; i++) { |
|
|
|
|
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); |
|
|
|
|
request_matcher_init(&server->unregistered_request_matchers[i], |
|
|
|
|
server->max_requested_calls, i, server); |
|
|
|
|
for (registered_method *rm = server->registered_methods; rm; |
|
|
|
|
rm = rm->next) { |
|
|
|
|
if (i == 0) { |
|
|
|
|
rm->request_matchers = |
|
|
|
|
gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count); |
|
|
|
|
} |
|
|
|
|
request_matcher_init(&rm->request_matchers[i], |
|
|
|
|
server->max_requested_calls, i, server); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
request_matcher_init(&server->unregistered_request_matcher, |
|
|
|
|
server->max_requested_calls, server); |
|
|
|
|
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { |
|
|
|
|
request_matcher_init(&rm->request_matcher, server->max_requested_calls, |
|
|
|
|
server); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (l = server->listeners; l; l = l->next) { |
|
|
|
@ -1319,20 +1312,20 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
switch (rc->type) { |
|
|
|
|
case BATCH_CALL: |
|
|
|
|
rm = &server->unregistered_request_matchers[cq_idx]; |
|
|
|
|
rm = &server->unregistered_request_matcher; |
|
|
|
|
break; |
|
|
|
|
case REGISTERED_CALL: |
|
|
|
|
rm = &rc->data.registered.registered_method->request_matchers[cq_idx]; |
|
|
|
|
rm = &rc->data.registered.registered_method->request_matcher; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
server->requested_calls[request_id] = *rc; |
|
|
|
|
gpr_free(rc); |
|
|
|
|
if (gpr_stack_lockfree_push(rm->requests, request_id)) { |
|
|
|
|
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { |
|
|
|
|
/* this was the first queued request: we need to lock and start
|
|
|
|
|
matching calls */ |
|
|
|
|
gpr_mu_lock(&server->mu_call); |
|
|
|
|
while ((calld = rm->pending_head) != NULL) { |
|
|
|
|
request_id = gpr_stack_lockfree_pop(rm->requests); |
|
|
|
|
request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); |
|
|
|
|
if (request_id == -1) break; |
|
|
|
|
rm->pending_head = calld->pending_next; |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|