diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 4d179d0ab15..54b76d8aa5b 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -69,11 +69,6 @@ typedef struct call_data call_data; typedef struct channel_data channel_data; typedef struct registered_method registered_method; -typedef struct { - call_data *next; - call_data *prev; -} call_link; - typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { @@ -170,10 +165,9 @@ struct call_data { struct request_matcher { grpc_server *server; - size_t cq_idx; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree *requests; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -182,7 +176,7 @@ struct registered_method { grpc_server_register_method_payload_handling payload_handling; uint32_t flags; /* one request matcher per method per cq */ - request_matcher *request_matchers; + request_matcher request_matcher; registered_method *next; }; @@ -211,7 +205,7 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods per cq */ - request_matcher *unregistered_request_matchers; + request_matcher unregistered_request_matcher; /** free list of available requested_calls indices */ gpr_stack_lockfree *request_freelist; /** requested call backing data */ @@ -313,16 +307,22 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, */ static void request_matcher_init(request_matcher *rm, size_t entries, - size_t cq_idx, grpc_server *server) { + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; - rm->cq_idx = cq_idx; - rm->requests = gpr_stack_lockfree_create(entries); + rm->requests_per_cq = + gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); + for (size_t i = 0; i < server->cq_count; i++) { + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + } } static void request_matcher_destroy(request_matcher *rm) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1); - gpr_stack_lockfree_destroy(rm->requests); + for (size_t i = 0; i < rm->server->cq_count; i++) { + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + } + gpr_free(rm->requests_per_cq); } static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { @@ -348,9 +348,11 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm) { int request_id; - while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(exec_ctx, server, rm->cq_idx, - &server->requested_calls[request_id]); + for (size_t i = 0; i < server->cq_count; i++) { + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, &server->requested_calls[request_id]); + } } } @@ -371,23 +373,19 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { - for (i = 0; i < server->cq_count; i++) { - request_matcher_destroy(&rm->request_matchers[i]); - } - gpr_free(rm->request_matchers); + request_matcher_destroy(&rm->request_matcher); } gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } + if (server->started) { + request_matcher_destroy(&server->unregistered_request_matcher); + } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); - if (server->started) { - request_matcher_destroy(&server->unregistered_request_matchers[i]); - } } gpr_stack_lockfree_destroy(server->request_freelist); - gpr_free(server->unregistered_request_matchers); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -506,7 +504,9 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, } static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - call_data *calld = arg; + grpc_call_element *call_elem = arg; + call_data *calld = call_elem->call_data; + channel_data *chand = call_elem->channel_data; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; @@ -521,27 +521,34 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { return; } - int request_id = gpr_stack_lockfree_pop(rm->requests); - if (request_id == -1) { - gpr_mu_lock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - calld->state = PENDING; - gpr_mu_unlock(&calld->mu_state); - if (rm->pending_head == NULL) { - rm->pending_tail = rm->pending_head = calld; + for (size_t i = 0; i < server->cq_count; i++) { + size_t cq_idx = (chand->cq_idx + i) % server->cq_count; + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { + continue; } else { - rm->pending_tail->pending_next = calld; - rm->pending_tail = calld; + gpr_mu_lock(&calld->mu_state); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls[request_id]); + return; /* early out */ } - calld->pending_next = NULL; - gpr_mu_unlock(&server->mu_call); + } + + /* no cq to take the request found: queue it on the slow list */ + gpr_mu_lock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + calld->state = PENDING; + gpr_mu_unlock(&calld->mu_state); + if (rm->pending_head == NULL) { + rm->pending_tail = rm->pending_head = calld; } else { - gpr_mu_lock(&calld->mu_state); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, rm->cq_idx, - &server->requested_calls[request_id]); + rm->pending_tail->pending_next = calld; + rm->pending_tail = calld; } + calld->pending_next = NULL; + gpr_mu_unlock(&server->mu_call); } static void finish_start_new_rpc( @@ -563,14 +570,14 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, calld, true); + publish_new_rpc(exec_ctx, elem, true); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, calld); + grpc_closure_init(&calld->publish, publish_new_rpc, elem); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -599,10 +606,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc( - exec_ctx, server, elem, - &rm->server_registered_method->request_matchers[chand->cq_idx], - rm->server_registered_method->payload_handling); + finish_start_new_rpc(exec_ctx, server, elem, + &rm->server_registered_method->request_matcher, + rm->server_registered_method->payload_handling); return; } /* check for a wildcard method definition (no host set) */ @@ -616,15 +622,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc( - exec_ctx, server, elem, - &rm->server_registered_method->request_matchers[chand->cq_idx], - rm->server_registered_method->payload_handling); + finish_start_new_rpc(exec_ctx, server, elem, + &rm->server_registered_method->request_matcher, + rm->server_registered_method->payload_handling); return; } } finish_start_new_rpc(exec_ctx, server, elem, - &server->unregistered_request_matchers[chand->cq_idx], + &server->unregistered_request_matcher, GRPC_SRM_PAYLOAD_NONE); } @@ -655,18 +660,14 @@ static int num_channels(grpc_server *server) { static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, grpc_server *server) { if (server->started) { - for (size_t i = 0; i < server->cq_count; i++) { - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matchers[i]); - request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matchers[i]); - for (registered_method *rm = server->registered_methods; rm; - rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, - &rm->request_matchers[i]); - request_matcher_zombify_all_pending_calls(exec_ctx, - &rm->request_matchers[i]); - } + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + exec_ctx, &server->unregistered_request_matcher); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); } } } @@ -1046,21 +1047,14 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); - server->unregistered_request_matchers = gpr_malloc( - sizeof(*server->unregistered_request_matchers) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); - request_matcher_init(&server->unregistered_request_matchers[i], - server->max_requested_calls, i, server); - for (registered_method *rm = server->registered_methods; rm; - rm = rm->next) { - if (i == 0) { - rm->request_matchers = - gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count); - } - request_matcher_init(&rm->request_matchers[i], - server->max_requested_calls, i, server); - } + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls, server); + for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { + request_matcher_init(&rm->request_matcher, server->max_requested_calls, + server); } for (l = server->listeners; l; l = l->next) { @@ -1295,20 +1289,20 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, } switch (rc->type) { case BATCH_CALL: - rm = &server->unregistered_request_matchers[cq_idx]; + rm = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = &rc->data.registered.registered_method->request_matchers[cq_idx]; + rm = &rc->data.registered.registered_method->request_matcher; break; } server->requested_calls[request_id] = *rc; gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests, request_id)) { + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests); + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call);