diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index b7d14a521eb..532fd2d157e 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -413,13 +413,6 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq); drained and no threads are executing grpc_completion_queue_next */ void grpc_completion_queue_destroy(grpc_completion_queue *cq); -/* Create a call given a grpc_channel, in order to call 'method'. The request - is not sent until grpc_call_invoke is called. All completions are sent to - 'completion_queue'. */ -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec deadline); - /* Create a call given a grpc_channel, in order to call 'method'. The request is not sent until grpc_call_invoke is called. All completions are sent to 'completion_queue'. */ @@ -475,48 +468,6 @@ void grpc_channel_destroy(grpc_channel *channel); If a grpc_call fails, it's guaranteed that no change to the call state has been made. */ -/* Add a single metadata element to the call, to be sent upon invocation. - flags is a bit-field combination of the write flags defined above. - REQUIRES: grpc_call_start_invoke/grpc_call_server_end_initial_metadata have - not been called on this call. - Produces no events. */ -grpc_call_error grpc_call_add_metadata_old(grpc_call *call, - grpc_metadata *metadata, - gpr_uint32 flags); - -/* Invoke the RPC. Starts sending metadata and request headers on the wire. - flags is a bit-field combination of the write flags defined above. - REQUIRES: Can be called at most once per call. - Can only be called on the client. - Produces a GRPC_CLIENT_METADATA_READ event with metadata_read_tag when - the servers initial metadata has been read. - Produces a GRPC_FINISHED event with finished_tag when the call has been - completed (there may be other events for the call pending at this - time) */ -grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags); - -/* Accept an incoming RPC, binding a completion queue to it. - To be called before sending or receiving messages. - REQUIRES: Can be called at most once per call. - Can only be called on the server. - Produces a GRPC_FINISHED event with finished_tag when the call has been - completed (there may be other events for the call pending at this - time) */ -grpc_call_error grpc_call_server_accept_old(grpc_call *call, - grpc_completion_queue *cq, - void *finished_tag); - -/* Start sending metadata. - To be called before sending messages. - flags is a bit-field combination of the write flags defined above. - REQUIRES: Can be called at most once per call. - Can only be called on the server. - Must be called after grpc_call_server_accept */ -grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags); - /* Called by clients to cancel an RPC on the server. Can be called multiple times, from any thread. */ grpc_call_error grpc_call_cancel(grpc_call *call); @@ -531,66 +482,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description); -/* Queue a byte buffer for writing. - flags is a bit-field combination of the write flags defined above. - A write with byte_buffer null is allowed, and will not send any bytes on the - wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides - a mechanism to flush any previously buffered writes to outgoing flow control. - REQUIRES: No other writes are pending on the call. It is only safe to - start the next write after the corresponding write_accepted event - is received. - GRPC_INVOKE_ACCEPTED must have been received by the application - prior to calling this on the client. On the server, - grpc_call_server_end_of_initial_metadata must have been called - successfully. - Produces a GRPC_WRITE_ACCEPTED event. */ -grpc_call_error grpc_call_start_write_old(grpc_call *call, - grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags); - -/* Queue a status for writing. - REQUIRES: No other writes are pending on the call. - grpc_call_server_end_initial_metadata must have been called on the - call prior to calling this. - Only callable on the server. - Produces a GRPC_FINISH_ACCEPTED event when the status is sent. */ -grpc_call_error grpc_call_start_write_status_old(grpc_call *call, - grpc_status_code status_code, - const char *status_message, - void *tag); - -/* No more messages to send. - REQUIRES: No other writes are pending on the call. - Only callable on the client. - Produces a GRPC_FINISH_ACCEPTED event when all bytes for the call have passed - outgoing flow control. */ -grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag); - -/* Initiate a read on a call. Output event contains a byte buffer with the - result of the read. - REQUIRES: No other reads are pending on the call. It is only safe to start - the next read after the corresponding read event is received. - On the client: - GRPC_INVOKE_ACCEPTED must have been received by the application - prior to calling this. - On the server: - grpc_call_server_accept must be called before calling this. - Produces a single GRPC_READ event. */ -grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag); - /* Destroy a call. */ void grpc_call_destroy(grpc_call *call); -/* Request a call on a server. - Allows the server to create a single GRPC_SERVER_RPC_NEW event, with tag - tag_new. - If the call is subsequently cancelled, the cancellation will occur with tag - tag_cancel. - REQUIRES: Server must not have been shutdown. - NOTE: calling this is the only way to obtain GRPC_SERVER_RPC_NEW events. */ -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new); - /* Request notification of a new call */ grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6ca1b4e9a13..189a2fd2b3e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -46,9 +46,6 @@ #include #include -typedef struct legacy_state legacy_state; -static void destroy_legacy_state(legacy_state *ls); - typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -225,10 +222,6 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; - - /* Data that the legacy api needs to track. To be deleted at some point - soon */ - legacy_state *legacy_state; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -352,9 +345,6 @@ static void destroy_call(void *call, int ignored_success) { } grpc_sopb_destroy(&c->send_ops); grpc_sopb_destroy(&c->recv_ops); - if (c->legacy_state) { - destroy_legacy_state(c->legacy_state); - } grpc_bbq_destroy(&c->incoming_queue); gpr_slice_buffer_destroy(&c->incoming_message); gpr_free(c); @@ -403,12 +393,6 @@ static void set_status_details(grpc_call *call, status_source source, call->status[source].details = status; } -static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { - if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; - call->cq = cq; - return GRPC_CALL_OK; -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -1265,312 +1249,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, tag); } - -/* - * LEGACY API IMPLEMENTATION - * All this code will disappear as soon as wrappings are updated - */ - -struct legacy_state { - gpr_uint8 md_out_buffer; - size_t md_out_count[2]; - size_t md_out_capacity[2]; - grpc_metadata *md_out[2]; - grpc_byte_buffer *msg_out; - - /* input buffers */ - grpc_metadata_array initial_md_in; - grpc_metadata_array trailing_md_in; - - size_t details_capacity; - char *details; - grpc_status_code status; - - char *send_details; - - size_t msg_in_read_idx; - grpc_byte_buffer *msg_in; - - void *finished_tag; -}; - -static legacy_state *get_legacy_state(grpc_call *call) { - if (call->legacy_state == NULL) { - call->legacy_state = gpr_malloc(sizeof(legacy_state)); - memset(call->legacy_state, 0, sizeof(legacy_state)); - } - return call->legacy_state; -} - -static void destroy_legacy_state(legacy_state *ls) { - size_t i, j; - for (i = 0; i < 2; i++) { - for (j = 0; j < ls->md_out_count[i]; j++) { - gpr_free((char *)ls->md_out[i][j].key); - gpr_free((char *)ls->md_out[i][j].value); - } - gpr_free(ls->md_out[i]); - } - gpr_free(ls->initial_md_in.metadata); - gpr_free(ls->trailing_md_in.metadata); - gpr_free(ls->details); - gpr_free(ls->send_details); - gpr_free(ls); -} - -grpc_call_error grpc_call_add_metadata_old(grpc_call *call, - grpc_metadata *metadata, - gpr_uint32 flags) { - legacy_state *ls; - grpc_metadata *mdout; - - lock(call); - ls = get_legacy_state(call); - - if (ls->md_out_count[ls->md_out_buffer] == - ls->md_out_capacity[ls->md_out_buffer]) { - ls->md_out_capacity[ls->md_out_buffer] = - GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, - ls->md_out_capacity[ls->md_out_buffer] + 8); - ls->md_out[ls->md_out_buffer] = gpr_realloc( - ls->md_out[ls->md_out_buffer], - sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); - } - mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; - mdout->key = gpr_strdup(metadata->key); - mdout->value = gpr_malloc(metadata->value_length); - mdout->value_length = metadata->value_length; - memcpy((char *)mdout->value, metadata->value, metadata->value_length); - - unlock(call); - - return GRPC_CALL_OK; -} - -static void finish_status(grpc_call *call, grpc_op_error status, - void *ignored) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - ls->status, ls->details, ls->trailing_md_in.metadata, - ls->trailing_md_in.count); - unlock(call); -} - -static void finish_recv_metadata(grpc_call *call, grpc_op_error status, - void *tag) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - if (status == GRPC_OP_OK) { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, - ls->initial_md_in.count, - ls->initial_md_in.metadata); - - } else { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, - NULL); - } - unlock(call); -} - -static void finish_send_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[4]; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) goto done; - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - ls->md_out_buffer++; - err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; - reqs[0].data.recv_metadata = &ls->initial_md_in; - err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; - reqs[0].data.recv_metadata = &ls->trailing_md_in; - reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status.user_data = &ls->status; - reqs[1].data.recv_status.set_value = set_status_value_directly; - reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; - reqs[2].data.recv_status_details.details = &ls->details; - reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; - reqs[3].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 4, finish_status, NULL); - if (err != GRPC_CALL_OK) goto done; - -done: - unlock(call); - return err; -} - -grpc_call_error grpc_call_server_accept_old(grpc_call *call, - grpc_completion_queue *cq, - void *finished_tag) { - grpc_ioreq reqs[2]; - grpc_call_error err; - legacy_state *ls; - - /* inform the completion queue of an incoming operation (corresponding to - finished_tag) */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) { - unlock(call); - return err; - } - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_RECV_STATUS; - reqs[0].data.recv_status.user_data = &ls->status; - reqs[0].data.recv_status.set_value = set_status_value_directly; - reqs[1].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 2, finish_status, NULL); - unlock(call); - return err; -} - -static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - grpc_ioreq req; - grpc_call_error err; - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; - req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL); - unlock(call); - - return err; -} - -static void finish_read_event(void *p, grpc_op_error error) { - if (p) grpc_byte_buffer_destroy(p); -} - -static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { - legacy_state *ls; - grpc_byte_buffer *msg; - lock(call); - ls = get_legacy_state(call); - msg = ls->msg_in; - grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); - unlock(call); -} - -grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) { - legacy_state *ls; - grpc_ioreq req; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_READ); - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_RECV_MESSAGE; - req.data.recv_message = &ls->msg_in; - err = start_ioreq(call, &req, 1, finish_read, tag); - unlock(call); - return err; -} - -static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { - lock(call); - grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out); - unlock(call); - grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_start_write_old(grpc_call *call, - grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - grpc_ioreq req; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - ls->msg_out = grpc_byte_buffer_copy(byte_buffer); - req.op = GRPC_IOREQ_SEND_MESSAGE; - req.data.send_message = ls->msg_out; - err = start_ioreq(call, &req, 1, finish_write, tag); - unlock(call); - - return err; -} - -static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { - grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) { - grpc_ioreq req; - grpc_call_error err; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - req.op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, &req, 1, finish_finish, tag); - unlock(call); - - return err; -} - -grpc_call_error grpc_call_start_write_status_old(grpc_call *call, - grpc_status_code status, - const char *details, - void *tag) { - grpc_ioreq reqs[3]; - grpc_call_error err; - legacy_state *ls; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - reqs[1].op = GRPC_IOREQ_SEND_STATUS; - reqs[1].data.send_status.code = status; - reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); - reqs[2].op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, reqs, 3, finish_finish, tag); - unlock(call); - - return err; -} diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 83caefcbc60..ac89589dc8b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -69,7 +69,7 @@ typedef struct { call_data *prev; } call_link; -typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type; +typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; @@ -165,10 +165,6 @@ typedef enum { ZOMBIED } call_state; -typedef struct legacy_data { - grpc_metadata_array initial_metadata; -} legacy_data; - struct call_data { grpc_call *call; @@ -178,7 +174,6 @@ struct call_data { gpr_timespec deadline; int got_initial_metadata; - legacy_data *legacy; grpc_completion_queue *cq_new; grpc_stream_op_buffer *recv_ops; @@ -557,11 +552,6 @@ static void destroy_call_elem(grpc_call_element *elem) { grpc_mdstr_unref(calld->path); } - if (calld->legacy) { - gpr_free(calld->legacy->initial_metadata.metadata); - gpr_free(calld->legacy); - } - server_unref(chand->server); } @@ -998,7 +988,6 @@ static grpc_call_error queue_call_request(grpc_server *server, return GRPC_CALL_OK; } switch (rc->type) { - case LEGACY_CALL: case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); @@ -1057,16 +1046,6 @@ grpc_call_error grpc_server_request_registered_call( return queue_call_request(server, &rc); } -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new) { - requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW); - rc.type = LEGACY_CALL; - rc.tag = tag_new; - return queue_call_request(server, &rc); -} - -static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag); static void publish_was_not_set(grpc_call *call, grpc_op_error status, @@ -1098,14 +1077,6 @@ static void begin_call(grpc_server *server, call_data *calld, an ioreq op, that should complete immediately. */ switch (rc->type) { - case LEGACY_CALL: - calld->legacy = gpr_malloc(sizeof(legacy_data)); - memset(calld->legacy, 0, sizeof(legacy_data)); - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = &calld->legacy->initial_metadata; - r++; - publish = publish_legacy; - break; case BATCH_CALL: cpstr(&rc->data.batch.details->host, &rc->data.batch.details->host_capacity, calld->host); @@ -1144,10 +1115,6 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc) { switch (rc->type) { - case LEGACY_CALL: - grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, - NULL, NULL, NULL, gpr_inf_past, 0, NULL); - break; case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; @@ -1163,25 +1130,6 @@ static void fail_call(grpc_server *server, requested_call *rc) { } } -static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - - if (status == GRPC_OP_OK) { - grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL, - grpc_mdstr_as_c_string(calld->path), - grpc_mdstr_as_c_string(calld->host), calld->deadline, - calld->legacy->initial_metadata.count, - calld->legacy->initial_metadata.metadata); - } else { - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - } -} - static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem =