|
|
|
@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; |
|
|
|
|
typedef struct { |
|
|
|
|
requested_call_type type; |
|
|
|
|
void *tag; |
|
|
|
|
grpc_completion_queue *cq_bound_to_call; |
|
|
|
|
grpc_completion_queue *cq_for_notification; |
|
|
|
|
grpc_call **call; |
|
|
|
|
union { |
|
|
|
|
struct { |
|
|
|
|
grpc_completion_queue *cq_bind; |
|
|
|
|
grpc_call **call; |
|
|
|
|
grpc_call_details *details; |
|
|
|
|
grpc_metadata_array *initial_metadata; |
|
|
|
|
} batch; |
|
|
|
|
struct { |
|
|
|
|
grpc_completion_queue *cq_bind; |
|
|
|
|
grpc_call **call; |
|
|
|
|
registered_method *registered_method; |
|
|
|
|
gpr_timespec *deadline; |
|
|
|
|
grpc_metadata_array *initial_metadata; |
|
|
|
@ -103,7 +102,6 @@ struct registered_method { |
|
|
|
|
char *host; |
|
|
|
|
call_data *pending; |
|
|
|
|
requested_call_array requested; |
|
|
|
|
grpc_completion_queue *cq; |
|
|
|
|
registered_method *next; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -130,7 +128,6 @@ struct grpc_server { |
|
|
|
|
size_t channel_filter_count; |
|
|
|
|
const grpc_channel_filter **channel_filters; |
|
|
|
|
grpc_channel_args *channel_args; |
|
|
|
|
grpc_completion_queue *unregistered_cq; |
|
|
|
|
|
|
|
|
|
grpc_completion_queue **cqs; |
|
|
|
|
grpc_pollset **pollsets; |
|
|
|
@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = { |
|
|
|
|
destroy_channel_elem, "server", |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void addcq(grpc_server *server, grpc_completion_queue *cq) { |
|
|
|
|
void grpc_server_register_completion_queue(grpc_server *server, |
|
|
|
|
grpc_completion_queue *cq) { |
|
|
|
|
size_t i, n; |
|
|
|
|
for (i = 0; i < server->cq_count; i++) { |
|
|
|
|
if (server->cqs[i] == cq) return; |
|
|
|
@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { |
|
|
|
|
server->cqs[n] = cq; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, |
|
|
|
|
grpc_channel_filter **filters, |
|
|
|
|
grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, |
|
|
|
|
size_t filter_count, |
|
|
|
|
const grpc_channel_args *args) { |
|
|
|
|
size_t i; |
|
|
|
@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, |
|
|
|
|
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); |
|
|
|
|
|
|
|
|
|
memset(server, 0, sizeof(grpc_server)); |
|
|
|
|
if (cq) addcq(server, cq); |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&server->mu); |
|
|
|
|
gpr_cv_init(&server->cv); |
|
|
|
|
|
|
|
|
|
server->unregistered_cq = cq; |
|
|
|
|
/* decremented by grpc_server_destroy */ |
|
|
|
|
gpr_ref_init(&server->internal_refcount, 1); |
|
|
|
|
server->root_channel_data.next = server->root_channel_data.prev = |
|
|
|
@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void *grpc_server_register_method(grpc_server *server, const char *method, |
|
|
|
|
const char *host, |
|
|
|
|
grpc_completion_queue *cq_new_rpc) { |
|
|
|
|
const char *host) { |
|
|
|
|
registered_method *m; |
|
|
|
|
if (!method) { |
|
|
|
|
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); |
|
|
|
@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method, |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
addcq(server, cq_new_rpc); |
|
|
|
|
m = gpr_malloc(sizeof(registered_method)); |
|
|
|
|
memset(m, 0, sizeof(*m)); |
|
|
|
|
m->method = gpr_strdup(method); |
|
|
|
|
m->host = gpr_strdup(host); |
|
|
|
|
m->next = server->registered_methods; |
|
|
|
|
m->cq = cq_new_rpc; |
|
|
|
|
server->registered_methods = m; |
|
|
|
|
return m; |
|
|
|
|
} |
|
|
|
@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, |
|
|
|
|
grpc_call_details *details, |
|
|
|
|
grpc_metadata_array *initial_metadata, |
|
|
|
|
grpc_completion_queue *cq_bind, |
|
|
|
|
void *tag) { |
|
|
|
|
grpc_call_error grpc_server_request_call( |
|
|
|
|
grpc_server *server, grpc_call **call, grpc_call_details *details, |
|
|
|
|
grpc_metadata_array *initial_metadata, |
|
|
|
|
grpc_completion_queue *cq_bound_to_call, |
|
|
|
|
grpc_completion_queue *cq_for_notification, void *tag) { |
|
|
|
|
requested_call rc; |
|
|
|
|
grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); |
|
|
|
|
grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); |
|
|
|
|
rc.type = BATCH_CALL; |
|
|
|
|
rc.tag = tag; |
|
|
|
|
rc.data.batch.cq_bind = cq_bind; |
|
|
|
|
rc.data.batch.call = call; |
|
|
|
|
rc.cq_bound_to_call = cq_bound_to_call; |
|
|
|
|
rc.cq_for_notification = cq_for_notification; |
|
|
|
|
rc.call = call; |
|
|
|
|
rc.data.batch.details = details; |
|
|
|
|
rc.data.batch.initial_metadata = initial_metadata; |
|
|
|
|
return queue_call_request(server, &rc); |
|
|
|
@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, |
|
|
|
|
grpc_call_error grpc_server_request_registered_call( |
|
|
|
|
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, |
|
|
|
|
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, |
|
|
|
|
grpc_completion_queue *cq_bind, void *tag) { |
|
|
|
|
grpc_completion_queue *cq_bound_to_call, |
|
|
|
|
grpc_completion_queue *cq_for_notification, void *tag) { |
|
|
|
|
requested_call rc; |
|
|
|
|
registered_method *registered_method = rm; |
|
|
|
|
grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); |
|
|
|
|
grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); |
|
|
|
|
rc.type = REGISTERED_CALL; |
|
|
|
|
rc.tag = tag; |
|
|
|
|
rc.data.registered.cq_bind = cq_bind; |
|
|
|
|
rc.data.registered.call = call; |
|
|
|
|
rc.cq_bound_to_call = cq_bound_to_call; |
|
|
|
|
rc.cq_for_notification = cq_for_notification; |
|
|
|
|
rc.call = call; |
|
|
|
|
rc.data.registered.registered_method = registered_method; |
|
|
|
|
rc.data.registered.deadline = deadline; |
|
|
|
|
rc.data.registered.initial_metadata = initial_metadata; |
|
|
|
@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld, |
|
|
|
|
fill in the metadata array passed by the client, we need to perform |
|
|
|
|
an ioreq op, that should complete immediately. */ |
|
|
|
|
|
|
|
|
|
grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); |
|
|
|
|
*rc->call = calld->call; |
|
|
|
|
calld->cq_new = rc->cq_for_notification; |
|
|
|
|
switch (rc->type) { |
|
|
|
|
case BATCH_CALL: |
|
|
|
|
cpstr(&rc->data.batch.details->host, |
|
|
|
@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld, |
|
|
|
|
cpstr(&rc->data.batch.details->method, |
|
|
|
|
&rc->data.batch.details->method_capacity, calld->path); |
|
|
|
|
rc->data.batch.details->deadline = calld->deadline; |
|
|
|
|
grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); |
|
|
|
|
*rc->data.batch.call = calld->call; |
|
|
|
|
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; |
|
|
|
|
r->data.recv_metadata = rc->data.batch.initial_metadata; |
|
|
|
|
r++; |
|
|
|
|
calld->cq_new = server->unregistered_cq; |
|
|
|
|
publish = publish_registered_or_batch; |
|
|
|
|
break; |
|
|
|
|
case REGISTERED_CALL: |
|
|
|
|
*rc->data.registered.deadline = calld->deadline; |
|
|
|
|
grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); |
|
|
|
|
*rc->data.registered.call = calld->call; |
|
|
|
|
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; |
|
|
|
|
r->data.recv_metadata = rc->data.registered.initial_metadata; |
|
|
|
|
r++; |
|
|
|
@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld, |
|
|
|
|
r->data.recv_message = rc->data.registered.optional_payload; |
|
|
|
|
r++; |
|
|
|
|
} |
|
|
|
|
calld->cq_new = rc->data.registered.registered_method->cq; |
|
|
|
|
publish = publish_registered_or_batch; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fail_call(grpc_server *server, requested_call *rc) { |
|
|
|
|
*rc->call = NULL; |
|
|
|
|
switch (rc->type) { |
|
|
|
|
case BATCH_CALL: |
|
|
|
|
*rc->data.batch.call = NULL; |
|
|
|
|
rc->data.batch.initial_metadata->count = 0; |
|
|
|
|
grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, |
|
|
|
|
GRPC_OP_ERROR); |
|
|
|
|
break; |
|
|
|
|
case REGISTERED_CALL: |
|
|
|
|
*rc->data.registered.call = NULL; |
|
|
|
|
rc->data.registered.initial_metadata->count = 0; |
|
|
|
|
grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, |
|
|
|
|
do_nothing, NULL, GRPC_OP_ERROR); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL, |
|
|
|
|
GRPC_OP_ERROR); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, |
|
|
|
|