diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 33d25e837c6..9ae48bd23d6 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -193,6 +193,15 @@ class ClientWriter : public ClientWriterInterface { cq_.Pluck(&ops); } + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + using WriterInterface::Write; bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet ops; @@ -213,6 +222,9 @@ class ClientWriter : public ClientWriterInterface { /// Read the final response and wait for the final status. Status Finish() GRPC_OVERRIDE { Status status; + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } finish_ops_.ClientRecvStatus(context_, &status); call_.PerformOps(&finish_ops_); GPR_ASSERT(cq_.Pluck(&finish_ops_)); @@ -221,7 +233,8 @@ class ClientWriter : public ClientWriterInterface { private: ClientContext* context_; - CallOpSet finish_ops_; + CallOpSet finish_ops_; CompletionQueue cq_; Call call_; }; @@ -292,7 +305,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { } Status Finish() GRPC_OVERRIDE { - CallOpSet ops; + CallOpSet ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index a8db32b9d54..c8aaf31e2d3 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->finish_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->finish_recv; } } diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 43eee046b88..1aa27208c2e 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hc_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hc_on_recv; } } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index bb75323933c..370f8dbe423 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hs_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hs_on_recv; } } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 3ad9fd9efb1..81297c8d449 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false, - NULL); - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, - false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, + &holder->waiting_ops[i]); } holder->waiting_ops_count = 0; } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 4c78711387e..3d8e5e8d35d 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->auth_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->auth_on_recv; calld->transport_op = *op; } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9495e748b5f..1b117aa6b8c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -159,6 +159,9 @@ struct grpc_call { uint8_t receiving_message; uint8_t received_final_op; + /* have we received initial metadata */ + bool has_initial_md_been_received; + batch_control active_batches[MAX_CONCURRENT_BATCHES]; /* first idx: is_receiving, second idx: is_trailing */ @@ -200,6 +203,7 @@ struct grpc_call { gpr_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; + grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; union { @@ -212,6 +216,11 @@ struct grpc_call { int *cancelled; } server; } final_op; + + struct { + void *bctlp; + bool success; + } saved_receiving_stream_ready_ctx; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, + bool success) { + grpc_call *call = bctl->call; + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&bctl->call->mu); + if (bctl->call->has_initial_md_been_received) { + gpr_mu_unlock(&bctl->call->mu); + process_data_after_md(exec_ctx, bctlp, success); + } else { + call->saved_receiving_stream_ready_ctx.bctlp = bctlp; + call->saved_receiving_stream_ready_ctx.success = success; + gpr_mu_unlock(&bctl->call->mu); + } +} + +static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, + void *bctlp, bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&call->mu); + + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + call->has_initial_md_been_received = true; + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + + if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + grpc_closure *saved_rsr_closure = grpc_closure_create( + receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, + call->saved_receiving_stream_ready_ctx.success, NULL); + call->saved_receiving_stream_ready_ctx.bctlp = NULL; + } + + gpr_mu_unlock(&call->mu); + + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_initial_metadata) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); - - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { } } -static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - - if (call->receiving_stream == NULL) { - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( - NULL, 0, call->compression_algorithm); - } else { - *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); - } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); - continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; - } -} - static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; + grpc_closure_init(&call->receiving_initial_metadata_ready, + receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + stream_op.recv_initial_metadata_ready = + &call->receiving_initial_metadata_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 705996cad35..537069e9848 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 42cffccb4c9..fb5e0d4b9e7 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->on_complete; - op->on_complete = &calld->server_on_recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; } } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c611496e7e5..0e1e2c42650 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -385,7 +385,7 @@ typedef struct { grpc_closure *send_trailing_metadata_finished; grpc_metadata_batch *recv_initial_metadata; - grpc_closure *recv_initial_metadata_finished; + grpc_closure *recv_initial_metadata_ready; grpc_byte_stream **recv_message; grpc_closure *recv_message_ready; grpc_metadata_batch *recv_trailing_metadata; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9298573c7f4..617d98875c3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); GPR_ASSERT(s->global.send_message_finished == NULL); GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); - GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); @@ -863,9 +863,9 @@ static void perform_stream_op_locked( } if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL); - stream_global->recv_initial_metadata_finished = - add_closure_barrier(on_complete); + GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL); + stream_global->recv_initial_metadata_ready = + op->recv_initial_metadata_ready; stream_global->recv_initial_metadata = op->recv_initial_metadata; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs; while ( grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { - if (stream_global->recv_initial_metadata_finished != NULL && + if (stream_global->recv_initial_metadata_ready != NULL && stream_global->published_initial_metadata) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->recv_initial_metadata_finished, 1); + grpc_exec_ctx_enqueue( + exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL); + stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { if (stream_global->incoming_frames.head != NULL) { diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 08d685668cc..6e154b629ab 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f5cac77adcc..8902c5d2f65 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op { /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure *recv_initial_metadata_ready; /** Receive message data from the stream, into provided byte stream. */ grpc_byte_stream **recv_message; @@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op { grpc_metadata_batch *recv_trailing_metadata; /** Should be enqueued when all requested operations (excluding recv_message - which has its own closure) in a given batch have been completed. */ + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ grpc_closure *on_complete; /** If != GRPC_STATUS_OK, cancel this stream */ diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 02db681cfd3..b36aef30935 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -51,7 +51,7 @@ static grpc_channel *channel; static grpc_completion_queue *cq; static grpc_call *call; static grpc_op ops[6]; -static grpc_op stream_init_op; +static grpc_op stream_init_ops[2]; static grpc_op stream_step_ops[2]; static grpc_metadata_array initial_metadata_recv; static grpc_metadata_array trailing_metadata_recv; @@ -105,13 +105,17 @@ static void step_ping_pong_request(void) { } static void init_ping_pong_stream(void) { + grpc_metadata_array_init(&initial_metadata_recv); + grpc_call_error error; call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/Reflector/reflectStream", "localhost", gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA; - stream_init_op.data.send_initial_metadata.count = 0; - error = grpc_call_start_batch(call, &stream_init_op, 1, (void *)1, NULL); + stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + stream_init_ops[0].data.send_initial_metadata.count = 0; + stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + stream_init_ops[1].data.recv_initial_metadata = &initial_metadata_recv; + error = grpc_call_start_batch(call, stream_init_ops, 2, (void *)1, NULL); GPR_ASSERT(GRPC_CALL_OK == error); grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index f8405627f9f..c72e20628f9 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back(std::move(builder.AddCompletionQueue())); + cqs_.push_back(builder.AddCompletionQueue()); } server_ = builder.BuildAndStart(); }