Rewrite server request startup path

Stub in registered methods, cleanup to the point I understand this code
again.
pull/501/head
Craig Tiller 10 years ago
parent 1d3561e1ee
commit 24be0f79e2
  1. 48
      include/grpc/grpc.h
  2. 4
      src/core/surface/call.c
  3. 1
      src/core/surface/call.h
  4. 398
      src/core/surface/server.c

@ -254,15 +254,18 @@ void grpc_call_details_init(grpc_call_details *details);
void grpc_call_details_destroy(grpc_call_details *details);
typedef enum {
/* Send initial metadata: one and only one instance MUST be sent for each call,
/* Send initial metadata: one and only one instance MUST be sent for each
call,
unless the call was cancelled - in which case this can be skipped */
GRPC_OP_SEND_INITIAL_METADATA = 0,
/* Send a message: 0 or more of these operations can occur for each call */
GRPC_OP_SEND_MESSAGE,
/* Send a close from the server: one and only one instance MUST be sent from the client,
/* Send a close from the server: one and only one instance MUST be sent from
the client,
unless the call was cancelled - in which case this can be skipped */
GRPC_OP_SEND_CLOSE_FROM_CLIENT,
/* Send status from the server: one and only one instance MUST be sent from the server
/* Send status from the server: one and only one instance MUST be sent from
the server
unless the call was cancelled - in which case this can be skipped */
GRPC_OP_SEND_STATUS_FROM_SERVER,
/* Receive initial metadata: one and only one MUST be made on the client, must
@ -270,13 +273,16 @@ typedef enum {
GRPC_OP_RECV_INITIAL_METADATA,
/* Receive a message: 0 or more of these operations can occur for each call */
GRPC_OP_RECV_MESSAGE,
/* Receive status on the client: one and only one must be made on the client */
/* Receive status on the client: one and only one must be made on the client
*/
GRPC_OP_RECV_STATUS_ON_CLIENT,
/* Receive status on the server: one and only one must be made on the server */
/* Receive status on the server: one and only one must be made on the server
*/
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;
/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has
/* Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT
which has
no arguments) */
typedef struct grpc_op {
grpc_op_type op;
@ -300,29 +306,33 @@ typedef struct grpc_op {
grpc_metadata_array *recv_initial_metadata;
grpc_byte_buffer **recv_message;
struct {
/* ownership of the array is with the caller, but ownership of the elements
/* ownership of the array is with the caller, but ownership of the
elements
stays with the call object (ie key, value members are owned by the call
object, trailing_metadata->array is owned by the caller).
After the operation completes, call grpc_metadata_array_destroy on this
value, or reuse it in a future op. */
grpc_metadata_array *trailing_metadata;
grpc_status_code *status;
/* status_details is a buffer owned by the application before the op completes
and after the op has completed. During the operation status_details may be
reallocated to a size larger than *status_details_capacity, in which case
/* status_details is a buffer owned by the application before the op
completes
and after the op has completed. During the operation status_details may
be
reallocated to a size larger than *status_details_capacity, in which
case
*status_details_capacity will be updated with the new array capacity.
Pre-allocating space:
size_t my_capacity = 8;
char *my_details = gpr_malloc(my_capacity);
x.status_details = &my_details;
x.status_details_capacity = &my_capacity;
x.status_details_capacity = &my_capacity;
Not pre-allocating space:
size_t my_capacity = 0;
char *my_details = NULL;
x.status_details = &my_details;
x.status_details_capacity = &my_capacity;
x.status_details_capacity = &my_capacity;
After the call:
gpr_free(my_details); */
@ -330,7 +340,8 @@ typedef struct grpc_op {
size_t *status_details_capacity;
} recv_status_on_client;
struct {
/* out argument, set to 1 if the call failed in any way (seen as a cancellation
/* out argument, set to 1 if the call failed in any way (seen as a
cancellation
on the server), or 0 if the call succeeded */
int *cancelled;
} recv_close_on_server;
@ -392,7 +403,7 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
gpr_timespec deadline);
/* Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
completion of type 'tag' to the completion queue bound to the call.
The order of ops specified in the batch has no significance.
Only one operation of each type can be active at once in any given
batch. */
@ -544,6 +555,15 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *request_metadata,
grpc_completion_queue *completion_queue, void *tag_new);
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host);
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload,
grpc_completion_queue *completion_queue, void *tag_new);
/* Create a server */
grpc_server *grpc_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args);

@ -258,6 +258,10 @@ void grpc_call_set_completion_queue(grpc_call *call,
call->cq = cq;
}
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {

@ -89,6 +89,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);

@ -60,6 +60,55 @@ typedef struct listener {
typedef struct call_data call_data;
typedef struct channel_data channel_data;
typedef struct registered_method registered_method;
typedef struct {
call_data *next;
call_data *prev;
} call_link;
typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
void *tag;
union {
struct {
grpc_completion_queue *cq;
grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
} batch;
struct {
grpc_completion_queue *cq;
grpc_call **call;
registered_method *registered_method;
gpr_timespec *deadline;
grpc_metadata_array *initial_metadata;
grpc_byte_buffer **optional_payload;
} registered;
} data;
} requested_call;
typedef struct {
requested_call *calls;
size_t count;
size_t capacity;
} requested_call_array;
struct registered_method {
char *method;
char *host;
call_link pending;
requested_call_array requested;
registered_method *next;
};
typedef struct channel_registered_method {
registered_method *server_registered_method;
grpc_mdstr *method;
grpc_mdstr *host;
} channel_registered_method;
struct channel_data {
grpc_server *server;
@ -71,20 +120,6 @@ struct channel_data {
channel_data *prev;
};
typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *user_data);
typedef struct {
void *user_data;
grpc_completion_queue *cq;
grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
new_call_cb cb;
} requested_call;
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
@ -93,9 +128,8 @@ struct grpc_server {
gpr_mu mu;
requested_call *requested_calls;
size_t requested_call_count;
size_t requested_call_capacity;
registered_method *registered_methods;
requested_call_array requested_calls;
gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
@ -108,11 +142,6 @@ struct grpc_server {
gpr_refcount internal_refcount;
};
typedef struct {
call_data *next;
call_data *prev;
} call_link;
typedef enum {
/* waiting for metadata */
NOT_STARTED,
@ -125,7 +154,7 @@ typedef enum {
} call_state;
typedef struct legacy_data {
grpc_metadata_array *initial_metadata;
grpc_metadata_array initial_metadata;
} legacy_data;
struct call_data {
@ -137,7 +166,6 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
grpc_call_details *details;
gpr_uint8 included[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@ -148,6 +176,10 @@ struct call_data {
static void do_nothing(void *unused, grpc_op_error ignored) {}
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
static int call_list_join(grpc_server *server, call_data *call,
call_list list) {
if (call->included[list]) return 0;
@ -196,6 +228,22 @@ static int call_list_remove(grpc_server *server, call_data *call,
return 1;
}
static void requested_call_array_destroy(requested_call_array *array) {
gpr_free(array->calls);
}
static requested_call *requested_call_array_add(requested_call_array *array) {
requested_call *rc;
if (array->count == array->capacity) {
array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
array->calls =
gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
}
rc = &array->calls[array->count++];
memset(rc, 0, sizeof(*rc));
return rc;
}
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@ -205,7 +253,7 @@ static void server_unref(grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
gpr_free(server->requested_calls);
requested_call_array_destroy(&server->requested_calls);
gpr_free(server);
}
}
@ -223,7 +271,6 @@ static void orphan_channel(channel_data *chand) {
static void finish_destroy_channel(void *cd, int success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
/*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
grpc_channel_destroy(chand->channel);
server_unref(server);
}
@ -242,12 +289,12 @@ static void start_new_rpc(grpc_call_element *elem) {
grpc_server *server = chand->server;
gpr_mu_lock(&server->mu);
if (server->requested_call_count > 0) {
requested_call rc = server->requested_calls[--server->requested_call_count];
if (server->requested_calls.count > 0) {
requested_call rc =
server->requested_calls.calls[--server->requested_calls.count];
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld,
rc.user_data);
begin_call(server, calld, &rc);
} else {
calld->state = PENDING;
call_list_join(server, calld, PENDING_START);
@ -427,8 +474,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
if (calld->legacy) {
gpr_free(calld->legacy->initial_metadata->metadata);
gpr_free(calld->legacy->initial_metadata);
gpr_free(calld->legacy->initial_metadata.metadata);
gpr_free(calld->legacy);
}
@ -464,9 +510,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
call_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "server",
call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
@ -509,6 +554,36 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
return server;
}
static int streq(const char *a, const char *b) {
if (a == NULL && b == NULL) return 1;
if (a == NULL) return 0;
if (b == NULL) return 0;
return 0 == strcmp(a, b);
}
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
if (streq(m->method, method) && streq(m->host, host)) {
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
host ? host : "*");
return NULL;
}
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
server->registered_methods = m;
return m;
}
void grpc_server_start(grpc_server *server) {
listener *l;
@ -561,8 +636,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l;
requested_call *requested_calls;
size_t requested_call_count;
requested_call_array requested_calls;
channel_data **channels;
channel_data *c;
size_t nchannels;
@ -592,9 +666,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
requested_calls = server->requested_calls;
requested_call_count = server->requested_call_count;
server->requested_calls = NULL;
server->requested_call_count = 0;
memset(&server->requested_calls, 0, sizeof(server->requested_calls));
server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
@ -623,13 +695,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_free(channels);
/* terminate all the requested calls */
for (i = 0; i < requested_call_count; i++) {
requested_calls[i].cb(server, requested_calls[i].cq,
requested_calls[i].call, requested_calls[i].details,
requested_calls[i].initial_metadata, NULL,
requested_calls[i].user_data);
for (i = 0; i < requested_calls.count; i++) {
fail_call(server, &requested_calls.calls[i]);
}
gpr_free(requested_calls);
gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@ -675,17 +744,12 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
}
static grpc_call_error queue_call_request(grpc_server *server,
grpc_completion_queue *cq,
grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
new_call_cb cb, void *user_data) {
requested_call *rc) {
call_data *calld;
requested_call *rc;
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
cb(server, cq, call, details, initial_metadata, NULL, user_data);
fail_call(server, rc);
return GRPC_CALL_OK;
}
calld = call_list_remove_head(server, PENDING_START);
@ -693,29 +757,60 @@ static grpc_call_error queue_call_request(grpc_server *server,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
cb(server, cq, call, details, initial_metadata, calld, user_data);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
if (server->requested_call_count == server->requested_call_capacity) {
server->requested_call_capacity =
GPR_MAX(server->requested_call_capacity + 8,
server->requested_call_capacity * 2);
server->requested_calls =
gpr_realloc(server->requested_calls,
sizeof(requested_call) * server->requested_call_capacity);
}
rc = &server->requested_calls[server->requested_call_count++];
rc->cb = cb;
rc->cq = cq;
rc->call = call;
rc->details = details;
rc->user_data = user_data;
rc->initial_metadata = initial_metadata;
*requested_call_array_add(&server->requested_calls) = *rc;
gpr_mu_unlock(&server->mu);
return GRPC_CALL_OK;
}
}
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq, void *tag) {
requested_call rc;
grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.data.batch.cq = cq;
rc.data.batch.call = call;
rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc);
}
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload, grpc_completion_queue *cq, void *tag) {
requested_call rc;
grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.data.registered.cq = cq;
rc.data.registered.call = call;
rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata;
rc.data.registered.optional_payload = optional_payload;
return queue_call_request(server, &rc);
}
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
requested_call rc;
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
rc.type = LEGACY_CALL;
rc.tag = tag_new;
return queue_call_request(server, &rc);
}
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
@ -727,6 +822,108 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc) {
grpc_ioreq_completion_func publish;
grpc_ioreq req[2];
grpc_ioreq *r = req;
/* called once initial metadata has been read by the call, but BEFORE
the ioreq to fetch it out of the call has been executed.
This means metadata related fields can be relied on in calld, but to
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
switch (rc->type) {
case LEGACY_CALL:
calld->legacy = gpr_malloc(sizeof(legacy_data));
memset(calld->legacy, 0, sizeof(legacy_data));
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = &calld->legacy->initial_metadata;
r++;
publish = publish_legacy;
break;
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
grpc_call_set_completion_queue(calld->call, rc->data.batch.cq);
*rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
grpc_call_set_completion_queue(calld->call, rc->data.registered.cq);
*rc->data.registered.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
r++;
if (rc->data.registered.optional_payload) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
publish = publish_registered_or_batch;
break;
}
grpc_call_internal_ref(calld->call);
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
rc->tag);
}
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
case LEGACY_CALL:
grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL,
NULL, gpr_inf_past, 0, NULL);
break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
grpc_cq_end_op_complete(rc->data.batch.cq, rc->tag, NULL, do_nothing,
NULL, GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op_complete(rc->data.registered.cq, rc->tag, NULL, do_nothing,
NULL, GRPC_OP_ERROR);
break;
}
}
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
calld->legacy->initial_metadata.count,
calld->legacy->initial_metadata.metadata);
} else {
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_cq_end_op_complete(grpc_call_get_completion_queue(call), tag, call,
do_nothing, NULL, status);
}
#if 0
static void publish_request(grpc_call *call, grpc_op_error status, void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
@ -748,9 +945,14 @@ static void publish_request(grpc_call *call, grpc_op_error status, void *tag) {
static void begin_request(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
registered_method *registered_method, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload,
call_data *calld, void *tag) {
grpc_ioreq req;
GPR_ASSERT(registered_method == NULL);
GPR_ASSERT(deadline == NULL);
GPR_ASSERT(optional_payload == NULL);
if (!calld) {
*call = NULL;
initial_metadata->count = 0;
@ -767,13 +969,39 @@ static void begin_request(grpc_server *server, grpc_completion_queue *cq,
tag);
}
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq, void *tag) {
grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
return queue_call_request(server, cq, call, details, initial_metadata,
begin_request, tag);
static void begin_registered_request(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
registered_method *registered_method, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload,
call_data *calld, void *tag) {
grpc_ioreq req[2];
grpc_ioreq *r;
GPR_ASSERT(registered_method != NULL);
GPR_ASSERT(deadline != NULL);
GPR_ASSERT(optional_payload != NULL);
if (!calld) {
*call = NULL;
initial_metadata->count = 0;
grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
return;
}
calld->details = NULL;
calld->registered_method = registered_method;
grpc_call_set_completion_queue(calld->call, cq);
*call = calld->call;
r = req;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = initial_metadata;
r++;
if (optional_payload != NULL) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = optional_payload;
r++;
}
grpc_call_internal_ref(calld->call);
grpc_call_start_ioreq_and_call_back(calld->call, req, r - rq, publish_request,
tag);
}
static void publish_legacy_request(grpc_call *call, grpc_op_error status,
@ -796,9 +1024,11 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status,
}
static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *tag) {
grpc_call **call, grpc_call_details *details,
registered_method *registered_method, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload,
call_data *calld, void *tag) {
grpc_ioreq req;
GPR_ASSERT(call == NULL);
GPR_ASSERT(details == NULL);
@ -818,15 +1048,7 @@ static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
publish_legacy_request, tag);
}
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
grpc_metadata_array *client_metadata =
gpr_malloc(sizeof(grpc_metadata_array));
memset(client_metadata, 0, sizeof(*client_metadata));
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
return queue_call_request(server, server->cq, NULL, NULL, client_metadata,
begin_legacy_request, tag_new);
}
#endif
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;

Loading…
Cancel
Save