Fix race condition in transport API

Specifically:

Receiving trailing and initial metadata had to be published in
lock-step.
=> If we wanted trailing metadata, we might not get initial metadata processed
   until messages arrived.
=> Compression code had no idea what codec to use.

To fix it, publish initial metadata as soon as it's ready (this is a
transport API change).

Requires changes to grpc_call to ensure ordering in processing initial
metadata and messages (one may be delayed).

Exposed at least some bugs in C++ where we never read initial metadata.

I expect at least one more similar bug.
pull/5052/head
Craig Tiller 9 years ago
parent 4253522c76
commit a44cbfc11c
  1. 20
      include/grpc++/impl/codegen/sync_stream.h
  2. 4
      src/core/census/grpc_filter.c
  3. 4
      src/core/channel/http_client_filter.c
  4. 4
      src/core/channel/http_server_filter.c
  5. 6
      src/core/channel/subchannel_call_holder.c
  6. 4
      src/core/security/server_auth_filter.c
  7. 154
      src/core/surface/call.c
  8. 3
      src/core/surface/lame_client.c
  9. 4
      src/core/surface/server.c
  10. 2
      src/core/transport/chttp2/internal.h
  11. 15
      src/core/transport/chttp2_transport.c
  12. 1
      src/core/transport/transport.c
  13. 5
      src/core/transport/transport.h
  14. 12
      test/core/fling/client.c
  15. 2
      test/cpp/end2end/hybrid_end2end_test.cc

@ -193,6 +193,15 @@ class ClientWriter : public ClientWriterInterface<W> {
cq_.Pluck(&ops);
}
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
@ -213,6 +222,9 @@ class ClientWriter : public ClientWriterInterface<W> {
/// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
@ -221,7 +233,8 @@ class ClientWriter : public ClientWriterInterface<W> {
private:
ClientContext* context_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@ -292,7 +305,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);

@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_complete;
op->on_complete = &calld->finish_recv;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->finish_recv;
}
}

@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_complete;
op->on_complete = &calld->hc_on_recv;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->hc_on_recv;
}
}

@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_complete;
op->on_complete = &calld->hs_on_recv;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->hs_on_recv;
}
}

@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
NULL);
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
false, NULL);
grpc_transport_stream_op_finish_with_failure(exec_ctx,
&holder->waiting_ops[i]);
}
holder->waiting_ops_count = 0;
}

@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->on_complete;
op->on_complete = &calld->auth_on_recv;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->auth_on_recv;
calld->transport_op = *op;
}
}

@ -159,6 +159,9 @@ struct grpc_call {
uint8_t receiving_message;
uint8_t received_final_op;
/* have we received initial metadata */
bool has_initial_md_been_received;
batch_control active_batches[MAX_CONCURRENT_BATCHES];
/* first idx: is_receiving, second idx: is_trailing */
@ -200,6 +203,7 @@ struct grpc_call {
gpr_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
union {
@ -212,6 +216,11 @@ struct grpc_call {
int *cancelled;
} server;
} final_op;
struct {
void *bctlp;
bool success;
} saved_receiving_stream_ready_ctx;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
bool success) {
grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
} else if (call->receiving_stream->length >
grpc_channel_get_max_message_length(call->channel)) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
"Max message size exceeded");
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
*call->receiving_buffer = NULL;
call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_algorithm > GRPC_COMPRESS_NONE)) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
NULL, 0, call->compression_algorithm);
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl);
continue_receiving_slices(exec_ctx, bctl);
/* early out */
return;
}
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received) {
gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp, success);
} else {
call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
call->saved_receiving_stream_ready_ctx.success = success;
gpr_mu_unlock(&bctl->call->mu);
}
}
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&call->mu);
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
call->has_initial_md_been_received = true;
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
GPR_TIMER_BEGIN("set_deadline_alarm", 0);
set_deadline_alarm(exec_ctx, call, md->deadline);
GPR_TIMER_END("set_deadline_alarm", 0);
}
if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
call->saved_receiving_stream_ready_ctx.success, NULL);
call->saved_receiving_stream_ready_ctx.bctlp = NULL;
}
gpr_mu_unlock(&call->mu);
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
}
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->recv_initial_metadata) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
GPR_TIMER_BEGIN("set_deadline_alarm", 0);
set_deadline_alarm(exec_ctx, call, md->deadline);
GPR_TIMER_END("set_deadline_alarm", 0);
}
}
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
}
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
} else if (call->receiving_stream->length >
grpc_channel_get_max_message_length(call->channel)) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
"Max message size exceeded");
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
*call->receiving_buffer = NULL;
call->receiving_message = 0;
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
}
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_algorithm > GRPC_COMPRESS_NONE)) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
NULL, 0, call->compression_algorithm);
} else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
}
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl);
continue_receiving_slices(exec_ctx, bctl);
/* early out */
return;
}
}
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
/* Flag validation: currently allow no flags */

@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {

@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv_initial_metadata = op->on_complete;
op->on_complete = &calld->server_on_recv_initial_metadata;
calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
}
}

@ -385,7 +385,7 @@ typedef struct {
grpc_closure *send_trailing_metadata_finished;
grpc_metadata_batch *recv_initial_metadata;
grpc_closure *recv_initial_metadata_finished;
grpc_closure *recv_initial_metadata_ready;
grpc_byte_stream **recv_message;
grpc_closure *recv_message_ready;
grpc_metadata_batch *recv_trailing_metadata;

@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.send_message_finished == NULL);
GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
@ -863,9 +863,9 @@ static void perform_stream_op_locked(
}
if (op->recv_initial_metadata != NULL) {
GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
stream_global->recv_initial_metadata_finished =
add_closure_barrier(on_complete);
GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL);
stream_global->recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs;
while (
grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
if (stream_global->recv_initial_metadata_finished != NULL &&
if (stream_global->recv_initial_metadata_ready != NULL &&
stream_global->published_initial_metadata) {
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
grpc_exec_ctx_enqueue(
exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL);
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
if (stream_global->incoming_frames.head != NULL) {

@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL);
grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
}

@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op {
/** Receive initial metadata from the stream, into provided metadata batch. */
grpc_metadata_batch *recv_initial_metadata;
/** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure *recv_initial_metadata_ready;
/** Receive message data from the stream, into provided byte stream. */
grpc_byte_stream **recv_message;
@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op {
grpc_metadata_batch *recv_trailing_metadata;
/** Should be enqueued when all requested operations (excluding recv_message
which has its own closure) in a given batch have been completed. */
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */

@ -51,7 +51,7 @@ static grpc_channel *channel;
static grpc_completion_queue *cq;
static grpc_call *call;
static grpc_op ops[6];
static grpc_op stream_init_op;
static grpc_op stream_init_ops[2];
static grpc_op stream_step_ops[2];
static grpc_metadata_array initial_metadata_recv;
static grpc_metadata_array trailing_metadata_recv;
@ -105,13 +105,17 @@ static void step_ping_pong_request(void) {
}
static void init_ping_pong_stream(void) {
grpc_metadata_array_init(&initial_metadata_recv);
grpc_call_error error;
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectStream", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA;
stream_init_op.data.send_initial_metadata.count = 0;
error = grpc_call_start_batch(call, &stream_init_op, 1, (void *)1, NULL);
stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
stream_init_ops[0].data.send_initial_metadata.count = 0;
stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
stream_init_ops[1].data.recv_initial_metadata = &initial_metadata_recv;
error = grpc_call_start_batch(call, stream_init_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);

@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test {
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
cqs_.push_back(std::move(builder.AddCompletionQueue()));
cqs_.push_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
}

Loading…
Cancel
Save