From cce17ac033ac89563141f8c4cad9c842adac37ef Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 20 Jan 2015 09:29:28 -0800 Subject: [PATCH] First compiling version of the new C api. --- README.md | 4 +- include/grpc/grpc.h | 90 +- src/core/channel/connected_channel.c | 8 +- src/core/surface/call.c | 1265 ++++++++--------- src/core/surface/call.h | 17 +- src/core/surface/channel.c | 2 +- src/core/surface/client.c | 10 +- src/core/surface/completion_queue.c | 23 +- src/core/surface/completion_queue.h | 4 + src/core/surface/event_string.c | 6 +- src/core/surface/lame_client.c | 16 +- src/core/surface/server.c | 218 ++- src/core/transport/chttp2/stream_encoder.c | 2 +- src/cpp/client/channel.cc | 1 + test/core/echo/client.c | 2 +- test/core/echo/server.c | 2 +- test/core/end2end/cq_verifier.c | 12 +- test/core/end2end/dualstack_socket_test.c | 4 +- test/core/end2end/no_server_test.c | 2 +- test/core/end2end/tests/cancel_after_accept.c | 5 +- .../cancel_after_accept_and_writes_closed.c | 5 +- test/core/end2end/tests/cancel_after_invoke.c | 3 +- .../core/end2end/tests/cancel_before_invoke.c | 3 +- test/core/end2end/tests/cancel_in_a_vacuum.c | 3 +- .../end2end/tests/census_simple_request.c | 5 +- test/core/end2end/tests/disappearing_server.c | 5 +- ..._server_shutdown_finishes_inflight_calls.c | 5 +- .../early_server_shutdown_finishes_tags.c | 2 +- .../end2end/tests/graceful_server_shutdown.c | 5 +- .../core/end2end/tests/invoke_large_request.c | 5 +- .../end2end/tests/max_concurrent_streams.c | 18 +- test/core/end2end/tests/ping_pong_streaming.c | 5 +- ...esponse_with_binary_metadata_and_payload.c | 5 +- ...quest_response_with_metadata_and_payload.c | 5 +- .../tests/request_response_with_payload.c | 5 +- ...ponse_with_trailing_metadata_and_payload.c | 5 +- .../tests/request_with_large_metadata.c | 5 +- .../core/end2end/tests/request_with_payload.c | 5 +- .../end2end/tests/simple_delayed_request.c | 5 +- test/core/end2end/tests/simple_request.c | 10 +- test/core/end2end/tests/thread_stress.c | 4 +- .../writes_done_hangs_with_pending_read.c | 5 +- test/core/fling/client.c | 4 +- test/core/fling/server.c | 2 +- test/core/surface/completion_queue_test.c | 27 - test/core/surface/lame_client_test.c | 2 +- 46 files changed, 910 insertions(+), 936 deletions(-) diff --git a/README.md b/README.md index fa39d3b3089..dde057e55a2 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ fleshing out the details of each of the required operations. A gRPC RPC comprises of a bidirectional stream of messages, initiated by the client. In the client-to-server direction, this stream begins with a mandatory `Call Header`, followed by optional `Initial-Metadata`, followed by zero or more `Payload Messages`. The server-to-client direction contains an optional `Initial-Metadata`, followed by zero or more `Payload Messages` terminated with a mandatory `Status` and optional `Status-Metadata` (a.k.a.,`Trailing-Metadata`). ## Implementation over HTTP/2 -The abstract protocol defined above is implemented over [HTTP/2](https://http2.github.io/). gRPC bidirectional streams are mapped to HTTP/2 streams. The contents of `Call Header` and `Initial Metadata` are sent as HTTP/2 headers and subject to HPAC compression. `Payload Messages` are serialized into a byte stream of length prefixed gRPC frames which are then fragmented into HTTP/2 frames at the sender and reassembled at the receiver. `Status` and `Trailing-Metadata` are sent as HTTP/2 trailing headers (a.k.a., trailers). +The abstract protocol defined above is implemented over [HTTP/2](https://http2.github.io/). gRPC bidirectional streams are mapped to HTTP/2 streams. The contents of `Call Header` and `Initial Metadata` are sent as HTTP/2 headers and subject to HPACK compression. `Payload Messages` are serialized into a byte stream of length prefixed gRPC frames which are then fragmented into HTTP/2 frames at the sender and reassembled at the receiver. `Status` and `Trailing-Metadata` are sent as HTTP/2 trailing headers (a.k.a., trailers). ## Flow Control -gRPC inherits the flow control mchanims in HTTP/2 and uses them to enable fine-grained control of the amount of memory used for buffering in-flight messages. +gRPC inherits the flow control mechanisms in HTTP/2 and uses them to enable fine-grained control of the amount of memory used for buffering in-flight messages. diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index af52dd95f4f..93deeaab0ae 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -183,12 +183,11 @@ typedef struct grpc_metadata { } grpc_metadata; typedef enum grpc_completion_type { - GRPC_QUEUE_SHUTDOWN, /* Shutting down */ - GRPC_READ, /* A read has completed */ - GRPC_INVOKE_ACCEPTED, /* An invoke call has been accepted by flow - control */ - GRPC_WRITE_ACCEPTED, /* A write has been accepted by - flow control */ + GRPC_QUEUE_SHUTDOWN, /* Shutting down */ + GRPC_IOREQ, /* grpc_call_ioreq completion */ + GRPC_READ, /* A read has completed */ + GRPC_WRITE_ACCEPTED, /* A write has been accepted by + flow control */ GRPC_FINISH_ACCEPTED, /* writes_done or write_status has been accepted */ GRPC_CLIENT_METADATA_READ, /* The metadata array sent by server received at client */ @@ -213,6 +212,7 @@ typedef struct grpc_event { grpc_op_error write_accepted; grpc_op_error finish_accepted; grpc_op_error invoke_accepted; + grpc_op_error ioreq; struct { size_t count; grpc_metadata *elements; @@ -233,6 +233,67 @@ typedef struct grpc_event { } data; } grpc_event; +typedef struct { + size_t count; + size_t capacity; + grpc_metadata *metadata; +} grpc_metadata_array; + +typedef struct { + size_t count; + size_t capacity; + grpc_byte_buffer **buffers; +} grpc_byte_buffer_array; + +typedef struct { + grpc_status_code status; + size_t details_length; + size_t details_capacity; + char *details; +} grpc_recv_status; + +typedef struct { + const char *method; + const char *host; + gpr_timespec deadline; +} grpc_call_details; + +typedef enum { + GRPC_IOREQ_SEND_INITIAL_METADATA = 0, + GRPC_IOREQ_SEND_TRAILING_METADATA, + GRPC_IOREQ_SEND_MESSAGES, + GRPC_IOREQ_SEND_CLOSE, + GRPC_IOREQ_RECV_INITIAL_METADATA, + GRPC_IOREQ_RECV_TRAILING_METADATA, + GRPC_IOREQ_RECV_MESSAGES, + GRPC_IOREQ_RECV_STATUS, + GRPC_IOREQ_OP_COUNT +} grpc_ioreq_op; + +typedef union { + struct { + size_t count; + const grpc_metadata *metadata; + } send_metadata; + struct { + size_t count; + grpc_byte_buffer **messages; + } send_messages; + struct { + /* fields only make sense on the server */ + grpc_status_code status; + const char *details; + } send_close; + grpc_metadata_array *recv_metadata; + grpc_byte_buffer_array *recv_messages; + grpc_recv_status *recv_status; +} grpc_ioreq_data; + +typedef struct grpc_ioreq { + grpc_ioreq_op op; + grpc_ioreq_data data; +} grpc_ioreq; + /* Initialize the grpc library */ void grpc_init(void); @@ -275,8 +336,15 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq); /* Create a call given a grpc_channel, in order to call 'method'. The request is not sent until grpc_call_invoke is called. All completions are sent to 'completion_queue'. */ -grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method, - const char *host, gpr_timespec deadline); + +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec deadline); + +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const grpc_call_details *details); + +grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, void *tag); /* Create a client channel */ grpc_channel *grpc_channel_create(const char *target, @@ -414,7 +482,11 @@ void grpc_call_destroy(grpc_call *call); tag_cancel. REQUIRES: Server must not have been shutdown. NOTE: calling this is the only way to obtain GRPC_SERVER_RPC_NEW events. */ -grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new); +grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new); + +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_completion_queue *cq, grpc_call_details *details, + grpc_metadata_array *initial_metadata, void *tag); /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index d35cede97b5..61a6caf0328 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -298,10 +298,6 @@ static void recv_error(channel_data *chand, call_data *calld, int line, static void do_nothing(void *calldata, grpc_op_error error) {} -static void done_message(void *user_data, grpc_op_error error) { - grpc_byte_buffer_destroy(user_data); -} - static void finish_message(channel_data *chand, call_data *calld) { grpc_call_element *elem = calld->elem; grpc_call_op call_op; @@ -309,9 +305,9 @@ static void finish_message(channel_data *chand, call_data *calld) { call_op.flags = 0; /* if we got all the bytes for this message, call up the stack */ call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = done_message; + call_op.done_cb = do_nothing; /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.user_data = call_op.data.message = grpc_byte_buffer_create( + call_op.data.message = grpc_byte_buffer_create( calld->incoming_message.slices, calld->incoming_message.count); gpr_slice_buffer_reset_and_unref(&calld->incoming_message); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e088793f01a..6a39d93f057 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -47,169 +47,78 @@ #define INVALID_TAG ((void *)0xdeadbeef) -/* Pending read queue - - This data structure tracks reads that need to be presented to the completion - queue but are waiting for the application to ask for them. */ - -#define INITIAL_PENDING_READ_COUNT 4 - typedef struct { - grpc_byte_buffer *byte_buffer; - void *user_data; - void (*on_finish)(void *user_data, grpc_op_error error); -} pending_read; - -/* TODO(ctiller): inline an element or two into this struct to avoid per-call - allocations */ -typedef struct { - pending_read *data; - size_t count; - size_t capacity; -} pending_read_array; - -typedef struct { - size_t drain_pos; - pending_read_array filling; - pending_read_array draining; -} pending_read_queue; - -static void pra_init(pending_read_array *array) { - array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT); - array->count = 0; - array->capacity = INITIAL_PENDING_READ_COUNT; -} - -static void pra_destroy(pending_read_array *array, - size_t finish_starting_from) { - size_t i; - for (i = finish_starting_from; i < array->count; i++) { - array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR); - } - gpr_free(array->data); -} - -/* Append an operation to an array, expanding as needed */ -static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { - if (a->count == a->capacity) { - a->capacity *= 2; - a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity); - } - a->data[a->count].byte_buffer = buffer; - a->data[a->count].user_data = user_data; - a->data[a->count].on_finish = on_finish; - a->count++; -} - -static void prq_init(pending_read_queue *q) { - q->drain_pos = 0; - pra_init(&q->filling); - pra_init(&q->draining); -} - -static void prq_destroy(pending_read_queue *q) { - pra_destroy(&q->filling, 0); - pra_destroy(&q->draining, q->drain_pos); -} - -static int prq_is_empty(pending_read_queue *q) { - return (q->drain_pos == q->draining.count && q->filling.count == 0); -} - -static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { - pra_push(&q->filling, buffer, on_finish, user_data); -} + size_t md_out_count; + size_t md_out_capacity; + grpc_mdelem **md_out; + grpc_byte_buffer *msg_out; + + /* input buffers */ + grpc_metadata_array md_in; + grpc_metadata_array trail_md_in; + grpc_recv_status status_in; + size_t msg_in_read_idx; + grpc_byte_buffer_array msg_in; -/* Take the first queue element and move it to the completion queue. Do nothing - if q is empty */ -static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, - grpc_completion_queue *cq) { - pending_read_array temp_array; - pending_read *pr; - - if (q->drain_pos == q->draining.count) { - if (q->filling.count == 0) { - return 0; - } - q->draining.count = 0; - q->drain_pos = 0; - /* swap arrays */ - temp_array = q->filling; - q->filling = q->draining; - q->draining = temp_array; - } - - pr = q->draining.data + q->drain_pos; - q->drain_pos++; - grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data, - pr->byte_buffer); - return 1; -} - -/* grpc_call proper */ + void *finished_tag; +} legacy_state; -/* the state of a call, based upon which functions have been called against - said call */ -typedef enum { - CALL_CREATED, - CALL_BOUNDCQ, - CALL_STARTED, - CALL_FINISHED -} call_state; +typedef struct reqinfo { + grpc_ioreq_data data; + struct reqinfo *master; + grpc_ioreq_completion_func on_complete; + void *user_data; + gpr_uint32 need_mask; + gpr_uint32 complete_mask; +} reqinfo; struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; grpc_mdctx *metadata_context; + /* TODO(ctiller): share with cq if possible? */ + gpr_mu mu; - call_state state; gpr_uint8 is_client; - gpr_uint8 have_write; - grpc_metadata_buffer incoming_metadata; - - /* protects variables in this section */ - gpr_mu read_mu; - gpr_uint8 received_start; - gpr_uint8 start_ok; - gpr_uint8 reads_done; - gpr_uint8 received_finish; - gpr_uint8 received_metadata; - gpr_uint8 have_read; + gpr_uint8 got_initial_metadata; gpr_uint8 have_alarm; - gpr_uint8 pending_writes_done; + gpr_uint8 read_closed; + gpr_uint8 stream_closed; gpr_uint8 got_status_code; - /* The current outstanding read message tag (only valid if have_read == 1) */ - void *read_tag; - void *metadata_tag; - void *finished_tag; - pending_read_queue prq; - - grpc_alarm alarm; - /* The current outstanding send message/context/invoke/end tag (only valid if - have_write == 1) */ - void *write_tag; - grpc_byte_buffer *pending_write; - gpr_uint32 pending_write_flags; + reqinfo requests[GRPC_IOREQ_OP_COUNT]; + grpc_byte_buffer_array buffered_messages; + grpc_metadata_array buffered_initial_metadata; + grpc_metadata_array buffered_trailing_metadata; + size_t write_index; - /* The final status of the call */ grpc_status_code status_code; grpc_mdstr *status_details; + grpc_alarm alarm; + gpr_refcount internal_refcount; + + legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) +#define SWAP(type, x, y) \ + do { \ + type temp = x; \ + x = y; \ + y = temp; \ + } while (0) + +#define TOMBSTONE_MASTER ((void *)1) +#define IS_LIVE_MASTER(x) ((x) != NULL && (x) != TOMBSTONE_MASTER) + static void do_nothing(void *ignored, grpc_op_error also_ignored) {} grpc_call *grpc_call_create(grpc_channel *channel, @@ -217,79 +126,328 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); - call->cq = NULL; + memset(call, 0, sizeof(grpc_call)); + gpr_mu_init(&call->mu); call->channel = channel; + call->is_client = server_transport_data == NULL; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); - call->state = CALL_CREATED; - call->is_client = (server_transport_data == NULL); - call->write_tag = INVALID_TAG; - call->read_tag = INVALID_TAG; - call->metadata_tag = INVALID_TAG; - call->finished_tag = INVALID_TAG; - call->have_read = 0; - call->have_write = 0; - call->have_alarm = 0; - call->received_metadata = 0; - call->got_status_code = 0; - call->start_ok = 0; - call->status_code = - server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; - call->status_details = NULL; - call->received_finish = 0; - call->reads_done = 0; - call->received_start = 0; - call->pending_write = NULL; - call->pending_writes_done = 0; - grpc_metadata_buffer_init(&call->incoming_metadata); gpr_ref_init(&call->internal_refcount, 1); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); - prq_init(&call->prq); - gpr_mu_init(&call->read_mu); return call; } +legacy_state *get_legacy_state(grpc_call *call) { + if (call->legacy_state == NULL) { + call->legacy_state = gpr_malloc(sizeof(legacy_state)); + memset(call->legacy_state, 0, sizeof(legacy_state)); + } + return call->legacy_state; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } void grpc_call_internal_unref(grpc_call *c) { if (gpr_unref(&c->internal_refcount)) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); - grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK); - if (c->status_details) { - grpc_mdstr_unref(c->status_details); - } - prq_destroy(&c->prq); - gpr_mu_destroy(&c->read_mu); grpc_channel_internal_unref(c->channel); + gpr_mu_destroy(&c->mu); + if (c->legacy_state) { + gpr_free(c->legacy_state->md_out); + gpr_free(c->legacy_state->md_in.metadata); + gpr_free(c->legacy_state->trail_md_in.metadata); + gpr_free(c->legacy_state->status_in.details); + gpr_free(c->legacy_state); + } gpr_free(c); } } +static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { + if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; + call->cq = cq; + return GRPC_CALL_OK; +} + +static void request_more_data(grpc_call *call) { + grpc_call_op op; + + /* call down */ + op.type = GRPC_REQUEST_DATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; + + grpc_call_execute_op(call, &op); +} + +#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0) + +static void start_next_step_and_unlock(grpc_call *call, reqinfo *master); + +static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error status) { + reqinfo *master = call->requests[op].master; + size_t i; + if (master == NULL || master == TOMBSTONE_MASTER) { + return; /* inactive */ + } + master->complete_mask |= 1 << op; + if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) { + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->requests[i].master == master) { + call->requests[i].master = + (i == GRPC_IOREQ_SEND_MESSAGES || i == GRPC_IOREQ_RECV_MESSAGES) + ? NULL + : TOMBSTONE_MASTER; + } + master->on_complete(call, status, master->user_data); + } + } +} + +static void finish_write_step(void *pc, grpc_op_error error) { + grpc_call *call = pc; + gpr_mu_lock(&call->mu); + if (error == GRPC_OP_OK) { + if (call->write_index == + call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK); + } + start_next_step_and_unlock(call, + call->requests[GRPC_IOREQ_SEND_MESSAGES].master); + } else { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); + gpr_mu_unlock(&call->mu); + } +} + +static void finish_finish_step(void *pc, grpc_op_error error) { + grpc_call *call = pc; + if (error == GRPC_OP_OK) { + gpr_mu_lock(&call->mu); + start_next_step_and_unlock(call, + call->requests[GRPC_IOREQ_SEND_CLOSE].master); + } else { + gpr_log(GPR_ERROR, "not implemented"); + abort(); + } +} + +static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { + reqinfo *requests = call->requests; + grpc_byte_buffer *send_message = NULL; + size_t i; + gpr_uint32 incomplete = master->need_mask & ~master->complete_mask; + gpr_uint8 send_initial_metadata = 0; + gpr_uint8 send_trailing_metadata = 0; + gpr_uint8 send_blocked = 0; + gpr_uint8 send_finished = 0; + gpr_uint8 completed; + + if (!send_blocked && + OP_IN_MASK(GRPC_IOREQ_SEND_INITIAL_METADATA, incomplete)) { + send_initial_metadata = 1; + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK); + master->complete_mask |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + } + + if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_MESSAGES, incomplete)) { + grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_MESSAGES].data; + send_message = data.send_messages.messages[call->write_index]; + send_blocked = 1; + call->write_index++; + } + + if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) { + send_finished = 1; + send_blocked = 1; + } + + if (!send_blocked && + OP_IN_MASK(GRPC_IOREQ_SEND_TRAILING_METADATA, incomplete)) { + send_trailing_metadata = 1; + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); + } + + completed = !send_blocked && master->complete_mask == master->need_mask; + + if (completed) { + master->on_complete(call, GRPC_OP_OK, master->user_data); + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->requests[i].master == master) { + call->requests[i].master = + (i == GRPC_IOREQ_SEND_MESSAGES || i == GRPC_IOREQ_RECV_MESSAGES) + ? NULL + : TOMBSTONE_MASTER; + } + } + } + + gpr_mu_unlock(&call->mu); + + if (send_initial_metadata) { + grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + grpc_call_element_send_metadata( + CALL_ELEM_FROM_CALL(call, 0), + grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length)); + } + } + + if (send_message) { + grpc_call_op op; + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.message = send_message; + op.done_cb = finish_write_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + } + + if (send_finished) { + grpc_call_op op; + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = finish_finish_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + } + + if (send_trailing_metadata) { + grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + grpc_call_element_send_metadata( + CALL_ELEM_FROM_CALL(call, 0), + grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length)); + } + } +} + +static grpc_call_error start_ioreq_error(grpc_call *call, + gpr_uint32 mutated_ops, + grpc_call_error ret) { + size_t i; + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (mutated_ops & (1 << i)) { + call->requests[i].master = NULL; + } + } + gpr_mu_unlock(&call->mu); + return ret; +} + +static grpc_call_error start_ioreq_and_unlock( + grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, + grpc_ioreq_completion_func completion, void *user_data) { + size_t i; + gpr_uint32 have_ops = 0; + gpr_uint32 precomplete = 0; + grpc_ioreq_op op; + reqinfo *master = NULL; + reqinfo *requests = call->requests; + grpc_ioreq_data data; + + for (i = 0; i < nreqs; i++) { + op = reqs[i].op; + if (requests[op].master) { + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); + } + if (master == NULL) { + master = &requests[op]; + } + have_ops |= 1 << op; + data = reqs[i].data; + + switch (op) { + default: + break; + case GRPC_IOREQ_RECV_MESSAGES: + data.recv_messages->count = 0; + if (call->buffered_messages.count > 0) { + SWAP(grpc_byte_buffer_array, *data.recv_messages, + call->buffered_messages); + precomplete |= 1 << op; + } + break; + case GRPC_IOREQ_SEND_MESSAGES: + call->write_index = 0; + break; + } + + requests[op].data = data; + requests[op].master = master; + } + + GPR_ASSERT(master != NULL); + master->need_mask = have_ops; + master->complete_mask = precomplete; + master->on_complete = completion; + master->user_data = user_data; + + start_next_step_and_unlock(call, master); + + if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) { + request_more_data(call); + } + + return GRPC_CALL_OK; +} + +static void call_start_ioreq_done(grpc_call *call, grpc_op_error status, + void *user_data) { + grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status); +} + +grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, void *tag) { + gpr_mu_lock(&call->mu); + return start_ioreq_and_unlock(call, reqs, nreqs, call_start_ioreq_done, tag); +} + +grpc_call_error grpc_call_start_ioreq_and_call_back( + grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, + grpc_ioreq_completion_func on_complete, void *user_data) { + gpr_mu_lock(&call->mu); + return start_ioreq_and_unlock(call, reqs, nreqs, on_complete, user_data); +} + void grpc_call_destroy(grpc_call *c) { int cancel; - gpr_mu_lock(&c->read_mu); + gpr_mu_lock(&c->mu); if (c->have_alarm) { grpc_alarm_cancel(&c->alarm); c->have_alarm = 0; } - cancel = !c->received_finish; - gpr_mu_unlock(&c->read_mu); + cancel = !c->stream_closed; + gpr_mu_unlock(&c->mu); if (cancel) grpc_call_cancel(c); grpc_call_internal_unref(c); } static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) { - if (!call->got_status_code) { - call->status_code = status; - call->got_status_code = 1; - } + if (call->got_status_code) return; + call->status_code = status; + call->got_status_code = 1; } static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) { - if (!call->status_details) { - call->status_details = grpc_mdstr_ref(status); + if (call->status_details != NULL) { + grpc_mdstr_unref(status); + return; } + call->status_details = status; } grpc_call_error grpc_call_cancel(grpc_call *c) { @@ -314,12 +472,12 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; - gpr_mu_lock(&c->read_mu); + gpr_mu_lock(&c->mu); maybe_set_status_code(c, status); if (details) { maybe_set_status_details(c, details); } - gpr_mu_unlock(&c->read_mu); + gpr_mu_unlock(&c->mu); return grpc_call_cancel(c); } @@ -332,525 +490,259 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - GPR_ASSERT(call->state < CALL_FINISHED); - - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.metadata = mdelem; + legacy_state *ls = get_legacy_state(call); - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + if (ls->md_out_count == ls->md_out_capacity) { + ls->md_out_capacity = + GPR_MAX(ls->md_out_count * 3 / 2, ls->md_out_count + 8); + ls->md_out = + gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity); + } + ls->md_out[ls->md_out_count++] = mdelem; } grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, gpr_uint32 flags) { - grpc_mdelem *mdelem; - - if (call->is_client) { - if (call->state >= CALL_STARTED) { - return GRPC_CALL_ERROR_ALREADY_INVOKED; - } - } else { - if (call->state >= CALL_FINISHED) { - return GRPC_CALL_ERROR_ALREADY_FINISHED; - } - } - - mdelem = grpc_mdelem_from_string_and_buffer( - call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value, - metadata->value_length); - grpc_call_add_mdelem(call, mdelem, flags); + grpc_call_add_mdelem( + call, grpc_mdelem_from_string_and_buffer( + call->metadata_context, metadata->key, + (gpr_uint8 *)metadata->value, metadata->value_length), + flags); return GRPC_CALL_OK; } -static void finish_call(grpc_call *call) { - size_t count; - grpc_metadata *elements; - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); - grpc_cq_end_finished( - call->cq, call->finished_tag, call, grpc_metadata_buffer_cleanup_elements, - elements, call->status_code, - call->status_details - ? (char *)grpc_mdstr_as_c_string(call->status_details) - : NULL, - elements, count); -} - -static void done_write(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; +static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { + legacy_state *ls; - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); -} - -static void done_writes_done(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); -} - -static void call_started(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - grpc_call_element *elem; - grpc_byte_buffer *pending_write = NULL; - gpr_uint32 pending_write_flags = 0; - gpr_uint8 pending_writes_done = 0; - int ok; - grpc_call_op op; + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + gpr_mu_unlock(&call->mu); - gpr_mu_lock(&call->read_mu); - GPR_ASSERT(!call->received_start); - call->received_start = 1; - ok = call->start_ok = (error == GRPC_OP_OK); - pending_write = call->pending_write; - pending_write_flags = call->pending_write_flags; - pending_writes_done = call->pending_writes_done; - gpr_mu_unlock(&call->read_mu); - - if (pending_write) { - if (ok) { - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = pending_write_flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = pending_write; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } else { - done_write(call, error); - } - grpc_byte_buffer_destroy(pending_write); - } - if (pending_writes_done) { - if (ok) { - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } else { - done_writes_done(call, error); + if (status == GRPC_OP_OK) { + grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL, + ls->status_in.status, ls->status_in.details, + ls->trail_md_in.metadata, ls->trail_md_in.count); + } else { + grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL, + GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0); + } +} + +static void finish_recv_metadata(grpc_call *call, grpc_op_error status, + void *tag) { + grpc_ioreq reqs[2]; + legacy_state *ls; + + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + if (status == GRPC_OP_OK) { + grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, + ls->md_in.count, ls->md_in.metadata); + + reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; + reqs[0].data.recv_metadata = &ls->trail_md_in; + reqs[1].op = GRPC_IOREQ_RECV_STATUS; + reqs[1].data.recv_status = &ls->status_in; + if (GRPC_CALL_OK != start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs), + finish_status, + ls->finished_tag)) { + grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, + GRPC_STATUS_UNKNOWN, + "Failed to start reading status", NULL, 0); } + } else { + gpr_mu_unlock(&call->mu); + grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, + NULL); + grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, + GRPC_STATUS_UNKNOWN, "Failed to read initial metadata", + NULL, 0); } - - grpc_call_internal_unref(call); } grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, void *metadata_read_tag, void *finished_tag, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; + grpc_ioreq req; + legacy_state *ls = get_legacy_state(call); + grpc_call_error err; - /* validate preconditions */ - if (!call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_SERVER; - } - - if (call->state >= CALL_STARTED || call->cq) { - gpr_log(GPR_ERROR, "call is already invoked"); - return GRPC_CALL_ERROR_ALREADY_INVOKED; - } - - if (call->have_write) { - gpr_log(GPR_ERROR, "can only have one pending write operation at a time"); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } - - if (call->have_read) { - gpr_log(GPR_ERROR, "can only have one pending read operation at a time"); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } - - if (flags & GRPC_WRITE_NO_COMPRESS) { - return GRPC_CALL_ERROR_INVALID_FLAGS; - } - - /* inform the completion queue of an incoming operation */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); + grpc_cq_begin_op(cq, call, GRPC_FINISHED); - gpr_mu_lock(&call->read_mu); - - /* update state */ - call->cq = cq; - call->state = CALL_STARTED; - call->finished_tag = finished_tag; - - if (call->received_finish) { - /* handle early cancellation */ - grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL, - NULL, 0, NULL); - finish_call(call); - - /* early out.. unlock & return */ - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_OK; - } - - call->metadata_tag = metadata_read_tag; - - gpr_mu_unlock(&call->read_mu); - - /* call down the filter stack */ - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = call_started; - op.data.start.pollset = grpc_cq_pollset(cq); - op.user_data = call; - grpc_call_internal_ref(call); - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_lock(&call->mu); + err = bind_cq(call, cq); + if (err != GRPC_CALL_OK) return err; - return GRPC_CALL_OK; + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = &ls->md_in; + return start_ioreq_and_unlock(call, &req, 1, finish_recv_metadata, + metadata_read_tag); } grpc_call_error grpc_call_server_accept(grpc_call *call, grpc_completion_queue *cq, void *finished_tag) { - /* validate preconditions */ - if (call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_CLIENT; - } - - if (call->state >= CALL_BOUNDCQ) { - gpr_log(GPR_ERROR, "call is already accepted"); - return GRPC_CALL_ERROR_ALREADY_ACCEPTED; - } + grpc_ioreq req; /* inform the completion queue of an incoming operation (corresponding to finished_tag) */ grpc_cq_begin_op(cq, call, GRPC_FINISHED); - /* update state */ - gpr_mu_lock(&call->read_mu); - call->state = CALL_BOUNDCQ; - call->cq = cq; - call->finished_tag = finished_tag; - call->received_start = 1; - if (prq_is_empty(&call->prq) && call->received_finish) { - finish_call(call); + bind_cq(call, cq); - /* early out.. unlock & return */ - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_OK; - } - gpr_mu_unlock(&call->read_mu); - - return GRPC_CALL_OK; + req.op = GRPC_IOREQ_RECV_STATUS; + req.data.recv_status = &get_legacy_state(call)->status_in; + return start_ioreq_and_unlock(call, &req, 1, finish_status, finished_tag); } grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - /* validate preconditions */ - if (call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_CLIENT; - } - - if (call->state >= CALL_STARTED) { - gpr_log(GPR_ERROR, "call is already started"); - return GRPC_CALL_ERROR_ALREADY_INVOKED; - } - - if (flags & GRPC_WRITE_NO_COMPRESS) { - return GRPC_CALL_ERROR_INVALID_FLAGS; - } - - /* update state */ - call->state = CALL_STARTED; - - /* call down */ - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = do_nothing; - op.data.start.pollset = grpc_cq_pollset(call->cq); - op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - return GRPC_CALL_OK; } void grpc_call_client_initial_metadata_complete( grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); - size_t count; - grpc_metadata *elements; - - gpr_mu_lock(&call->read_mu); - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); - - GPR_ASSERT(!call->received_metadata); - grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, - grpc_metadata_buffer_cleanup_elements, - elements, count, elements); - call->received_metadata = 1; - call->metadata_tag = INVALID_TAG; - gpr_mu_unlock(&call->read_mu); -} - -static void request_more_data(grpc_call *call) { - grpc_call_element *elem; - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_lock(&call->mu); + call->got_initial_metadata = 1; + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + gpr_mu_unlock(&call->mu); +} + +static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { + legacy_state *ls; + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + if (ls->msg_in.count == 0) { + grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); + } else { + grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, + ls->msg_in.buffers[ls->msg_in_read_idx++]); + } + gpr_mu_unlock(&call->mu); } grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { - gpr_uint8 request_more = 0; - - switch (call->state) { - case CALL_CREATED: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_BOUNDCQ: - case CALL_STARTED: - break; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - } - - gpr_mu_lock(&call->read_mu); - - if (call->have_read) { - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } + legacy_state *ls; + grpc_ioreq req; grpc_cq_begin_op(call->cq, call, GRPC_READ); - if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) { - if (call->reads_done) { - grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); - } else { - call->read_tag = tag; - call->have_read = 1; - request_more = call->received_start; - } - } else if (prq_is_empty(&call->prq) && call->received_finish) { - finish_call(call); - } - - gpr_mu_unlock(&call->read_mu); + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); - if (request_more) { - request_more_data(call); + if (ls->msg_in_read_idx == ls->msg_in.count) { + ls->msg_in_read_idx = 0; + req.op = GRPC_IOREQ_RECV_MESSAGES; + req.data.recv_messages = &ls->msg_in; + return start_ioreq_and_unlock(call, &req, 1, finish_read, tag); } + grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, + ls->msg_in.buffers[ls->msg_in_read_idx++]); + gpr_mu_unlock(&call->mu); return GRPC_CALL_OK; } +static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { + grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); +} + grpc_call_error grpc_call_start_write(grpc_call *call, grpc_byte_buffer *byte_buffer, void *tag, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_STARTED: - break; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - } - - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } + grpc_ioreq req; + legacy_state *ls; grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a - flush, and that flush should be propogated down from here */ - if (byte_buffer == NULL) { - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK); - return GRPC_CALL_OK; - } - - call->write_tag = tag; - call->have_write = 1; - - gpr_mu_lock(&call->read_mu); - if (!call->received_start) { - call->pending_write = grpc_byte_buffer_copy(byte_buffer); - call->pending_write_flags = flags; - - gpr_mu_unlock(&call->read_mu); - } else { - gpr_mu_unlock(&call->read_mu); - - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = byte_buffer; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + ls->msg_out = byte_buffer; + req.op = GRPC_IOREQ_SEND_MESSAGES; + req.data.send_messages.count = 1; + req.data.send_messages.messages = &ls->msg_out; + return start_ioreq_and_unlock(call, &req, 1, finish_write, tag); +} - return GRPC_CALL_OK; +static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { + grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status); } grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { - grpc_call_element *elem; - grpc_call_op op; - - if (!call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_SERVER; - } - - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - case CALL_STARTED: - break; - } - - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } - + grpc_ioreq req; grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - call->write_tag = tag; - call->have_write = 1; - - gpr_mu_lock(&call->read_mu); - if (!call->received_start) { - call->pending_writes_done = 1; - - gpr_mu_unlock(&call->read_mu); - } else { - gpr_mu_unlock(&call->read_mu); - - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } - - return GRPC_CALL_OK; + gpr_mu_lock(&call->mu); + req.op = GRPC_IOREQ_SEND_CLOSE; + return start_ioreq_and_unlock(call, &req, 1, finish_finish, tag); } grpc_call_error grpc_call_start_write_status(grpc_call *call, grpc_status_code status, const char *details, void *tag) { - grpc_call_element *elem; - grpc_call_op op; + grpc_ioreq req; + grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - if (call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_CLIENT; - } + gpr_mu_lock(&call->mu); + req.op = GRPC_IOREQ_SEND_CLOSE; + req.data.send_close.status = status; + req.data.send_close.details = details; + return start_ioreq_and_unlock(call, &req, 1, finish_finish, tag); +} - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - case CALL_STARTED: - break; - } +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; +static void call_alarm(void *arg, int success) { + grpc_call *call = arg; + if (success) { + if (call->is_client) { + grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + "Deadline Exceeded"); + } else { + grpc_call_cancel(call); + } } + grpc_call_internal_unref(call); +} - elem = CALL_ELEM_FROM_CALL(call, 0); - - if (details && details[0]) { - grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context, - "grpc-message", details); +void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.metadata = md; - elem->filter->call_op(elem, NULL, &op); + if (call->have_alarm) { + gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } + grpc_call_internal_ref(call); + call->have_alarm = 1; + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); +} - /* always send status */ - { - grpc_mdelem *md; - char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(status, buffer); - md = - grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer); +void grpc_call_read_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + gpr_mu_lock(&call->mu); + GPR_ASSERT(!call->read_closed); + call->read_closed = 1; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + gpr_mu_unlock(&call->mu); +} - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.metadata = md; - elem->filter->call_op(elem, NULL, &op); +void grpc_call_stream_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + gpr_mu_lock(&call->mu); + if (!call->read_closed) { + call->read_closed = 1; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); } - - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - call->state = CALL_FINISHED; - call->write_tag = tag; - call->have_write = 1; - - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; - - elem->filter->call_op(elem, NULL, &op); - - return GRPC_CALL_OK; + call->stream_closed = 1; + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); + gpr_mu_unlock(&call->mu); } /* we offset status by a small amount when storing it into transport metadata @@ -863,7 +755,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -876,113 +768,66 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { +void grpc_call_recv_message(grpc_call_element *elem, + grpc_byte_buffer *byte_buffer) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdelem *md = op->data.metadata; - grpc_mdstr *key = md->key; - gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call, - grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); - if (key == grpc_channel_get_status_string(call->channel)) { - maybe_set_status_code(call, decode_status(md)); - grpc_mdelem_unref(md); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (key == grpc_channel_get_message_string(call->channel)) { - maybe_set_status_details(call, md->value); - grpc_mdelem_unref(md); - op->done_cb(op->user_data, GRPC_OP_OK); + grpc_byte_buffer_array *dest; + gpr_mu_lock(&call->mu); + if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) { + dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages; } else { - grpc_metadata_buffer_queue(&call->incoming_metadata, op); + dest = &call->buffered_messages; } -} - -void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - - gpr_mu_lock(&call->read_mu); - - if (call->have_read) { - grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL); - call->read_tag = INVALID_TAG; - call->have_read = 0; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2); + dest->buffers = + gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity); } - if (call->is_client && !call->received_metadata && call->cq) { - size_t count; - grpc_metadata *elements; - - call->received_metadata = 1; - - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); - grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, - grpc_metadata_buffer_cleanup_elements, - elements, count, elements); - } - if (is_full_close) { - if (call->have_alarm) { - grpc_alarm_cancel(&call->alarm); - call->have_alarm = 0; - } - call->received_finish = 1; - if (prq_is_empty(&call->prq) && call->cq != NULL) { - finish_call(call); - } - } else { - call->reads_done = 1; - } - gpr_mu_unlock(&call->read_mu); + dest->buffers[dest->count++] = byte_buffer; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + gpr_mu_unlock(&call->mu); } -void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message, - void (*on_finish)(void *user_data, - grpc_op_error error), - void *user_data) { +void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_mdstr *key = md->key; + grpc_metadata_array *dest; + grpc_metadata *mdusr; - gpr_mu_lock(&call->read_mu); - if (call->have_read) { - grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data, - message); - call->read_tag = INVALID_TAG; - call->have_read = 0; + gpr_mu_lock(&call->mu); + if (key == grpc_channel_get_status_string(call->channel)) { + maybe_set_status_code(call, decode_status(md)); + grpc_mdelem_unref(md); + } else if (key == grpc_channel_get_message_string(call->channel)) { + maybe_set_status_details(call, md->value); + grpc_mdelem_unref(md); } else { - prq_push(&call->prq, message, on_finish, user_data); - } - gpr_mu_unlock(&call->read_mu); -} - -grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { - return CALL_FROM_TOP_ELEM(elem); -} - -grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { - return &call->incoming_metadata; -} - -static void call_alarm(void *arg, int success) { - grpc_call *call = arg; - if (success) { - if (call->is_client) { - grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded"); + if (!call->got_initial_metadata) { + dest = IS_LIVE_MASTER( + call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].master) + ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] + .data.recv_metadata + : &call->buffered_initial_metadata; } else { - grpc_call_cancel(call); + dest = IS_LIVE_MASTER( + call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].master) + ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] + .data.recv_metadata + : &call->buffered_trailing_metadata; } + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = (char *)grpc_mdstr_as_c_string(md->key); + mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); } - grpc_call_internal_unref(call); -} - -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - - if (call->have_alarm) { - gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); - } - grpc_call_internal_ref(call); - call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); + gpr_mu_unlock(&call->mu); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } - diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 804b387cb19..2da40607c5f 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -38,6 +38,9 @@ #include "src/core/channel/metadata_buffer.h" #include +typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status, + void *user_data); + grpc_call *grpc_call_create(grpc_channel *channel, const void *server_transport_data); @@ -47,14 +50,15 @@ void grpc_call_internal_unref(grpc_call *call); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_call_op *op); + grpc_mdelem *md); void grpc_call_recv_message( - grpc_call_element *surface_element, grpc_byte_buffer *message, - void (*on_finish)(void *user_data, grpc_op_error error), void *user_data); -void grpc_call_recv_finish(grpc_call_element *surface_element, - int is_full_close); + grpc_call_element *surface_element, grpc_byte_buffer *message); +void grpc_call_read_closed(grpc_call_element *surface_element); +void grpc_call_stream_closed(grpc_call_element *surface_element); void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); +grpc_call_error grpc_call_start_ioreq_and_call_back(grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); /* Called when it's known that the initial batch of metadata is complete on the client side (must not be called on the server) */ @@ -69,9 +73,6 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -/* Get the metadata buffer. */ -grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call); - void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, gpr_uint32 flags); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a1bcea58ddb..d63fb4e141c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -74,7 +74,7 @@ grpc_channel *grpc_channel_create_from_filters( static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method, +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec absolute_deadline) { grpc_call *call; diff --git a/src/core/surface/client.c b/src/core/surface/client.c index a7c9b902ed4..42f46fdaa66 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -56,20 +56,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_next_op(elem, op); break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op); + grpc_call_recv_metadata(elem, op->data.metadata); break; case GRPC_RECV_DEADLINE: gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); break; case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message, op->done_cb, - op->user_data); + grpc_call_recv_message(elem, op->data.message); + op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_RECV_HALF_CLOSE: - grpc_call_recv_finish(elem, 0); + grpc_call_read_closed(elem); break; case GRPC_RECV_FINISH: - grpc_call_recv_finish(elem, 1); + grpc_call_stream_closed(elem); break; case GRPC_RECV_END_OF_INITIAL_METADATA: grpc_call_client_initial_metadata_complete(elem); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 2bf31c50a8f..5854afbeefd 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -173,18 +173,6 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.invoke_accepted = error; - end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, @@ -197,6 +185,17 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } +void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error) { + event *ev; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_IOREQ); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} + void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 85984075f78..fea8336b633 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -97,6 +97,10 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements); +void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error); + void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag); /* disable polling for some tests */ diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 8975d312eec..7c76bf93d7c 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) { gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); } break; - case GRPC_INVOKE_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: ")); + case GRPC_IOREQ: + gpr_strvec_add(&buf, gpr_strdup("IOREQ: ")); addhdr(&buf, ev); - adderr(&buf, ev->data.invoke_accepted); + adderr(&buf, ev->data.ioreq); break; case GRPC_WRITE_ACCEPTED: gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 6098ac78de4..2f5eff55844 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -50,26 +50,16 @@ typedef struct { grpc_mdelem *message; } channel_data; -static void do_nothing(void *data, grpc_op_error error) {} - static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_START: { - grpc_call_op set_status_op; - grpc_mdelem_ref(channeld->message); - memset(&set_status_op, 0, sizeof(grpc_call_op)); - set_status_op.dir = GRPC_CALL_UP; - set_status_op.type = GRPC_RECV_METADATA; - set_status_op.done_cb = do_nothing; - set_status_op.data.metadata = channeld->message; - grpc_call_recv_metadata(elem, &set_status_op); - grpc_call_recv_finish(elem, 1); + case GRPC_SEND_START: + grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); + grpc_call_stream_closed(elem); break; - } case GRPC_SEND_METADATA: grpc_mdelem_unref(op->data.metadata); break; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 9a001f4c33d..d46e3de85f0 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/transport/metadata.h" #include #include #include @@ -63,11 +64,22 @@ typedef struct channel_data channel_data; struct channel_data { grpc_server *server; grpc_channel *channel; + grpc_mdstr *path_key; + grpc_mdstr *authority_key; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; }; +typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, grpc_metadata_array *initial_metadata, call_data *calld, void *user_data); + +typedef struct { + void *user_data; + grpc_completion_queue *cq; + grpc_metadata_array *initial_metadata; + new_call_cb cb; +} requested_call; + struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; @@ -76,9 +88,9 @@ struct grpc_server { gpr_mu mu; - void **tags; - size_t ntags; - size_t tag_cap; + requested_call *requested_calls; + size_t requested_call_count; + size_t requested_call_capacity; gpr_uint8 shutdown; gpr_uint8 have_shutdown_tag; @@ -107,11 +119,19 @@ typedef enum { ZOMBIED } call_state; +typedef struct legacy_data { + grpc_metadata_array client_metadata; +} legacy_data; + struct call_data { grpc_call *call; call_state state; gpr_timespec deadline; + grpc_mdstr *path; + grpc_mdstr *host; + + legacy_data *legacy; gpr_uint8 included[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -179,7 +199,7 @@ static void server_unref(grpc_server *server) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); gpr_free(server->channel_filters); - gpr_free(server->tags); + gpr_free(server->requested_calls); gpr_free(server); } } @@ -210,62 +230,37 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } -static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { - grpc_call *call = calld->call; - grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call); - size_t count = grpc_metadata_buffer_count(mdbuf); - grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf); - const char *host = NULL; - const char *method = NULL; - size_t i; - - for (i = 0; i < count; i++) { - if (0 == strcmp(elements[i].key, ":authority")) { - host = elements[i].value; - } else if (0 == strcmp(elements[i].key, ":path")) { - method = elements[i].value; - } - } - - grpc_call_internal_ref(call); - grpc_cq_end_new_rpc(server->cq, tag, call, - grpc_metadata_buffer_cleanup_elements, elements, method, - host, calld->deadline, count, elements); -} - static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; gpr_mu_lock(&server->mu); - if (server->ntags) { + if (server->requested_call_count > 0) { + requested_call rc = server->requested_calls[--server->requested_call_count]; calld->state = ACTIVATED; - queue_new_rpc(server, calld, server->tags[--server->ntags]); + gpr_mu_unlock(&server->mu); + rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data); } else { calld->state = PENDING; call_list_join(server, calld, PENDING_START); + gpr_mu_unlock(&server->mu); } - gpr_mu_unlock(&server->mu); } static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void finish_rpc(grpc_call_element *elem, int is_full_close) { +static void stream_closed(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->server->mu); switch (calld->state) { case ACTIVATED: - grpc_call_recv_finish(elem, is_full_close); + grpc_call_stream_closed(elem); break; case PENDING: - if (!is_full_close) { - grpc_call_recv_finish(elem, is_full_close); - break; - } call_list_remove(chand->server, calld, PENDING_START); /* fallthrough intended */ case NOT_STARTED: @@ -278,25 +273,56 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) { gpr_mu_unlock(&chand->server->mu); } +static void read_closed(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->server->mu); + switch (calld->state) { + case ACTIVATED: + case PENDING: + grpc_call_read_closed(elem); + break; + case NOT_STARTED: + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + break; + case ZOMBIED: + break; + } + gpr_mu_unlock(&chand->server->mu); +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op); + md = op->data.metadata; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + grpc_mdelem_unref(md); + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + grpc_mdelem_unref(md); + } else { + grpc_call_recv_metadata(elem, md); + } break; case GRPC_RECV_END_OF_INITIAL_METADATA: start_new_rpc(elem); break; case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message, op->done_cb, - op->user_data); + grpc_call_recv_message(elem, op->data.message); + op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_RECV_HALF_CLOSE: - finish_rpc(elem, 0); + read_closed(elem); break; case GRPC_RECV_FINISH: - finish_rpc(elem, 1); + stream_closed(elem); break; case GRPC_RECV_DEADLINE: grpc_call_set_deadline(elem, op->data.deadline); @@ -395,6 +421,8 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(!is_last); chand->server = NULL; chand->channel = NULL; + chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); + chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; } @@ -406,6 +434,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->prev->next = chand->next; chand->next = chand->prev = chand; gpr_mu_unlock(&chand->server->mu); + grpc_mdstr_unref(chand->path_key); + grpc_mdstr_unref(chand->authority_key); server_unref(chand->server); } } @@ -415,16 +445,6 @@ static const grpc_channel_filter server_surface_filter = { init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; -static void early_terminate_requested_calls(grpc_completion_queue *cq, - void **tags, size_t ntags) { - size_t i; - - for (i = 0; i < ntags; i++) { - grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL, - gpr_inf_past, 0, NULL); - } -} - grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_channel_filter **filters, size_t filter_count, @@ -517,8 +537,8 @@ grpc_transport_setup_result grpc_server_setup_transport( void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, void *shutdown_tag) { listener *l; - void **tags; - size_t ntags; + requested_call *requested_calls; + size_t requested_call_count; channel_data **channels; channel_data *c; size_t nchannels; @@ -547,10 +567,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, i++; } - tags = server->tags; - ntags = server->ntags; - server->tags = NULL; - server->ntags = 0; + requested_calls = server->requested_calls; + requested_call_count = server->requested_call_count; + server->requested_calls = NULL; + server->requested_call_count = 0; server->shutdown = 1; server->have_shutdown_tag = have_shutdown_tag; @@ -579,8 +599,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, gpr_free(channels); /* terminate all the requested calls */ - early_terminate_requested_calls(server->cq, tags, ntags); - gpr_free(tags); + for (i = 0; i < requested_call_count; i++) { + requested_calls[i].cb(server, requested_calls[i].cq, requested_calls[i].initial_metadata, NULL, requested_calls[i].user_data); + } + gpr_free(requested_calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -625,35 +647,81 @@ void grpc_server_add_listener(grpc_server *server, void *arg, server->listeners = l; } -grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new) { +static grpc_call_error queue_call_request(grpc_server *server, grpc_completion_queue *cq, grpc_metadata_array *initial_metadata, new_call_cb cb, void *user_data) { call_data *calld; - - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - + requested_call *rc; gpr_mu_lock(&server->mu); - if (server->shutdown) { gpr_mu_unlock(&server->mu); - early_terminate_requested_calls(server->cq, &tag_new, 1); + cb(server, cq, initial_metadata, NULL, user_data); return GRPC_CALL_OK; } - calld = call_list_remove_head(server, PENDING_START); if (calld) { - GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - queue_new_rpc(server, calld, tag_new); + GPR_ASSERT(calld->state == PENDING); + gpr_mu_unlock(&server->mu); + cb(server, cq, initial_metadata, calld, user_data); + return GRPC_CALL_OK; } else { - if (server->tag_cap == server->ntags) { - server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1); - server->tags = - gpr_realloc(server->tags, sizeof(void *) * server->tag_cap); + if (server->requested_call_count == server->requested_call_capacity) { + server->requested_call_capacity = GPR_MAX(server->requested_call_capacity + 8, server->requested_call_capacity * 2); + server->requested_calls = gpr_realloc(server->requested_calls, + sizeof(requested_call) * server->requested_call_capacity); } - server->tags[server->ntags++] = tag_new; + rc = &server->requested_calls[server->requested_call_count++]; + rc->cb = cb; + rc->cq = cq; + rc->user_data = user_data; + rc->initial_metadata = initial_metadata; + gpr_mu_unlock(&server->mu); + return GRPC_CALL_OK; } - gpr_mu_unlock(&server->mu); +} + +static void begin_request(grpc_server *server, grpc_completion_queue *cq, grpc_metadata_array *initial_metadata, call_data *call_data, void *tag) { + abort(); +} + +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_completion_queue *cq, grpc_call_details *details, + grpc_metadata_array *initial_metadata, void *tag) { + grpc_cq_begin_op(cq, NULL, GRPC_IOREQ); + return queue_call_request(server, cq, initial_metadata, begin_request, tag); +} + +static void publish_legacy_request(grpc_call *call, grpc_op_error status, void *tag) { + grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; - return GRPC_CALL_OK; + if (status == GRPC_OP_OK) { + grpc_cq_end_new_rpc(server->cq, tag, call, + do_nothing, NULL, grpc_mdstr_as_c_string(calld->path), + grpc_mdstr_as_c_string(calld->host), calld->deadline, + calld->legacy->client_metadata.count, calld->legacy->client_metadata.metadata); + } +} + +static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, grpc_metadata_array *initial_metadata, call_data *calld, void *tag) { + grpc_ioreq req; + if (!calld) { + gpr_free(initial_metadata); + grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL, + gpr_inf_past, 0, NULL); + return; + } + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = initial_metadata; + grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_legacy_request, tag); +} + +grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new) { + grpc_metadata_array *client_metadata = gpr_malloc(sizeof(grpc_metadata_array)); + memset(client_metadata, 0, sizeof(*client_metadata)); + grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + return queue_call_request(server, server->cq, client_metadata, begin_legacy_request, tag_new); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index c4e3ca516d1..2af18c30358 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -432,7 +432,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { - char timeout_str[32]; + char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); hpack_enc(c, grpc_mdelem_from_metadata_strings( c->mdctx, grpc_mdstr_ref(c->timeout_key_str), diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index c8b2bb2cf6e..19943ab8c63 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -102,6 +102,7 @@ Status Channel::StartBlockingRpc(const RpcMethod &method, grpc_call *call = grpc_channel_create_call( c_channel_, method.name(), target_.c_str(), context->RawDeadline()); context->set_call(call); + grpc_event *ev; void *finished_tag = reinterpret_cast(call); void *metadata_read_tag = reinterpret_cast(call) + 2; diff --git a/test/core/echo/client.c b/test/core/echo/client.c index 2ad29df53ca..3c789f84f97 100644 --- a/test/core/echo/client.c +++ b/test/core/echo/client.c @@ -78,7 +78,7 @@ int main(int argc, char **argv) { GPR_ASSERT(argc == 2); channel = grpc_channel_create(argv[1], NULL); - call = grpc_channel_create_call(channel, "/foo", "localhost", gpr_inf_future); + call = grpc_channel_create_call_old(channel, "/foo", "localhost", gpr_inf_future); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) == GRPC_CALL_OK); diff --git a/test/core/echo/server.c b/test/core/echo/server.c index 6b67334248a..0b9c72162d1 100644 --- a/test/core/echo/server.c +++ b/test/core/echo/server.c @@ -64,7 +64,7 @@ static void request_call(void) { call_state *tag = gpr_malloc(sizeof(*tag)); gpr_ref_init(&tag->pending_ops, 2); tag->bytes_read = 0; - grpc_server_request_call(server, tag); + grpc_server_request_call_old(server, tag); } static void assert_read_ok(call_state *s, grpc_byte_buffer *b) { diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index 287f83eebca..b5368fffbb8 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -70,6 +70,7 @@ typedef struct expectation { union { grpc_op_error finish_accepted; grpc_op_error write_accepted; + grpc_op_error ioreq; struct { const char *method; const char *host; @@ -180,9 +181,6 @@ static void verify_matches(expectation *e, grpc_event *ev) { case GRPC_WRITE_ACCEPTED: GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted); break; - case GRPC_INVOKE_ACCEPTED: - abort(); - break; case GRPC_SERVER_RPC_NEW: GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method, ev->data.server_rpc_new.method)); @@ -222,6 +220,9 @@ static void verify_matches(expectation *e, grpc_event *ev) { GPR_ASSERT(ev->data.read == NULL); } break; + case GRPC_IOREQ: + GPR_ASSERT(e->data.ioreq == ev->data.ioreq); + break; case GRPC_SERVER_SHUTDOWN: break; case GRPC_COMPLETION_DO_NOT_USE: @@ -261,8 +262,9 @@ static void expectation_to_strvec(gpr_strvec *buf, expectation *e) { e->data.write_accepted); gpr_strvec_add(buf, tmp); break; - case GRPC_INVOKE_ACCEPTED: - gpr_strvec_add(buf, gpr_strdup("GRPC_INVOKE_ACCEPTED")); + case GRPC_IOREQ: + gpr_asprintf(&tmp, "GRPC_IOREQ result=%d", e->data.ioreq); + gpr_strvec_add(buf, tmp); break; case GRPC_SERVER_RPC_NEW: timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now()); diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 6219f575002..340fa039fd1 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -112,7 +112,7 @@ void test_connect(const char *server_host, const char *client_host, int port, } /* Send a trivial request. */ - c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, client_cq, tag(2), tag(3), 0)); @@ -122,7 +122,7 @@ void test_connect(const char *server_host, const char *client_host, int port, cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index 389a6429c48..e4b4e4bb7d4 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -56,7 +56,7 @@ int main(int argc, char **argv) { /* create a call, channel to a non existant server */ chan = grpc_channel_create("nonexistant:54321", NULL); - call = grpc_channel_create_call(chan, "/foo", "nonexistant", deadline); + call = grpc_channel_create_call_old(chan, "/foo", "nonexistant", deadline); GPR_ASSERT(grpc_call_invoke(call, cq, tag(2), tag(3), 0) == GRPC_CALL_OK); /* verify that all tags get completed */ cq_expect_client_metadata_read(cqv, tag(2), NULL); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 33aed98c38c..8db0e3e4087 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -113,13 +113,14 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0)); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index f348488b182..9dbc8dcf790 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -113,13 +113,14 @@ static void test_cancel_after_accept_and_writes_closed( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0)); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index 3bb86723e6e..08b3d69bfd0 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -111,7 +111,8 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index ac816484fd6..7c706029a1c 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -109,7 +109,8 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config) { gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_cancel(c)); diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c index e4f9deecd1b..42294fc4bae 100644 --- a/test/core/end2end/tests/cancel_in_a_vacuum.c +++ b/test/core/end2end/tests/cancel_in_a_vacuum.c @@ -109,7 +109,8 @@ static void test_cancel_in_a_vacuum(grpc_end2end_test_config config, gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c)); diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index 86cef437be8..1b442e9e4c3 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -106,7 +106,8 @@ static void test_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); tag(1); GPR_ASSERT(GRPC_CALL_OK == @@ -116,7 +117,7 @@ static void test_body(grpc_end2end_test_fixture f) { cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index 036fdc2501f..eafda6132b5 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -97,7 +97,8 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, grpc_call *s; gpr_timespec deadline = five_seconds_time(); - c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -107,7 +108,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f->server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c index 66e3c44f4b9..2eb56517f7e 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c @@ -111,7 +111,8 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -121,7 +122,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c index 88f735c8e0a..123c8bc4153 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c @@ -110,7 +110,7 @@ static void test_early_server_shutdown_finishes_tags( /* upon shutdown, the server should finish all requested calls indicating no new call */ - grpc_server_request_call(f.server, tag(1000)); + grpc_server_request_call_old(f.server, tag(1000)); grpc_server_shutdown(f.server); cq_expect_server_rpc_new(v_server, &s, tag(1000), NULL, NULL, gpr_inf_past, NULL); diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index d9c9dbb8b20..7ebd8e38b03 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -110,7 +110,8 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -120,7 +121,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index f187eceadb3..ac7071a04bd 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -120,9 +120,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(request_payload_slice); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index a177a7b2f29..fc180fd9629 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -109,7 +109,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -119,7 +120,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); @@ -181,13 +182,14 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { /* start two requests - ensuring that the second is not accepted until the first completes */ deadline = five_seconds_time(); - c1 = - grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline); + c1 = grpc_channel_create_call_old(f.client, "/alpha", "test.google.com", + deadline); GPR_ASSERT(c1); - c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline); + c2 = grpc_channel_create_call_old(f.client, "/beta", "test.google.com", + deadline); GPR_ASSERT(c1); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c1, f.client_cq, tag(301), tag(302), 0)); @@ -204,7 +206,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { /* The /alpha or /beta calls started above could be invoked (but NOT both); * check this here */ /* We'll get tag 303 or 403, we want 300, 400 */ - live_call = ((int)(gpr_intptr)ev->tag) - 3; + live_call = ((int)(gpr_intptr) ev->tag) - 3; grpc_event_finish(ev); cq_expect_server_rpc_new(v_server, &s1, tag(100), @@ -232,7 +234,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { live_call = (live_call == 300) ? 400 : 300; cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(200))); cq_expect_server_rpc_new(v_server, &s2, tag(200), live_call == 300 ? "/alpha" : "/beta", "test.google.com", deadline, NULL); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index 6768bd8aa9f..e8adb82c19d 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -118,13 +118,14 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, cq_verifier *v_server = cq_verifier_create(f.server_cq); gpr_log(GPR_INFO, "testing with %d message pairs.", messages); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0)); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index 1dd798dc8d6..49720a7bea4 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -131,13 +131,14 @@ static void test_request_response_with_metadata_and_payload( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index cfc9b61f56f..14e791ae8ee 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -122,13 +122,14 @@ static void test_request_response_with_metadata_and_payload( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 32bf5129ff4..ee1e52e54c4 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -119,9 +119,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index 4f1de8b466e..65f890b4b98 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -124,13 +124,14 @@ static void test_request_response_with_metadata_and_payload( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index 83628449a22..438f2ef1d14 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -113,7 +113,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { cq_verifier *v_server = cq_verifier_create(f.server_cq); const int large_size = 64 * 1024; - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); meta.key = "key"; meta.value = gpr_malloc(large_size + 1); @@ -121,7 +121,8 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { meta.value[large_size] = 0; meta.value_length = large_size; - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); /* add the metadata */ diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index a3527839659..31d79df5049 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -116,10 +116,11 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(payload_slice); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0)); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 1e15eaa9cc4..1222d167ded 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -103,7 +103,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, config.init_client(f, client_args); - c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -117,7 +118,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f->server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index f8894a8ba92..64a8340d54b 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -110,7 +110,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -120,7 +121,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); @@ -156,7 +157,8 @@ static void simple_request_body2(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -166,7 +168,7 @@ static void simple_request_body2(grpc_end2end_test_fixture f) { cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/end2end/tests/thread_stress.c b/test/core/end2end/tests/thread_stress.c index 8fdb765951c..7e235be4f41 100644 --- a/test/core/end2end/tests/thread_stress.c +++ b/test/core/end2end/tests/thread_stress.c @@ -108,7 +108,7 @@ static void drain_cq(int client, grpc_completion_queue *cq) { static void start_request(void) { gpr_slice slice = gpr_slice_malloc(100); grpc_byte_buffer *buf; - grpc_call *call = grpc_channel_create_call( + grpc_call *call = grpc_channel_create_call_old( g_fixture.client, "/Foo", "test.google.com", g_test_end_time); memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice)); @@ -179,7 +179,7 @@ static void client_thread(void *p) { static void request_server_call(void) { gpr_refcount *rc = gpr_malloc(sizeof(gpr_refcount)); gpr_ref_init(rc, 2); - grpc_server_request_call(g_fixture.server, rc); + grpc_server_request_call_old(g_fixture.server, rc); } static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) { diff --git a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c index eea459459ad..fb2fbdd092b 100644 --- a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c +++ b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c @@ -124,7 +124,8 @@ static void test_writes_done_hangs_with_pending_read( gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", + deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -138,7 +139,7 @@ static void test_writes_done_hangs_with_pending_read( cq_expect_write_accepted(v_client, tag(4), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", deadline, NULL); cq_verify(v_server); diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 7947ffecc5c..e1743c3e7ea 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -53,7 +53,7 @@ static grpc_call *call; static void init_ping_pong_request(void) {} static void step_ping_pong_request(void) { - call = grpc_channel_create_call(channel, "/Reflector/reflectUnary", + call = grpc_channel_create_call_old(channel, "/Reflector/reflectUnary", "localhost", gpr_inf_future); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); @@ -70,7 +70,7 @@ static void step_ping_pong_request(void) { } static void init_ping_pong_stream(void) { - call = grpc_channel_create_call(channel, "/Reflector/reflectStream", + call = grpc_channel_create_call_old(channel, "/Reflector/reflectStream", "localhost", gpr_inf_future); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) == GRPC_CALL_OK); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index f811aac284c..e35597fbea2 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -62,7 +62,7 @@ typedef struct { static void request_call(void) { call_state *s = gpr_malloc(sizeof(call_state)); gpr_ref_init(&s->pending_ops, 2); - grpc_server_request_call(server, s); + grpc_server_request_call_old(server, s); } static void sigint_handler(int x) { got_sigint = 1; } diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 71f9cc22916..2928623f6de 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -105,32 +105,6 @@ static void test_cq_end_read(void) { shutdown_and_destroy(cc); } -static void test_cq_end_invoke_accepted(void) { - grpc_event *ev; - grpc_completion_queue *cc; - int on_finish_called = 0; - void *tag = create_test_tag(); - - LOG_TEST(); - - cc = grpc_completion_queue_create(); - - grpc_cq_begin_op(cc, NULL, GRPC_INVOKE_ACCEPTED); - grpc_cq_end_invoke_accepted(cc, tag, NULL, increment_int_on_finish, - &on_finish_called, GRPC_OP_OK); - - ev = grpc_completion_queue_next(cc, gpr_inf_past); - GPR_ASSERT(ev != NULL); - GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED); - GPR_ASSERT(ev->tag == tag); - GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK); - GPR_ASSERT(on_finish_called == 0); - grpc_event_finish(ev); - GPR_ASSERT(on_finish_called == 1); - - shutdown_and_destroy(cc); -} - static void test_cq_end_write_accepted(void) { grpc_event *ev; grpc_completion_queue *cc; @@ -421,7 +395,6 @@ int main(int argc, char **argv) { test_no_op(); test_wait_empty(); test_cq_end_read(); - test_cq_end_invoke_accepted(); test_cq_end_write_accepted(); test_cq_end_finish_accepted(); test_cq_end_client_metadata_read(); diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index 9b9f0202d6d..c43ac7c2428 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -51,7 +51,7 @@ int main(int argc, char **argv) { chan = grpc_lame_client_channel_create(); GPR_ASSERT(chan); - call = grpc_channel_create_call( + call = grpc_channel_create_call_old( chan, "/Foo", "anywhere", gpr_time_add(gpr_now(), gpr_time_from_seconds(100))); GPR_ASSERT(call);