Split mu_call into a server-wide and a per-call lock

pull/2315/head
Craig Tiller 10 years ago
parent e822963e8b
commit 76d2c3b951
  1. 66
      src/core/surface/server.c

@ -182,7 +182,11 @@ typedef enum {
struct call_data { struct call_data {
grpc_call *call; grpc_call *call;
/** protects state */
gpr_mu mu_state;
/** the current state of a call - see call_state */
call_state state; call_state state;
grpc_mdstr *path; grpc_mdstr *path;
grpc_mdstr *host; grpc_mdstr *host;
gpr_timespec deadline; gpr_timespec deadline;
@ -403,19 +407,23 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
} }
static void finish_start_new_rpc_and_unlock(grpc_server *server, static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
grpc_call_element *elem, call_data **pending_root,
call_data **pending_root, requested_call_array *array) {
requested_call_array *array) {
requested_call rc; requested_call rc;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
gpr_mu_lock(&server->mu_call);
if (array->count == 0) { if (array->count == 0) {
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING; calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START); call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
} else { } else {
rc = array->calls[--array->count]; rc = array->calls[--array->count];
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, &rc); begin_call(server, calld, &rc);
} }
@ -429,7 +437,6 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash; gpr_uint32 hash;
channel_registered_method *rm; channel_registered_method *rm;
gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) { if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */ /* TODO(ctiller): unify these two searches */
/* check for an exact match with host */ /* check for an exact match with host */
@ -440,9 +447,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break; if (!rm) break;
if (rm->host != calld->host) continue; if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue; if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem, finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
&rm->server_registered_method->pending, &rm->server_registered_method->requested);
&rm->server_registered_method->requested);
return; return;
} }
/* check for a wildcard method definition (no host set) */ /* check for a wildcard method definition (no host set) */
@ -453,14 +459,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break; if (!rm) break;
if (rm->host != NULL) continue; if (rm->host != NULL) continue;
if (rm->method != calld->path) continue; if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem, finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
&rm->server_registered_method->pending, &rm->server_registered_method->requested);
&rm->server_registered_method->requested);
return; return;
} }
} }
finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
&server->requested_calls); &server->requested_calls);
} }
static void kill_zombie(void *elem, int success) { static void kill_zombie(void *elem, int success) {
@ -541,27 +546,34 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED: case GRPC_STREAM_SEND_CLOSED:
break; break;
case GRPC_STREAM_RECV_CLOSED: case GRPC_STREAM_RECV_CLOSED:
gpr_mu_lock(&chand->server->mu_call); gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) { if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else {
gpr_mu_unlock(&calld->mu_state);
} }
gpr_mu_unlock(&chand->server->mu_call);
break; break;
case GRPC_STREAM_CLOSED: case GRPC_STREAM_CLOSED:
gpr_mu_lock(&chand->server->mu_call); gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) { if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) { } else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
gpr_mu_lock(&chand->server->mu_call);
call_list_remove(calld, PENDING_START);
gpr_mu_unlock(&chand->server->mu_call);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else {
gpr_mu_unlock(&calld->mu_state);
} }
gpr_mu_unlock(&chand->server->mu_call);
break; break;
} }
@ -623,6 +635,7 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data)); memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future; calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem); calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
@ -634,13 +647,12 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) { static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
size_t i;
gpr_mu_lock(&chand->server->mu_call); if (calld->state == PENDING) {
for (i = 0; i < CALL_LIST_COUNT; i++) { gpr_mu_lock(&chand->server->mu_call);
call_list_remove(elem->call_data, i); call_list_remove(elem->call_data, PENDING_START);
gpr_mu_unlock(&chand->server->mu_call);
} }
gpr_mu_unlock(&chand->server->mu_call);
if (calld->host) { if (calld->host) {
grpc_mdstr_unref(calld->host); grpc_mdstr_unref(calld->host);
@ -649,6 +661,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
grpc_mdstr_unref(calld->path); grpc_mdstr_unref(calld->path);
} }
gpr_mu_destroy(&calld->mu_state);
server_unref(chand->server); server_unref(chand->server);
} }
@ -1043,10 +1057,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
requested_calls = &rc->data.registered.registered_method->requested; requested_calls = &rc->data.registered.registered_method->requested;
break; break;
} }
if (calld) { if (calld != NULL) {
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
GPR_ASSERT(calld->state == PENDING); GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc); begin_call(server, calld, rc);
return GRPC_CALL_OK; return GRPC_CALL_OK;
} else { } else {

Loading…
Cancel
Save