Enforce correct padding and byte alignment in chaotic good transport by ensuring no inline slices are created and returned by promise_endpoint.

The PR also updates the transport to read padding and message in one read instead of two.

PiperOrigin-RevId: 619374897
pull/36163/head
Vignesh Babu 10 months ago committed by Copybara-Service
parent cde8d03da5
commit 72676e6fae
  1. 2
      src/core/BUILD
  2. 24
      src/core/ext/transport/chaotic_good/chaotic_good_transport.h
  3. 3
      src/core/lib/event_engine/extensions/chaotic_good_extension.h
  4. 36
      src/core/lib/slice/slice.cc
  5. 60
      src/core/lib/slice/slice_buffer.cc
  6. 15
      src/core/lib/slice/slice_buffer.h
  7. 7
      src/core/lib/slice/slice_internal.h
  8. 45
      src/core/lib/transport/promise_endpoint.h

@ -7027,6 +7027,8 @@ grpc_cc_library(
"activity",
"cancel_callback",
"event_engine_common",
"event_engine_extensions",
"event_engine_query_extensions",
"if",
"map",
"poll",

@ -17,6 +17,9 @@
#include <grpc/support/port_platform.h>
#include <cstdint>
#include <utility>
#include "absl/random/random.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
@ -43,7 +46,9 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
: control_endpoint_(std::move(control_endpoint)),
data_endpoint_(std::move(data_endpoint)),
encoder_(std::move(hpack_encoder)),
parser_(std::move(hpack_parser)) {}
parser_(std::move(hpack_parser)) {
data_endpoint_.EnforceRxMemoryAlignment();
}
auto WriteFrame(const FrameInterface& frame) {
auto buffers = frame.Serialize(&encoder_);
@ -82,25 +87,25 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
return If(
frame_header.ok(),
[this, &frame_header] {
const uint32_t message_padding = std::exchange(
last_message_padding_, frame_header->message_padding);
const uint32_t message_padding = frame_header->message_padding;
const uint32_t message_length = frame_header->message_length;
return Map(
TryJoin<absl::StatusOr>(
control_endpoint_.Read(frame_header->GetFrameLength()),
TrySeq(data_endpoint_.Read(message_padding),
[this, message_length]() {
return data_endpoint_.Read(message_length);
})),
[frame_header = *frame_header](
data_endpoint_.Read(message_length + message_padding)),
[frame_header = *frame_header, message_padding](
absl::StatusOr<std::tuple<SliceBuffer, SliceBuffer>>
buffers)
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
if (!buffers.ok()) return buffers.status();
SliceBuffer data_read = std::move(std::get<1>(*buffers));
if (message_padding > 0) {
data_read.RemoveLastNBytesNoInline(message_padding);
}
return std::tuple<FrameHeader, BufferPair>(
frame_header,
BufferPair{std::move(std::get<0>(*buffers)),
std::move(std::get<1>(*buffers))});
std::move(data_read)});
});
},
[&frame_header]() {
@ -127,7 +132,6 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
private:
PromiseEndpoint control_endpoint_;
PromiseEndpoint data_endpoint_;
uint32_t last_message_padding_ = 0;
HPackCompressor encoder_;
HPackParser parser_;
absl::BitGen bitgen_;

@ -37,6 +37,9 @@ class ChaoticGoodExtension {
/// Otherwise they are grouped into histograms and counters specific to the
/// chaotic good data channel.
virtual void EnableStatsCollection(bool is_control_channel) = 0;
/// If invoked, the endpoint tries to preserve proper order and alignment of
/// any memory that maybe shared across reads.
virtual void EnforceRxMemoryAlignment() = 0;
};
} // namespace experimental

@ -289,8 +289,10 @@ grpc_slice grpc_slice_sub(grpc_slice source, size_t begin, size_t end) {
return subset;
}
grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split,
grpc_slice_ref_whom ref_whom) {
template <bool allow_inline>
grpc_slice grpc_slice_split_tail_maybe_ref_impl(grpc_slice* source,
size_t split,
grpc_slice_ref_whom ref_whom) {
grpc_slice tail;
if (source->refcount == nullptr) {
@ -311,7 +313,7 @@ grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split,
} else {
size_t tail_length = source->data.refcounted.length - split;
GPR_ASSERT(source->data.refcounted.length >= split);
if (tail_length < sizeof(tail.data.inlined.bytes) &&
if (allow_inline && tail_length < sizeof(tail.data.inlined.bytes) &&
ref_whom != GRPC_SLICE_REF_TAIL) {
// Copy out the bytes - it'll be cheaper than refcounting
tail.refcount = nullptr;
@ -346,11 +348,27 @@ grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split,
return tail;
}
grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice* source, size_t split,
grpc_slice_ref_whom ref_whom) {
return grpc_slice_split_tail_maybe_ref_impl<true>(source, split, ref_whom);
}
grpc_slice grpc_slice_split_tail_maybe_ref_no_inline(
grpc_slice* source, size_t split, grpc_slice_ref_whom ref_whom) {
return grpc_slice_split_tail_maybe_ref_impl<false>(source, split, ref_whom);
}
grpc_slice grpc_slice_split_tail(grpc_slice* source, size_t split) {
return grpc_slice_split_tail_maybe_ref(source, split, GRPC_SLICE_REF_BOTH);
}
grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) {
grpc_slice grpc_slice_split_tail_no_inline(grpc_slice* source, size_t split) {
return grpc_slice_split_tail_maybe_ref_no_inline(source, split,
GRPC_SLICE_REF_BOTH);
}
template <bool allow_inline>
grpc_slice grpc_slice_split_head_impl(grpc_slice* source, size_t split) {
grpc_slice head;
if (source->refcount == nullptr) {
@ -363,7 +381,7 @@ grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) {
static_cast<uint8_t>(source->data.inlined.length - split);
memmove(source->data.inlined.bytes, source->data.inlined.bytes + split,
source->data.inlined.length);
} else if (split < sizeof(head.data.inlined.bytes)) {
} else if (allow_inline && split < sizeof(head.data.inlined.bytes)) {
GPR_ASSERT(source->data.refcounted.length >= split);
head.refcount = nullptr;
@ -390,6 +408,14 @@ grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) {
return head;
}
grpc_slice grpc_slice_split_head(grpc_slice* source, size_t split) {
return grpc_slice_split_head_impl<true>(source, split);
}
grpc_slice grpc_slice_split_head_no_inline(grpc_slice* source, size_t split) {
return grpc_slice_split_head_impl<false>(source, split);
}
int grpc_slice_eq(grpc_slice a, grpc_slice b) {
if (GRPC_SLICE_LENGTH(a) != GRPC_SLICE_LENGTH(b)) return false;
if (GRPC_SLICE_LENGTH(a) == 0) return true;

@ -324,9 +324,13 @@ void grpc_slice_buffer_move_into(grpc_slice_buffer* src,
src->length = 0;
}
template <bool incref>
template <bool incref, bool allow_inline>
static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
if (n == 0) {
return;
}
GPR_ASSERT(src->length >= n);
if (src->length == n) {
grpc_slice_buffer_move_into(src, dst);
@ -346,14 +350,28 @@ static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer_add(dst, slice);
break;
} else if (incref) { // n < slice_len
grpc_slice_buffer_undo_take_first(
src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_BOTH));
if (allow_inline) {
grpc_slice_buffer_undo_take_first(
src,
grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_BOTH));
} else {
grpc_slice_buffer_undo_take_first(
src, grpc_slice_split_tail_maybe_ref_no_inline(
&slice, n, GRPC_SLICE_REF_BOTH));
}
GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n);
grpc_slice_buffer_add(dst, slice);
break;
} else { // n < slice_len
grpc_slice_buffer_undo_take_first(
src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_TAIL));
if (allow_inline) {
grpc_slice_buffer_undo_take_first(
src,
grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_TAIL));
} else {
grpc_slice_buffer_undo_take_first(
src, grpc_slice_split_tail_maybe_ref_no_inline(
&slice, n, GRPC_SLICE_REF_TAIL));
}
GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n);
grpc_slice_buffer_add_indexed(dst, slice);
break;
@ -364,14 +382,19 @@ static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer* src, size_t n,
GPR_ASSERT(src->count > 0);
}
void grpc_slice_buffer_move_first_no_inline(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
slice_buffer_move_first_maybe_ref<true, false>(src, n, dst);
}
void grpc_slice_buffer_move_first(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
slice_buffer_move_first_maybe_ref<true>(src, n, dst);
slice_buffer_move_first_maybe_ref<true, true>(src, n, dst);
}
void grpc_slice_buffer_move_first_no_ref(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst) {
slice_buffer_move_first_maybe_ref<false>(src, n, dst);
slice_buffer_move_first_maybe_ref<false, true>(src, n, dst);
}
void grpc_slice_buffer_move_first_into_buffer(grpc_slice_buffer* src, size_t n,
@ -417,9 +440,9 @@ void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n,
n -= slice_len;
}
}
void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
grpc_slice_buffer* garbage) {
template <bool allow_inline>
void grpc_slice_buffer_trim_end_impl(grpc_slice_buffer* sb, size_t n,
grpc_slice_buffer* garbage) {
GPR_ASSERT(n <= sb->length);
sb->length -= n;
for (;;) {
@ -427,7 +450,12 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
grpc_slice slice = sb->slices[idx];
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (slice_len > n) {
sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n);
if (allow_inline) {
sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n);
} else {
sb->slices[idx] =
grpc_slice_split_head_no_inline(&slice, slice_len - n);
}
if (garbage) {
grpc_slice_buffer_add_indexed(garbage, slice);
} else {
@ -454,6 +482,16 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
}
}
void grpc_slice_buffer_trim_end_no_inline(grpc_slice_buffer* sb, size_t n,
grpc_slice_buffer* garbage) {
return grpc_slice_buffer_trim_end_impl<false>(sb, n, garbage);
}
void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
grpc_slice_buffer* garbage) {
return grpc_slice_buffer_trim_end_impl<true>(sb, n, garbage);
}
grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer* sb) {
grpc_slice slice;
GPR_ASSERT(sb->count > 0);

@ -32,6 +32,12 @@
void grpc_slice_buffer_copy_first_into_buffer(grpc_slice_buffer* src, size_t n,
void* dst);
void grpc_slice_buffer_move_first_no_inline(grpc_slice_buffer* src, size_t n,
grpc_slice_buffer* dst);
void grpc_slice_buffer_trim_end_no_inline(grpc_slice_buffer* sb, size_t n,
grpc_slice_buffer* garbage);
namespace grpc_core {
/// A slice buffer holds the memory for a collection of slices.
@ -96,7 +102,14 @@ class SliceBuffer {
grpc_slice_buffer_trim_end(&slice_buffer_, n, nullptr);
}
/// Move the first n bytes of the SliceBuffer into a memory pointed to by dst.
/// Removes/deletes the last n bytes in the SliceBuffer while avoiding the
/// the creation of inline slices.
void RemoveLastNBytesNoInline(size_t n) {
grpc_slice_buffer_trim_end_no_inline(&slice_buffer_, n, nullptr);
}
/// Move the first n bytes of the SliceBuffer into a memory pointed to by
/// dst.
void MoveFirstNBytesIntoBuffer(size_t n, void* dst) {
grpc_slice_buffer_move_first_into_buffer(&slice_buffer_, n, dst);
}

@ -67,6 +67,13 @@ grpc_slice grpc_slice_from_cpp_string(std::string str);
// 0. All other slices will return the size of the allocated chars.
size_t grpc_slice_memory_usage(grpc_slice s);
grpc_slice grpc_slice_split_tail_maybe_ref_no_inline(
grpc_slice* source, size_t split, grpc_slice_ref_whom ref_whom);
grpc_slice grpc_slice_split_tail_no_inline(grpc_slice* source, size_t split);
grpc_slice grpc_slice_split_head_no_inline(grpc_slice* source, size_t split);
namespace grpc_core {
// Converts grpc_slice to absl::string_view.

@ -21,6 +21,7 @@
#include <stdint.h>
#include <atomic>
#include <cstring>
#include <functional>
#include <memory>
#include <utility>
@ -36,6 +37,8 @@
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
@ -164,8 +167,9 @@ class PromiseEndpoint {
complete,
[this, num_bytes]() {
SliceBuffer ret;
grpc_slice_buffer_move_first(read_state_->buffer.c_slice_buffer(),
num_bytes, ret.c_slice_buffer());
grpc_slice_buffer_move_first_no_inline(
read_state_->buffer.c_slice_buffer(), num_bytes,
ret.c_slice_buffer());
return [ret = std::move(
ret)]() mutable -> Poll<absl::StatusOr<SliceBuffer>> {
return std::move(ret);
@ -180,8 +184,9 @@ class PromiseEndpoint {
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
if (read_state->result.ok()) {
SliceBuffer ret;
grpc_slice_buffer_move_first(read_state->buffer.c_slice_buffer(),
num_bytes, ret.c_slice_buffer());
grpc_slice_buffer_move_first_no_inline(
read_state->buffer.c_slice_buffer(), num_bytes,
ret.c_slice_buffer());
read_state->complete.store(false, std::memory_order_relaxed);
return std::move(ret);
}
@ -214,6 +219,38 @@ class PromiseEndpoint {
});
}
void EnforceRxMemoryAlignment() {
auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get());
if (chaotic_good_ext != nullptr) {
chaotic_good_ext->EnforceRxMemoryAlignment();
if (read_state_->buffer.Length() == 0) {
return;
}
// Copy everything from read_state_->buffer into a single slice and
// replace the contents of read_state_->buffer with that slice.
grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length());
GPR_ASSERT(
reinterpret_cast<uintptr_t>(GRPC_SLICE_START_PTR(slice)) % 64 == 0);
size_t ofs = 0;
for (size_t i = 0; i < read_state_->buffer.Count(); i++) {
memcpy(
GRPC_SLICE_START_PTR(slice) + ofs,
GRPC_SLICE_START_PTR(
read_state_->buffer.c_slice_buffer()->slices[i]),
GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]));
ofs +=
GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]);
}
read_state_->buffer.Clear();
read_state_->buffer.AppendIndexed(
grpc_event_engine::experimental::Slice(slice));
GPR_DEBUG_ASSERT(read_state_->buffer.Length() == ofs);
}
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const;
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&

Loading…
Cancel
Save