From 24be0f79e2a1a0b8beeb6141aa5dc83804d36a0d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 10 Feb 2015 14:04:22 -0800 Subject: [PATCH] Rewrite server request startup path Stub in registered methods, cleanup to the point I understand this code again. --- include/grpc/grpc.h | 48 +++-- src/core/surface/call.c | 4 + src/core/surface/call.h | 1 + src/core/surface/server.c | 398 +++++++++++++++++++++++++++++--------- 4 files changed, 349 insertions(+), 102 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 7b33a4d8619..4ccb5a4dd59 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -254,15 +254,18 @@ void grpc_call_details_init(grpc_call_details *details); void grpc_call_details_destroy(grpc_call_details *details); typedef enum { - /* Send initial metadata: one and only one instance MUST be sent for each call, + /* Send initial metadata: one and only one instance MUST be sent for each + call, unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_INITIAL_METADATA = 0, /* Send a message: 0 or more of these operations can occur for each call */ GRPC_OP_SEND_MESSAGE, - /* Send a close from the server: one and only one instance MUST be sent from the client, + /* Send a close from the server: one and only one instance MUST be sent from + the client, unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_CLOSE_FROM_CLIENT, - /* Send status from the server: one and only one instance MUST be sent from the server + /* Send status from the server: one and only one instance MUST be sent from + the server unless the call was cancelled - in which case this can be skipped */ GRPC_OP_SEND_STATUS_FROM_SERVER, /* Receive initial metadata: one and only one MUST be made on the client, must @@ -270,13 +273,16 @@ typedef enum { GRPC_OP_RECV_INITIAL_METADATA, /* Receive a message: 0 or more of these operations can occur for each call */ GRPC_OP_RECV_MESSAGE, - /* Receive status on the client: one and only one must be made on the client */ + /* Receive status on the client: one and only one must be made on the client + */ GRPC_OP_RECV_STATUS_ON_CLIENT, - /* Receive status on the server: one and only one must be made on the server */ + /* Receive status on the server: one and only one must be made on the server + */ GRPC_OP_RECV_CLOSE_ON_SERVER } grpc_op_type; -/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has +/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT + which has no arguments) */ typedef struct grpc_op { grpc_op_type op; @@ -300,29 +306,33 @@ typedef struct grpc_op { grpc_metadata_array *recv_initial_metadata; grpc_byte_buffer **recv_message; struct { - /* ownership of the array is with the caller, but ownership of the elements + /* ownership of the array is with the caller, but ownership of the + elements stays with the call object (ie key, value members are owned by the call object, trailing_metadata->array is owned by the caller). After the operation completes, call grpc_metadata_array_destroy on this value, or reuse it in a future op. */ grpc_metadata_array *trailing_metadata; grpc_status_code *status; - /* status_details is a buffer owned by the application before the op completes - and after the op has completed. During the operation status_details may be - reallocated to a size larger than *status_details_capacity, in which case + /* status_details is a buffer owned by the application before the op + completes + and after the op has completed. During the operation status_details may + be + reallocated to a size larger than *status_details_capacity, in which + case *status_details_capacity will be updated with the new array capacity. Pre-allocating space: size_t my_capacity = 8; char *my_details = gpr_malloc(my_capacity); x.status_details = &my_details; - x.status_details_capacity = &my_capacity; + x.status_details_capacity = &my_capacity; Not pre-allocating space: size_t my_capacity = 0; char *my_details = NULL; x.status_details = &my_details; - x.status_details_capacity = &my_capacity; + x.status_details_capacity = &my_capacity; After the call: gpr_free(my_details); */ @@ -330,7 +340,8 @@ typedef struct grpc_op { size_t *status_details_capacity; } recv_status_on_client; struct { - /* out argument, set to 1 if the call failed in any way (seen as a cancellation + /* out argument, set to 1 if the call failed in any way (seen as a + cancellation on the server), or 0 if the call succeeded */ int *cancelled; } recv_close_on_server; @@ -392,7 +403,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, gpr_timespec deadline); /* Start a batch of operations defined in the array ops; when complete, post a - completion of type 'tag' to the completion queue bound to the call. + completion of type 'tag' to the completion queue bound to the call. The order of ops specified in the batch has no significance. Only one operation of each type can be active at once in any given batch. */ @@ -544,6 +555,15 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *request_metadata, grpc_completion_queue *completion_queue, void *tag_new); +void *grpc_server_register_method(grpc_server *server, const char *method, + const char *host); + +grpc_call_error grpc_server_request_registered_call( + grpc_server *server, void *registered_method, grpc_call **call, + gpr_timespec *deadline, grpc_metadata_array *request_metadata, + grpc_byte_buffer **optional_payload, + grpc_completion_queue *completion_queue, void *tag_new); + /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, const grpc_channel_args *args); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ee8e8588c70..cc7094a0ce2 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -258,6 +258,10 @@ void grpc_call_set_completion_queue(grpc_call *call, call->cq = cq; } +grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { + return call->cq; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 05014c631c7..55e434433d3 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -89,6 +89,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); +grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ee0f96a5803..0415949dacd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -60,6 +60,55 @@ typedef struct listener { typedef struct call_data call_data; typedef struct channel_data channel_data; +typedef struct registered_method registered_method; + +typedef struct { + call_data *next; + call_data *prev; +} call_link; + +typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type; + +typedef struct { + requested_call_type type; + void *tag; + union { + struct { + grpc_completion_queue *cq; + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *initial_metadata; + } batch; + struct { + grpc_completion_queue *cq; + grpc_call **call; + registered_method *registered_method; + gpr_timespec *deadline; + grpc_metadata_array *initial_metadata; + grpc_byte_buffer **optional_payload; + } registered; + } data; +} requested_call; + +typedef struct { + requested_call *calls; + size_t count; + size_t capacity; +} requested_call_array; + +struct registered_method { + char *method; + char *host; + call_link pending; + requested_call_array requested; + registered_method *next; +}; + +typedef struct channel_registered_method { + registered_method *server_registered_method; + grpc_mdstr *method; + grpc_mdstr *host; +} channel_registered_method; struct channel_data { grpc_server *server; @@ -71,20 +120,6 @@ struct channel_data { channel_data *prev; }; -typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, - grpc_call **call, grpc_call_details *details, - grpc_metadata_array *initial_metadata, - call_data *calld, void *user_data); - -typedef struct { - void *user_data; - grpc_completion_queue *cq; - grpc_call **call; - grpc_call_details *details; - grpc_metadata_array *initial_metadata; - new_call_cb cb; -} requested_call; - struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; @@ -93,9 +128,8 @@ struct grpc_server { gpr_mu mu; - requested_call *requested_calls; - size_t requested_call_count; - size_t requested_call_capacity; + registered_method *registered_methods; + requested_call_array requested_calls; gpr_uint8 shutdown; gpr_uint8 have_shutdown_tag; @@ -108,11 +142,6 @@ struct grpc_server { gpr_refcount internal_refcount; }; -typedef struct { - call_data *next; - call_data *prev; -} call_link; - typedef enum { /* waiting for metadata */ NOT_STARTED, @@ -125,7 +154,7 @@ typedef enum { } call_state; typedef struct legacy_data { - grpc_metadata_array *initial_metadata; + grpc_metadata_array initial_metadata; } legacy_data; struct call_data { @@ -137,7 +166,6 @@ struct call_data { grpc_mdstr *host; legacy_data *legacy; - grpc_call_details *details; gpr_uint8 included[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -148,6 +176,10 @@ struct call_data { static void do_nothing(void *unused, grpc_op_error ignored) {} +static void begin_call(grpc_server *server, call_data *calld, + requested_call *rc); +static void fail_call(grpc_server *server, requested_call *rc); + static int call_list_join(grpc_server *server, call_data *call, call_list list) { if (call->included[list]) return 0; @@ -196,6 +228,22 @@ static int call_list_remove(grpc_server *server, call_data *call, return 1; } +static void requested_call_array_destroy(requested_call_array *array) { + gpr_free(array->calls); +} + +static requested_call *requested_call_array_add(requested_call_array *array) { + requested_call *rc; + if (array->count == array->capacity) { + array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); + array->calls = + gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); + } + rc = &array->calls[array->count++]; + memset(rc, 0, sizeof(*rc)); + return rc; +} + static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -205,7 +253,7 @@ static void server_unref(grpc_server *server) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); gpr_free(server->channel_filters); - gpr_free(server->requested_calls); + requested_call_array_destroy(&server->requested_calls); gpr_free(server); } } @@ -223,7 +271,6 @@ static void orphan_channel(channel_data *chand) { static void finish_destroy_channel(void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; - /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ grpc_channel_destroy(chand->channel); server_unref(server); } @@ -242,12 +289,12 @@ static void start_new_rpc(grpc_call_element *elem) { grpc_server *server = chand->server; gpr_mu_lock(&server->mu); - if (server->requested_call_count > 0) { - requested_call rc = server->requested_calls[--server->requested_call_count]; + if (server->requested_calls.count > 0) { + requested_call rc = + server->requested_calls.calls[--server->requested_calls.count]; calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld, - rc.user_data); + begin_call(server, calld, &rc); } else { calld->state = PENDING; call_list_join(server, calld, PENDING_START); @@ -427,8 +474,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } if (calld->legacy) { - gpr_free(calld->legacy->initial_metadata->metadata); - gpr_free(calld->legacy->initial_metadata); + gpr_free(calld->legacy->initial_metadata.metadata); gpr_free(calld->legacy); } @@ -464,9 +510,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server", + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, @@ -509,6 +554,36 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, return server; } +static int streq(const char *a, const char *b) { + if (a == NULL && b == NULL) return 1; + if (a == NULL) return 0; + if (b == NULL) return 0; + return 0 == strcmp(a, b); +} + +void *grpc_server_register_method(grpc_server *server, const char *method, + const char *host) { + registered_method *m; + if (!method) { + gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); + return NULL; + } + for (m = server->registered_methods; m; m = m->next) { + if (streq(m->method, method) && streq(m->host, host)) { + gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, + host ? host : "*"); + return NULL; + } + } + m = gpr_malloc(sizeof(registered_method)); + memset(m, 0, sizeof(*m)); + m->method = gpr_strdup(method); + m->host = gpr_strdup(host); + m->next = server->registered_methods; + server->registered_methods = m; + return m; +} + void grpc_server_start(grpc_server *server) { listener *l; @@ -561,8 +636,7 @@ grpc_transport_setup_result grpc_server_setup_transport( void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, void *shutdown_tag) { listener *l; - requested_call *requested_calls; - size_t requested_call_count; + requested_call_array requested_calls; channel_data **channels; channel_data *c; size_t nchannels; @@ -592,9 +666,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, } requested_calls = server->requested_calls; - requested_call_count = server->requested_call_count; - server->requested_calls = NULL; - server->requested_call_count = 0; + memset(&server->requested_calls, 0, sizeof(server->requested_calls)); server->shutdown = 1; server->have_shutdown_tag = have_shutdown_tag; @@ -623,13 +695,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, gpr_free(channels); /* terminate all the requested calls */ - for (i = 0; i < requested_call_count; i++) { - requested_calls[i].cb(server, requested_calls[i].cq, - requested_calls[i].call, requested_calls[i].details, - requested_calls[i].initial_metadata, NULL, - requested_calls[i].user_data); + for (i = 0; i < requested_calls.count; i++) { + fail_call(server, &requested_calls.calls[i]); } - gpr_free(requested_calls); + gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -675,17 +744,12 @@ void grpc_server_add_listener(grpc_server *server, void *arg, } static grpc_call_error queue_call_request(grpc_server *server, - grpc_completion_queue *cq, - grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - new_call_cb cb, void *user_data) { + requested_call *rc) { call_data *calld; - requested_call *rc; gpr_mu_lock(&server->mu); if (server->shutdown) { gpr_mu_unlock(&server->mu); - cb(server, cq, call, details, initial_metadata, NULL, user_data); + fail_call(server, rc); return GRPC_CALL_OK; } calld = call_list_remove_head(server, PENDING_START); @@ -693,29 +757,60 @@ static grpc_call_error queue_call_request(grpc_server *server, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - cb(server, cq, call, details, initial_metadata, calld, user_data); + begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - if (server->requested_call_count == server->requested_call_capacity) { - server->requested_call_capacity = - GPR_MAX(server->requested_call_capacity + 8, - server->requested_call_capacity * 2); - server->requested_calls = - gpr_realloc(server->requested_calls, - sizeof(requested_call) * server->requested_call_capacity); - } - rc = &server->requested_calls[server->requested_call_count++]; - rc->cb = cb; - rc->cq = cq; - rc->call = call; - rc->details = details; - rc->user_data = user_data; - rc->initial_metadata = initial_metadata; + *requested_call_array_add(&server->requested_calls) = *rc; gpr_mu_unlock(&server->mu); return GRPC_CALL_OK; } } +grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq, void *tag) { + requested_call rc; + grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + rc.type = BATCH_CALL; + rc.tag = tag; + rc.data.batch.cq = cq; + rc.data.batch.call = call; + rc.data.batch.details = details; + rc.data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, &rc); +} + +grpc_call_error grpc_server_request_registered_call( + grpc_server *server, void *registered_method, grpc_call **call, + gpr_timespec *deadline, grpc_metadata_array *initial_metadata, + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq, void *tag) { + requested_call rc; + grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + rc.type = REGISTERED_CALL; + rc.tag = tag; + rc.data.registered.cq = cq; + rc.data.registered.call = call; + rc.data.registered.registered_method = registered_method; + rc.data.registered.deadline = deadline; + rc.data.registered.initial_metadata = initial_metadata; + rc.data.registered.optional_payload = optional_payload; + return queue_call_request(server, &rc); +} + +grpc_call_error grpc_server_request_call_old(grpc_server *server, + void *tag_new) { + requested_call rc; + grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + rc.type = LEGACY_CALL; + rc.tag = tag_new; + return queue_call_request(server, &rc); +} + +static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); +static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, + void *tag); + static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; size_t len = GPR_SLICE_LENGTH(slice); @@ -727,6 +822,108 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); } +static void begin_call(grpc_server *server, call_data *calld, + requested_call *rc) { + grpc_ioreq_completion_func publish; + grpc_ioreq req[2]; + grpc_ioreq *r = req; + + /* called once initial metadata has been read by the call, but BEFORE + the ioreq to fetch it out of the call has been executed. + This means metadata related fields can be relied on in calld, but to + fill in the metadata array passed by the client, we need to perform + an ioreq op, that should complete immediately. */ + + switch (rc->type) { + case LEGACY_CALL: + calld->legacy = gpr_malloc(sizeof(legacy_data)); + memset(calld->legacy, 0, sizeof(legacy_data)); + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = &calld->legacy->initial_metadata; + r++; + publish = publish_legacy; + break; + case BATCH_CALL: + cpstr(&rc->data.batch.details->host, + &rc->data.batch.details->host_capacity, calld->host); + cpstr(&rc->data.batch.details->method, + &rc->data.batch.details->method_capacity, calld->path); + grpc_call_set_completion_queue(calld->call, rc->data.batch.cq); + *rc->data.batch.call = calld->call; + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = rc->data.batch.initial_metadata; + r++; + publish = publish_registered_or_batch; + break; + case REGISTERED_CALL: + *rc->data.registered.deadline = calld->deadline; + grpc_call_set_completion_queue(calld->call, rc->data.registered.cq); + *rc->data.registered.call = calld->call; + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = rc->data.registered.initial_metadata; + r++; + if (rc->data.registered.optional_payload) { + r->op = GRPC_IOREQ_RECV_MESSAGE; + r->data.recv_message = rc->data.registered.optional_payload; + r++; + } + publish = publish_registered_or_batch; + break; + } + + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, + rc->tag); +} + +static void fail_call(grpc_server *server, requested_call *rc) { + switch (rc->type) { + case LEGACY_CALL: + grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL, + NULL, gpr_inf_past, 0, NULL); + break; + case BATCH_CALL: + *rc->data.batch.call = NULL; + rc->data.batch.initial_metadata->count = 0; + grpc_cq_end_op_complete(rc->data.batch.cq, rc->tag, NULL, do_nothing, + NULL, GRPC_OP_ERROR); + break; + case REGISTERED_CALL: + *rc->data.registered.call = NULL; + rc->data.registered.initial_metadata->count = 0; + grpc_cq_end_op_complete(rc->data.registered.cq, rc->tag, NULL, do_nothing, + NULL, GRPC_OP_ERROR); + break; + } +} + +static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + + if (status == GRPC_OP_OK) { + grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL, + grpc_mdstr_as_c_string(calld->path), + grpc_mdstr_as_c_string(calld->host), calld->deadline, + calld->legacy->initial_metadata.count, + calld->legacy->initial_metadata.metadata); + } else { + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + } +} + +static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, + void *tag) { + grpc_cq_end_op_complete(grpc_call_get_completion_queue(call), tag, call, + do_nothing, NULL, status); +} + +#if 0 + static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); @@ -748,9 +945,14 @@ static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { static void begin_request(grpc_server *server, grpc_completion_queue *cq, grpc_call **call, grpc_call_details *details, + registered_method *registered_method, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, + grpc_byte_buffer **optional_payload, call_data *calld, void *tag) { grpc_ioreq req; + GPR_ASSERT(registered_method == NULL); + GPR_ASSERT(deadline == NULL); + GPR_ASSERT(optional_payload == NULL); if (!calld) { *call = NULL; initial_metadata->count = 0; @@ -767,13 +969,39 @@ static void begin_request(grpc_server *server, grpc_completion_queue *cq, tag); } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq, void *tag) { - grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); - return queue_call_request(server, cq, call, details, initial_metadata, - begin_request, tag); +static void begin_registered_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, + registered_method *registered_method, gpr_timespec *deadline, + grpc_metadata_array *initial_metadata, + grpc_byte_buffer **optional_payload, + call_data *calld, void *tag) { + grpc_ioreq req[2]; + grpc_ioreq *r; + GPR_ASSERT(registered_method != NULL); + GPR_ASSERT(deadline != NULL); + GPR_ASSERT(optional_payload != NULL); + if (!calld) { + *call = NULL; + initial_metadata->count = 0; + grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); + return; + } + calld->details = NULL; + calld->registered_method = registered_method; + grpc_call_set_completion_queue(calld->call, cq); + *call = calld->call; + r = req; + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = initial_metadata; + r++; + if (optional_payload != NULL) { + r->op = GRPC_IOREQ_RECV_MESSAGE; + r->data.recv_message = optional_payload; + r++; + } + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - rq, publish_request, + tag); } static void publish_legacy_request(grpc_call *call, grpc_op_error status, @@ -796,9 +1024,11 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status, } static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, - grpc_call **call, grpc_call_details *details, - grpc_metadata_array *initial_metadata, - call_data *calld, void *tag) { + grpc_call **call, grpc_call_details *details, + registered_method *registered_method, gpr_timespec *deadline, + grpc_metadata_array *initial_metadata, + grpc_byte_buffer **optional_payload, + call_data *calld, void *tag) { grpc_ioreq req; GPR_ASSERT(call == NULL); GPR_ASSERT(details == NULL); @@ -818,15 +1048,7 @@ static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, publish_legacy_request, tag); } -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new) { - grpc_metadata_array *client_metadata = - gpr_malloc(sizeof(grpc_metadata_array)); - memset(client_metadata, 0, sizeof(*client_metadata)); - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - return queue_call_request(server, server->cq, NULL, NULL, client_metadata, - begin_legacy_request, tag_new); -} +#endif const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { return server->channel_args;