diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 63bc2bd59f5..aeb0c39df92 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -52,7 +52,6 @@ struct call_data { grpc_closure* recv_message_ready; grpc_closure* on_complete; grpc_byte_stream** pp_recv_message; - grpc_slice_buffer read_slice_buffer; grpc_slice_buffer_stream read_stream; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -224,13 +223,15 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, /* decode payload from query and add to the slice buffer to be returned */ const int k_url_safe = 1; + grpc_slice_buffer read_slice_buffer; + grpc_slice_buffer_init(&read_slice_buffer); grpc_slice_buffer_add( - &calld->read_slice_buffer, + &read_slice_buffer, grpc_base64_decode_with_len( reinterpret_cast GRPC_SLICE_START_PTR(query_slice), GRPC_SLICE_LENGTH(query_slice), k_url_safe)); - grpc_slice_buffer_stream_init(&calld->read_stream, - &calld->read_slice_buffer, 0); + grpc_slice_buffer_stream_init(&calld->read_stream, &read_slice_buffer, 0); + grpc_slice_buffer_destroy_internal(&read_slice_buffer); calld->seen_path_with_query = true; grpc_slice_unref_internal(query_slice); } else { @@ -393,7 +394,6 @@ static grpc_error* init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } @@ -402,7 +402,9 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); - grpc_slice_buffer_destroy_internal(&calld->read_slice_buffer); + if (calld->seen_path_with_query && !calld->payload_bin_delivered) { + grpc_byte_stream_destroy(&calld->read_stream.base); + } } /* Constructor for channel_data */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index ad8da94cb3e..2fc3c4fa41d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1473,6 +1473,7 @@ static void perform_stream_op_locked(void* stream_op, // streaming call might send another message before getting a // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. + grpc_byte_stream_destroy(op->payload->send_message.send_message); grpc_chttp2_complete_closure_step( t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata @@ -2092,7 +2093,10 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - s->fetching_send_message = nullptr; + if (s->fetching_send_message != nullptr) { + grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message = nullptr; + } grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 2022eaffe5b..e1d48437859 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -480,6 +480,8 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->recv_message_op = nullptr; } if (s->send_message_op) { + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, error, s->send_message_op, "fail_helper scheduling send-message-on-complete"); @@ -506,6 +508,14 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { GRPC_ERROR_UNREF(error); } +// TODO(vjpai): It should not be necessary to drain the incoming byte +// stream and create a new one; instead, we should simply pass the byte +// stream from the sender directly to the receiver as-is. +// +// Note that fixing this will also avoid the assumption in this code +// that the incoming byte stream's next() call will always return +// synchronously. That assumption is true today but may not always be +// true in the future. static void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = @@ -532,6 +542,8 @@ static void message_transfer_locked(inproc_stream* sender, remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(&receiver->recv_message, message_slice); } while (remaining > 0); + grpc_byte_stream_destroy( + sender->send_message_op->payload->send_message.send_message); grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, 0); @@ -592,6 +604,8 @@ static void op_state_machine(void* arg, grpc_error* error) { (s->trailing_md_sent || other->recv_trailing_md_op)) { // A server send will never be matched if the client is waiting // for trailing metadata already + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -728,6 +742,8 @@ static void op_state_machine(void* arg, grpc_error* error) { if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -785,6 +801,8 @@ static void op_state_machine(void* arg, grpc_error* error) { s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op + grpc_byte_stream_destroy( + s->send_message_op->payload->send_message.send_message); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index afb55b2f201..b96598c3932 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -51,7 +51,7 @@ static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream, grpc_closure* on_complete) { grpc_slice_buffer_stream* stream = reinterpret_cast(byte_stream); - GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + GPR_ASSERT(stream->cursor < stream->backing_buffer.count); return true; } @@ -62,9 +62,9 @@ static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream, if (stream->shutdown_error != GRPC_ERROR_NONE) { return GRPC_ERROR_REF(stream->shutdown_error); } - GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + GPR_ASSERT(stream->cursor < stream->backing_buffer.count); *slice = - grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); + grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]); stream->cursor++; return GRPC_ERROR_NONE; } @@ -80,7 +80,7 @@ static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream, static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) { grpc_slice_buffer_stream* stream = reinterpret_cast(byte_stream); - grpc_slice_buffer_reset_and_unref_internal(stream->backing_buffer); + grpc_slice_buffer_destroy(&stream->backing_buffer); GRPC_ERROR_UNREF(stream->shutdown_error); } @@ -95,7 +95,8 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, stream->base.length = static_cast(slice_buffer->length); stream->base.flags = flags; stream->base.vtable = &slice_buffer_stream_vtable; - stream->backing_buffer = slice_buffer; + grpc_slice_buffer_init(&stream->backing_buffer); + grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer); stream->cursor = 0; stream->shutdown_error = GRPC_ERROR_NONE; } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 52c7a07f568..fc12e5686d4 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -81,7 +81,7 @@ void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream); typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; - grpc_slice_buffer* backing_buffer; + grpc_slice_buffer backing_buffer; size_t cursor; grpc_error* shutdown_error; } grpc_slice_buffer_stream;