|
|
|
@ -179,7 +179,11 @@ typedef enum { |
|
|
|
|
struct call_data { |
|
|
|
|
grpc_call *call; |
|
|
|
|
|
|
|
|
|
/** protects state */ |
|
|
|
|
gpr_mu mu_state; |
|
|
|
|
/** the current state of a call - see call_state */ |
|
|
|
|
call_state state; |
|
|
|
|
|
|
|
|
|
grpc_mdstr *path; |
|
|
|
|
grpc_mdstr *host; |
|
|
|
|
gpr_timespec deadline; |
|
|
|
@ -382,19 +386,24 @@ static void destroy_channel(channel_data *chand) { |
|
|
|
|
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void finish_start_new_rpc_and_unlock(grpc_server *server, |
|
|
|
|
grpc_call_element *elem, |
|
|
|
|
call_data **pending_root, |
|
|
|
|
requested_call **requests) { |
|
|
|
|
requested_call *rc = *requests; |
|
|
|
|
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, |
|
|
|
|
call_data **pending_root, |
|
|
|
|
requested_call **requests) { |
|
|
|
|
requested_call *rc; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
gpr_mu_lock(&server->mu_call); |
|
|
|
|
rc = *requests; |
|
|
|
|
if (rc == NULL) { |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = PENDING; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
call_list_join(pending_root, calld, PENDING_START); |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
} else { |
|
|
|
|
*requests = rc->next; |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
calld->state = ACTIVATED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
begin_call(server, calld, rc); |
|
|
|
|
} |
|
|
|
@ -408,7 +417,6 @@ static void start_new_rpc(grpc_call_element *elem) { |
|
|
|
|
gpr_uint32 hash; |
|
|
|
|
channel_registered_method *rm; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&server->mu_call); |
|
|
|
|
if (chand->registered_methods && calld->path && calld->host) { |
|
|
|
|
/* TODO(ctiller): unify these two searches */ |
|
|
|
|
/* check for an exact match with host */ |
|
|
|
@ -419,9 +427,8 @@ static void start_new_rpc(grpc_call_element *elem) { |
|
|
|
|
if (!rm) break; |
|
|
|
|
if (rm->host != calld->host) continue; |
|
|
|
|
if (rm->method != calld->path) continue; |
|
|
|
|
finish_start_new_rpc_and_unlock(server, elem, |
|
|
|
|
&rm->server_registered_method->pending, |
|
|
|
|
&rm->server_registered_method->requests); |
|
|
|
|
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, |
|
|
|
|
&rm->server_registered_method->requests); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* check for a wildcard method definition (no host set) */ |
|
|
|
@ -432,14 +439,13 @@ static void start_new_rpc(grpc_call_element *elem) { |
|
|
|
|
if (!rm) break; |
|
|
|
|
if (rm->host != NULL) continue; |
|
|
|
|
if (rm->method != calld->path) continue; |
|
|
|
|
finish_start_new_rpc_and_unlock(server, elem, |
|
|
|
|
&rm->server_registered_method->pending, |
|
|
|
|
&rm->server_registered_method->requests); |
|
|
|
|
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, |
|
|
|
|
&rm->server_registered_method->requests); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], |
|
|
|
|
&server->requests); |
|
|
|
|
finish_start_new_rpc(server, elem, &server->lists[PENDING_START], |
|
|
|
|
&server->requests); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void kill_zombie(void *elem, int success) { |
|
|
|
@ -530,27 +536,34 @@ static void server_on_recv(void *ptr, int success) { |
|
|
|
|
case GRPC_STREAM_SEND_CLOSED: |
|
|
|
|
break; |
|
|
|
|
case GRPC_STREAM_RECV_CLOSED: |
|
|
|
|
gpr_mu_lock(&chand->server->mu_call); |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
if (calld->state == NOT_STARTED) { |
|
|
|
|
calld->state = ZOMBIED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); |
|
|
|
|
grpc_iomgr_add_callback(&calld->kill_zombie_closure); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&chand->server->mu_call); |
|
|
|
|
break; |
|
|
|
|
case GRPC_STREAM_CLOSED: |
|
|
|
|
gpr_mu_lock(&chand->server->mu_call); |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
if (calld->state == NOT_STARTED) { |
|
|
|
|
calld->state = ZOMBIED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); |
|
|
|
|
grpc_iomgr_add_callback(&calld->kill_zombie_closure); |
|
|
|
|
} else if (calld->state == PENDING) { |
|
|
|
|
call_list_remove(calld, PENDING_START); |
|
|
|
|
calld->state = ZOMBIED; |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
gpr_mu_lock(&chand->server->mu_call); |
|
|
|
|
call_list_remove(calld, PENDING_START); |
|
|
|
|
gpr_mu_unlock(&chand->server->mu_call); |
|
|
|
|
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); |
|
|
|
|
grpc_iomgr_add_callback(&calld->kill_zombie_closure); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&chand->server->mu_call); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -612,6 +625,7 @@ static void init_call_elem(grpc_call_element *elem, |
|
|
|
|
memset(calld, 0, sizeof(call_data)); |
|
|
|
|
calld->deadline = gpr_inf_future; |
|
|
|
|
calld->call = grpc_call_from_top_element(elem); |
|
|
|
|
gpr_mu_init(&calld->mu_state); |
|
|
|
|
|
|
|
|
|
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); |
|
|
|
|
|
|
|
|
@ -623,13 +637,12 @@ static void init_call_elem(grpc_call_element *elem, |
|
|
|
|
static void destroy_call_elem(grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
size_t i; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&chand->server->mu_call); |
|
|
|
|
for (i = 0; i < CALL_LIST_COUNT; i++) { |
|
|
|
|
call_list_remove(elem->call_data, i); |
|
|
|
|
if (calld->state == PENDING) { |
|
|
|
|
gpr_mu_lock(&chand->server->mu_call); |
|
|
|
|
call_list_remove(elem->call_data, PENDING_START); |
|
|
|
|
gpr_mu_unlock(&chand->server->mu_call); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&chand->server->mu_call); |
|
|
|
|
|
|
|
|
|
if (calld->host) { |
|
|
|
|
grpc_mdstr_unref(calld->host); |
|
|
|
@ -638,6 +651,8 @@ static void destroy_call_elem(grpc_call_element *elem) { |
|
|
|
|
grpc_mdstr_unref(calld->path); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_destroy(&calld->mu_state); |
|
|
|
|
|
|
|
|
|
server_unref(chand->server); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1024,10 +1039,12 @@ static grpc_call_error queue_call_request(grpc_server *server, |
|
|
|
|
requests = &rc->data.registered.registered_method->requests; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if (calld) { |
|
|
|
|
if (calld != NULL) { |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
gpr_mu_lock(&calld->mu_state); |
|
|
|
|
GPR_ASSERT(calld->state == PENDING); |
|
|
|
|
calld->state = ACTIVATED; |
|
|
|
|
gpr_mu_unlock(&server->mu_call); |
|
|
|
|
gpr_mu_unlock(&calld->mu_state); |
|
|
|
|
begin_call(server, calld, rc); |
|
|
|
|
return GRPC_CALL_OK; |
|
|
|
|
} else { |
|
|
|
|