Merge pull request #17401 from soheilhy/worktree-nolock

Implement a lock-free fast path for queue_call_request()
pull/17432/head
Soheil Hassas Yeganeh 6 years ago committed by GitHub
commit d6d71abdad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 104
      src/core/lib/surface/server.cc

@ -192,10 +192,13 @@ struct call_data {
};
struct request_matcher {
request_matcher(grpc_server* server);
~request_matcher();
grpc_server* server;
call_data* pending_head;
call_data* pending_tail;
gpr_locked_mpscq* requests_per_cq;
std::atomic<call_data*> pending_head{nullptr};
call_data* pending_tail = nullptr;
gpr_locked_mpscq* requests_per_cq = nullptr;
};
struct registered_method {
@ -344,22 +347,30 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb,
* request_matcher
*/
static void request_matcher_init(request_matcher* rm, grpc_server* server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
namespace {
request_matcher::request_matcher(grpc_server* server) : server(server) {
requests_per_cq = static_cast<gpr_locked_mpscq*>(
gpr_malloc(sizeof(*requests_per_cq) * server->cq_count));
for (size_t i = 0; i < server->cq_count; i++) {
gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
gpr_locked_mpscq_init(&requests_per_cq[i]);
}
}
static void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
request_matcher::~request_matcher() {
for (size_t i = 0; i < server->cq_count; i++) {
GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr);
gpr_locked_mpscq_destroy(&requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
gpr_free(requests_per_cq);
}
} // namespace
static void request_matcher_init(request_matcher* rm, grpc_server* server) {
new (rm) request_matcher(server);
}
static void request_matcher_destroy(request_matcher* rm) {
rm->~request_matcher();
}
static void kill_zombie(void* elem, grpc_error* error) {
@ -368,9 +379,10 @@ static void kill_zombie(void* elem, grpc_error* error) {
}
static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
while (rm->pending_head) {
call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next;
call_data* calld;
while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
nullptr) {
rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
@ -568,8 +580,9 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
}
gpr_atm_no_barrier_store(&calld->state, PENDING);
if (rm->pending_head == nullptr) {
rm->pending_tail = rm->pending_head = calld;
if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
rm->pending_head.store(calld, std::memory_order_relaxed);
rm->pending_tail = calld;
} else {
rm->pending_tail->pending_next = calld;
rm->pending_tail = calld;
@ -1433,30 +1446,39 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != nullptr) {
rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
if (rc == nullptr) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
// Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
} else {
publish_call(server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
// Fast path: if there is no pending request to be processed, immediately
// return.
if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) ||
// Note: We are reading the pending_head without holding the server's call
// mutex. Even if we read a non-null value here due to reordering,
// we will check it below again after grabbing the lock.
rm->pending_head.load(std::memory_order_relaxed) == nullptr) {
return GRPC_CALL_OK;
}
// Slow path: This was the first queued request and there are pendings:
// We need to lock and start matching calls.
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head.load(std::memory_order_relaxed)) !=
nullptr) {
rc = reinterpret_cast<requested_call*>(
gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
if (rc == nullptr) break;
rm->pending_head.store(calld->pending_next, std::memory_order_relaxed);
gpr_mu_unlock(&server->mu_call);
if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
// Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
} else {
publish_call(server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}

Loading…
Cancel
Save