Merge pull request #19868 from yashykt/failwrites

Fail SEND_MESSAGE ops if stream is closed for writes
reviewable/pr19729/r8
Yash Tibrewal 6 years ago committed by GitHub
commit ea5029f5cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 2
      src/core/ext/transport/chttp2/transport/internal.h
  3. 2
      src/core/ext/transport/chttp2/transport/parsing.cc
  4. 6
      src/core/lib/surface/call.cc
  5. 12
      src/core/lib/transport/transport.h

@ -1535,19 +1535,13 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
// Return an error unless the client has already received trailing
// metadata from the server, since an application using a
// streaming call might send another message before getting a
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
op->payload->send_message.stream_write_closed = true;
// We should NOT return an error here, so as to avoid a cancel OP being
// started. The surface layer will notice that the stream has been closed
// for writes and fail the send message op.
op->payload->send_message.send_message.reset();
grpc_chttp2_complete_closure_step(
t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Attempt to send message after stream was closed",
&s->write_closed_error, 1),
t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == nullptr);

@ -564,8 +564,6 @@ struct grpc_chttp2_stream {
/** Are we buffering writes on this stream? If yes, we won't become writable
until there's enough queued up in the flow_controlled_buffer */
bool write_buffering = false;
/** Has trailing metadata been received. */
bool received_trailing_metadata = false;
/* have we sent or received the EOS bit? */
bool eos_received = false;

@ -648,7 +648,6 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t,
*s->trailing_metadata_available = true;
}
t->hpack_parser.on_header = on_trailing_header;
s->received_trailing_metadata = true;
} else {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata"));
t->hpack_parser.on_header = on_initial_header;
@ -657,7 +656,6 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t,
case 1:
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata"));
t->hpack_parser.on_header = on_trailing_header;
s->received_trailing_metadata = true;
break;
case 2:
gpr_log(GPR_ERROR, "too many header frames received");

@ -1175,6 +1175,12 @@ static void post_batch_completion(batch_control* bctl) {
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
if (bctl->op.payload->send_message.stream_write_closed &&
error == GRPC_ERROR_NONE) {
error = grpc_error_add_child(
error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to send message after stream was closed."));
}
call->sending_message = false;
}
if (bctl->op.send_trailing_metadata) {

@ -245,6 +245,18 @@ struct grpc_transport_stream_op_batch_payload {
// The batch's on_complete will not be called until after the byte
// stream is orphaned.
grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
// Set by the transport if the stream has been closed for writes. If this
// is set and send message op is present, we set the operation to be a
// failure without sending a cancel OP down the stack. This is so that the
// status of the call does not get overwritten by the Cancel OP, which would
// be especially problematic if we had received a valid status from the
// server.
// For send_initial_metadata, it is fine for the status to be overwritten
// because at that point, the client will not have received a status.
// For send_trailing_metadata, we might overwrite the status if we have
// non-zero metadata to send. This is fine because the API does not allow
// the client to send trailing metadata.
bool stream_write_closed = false;
} send_message;
struct {

Loading…
Cancel
Save