Remove _old apis from the core library

pull/1397/head
Craig Tiller 10 years ago
parent 2288b2d61c
commit 0e91956b49
  1. 106
      include/grpc/grpc.h
  2. 325
      src/core/surface/call.c
  3. 54
      src/core/surface/server.c

@ -413,13 +413,6 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
drained and no threads are executing grpc_completion_queue_next */
void grpc_completion_queue_destroy(grpc_completion_queue *cq);
/* Create a call given a grpc_channel, in order to call 'method'. The request
is not sent until grpc_call_invoke is called. All completions are sent to
'completion_queue'. */
grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec deadline);
/* Create a call given a grpc_channel, in order to call 'method'. The request
is not sent until grpc_call_invoke is called. All completions are sent to
'completion_queue'. */
@ -475,48 +468,6 @@ void grpc_channel_destroy(grpc_channel *channel);
If a grpc_call fails, it's guaranteed that no change to the call state
has been made. */
/* Add a single metadata element to the call, to be sent upon invocation.
flags is a bit-field combination of the write flags defined above.
REQUIRES: grpc_call_start_invoke/grpc_call_server_end_initial_metadata have
not been called on this call.
Produces no events. */
grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
grpc_metadata *metadata,
gpr_uint32 flags);
/* Invoke the RPC. Starts sending metadata and request headers on the wire.
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the client.
Produces a GRPC_CLIENT_METADATA_READ event with metadata_read_tag when
the servers initial metadata has been read.
Produces a GRPC_FINISHED event with finished_tag when the call has been
completed (there may be other events for the call pending at this
time) */
grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags);
/* Accept an incoming RPC, binding a completion queue to it.
To be called before sending or receiving messages.
REQUIRES: Can be called at most once per call.
Can only be called on the server.
Produces a GRPC_FINISHED event with finished_tag when the call has been
completed (there may be other events for the call pending at this
time) */
grpc_call_error grpc_call_server_accept_old(grpc_call *call,
grpc_completion_queue *cq,
void *finished_tag);
/* Start sending metadata.
To be called before sending messages.
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the server.
Must be called after grpc_call_server_accept */
grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
gpr_uint32 flags);
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread. */
grpc_call_error grpc_call_cancel(grpc_call *call);
@ -531,66 +482,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description);
/* Queue a byte buffer for writing.
flags is a bit-field combination of the write flags defined above.
A write with byte_buffer null is allowed, and will not send any bytes on the
wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides
a mechanism to flush any previously buffered writes to outgoing flow control.
REQUIRES: No other writes are pending on the call. It is only safe to
start the next write after the corresponding write_accepted event
is received.
GRPC_INVOKE_ACCEPTED must have been received by the application
prior to calling this on the client. On the server,
grpc_call_server_end_of_initial_metadata must have been called
successfully.
Produces a GRPC_WRITE_ACCEPTED event. */
grpc_call_error grpc_call_start_write_old(grpc_call *call,
grpc_byte_buffer *byte_buffer,
void *tag, gpr_uint32 flags);
/* Queue a status for writing.
REQUIRES: No other writes are pending on the call.
grpc_call_server_end_initial_metadata must have been called on the
call prior to calling this.
Only callable on the server.
Produces a GRPC_FINISH_ACCEPTED event when the status is sent. */
grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
grpc_status_code status_code,
const char *status_message,
void *tag);
/* No more messages to send.
REQUIRES: No other writes are pending on the call.
Only callable on the client.
Produces a GRPC_FINISH_ACCEPTED event when all bytes for the call have passed
outgoing flow control. */
grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag);
/* Initiate a read on a call. Output event contains a byte buffer with the
result of the read.
REQUIRES: No other reads are pending on the call. It is only safe to start
the next read after the corresponding read event is received.
On the client:
GRPC_INVOKE_ACCEPTED must have been received by the application
prior to calling this.
On the server:
grpc_call_server_accept must be called before calling this.
Produces a single GRPC_READ event. */
grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag);
/* Destroy a call. */
void grpc_call_destroy(grpc_call *call);
/* Request a call on a server.
Allows the server to create a single GRPC_SERVER_RPC_NEW event, with tag
tag_new.
If the call is subsequently cancelled, the cancellation will occur with tag
tag_cancel.
REQUIRES: Server must not have been shutdown.
NOTE: calling this is the only way to obtain GRPC_SERVER_RPC_NEW events. */
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new);
/* Request notification of a new call */
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,

@ -46,9 +46,6 @@
#include <stdlib.h>
#include <string.h>
typedef struct legacy_state legacy_state;
static void destroy_legacy_state(legacy_state *ls);
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@ -225,10 +222,6 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@ -352,9 +345,6 @@ static void destroy_call(void *call, int ignored_success) {
}
grpc_sopb_destroy(&c->send_ops);
grpc_sopb_destroy(&c->recv_ops);
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
gpr_slice_buffer_destroy(&c->incoming_message);
gpr_free(c);
@ -403,12 +393,6 @@ static void set_status_details(grpc_call *call, status_source source,
call->status[source].details = status;
}
static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
call->cq = cq;
return GRPC_CALL_OK;
}
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@ -1265,312 +1249,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
tag);
}
/*
* LEGACY API IMPLEMENTATION
* All this code will disappear as soon as wrappings are updated
*/
struct legacy_state {
gpr_uint8 md_out_buffer;
size_t md_out_count[2];
size_t md_out_capacity[2];
grpc_metadata *md_out[2];
grpc_byte_buffer *msg_out;
/* input buffers */
grpc_metadata_array initial_md_in;
grpc_metadata_array trailing_md_in;
size_t details_capacity;
char *details;
grpc_status_code status;
char *send_details;
size_t msg_in_read_idx;
grpc_byte_buffer *msg_in;
void *finished_tag;
};
static legacy_state *get_legacy_state(grpc_call *call) {
if (call->legacy_state == NULL) {
call->legacy_state = gpr_malloc(sizeof(legacy_state));
memset(call->legacy_state, 0, sizeof(legacy_state));
}
return call->legacy_state;
}
static void destroy_legacy_state(legacy_state *ls) {
size_t i, j;
for (i = 0; i < 2; i++) {
for (j = 0; j < ls->md_out_count[i]; j++) {
gpr_free((char *)ls->md_out[i][j].key);
gpr_free((char *)ls->md_out[i][j].value);
}
gpr_free(ls->md_out[i]);
}
gpr_free(ls->initial_md_in.metadata);
gpr_free(ls->trailing_md_in.metadata);
gpr_free(ls->details);
gpr_free(ls->send_details);
gpr_free(ls);
}
grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
grpc_metadata *metadata,
gpr_uint32 flags) {
legacy_state *ls;
grpc_metadata *mdout;
lock(call);
ls = get_legacy_state(call);
if (ls->md_out_count[ls->md_out_buffer] ==
ls->md_out_capacity[ls->md_out_buffer]) {
ls->md_out_capacity[ls->md_out_buffer] =
GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
ls->md_out_capacity[ls->md_out_buffer] + 8);
ls->md_out[ls->md_out_buffer] = gpr_realloc(
ls->md_out[ls->md_out_buffer],
sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
}
mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
mdout->key = gpr_strdup(metadata->key);
mdout->value = gpr_malloc(metadata->value_length);
mdout->value_length = metadata->value_length;
memcpy((char *)mdout->value, metadata->value, metadata->value_length);
unlock(call);
return GRPC_CALL_OK;
}
static void finish_status(grpc_call *call, grpc_op_error status,
void *ignored) {
legacy_state *ls;
lock(call);
ls = get_legacy_state(call);
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
ls->status, ls->details, ls->trailing_md_in.metadata,
ls->trailing_md_in.count);
unlock(call);
}
static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
void *tag) {
legacy_state *ls;
lock(call);
ls = get_legacy_state(call);
if (status == GRPC_OP_OK) {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
ls->initial_md_in.count,
ls->initial_md_in.metadata);
} else {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
NULL);
}
unlock(call);
}
static void finish_send_metadata(grpc_call *call, grpc_op_error status,
void *tag) {}
grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags) {
grpc_ioreq reqs[4];
legacy_state *ls;
grpc_call_error err;
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
lock(call);
ls = get_legacy_state(call);
err = bind_cq(call, cq);
if (err != GRPC_CALL_OK) goto done;
ls->finished_tag = finished_tag;
reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
ls->md_out_buffer++;
err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
reqs[0].data.recv_metadata = &ls->initial_md_in;
err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trailing_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
reqs[1].data.recv_status.user_data = &ls->status;
reqs[1].data.recv_status.set_value = set_status_value_directly;
reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS;
reqs[2].data.recv_status_details.details = &ls->details;
reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity;
reqs[3].op = GRPC_IOREQ_RECV_CLOSE;
err = start_ioreq(call, reqs, 4, finish_status, NULL);
if (err != GRPC_CALL_OK) goto done;
done:
unlock(call);
return err;
}
grpc_call_error grpc_call_server_accept_old(grpc_call *call,
grpc_completion_queue *cq,
void *finished_tag) {
grpc_ioreq reqs[2];
grpc_call_error err;
legacy_state *ls;
/* inform the completion queue of an incoming operation (corresponding to
finished_tag) */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
lock(call);
ls = get_legacy_state(call);
err = bind_cq(call, cq);
if (err != GRPC_CALL_OK) {
unlock(call);
return err;
}
ls->finished_tag = finished_tag;
reqs[0].op = GRPC_IOREQ_RECV_STATUS;
reqs[0].data.recv_status.user_data = &ls->status;
reqs[0].data.recv_status.set_value = set_status_value_directly;
reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
err = start_ioreq(call, reqs, 2, finish_status, NULL);
unlock(call);
return err;
}
static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
void *tag) {}
grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
gpr_uint32 flags) {
grpc_ioreq req;
grpc_call_error err;
legacy_state *ls;
lock(call);
ls = get_legacy_state(call);
req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
unlock(call);
return err;
}
static void finish_read_event(void *p, grpc_op_error error) {
if (p) grpc_byte_buffer_destroy(p);
}
static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
legacy_state *ls;
grpc_byte_buffer *msg;
lock(call);
ls = get_legacy_state(call);
msg = ls->msg_in;
grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
unlock(call);
}
grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
legacy_state *ls;
grpc_ioreq req;
grpc_call_error err;
grpc_cq_begin_op(call->cq, call, GRPC_READ);
lock(call);
ls = get_legacy_state(call);
req.op = GRPC_IOREQ_RECV_MESSAGE;
req.data.recv_message = &ls->msg_in;
err = start_ioreq(call, &req, 1, finish_read, tag);
unlock(call);
return err;
}
static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
lock(call);
grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
unlock(call);
grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
}
grpc_call_error grpc_call_start_write_old(grpc_call *call,
grpc_byte_buffer *byte_buffer,
void *tag, gpr_uint32 flags) {
grpc_ioreq req;
legacy_state *ls;
grpc_call_error err;
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
lock(call);
ls = get_legacy_state(call);
ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
req.op = GRPC_IOREQ_SEND_MESSAGE;
req.data.send_message = ls->msg_out;
err = start_ioreq(call, &req, 1, finish_write, tag);
unlock(call);
return err;
}
static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
}
grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
grpc_ioreq req;
grpc_call_error err;
grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
lock(call);
req.op = GRPC_IOREQ_SEND_CLOSE;
err = start_ioreq(call, &req, 1, finish_finish, tag);
unlock(call);
return err;
}
grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
grpc_status_code status,
const char *details,
void *tag) {
grpc_ioreq reqs[3];
grpc_call_error err;
legacy_state *ls;
grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
lock(call);
ls = get_legacy_state(call);
reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
reqs[1].op = GRPC_IOREQ_SEND_STATUS;
reqs[1].data.send_status.code = status;
reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details);
reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
err = start_ioreq(call, reqs, 3, finish_finish, tag);
unlock(call);
return err;
}

@ -69,7 +69,7 @@ typedef struct {
call_data *prev;
} call_link;
typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
@ -165,10 +165,6 @@ typedef enum {
ZOMBIED
} call_state;
typedef struct legacy_data {
grpc_metadata_array initial_metadata;
} legacy_data;
struct call_data {
grpc_call *call;
@ -178,7 +174,6 @@ struct call_data {
gpr_timespec deadline;
int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
grpc_stream_op_buffer *recv_ops;
@ -557,11 +552,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
grpc_mdstr_unref(calld->path);
}
if (calld->legacy) {
gpr_free(calld->legacy->initial_metadata.metadata);
gpr_free(calld->legacy);
}
server_unref(chand->server);
}
@ -998,7 +988,6 @@ static grpc_call_error queue_call_request(grpc_server *server,
return GRPC_CALL_OK;
}
switch (rc->type) {
case LEGACY_CALL:
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
@ -1057,16 +1046,6 @@ grpc_call_error grpc_server_request_registered_call(
return queue_call_request(server, &rc);
}
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
requested_call rc;
grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
rc.type = LEGACY_CALL;
rc.tag = tag_new;
return queue_call_request(server, &rc);
}
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status,
@ -1098,14 +1077,6 @@ static void begin_call(grpc_server *server, call_data *calld,
an ioreq op, that should complete immediately. */
switch (rc->type) {
case LEGACY_CALL:
calld->legacy = gpr_malloc(sizeof(legacy_data));
memset(calld->legacy, 0, sizeof(legacy_data));
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = &calld->legacy->initial_metadata;
r++;
publish = publish_legacy;
break;
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
@ -1144,10 +1115,6 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
case LEGACY_CALL:
grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
NULL, NULL, NULL, gpr_inf_past, 0, NULL);
break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
@ -1163,25 +1130,6 @@ static void fail_call(grpc_server *server, requested_call *rc) {
}
}
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
calld->legacy->initial_metadata.count,
calld->legacy->initial_metadata.metadata);
} else {
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =

Loading…
Cancel
Save