From 72676e6fae17ac6134bcb6cf1b3c1320d8f881e8 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Tue, 26 Mar 2024 18:34:33 -0700 Subject: [PATCH] Enforce correct padding and byte alignment in chaotic good transport by ensuring no inline slices are created and returned by promise_endpoint. The PR also updates the transport to read padding and message in one read instead of two. PiperOrigin-RevId: 619374897 --- src/core/BUILD | 2 + .../chaotic_good/chaotic_good_transport.h | 24 ++++---- .../extensions/chaotic_good_extension.h | 3 + src/core/lib/slice/slice.cc | 36 +++++++++-- src/core/lib/slice/slice_buffer.cc | 60 +++++++++++++++---- src/core/lib/slice/slice_buffer.h | 15 ++++- src/core/lib/slice/slice_internal.h | 7 +++ src/core/lib/transport/promise_endpoint.h | 45 ++++++++++++-- 8 files changed, 161 insertions(+), 31 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index a216c2c7b9d..37d1d4f0e6e 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7027,6 +7027,8 @@ grpc_cc_library( "activity", "cancel_callback", "event_engine_common", + "event_engine_extensions", + "event_engine_query_extensions", "if", "map", "poll", diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h index ae944cf198b..294567701e8 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h @@ -17,6 +17,9 @@ #include +#include +#include + #include "absl/random/random.h" #include "src/core/ext/transport/chaotic_good/frame.h" @@ -43,7 +46,9 @@ class ChaoticGoodTransport : public RefCounted { : control_endpoint_(std::move(control_endpoint)), data_endpoint_(std::move(data_endpoint)), encoder_(std::move(hpack_encoder)), - parser_(std::move(hpack_parser)) {} + parser_(std::move(hpack_parser)) { + data_endpoint_.EnforceRxMemoryAlignment(); + } auto WriteFrame(const FrameInterface& frame) { auto buffers = frame.Serialize(&encoder_); @@ -82,25 +87,25 @@ class ChaoticGoodTransport : public RefCounted { return If( frame_header.ok(), [this, &frame_header] { - const uint32_t message_padding = std::exchange( - last_message_padding_, frame_header->message_padding); + const uint32_t message_padding = frame_header->message_padding; const uint32_t message_length = frame_header->message_length; return Map( TryJoin( control_endpoint_.Read(frame_header->GetFrameLength()), - TrySeq(data_endpoint_.Read(message_padding), - [this, message_length]() { - return data_endpoint_.Read(message_length); - })), - [frame_header = *frame_header]( + data_endpoint_.Read(message_length + message_padding)), + [frame_header = *frame_header, message_padding]( absl::StatusOr> buffers) -> absl::StatusOr> { if (!buffers.ok()) return buffers.status(); + SliceBuffer data_read = std::move(std::get<1>(*buffers)); + if (message_padding > 0) { + data_read.RemoveLastNBytesNoInline(message_padding); + } return std::tuple( frame_header, BufferPair{std::move(std::get<0>(*buffers)), - std::move(std::get<1>(*buffers))}); + std::move(data_read)}); }); }, [&frame_header]() { @@ -127,7 +132,6 @@ class ChaoticGoodTransport : public RefCounted { private: PromiseEndpoint control_endpoint_; PromiseEndpoint data_endpoint_; - uint32_t last_message_padding_ = 0; HPackCompressor encoder_; HPackParser parser_; absl::BitGen bitgen_; diff --git a/src/core/lib/event_engine/extensions/chaotic_good_extension.h b/src/core/lib/event_engine/extensions/chaotic_good_extension.h index 10fec67b1d9..b79c2c2d4c3 100644 --- a/src/core/lib/event_engine/extensions/chaotic_good_extension.h +++ b/src/core/lib/event_engine/extensions/chaotic_good_extension.h @@ -37,6 +37,9 @@ class ChaoticGoodExtension { /// Otherwise they are grouped into histograms and counters specific to the /// chaotic good data channel. virtual void EnableStatsCollection(bool is_control_channel) = 0; + /// If invoked, the endpoint tries to preserve proper order and alignment of + /// any memory that maybe shared across reads. + virtual void EnforceRxMemoryAlignment() = 0; }; } // namespace experimental diff --git a/src/core/lib/slice/slice.cc b/src/core/lib/slice/slice.cc index 6180ef10e56..ffce11c2a47 100644 --- a/src/core/lib/slice/slice.cc +++ b/src/core/lib/slice/slice.cc @@ -289,8 +289,10 @@ grpc_slice grpc_slice_sub(grpc_slice source, size_t begin, size_t end) { return subset; } -grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split, - grpc_slice_ref_whom ref_whom) { +template +grpc_slice grpc_slice_split_tail_maybe_ref_impl(grpc_slice* source, + size_t split, + grpc_slice_ref_whom ref_whom) { grpc_slice tail; if (source->refcount == nullptr) { @@ -311,7 +313,7 @@ grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split, } else { size_t tail_length = source->data.refcounted.length - split; GPR_ASSERT(source->data.refcounted.length >= split); - if (tail_length < sizeof(tail.data.inlined.bytes) && + if (allow_inline && tail_length < sizeof(tail.data.inlined.bytes) && ref_whom != GRPC_SLICE_REF_TAIL) { // Copy out the bytes - it'll be cheaper than refcounting tail.refcount = nullptr; @@ -346,11 +348,27 @@ grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split, return tail; } +grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split, + grpc_slice_ref_whom ref_whom) { + return grpc_slice_split_tail_maybe_ref_impl(source, split, ref_whom); +} + +grpc_slice grpc_slice_split_tail_maybe_ref_no_inline( + grpc_slice* source, size_t split, grpc_slice_ref_whom ref_whom) { + return grpc_slice_split_tail_maybe_ref_impl(source, split, ref_whom); +} + grpc_slice grpc_slice_split_tail(grpc_slice* source, size_t split) { return grpc_slice_split_tail_maybe_ref(source, split, GRPC_SLICE_REF_BOTH); } -grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) { +grpc_slice grpc_slice_split_tail_no_inline(grpc_slice* source, size_t split) { + return grpc_slice_split_tail_maybe_ref_no_inline(source, split, + GRPC_SLICE_REF_BOTH); +} + +template +grpc_slice grpc_slice_split_head_impl(grpc_slice* source, size_t split) { grpc_slice head; if (source->refcount == nullptr) { @@ -363,7 +381,7 @@ grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) { static_cast(source->data.inlined.length - split); memmove(source->data.inlined.bytes, source->data.inlined.bytes + split, source->data.inlined.length); - } else if (split < sizeof(head.data.inlined.bytes)) { + } else if (allow_inline && split < sizeof(head.data.inlined.bytes)) { GPR_ASSERT(source->data.refcounted.length >= split); head.refcount = nullptr; @@ -390,6 +408,14 @@ grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) { return head; } +grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) { + return grpc_slice_split_head_impl(source, split); +} + +grpc_slice grpc_slice_split_head_no_inline(grpc_slice* source, size_t split) { + return grpc_slice_split_head_impl(source, split); +} + int grpc_slice_eq(grpc_slice a, grpc_slice b) { if (GRPC_SLICE_LENGTH(a) != GRPC_SLICE_LENGTH(b)) return false; if (GRPC_SLICE_LENGTH(a) == 0) return true; diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index ec93b81f223..14c9b67ab2a 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -324,9 +324,13 @@ void grpc_slice_buffer_move_into(grpc_slice_buffer* src, src->length = 0; } -template +template static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n, grpc_slice_buffer* dst) { + if (n == 0) { + return; + } + GPR_ASSERT(src->length >= n); if (src->length == n) { grpc_slice_buffer_move_into(src, dst); @@ -346,14 +350,28 @@ static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n, grpc_slice_buffer_add(dst, slice); break; } else if (incref) { // n < slice_len - grpc_slice_buffer_undo_take_first( - src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_BOTH)); + if (allow_inline) { + grpc_slice_buffer_undo_take_first( + src, + grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_BOTH)); + } else { + grpc_slice_buffer_undo_take_first( + src, grpc_slice_split_tail_maybe_ref_no_inline( + &slice, n, GRPC_SLICE_REF_BOTH)); + } GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n); grpc_slice_buffer_add(dst, slice); break; } else { // n < slice_len - grpc_slice_buffer_undo_take_first( - src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_TAIL)); + if (allow_inline) { + grpc_slice_buffer_undo_take_first( + src, + grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_TAIL)); + } else { + grpc_slice_buffer_undo_take_first( + src, grpc_slice_split_tail_maybe_ref_no_inline( + &slice, n, GRPC_SLICE_REF_TAIL)); + } GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n); grpc_slice_buffer_add_indexed(dst, slice); break; @@ -364,14 +382,19 @@ static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n, GPR_ASSERT(src->count > 0); } +void grpc_slice_buffer_move_first_no_inline(grpc_slice_buffer* src, size_t n, + grpc_slice_buffer* dst) { + slice_buffer_move_first_maybe_ref(src, n, dst); +} + void grpc_slice_buffer_move_first(grpc_slice_buffer* src, size_t n, grpc_slice_buffer* dst) { - slice_buffer_move_first_maybe_ref(src, n, dst); + slice_buffer_move_first_maybe_ref(src, n, dst); } void grpc_slice_buffer_move_first_no_ref(grpc_slice_buffer* src, size_t n, grpc_slice_buffer* dst) { - slice_buffer_move_first_maybe_ref(src, n, dst); + slice_buffer_move_first_maybe_ref(src, n, dst); } void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer* src, size_t n, @@ -417,9 +440,9 @@ void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n, n -= slice_len; } } - -void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, - grpc_slice_buffer* garbage) { +template +void grpc_slice_buffer_trim_end_impl(grpc_slice_buffer* sb, size_t n, + grpc_slice_buffer* garbage) { GPR_ASSERT(n <= sb->length); sb->length -= n; for (;;) { @@ -427,7 +450,12 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, grpc_slice slice = sb->slices[idx]; size_t slice_len = GRPC_SLICE_LENGTH(slice); if (slice_len > n) { - sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n); + if (allow_inline) { + sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n); + } else { + sb->slices[idx] = + grpc_slice_split_head_no_inline(&slice, slice_len - n); + } if (garbage) { grpc_slice_buffer_add_indexed(garbage, slice); } else { @@ -454,6 +482,16 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, } } +void grpc_slice_buffer_trim_end_no_inline(grpc_slice_buffer* sb, size_t n, + grpc_slice_buffer* garbage) { + return grpc_slice_buffer_trim_end_impl(sb, n, garbage); +} + +void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, + grpc_slice_buffer* garbage) { + return grpc_slice_buffer_trim_end_impl(sb, n, garbage); +} + grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer* sb) { grpc_slice slice; GPR_ASSERT(sb->count > 0); diff --git a/src/core/lib/slice/slice_buffer.h b/src/core/lib/slice/slice_buffer.h index 7bf875c8870..c6227da674d 100644 --- a/src/core/lib/slice/slice_buffer.h +++ b/src/core/lib/slice/slice_buffer.h @@ -32,6 +32,12 @@ void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n, void* dst); +void grpc_slice_buffer_move_first_no_inline(grpc_slice_buffer* src, size_t n, + grpc_slice_buffer* dst); + +void grpc_slice_buffer_trim_end_no_inline(grpc_slice_buffer* sb, size_t n, + grpc_slice_buffer* garbage); + namespace grpc_core { /// A slice buffer holds the memory for a collection of slices. @@ -96,7 +102,14 @@ class SliceBuffer { grpc_slice_buffer_trim_end(&slice_buffer_, n, nullptr); } - /// Move the first n bytes of the SliceBuffer into a memory pointed to by dst. + /// Removes/deletes the last n bytes in the SliceBuffer while avoiding the + /// the creation of inline slices. + void RemoveLastNBytesNoInline(size_t n) { + grpc_slice_buffer_trim_end_no_inline(&slice_buffer_, n, nullptr); + } + + /// Move the first n bytes of the SliceBuffer into a memory pointed to by + /// dst. void MoveFirstNBytesIntoBuffer(size_t n, void* dst) { grpc_slice_buffer_move_first_into_buffer(&slice_buffer_, n, dst); } diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h index ae70241560d..626d06a3b7e 100644 --- a/src/core/lib/slice/slice_internal.h +++ b/src/core/lib/slice/slice_internal.h @@ -67,6 +67,13 @@ grpc_slice grpc_slice_from_cpp_string(std::string str); // 0. All other slices will return the size of the allocated chars. size_t grpc_slice_memory_usage(grpc_slice s); +grpc_slice grpc_slice_split_tail_maybe_ref_no_inline( + grpc_slice* source, size_t split, grpc_slice_ref_whom ref_whom); + +grpc_slice grpc_slice_split_tail_no_inline(grpc_slice* source, size_t split); + +grpc_slice grpc_slice_split_head_no_inline(grpc_slice* source, size_t split); + namespace grpc_core { // Converts grpc_slice to absl::string_view. diff --git a/src/core/lib/transport/promise_endpoint.h b/src/core/lib/transport/promise_endpoint.h index bcbf13faf8e..761dc71c8e3 100644 --- a/src/core/lib/transport/promise_endpoint.h +++ b/src/core/lib/transport/promise_endpoint.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,8 @@ #include #include +#include "src/core/lib/event_engine/extensions/chaotic_good_extension.h" +#include "src/core/lib/event_engine/query_extensions.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" @@ -164,8 +167,9 @@ class PromiseEndpoint { complete, [this, num_bytes]() { SliceBuffer ret; - grpc_slice_buffer_move_first(read_state_->buffer.c_slice_buffer(), - num_bytes, ret.c_slice_buffer()); + grpc_slice_buffer_move_first_no_inline( + read_state_->buffer.c_slice_buffer(), num_bytes, + ret.c_slice_buffer()); return [ret = std::move( ret)]() mutable -> Poll> { return std::move(ret); @@ -180,8 +184,9 @@ class PromiseEndpoint { // If read succeeds, return `SliceBuffer` with `num_bytes` bytes. if (read_state->result.ok()) { SliceBuffer ret; - grpc_slice_buffer_move_first(read_state->buffer.c_slice_buffer(), - num_bytes, ret.c_slice_buffer()); + grpc_slice_buffer_move_first_no_inline( + read_state->buffer.c_slice_buffer(), num_bytes, + ret.c_slice_buffer()); read_state->complete.store(false, std::memory_order_relaxed); return std::move(ret); } @@ -214,6 +219,38 @@ class PromiseEndpoint { }); } + void EnforceRxMemoryAlignment() { + auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension< + grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get()); + if (chaotic_good_ext != nullptr) { + chaotic_good_ext->EnforceRxMemoryAlignment(); + if (read_state_->buffer.Length() == 0) { + return; + } + + // Copy everything from read_state_->buffer into a single slice and + // replace the contents of read_state_->buffer with that slice. + grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length()); + GPR_ASSERT( + reinterpret_cast(GRPC_SLICE_START_PTR(slice)) % 64 == 0); + size_t ofs = 0; + for (size_t i = 0; i < read_state_->buffer.Count(); i++) { + memcpy( + GRPC_SLICE_START_PTR(slice) + ofs, + GRPC_SLICE_START_PTR( + read_state_->buffer.c_slice_buffer()->slices[i]), + GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i])); + ofs += + GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]); + } + + read_state_->buffer.Clear(); + read_state_->buffer.AppendIndexed( + grpc_event_engine::experimental::Slice(slice)); + GPR_DEBUG_ASSERT(read_state_->buffer.Length() == ofs); + } + } + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& GetPeerAddress() const; const grpc_event_engine::experimental::EventEngine::ResolvedAddress&