[chaotic-good] New frame serialization/deserialization in chaotic-good transport. (#33067)

(This is a re-open PR for https://github.com/grpc/grpc/pull/32999, which
was closed accidentally due to the branch re-base and force-push)

Implement the frame serialization/deserialization method in chaotic-good
transport.

Previous comments from Craig:
- Since messages are not part of the framing system anymore, I think we
should remove ReceiveMessage (and therefore ReceivePadding) from this
type.
(instead we should add some helper functions to get the message lengths)
-- Resolved

- This approach will cause all frame manipulation code to know about
this serialization detail, rather than just the code that's serializing
it - I think it would be better to keep the type, flags separation (even
if we need to change the flags representation)
-- Done, changed back to type, flags separation. 

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33219/head
nanahpang 2 years ago committed by GitHub
parent b225083b34
commit c684409921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 90
      src/core/ext/transport/chaotic_good/frame.cc
  3. 5
      src/core/ext/transport/chaotic_good/frame.h
  4. 36
      src/core/ext/transport/chaotic_good/frame_header.cc
  5. 22
      src/core/ext/transport/chaotic_good/frame_header.h
  6. 14
      test/core/transport/chaotic_good/frame_fuzzer.cc
  7. 6
      test/core/transport/chaotic_good/frame_header_fuzzer.cc
  8. 111
      test/core/transport/chaotic_good/frame_header_test.cc
  9. 7
      test/core/transport/chaotic_good/frame_test.cc

@ -5587,7 +5587,6 @@ grpc_cc_library(
"arena",
"bitset",
"chaotic_good_frame_header",
"context",
"no_destruct",
"slice",
"slice_buffer",

@ -18,6 +18,7 @@
#include <string.h>
#include <cstdint>
#include <limits>
#include <utility>
@ -30,7 +31,6 @@
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -39,62 +39,37 @@ namespace chaotic_good {
namespace {
const NoDestruct<Slice> kZeroSlice{[] {
auto slice = GRPC_SLICE_MALLOC(64);
memset(GRPC_SLICE_START_PTR(slice), 0, 64);
// Frame header size is fixed to 24 bytes.
auto slice = GRPC_SLICE_MALLOC(24);
memset(GRPC_SLICE_START_PTR(slice), 0, 24);
return slice;
}()};
class FrameSerializer {
public:
explicit FrameSerializer(FrameType type, uint32_t stream_id)
: header_{type, {}, stream_id, 0, 0, 0} {
: header_{type, {}, stream_id, 0, 0, 0, 0} {
output_.AppendIndexed(kZeroSlice->Copy());
}
// If called, must be called before AddMessage, AddTrailers, Finish
// If called, must be called before AddTrailers, Finish.
SliceBuffer& AddHeaders() {
GPR_ASSERT(last_added_ == nullptr);
header_.flags.set(0);
return Start(&header_.header_length);
}
// If called, must be called before AddTrailers, Finish
SliceBuffer& AddMessage() {
MaybeCommitLast();
header_.flags.set(1);
return Start(&header_.message_length);
return output_;
}
// If called, must be called before Finish
// If called, must be called before Finish.
SliceBuffer& AddTrailers() {
MaybeCommitLast();
header_.flags.set(2);
return Start(&header_.trailer_length);
header_.flags.set(1);
return output_;
}
SliceBuffer Finish() {
MaybeCommitLast();
header_.Serialize(
GRPC_SLICE_START_PTR(output_.c_slice_buffer()->slices[0]));
return std::move(output_);
}
private:
SliceBuffer& Start(uint32_t* length_field) {
last_added_ = length_field;
length_at_last_added_ = output_.Length();
return output_;
}
void MaybeCommitLast() {
if (last_added_ == nullptr) return;
*last_added_ = output_.Length() - length_at_last_added_;
if (output_.Length() % 64 != 0) {
output_.Append(kZeroSlice->RefSubSlice(0, 64 - output_.Length() % 64));
}
}
FrameHeader header_;
uint32_t* last_added_ = nullptr;
size_t length_at_last_added_;
SliceBuffer output_;
};
@ -103,19 +78,20 @@ class FrameDeserializer {
FrameDeserializer(const FrameHeader& header, SliceBuffer& input)
: header_(header), input_(input) {}
const FrameHeader& header() const { return header_; }
// If called, must be called before ReceiveMessage, ReceiveTrailers
// If called, must be called before ReceiveTrailers, Finish.
absl::StatusOr<SliceBuffer> ReceiveHeaders() {
return Take(header_.header_length);
}
// If called, must be called before ReceiveTrailers
absl::StatusOr<SliceBuffer> ReceiveMessage() {
return Take(header_.message_length);
}
// If called, must be called before Finish
// If called, must be called before Finish.
absl::StatusOr<SliceBuffer> ReceiveTrailers() {
return Take(header_.trailer_length);
}
// Return message length to get payload size in data plane.
uint32_t GetMessageLength() const { return header_.message_length; }
// Return message padding to get padding size in data plane.
uint32_t GetMessagePadding() const { return header_.message_padding; }
absl::Status Finish() { return absl::OkStatus(); }
private:
@ -127,20 +103,6 @@ class FrameDeserializer {
}
SliceBuffer out;
input_.MoveFirstNBytesIntoSliceBuffer(length, out);
if (length % 64 != 0) {
const uint32_t padding_length = 64 - length % 64;
if (input_.Length() < padding_length) {
return absl::InvalidArgumentError(
"Frame too short (insufficient padding)");
}
uint8_t padding[64];
input_.MoveFirstNBytesIntoBuffer(padding_length, padding);
for (uint32_t i = 0; i < padding_length; i++) {
if (padding[i] != 0) {
return absl::InvalidArgumentError("Frame padding not zero");
}
}
}
return std::move(out);
}
FrameHeader header_;
@ -207,12 +169,6 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (!r.ok()) return r.status();
}
if (header.flags.is_set(1)) {
message = GetContext<Arena>()->MakePooled<Message>();
auto r = deserializer.ReceiveMessage();
if (!r.ok()) return r.status();
r->Swap(message->payload());
}
if (header.flags.is_set(2)) {
if (header.trailer_length != 0) {
return absl::InvalidArgumentError("Unexpected trailer length");
}
@ -229,9 +185,6 @@ SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
if (message.get() != nullptr) {
serializer.AddMessage().Append(*message->payload());
}
if (end_of_stream) {
serializer.AddTrailers();
}
@ -255,12 +208,6 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (!r.ok()) return r.status();
}
if (header.flags.is_set(1)) {
message = GetContext<Arena>()->MakePooled<Message>();
auto r = deserializer.ReceiveMessage();
if (!r.ok()) return r.status();
r->Swap(message->payload());
}
if (header.flags.is_set(2)) {
auto r = ReadMetadata<ServerMetadata>(
parser, deserializer.ReceiveTrailers(), header.stream_id, false, false);
}
@ -273,9 +220,6 @@ SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
if (message.get() != nullptr) {
serializer.AddMessage().Append(*message->payload());
}
if (trailers.get() != nullptr) {
encoder->EncodeRawHeaders(*trailers.get(), serializer.AddTrailers());
}

@ -75,12 +75,10 @@ struct ClientFragmentFrame final : public FrameInterface {
uint32_t stream_id;
ClientMetadataHandle headers;
MessageHandle message;
bool end_of_stream = false;
bool operator==(const ClientFragmentFrame& other) const {
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
EqHdl(message, other.message) &&
end_of_stream == other.end_of_stream;
}
};
@ -92,12 +90,11 @@ struct ServerFragmentFrame final : public FrameInterface {
uint32_t stream_id;
ServerMetadataHandle headers;
MessageHandle message;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {
return stream_id == other.stream_id && EqHdl(headers, other.headers) &&
EqHdl(message, other.message) && EqHdl(trailers, other.trailers);
EqHdl(trailers, other.trailers);
}
};

@ -16,8 +16,6 @@
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include <string.h>
#include <cstdint>
#include "absl/status/status.h"
@ -41,46 +39,38 @@ uint32_t ReadLittleEndianUint32(const uint8_t* data) {
}
} // namespace
// Serializes a frame header into a buffer of 24 bytes.
void FrameHeader::Serialize(uint8_t* data) const {
WriteLittleEndianUint32(
static_cast<uint32_t>(type) | (flags.ToInt<uint32_t>() << 8), data);
WriteLittleEndianUint32(stream_id, data + 4);
WriteLittleEndianUint32(header_length, data + 8);
WriteLittleEndianUint32(message_length, data + 12);
WriteLittleEndianUint32(trailer_length, data + 16);
memset(data + 20, 0, 44);
WriteLittleEndianUint32(message_padding, data + 16);
WriteLittleEndianUint32(trailer_length, data + 20);
}
// Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed.
absl::StatusOr<FrameHeader> FrameHeader::Parse(const uint8_t* data) {
FrameHeader header;
const uint32_t type_and_flags = ReadLittleEndianUint32(data);
header.type = static_cast<FrameType>(type_and_flags & 0xff);
const uint32_t flags = type_and_flags >> 8;
if (flags > 7) return absl::InvalidArgumentError("Invalid flags");
header.flags = BitSet<3>::FromInt(flags);
if (flags > 3) return absl::InvalidArgumentError("Invalid flags");
header.flags = BitSet<2>::FromInt(flags);
header.stream_id = ReadLittleEndianUint32(data + 4);
header.header_length = ReadLittleEndianUint32(data + 8);
header.message_length = ReadLittleEndianUint32(data + 12);
header.trailer_length = ReadLittleEndianUint32(data + 16);
for (int i = 0; i < 44; i++) {
if (data[20 + i] != 0) return absl::InvalidArgumentError("Invalid padding");
}
header.message_padding = ReadLittleEndianUint32(data + 16);
header.trailer_length = ReadLittleEndianUint32(data + 20);
return header;
}
namespace {
uint64_t RoundUp(uint64_t x) {
if (x % 64 == 0) return x;
return x + 64 - (x % 64);
}
} // namespace
FrameSizes FrameHeader::ComputeFrameSizes() const {
FrameSizes sizes;
sizes.message_offset = RoundUp(header_length);
sizes.trailer_offset = sizes.message_offset + RoundUp(message_length);
sizes.frame_length = sizes.trailer_offset + RoundUp(trailer_length);
return sizes;
uint32_t FrameHeader::GetFrameLength() const {
// In chaotic-good transport design, message and message padding are sent
// through different channel. So not included in the frame length calculation.
uint32_t frame_length = header_length + trailer_length;
return frame_length;
}
} // namespace chaotic_good

@ -32,37 +32,27 @@ enum class FrameType : uint8_t {
kCancel = 0x81,
};
struct FrameSizes {
uint64_t message_offset;
uint64_t trailer_offset;
uint64_t frame_length;
bool operator==(const FrameSizes& other) const {
return message_offset == other.message_offset &&
trailer_offset == other.trailer_offset &&
frame_length == other.frame_length;
}
};
struct FrameHeader {
FrameType type;
BitSet<3> flags;
BitSet<2> flags;
uint32_t stream_id;
uint32_t header_length;
uint32_t message_length;
uint32_t message_padding;
uint32_t trailer_length;
// Parses a frame header from a buffer of 64 bytes. All 64 bytes are consumed.
// Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed.
static absl::StatusOr<FrameHeader> Parse(const uint8_t* data);
// Serializes a frame header into a buffer of 64 bytes.
// Serializes a frame header into a buffer of 24 bytes.
void Serialize(uint8_t* data) const;
// Compute frame sizes from the header.
FrameSizes ComputeFrameSizes() const;
uint32_t GetFrameLength() const;
bool operator==(const FrameHeader& h) const {
return type == h.type && flags == h.flags && stream_id == h.stream_id &&
header_length == h.header_length &&
message_length == h.message_length &&
message_padding == h.message_padding &&
trailer_length == h.trailer_length;
}
};

@ -44,10 +44,10 @@ template <typename T>
void AssertRoundTrips(const T& input, FrameType expected_frame_type) {
HPackCompressor hpack_compressor;
auto serialized = input.Serialize(&hpack_compressor);
GPR_ASSERT(serialized.Length() >= 64);
GPR_ASSERT(serialized.Length() % 64 == 0);
uint8_t header_bytes[64];
serialized.MoveFirstNBytesIntoBuffer(64, header_bytes);
GPR_ASSERT(serialized.Length() >=
24); // Initial output buffer size is 64 byte.
uint8_t header_bytes[24];
serialized.MoveFirstNBytesIntoBuffer(24, header_bytes);
auto header = FrameHeader::Parse(header_bytes);
GPR_ASSERT(header.ok());
GPR_ASSERT(header->type == expected_frame_type);
@ -76,11 +76,11 @@ int Run(const uint8_t* data, size_t size) {
const bool is_server = (data[0] & 1) != 0;
size--;
data++;
if (size < 64) return 0;
if (size < 24) return 0;
auto r = FrameHeader::Parse(data);
if (!r.ok()) return 0;
size -= 64;
data += 64;
size -= 24;
data += 24;
MemoryAllocator memory_allocator = MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));
auto arena = MakeScopedArena(1024, &memory_allocator);

@ -23,12 +23,12 @@
bool squelch = false;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
if (size != 64) return 0;
if (size != 24) return 0;
auto r = grpc_core::chaotic_good::FrameHeader::Parse(data);
if (!r.ok()) return 0;
uint8_t reserialized[64];
uint8_t reserialized[24];
r->Serialize(reserialized);
// If it parses, we insist that the bytes reserialize to the same thing.
if (memcmp(data, reserialized, 64) != 0) abort();
if (memcmp(data, reserialized, 24) != 0) abort();
return 0;
}

@ -26,88 +26,71 @@ namespace chaotic_good {
namespace {
std::vector<uint8_t> Serialize(FrameHeader h) {
uint8_t buffer[64];
uint8_t buffer[24];
h.Serialize(buffer);
return std::vector<uint8_t>(buffer, buffer + 64);
return std::vector<uint8_t>(buffer, buffer + 24);
}
absl::StatusOr<FrameHeader> Deserialize(std::vector<uint8_t> data) {
if (data.size() != 64) return absl::InvalidArgumentError("bad length");
if (data.size() != 24) return absl::InvalidArgumentError("bad length");
return FrameHeader::Parse(data.data());
}
TEST(FrameHeaderTest, SimpleSerialize) {
EXPECT_EQ(
Serialize(FrameHeader{FrameType::kCancel, BitSet<3>::FromInt(0),
0x01020304, 0x05060708, 0x090a0b0c, 0x0d0e0f10}),
std::vector<uint8_t>({0x81, 0, 0, 0, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x10, 0x0f, 0x0e, 0x0d, // trailer_length
// padding
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0}));
EXPECT_EQ(Serialize(FrameHeader{FrameType::kCancel, BitSet<2>::FromInt(0),
0x01020304, 0x05060708, 0x090a0b0c,
0x00000034, 0x0d0e0f10}),
std::vector<uint8_t>({
0x81, 0, 0, 0, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x34, 0x00, 0x00, 0x00, // mesage_padding
0x10, 0x0f, 0x0e, 0x0d // trailer_length
}));
}
TEST(FrameHeaderTest, SimpleDeserialize) {
EXPECT_EQ(
Deserialize(std::vector<uint8_t>(
{0x81, 0, 0, 0, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x10, 0x0f, 0x0e, 0x0d, // trailer_length
// padding
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})),
absl::StatusOr<FrameHeader>(
FrameHeader{FrameType::kCancel, BitSet<3>::FromInt(0), 0x01020304,
0x05060708, 0x090a0b0c, 0x0d0e0f10}));
EXPECT_EQ(Deserialize(std::vector<uint8_t>(
{0x81, 88, 88, 88, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x10, 0x0f, 0x0e, 0x0d, // trailer_length
// garbage padding
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0}))
EXPECT_EQ(Deserialize(std::vector<uint8_t>({
0x81, 0, 0, 0, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x34, 0x00, 0x00, 0x00, // mesage_padding
0x10, 0x0f, 0x0e, 0x0d // trailer_length
})),
absl::StatusOr<FrameHeader>(FrameHeader{
FrameType::kCancel, BitSet<2>::FromInt(0), 0x01020304,
0x05060708, 0x090a0b0c, 0x00000034, 0x0d0e0f10}));
EXPECT_EQ(Deserialize(std::vector<uint8_t>({
0x81, 88, 88, 88, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x34, 0x00, 0x00, 0x00, // mesage_padding
0x10, 0x0f, 0x0e, 0x0d // trailer_length
}))
.status(),
absl::InvalidArgumentError("Invalid flags"));
EXPECT_EQ(Deserialize(std::vector<uint8_t>(
{0x81, 0, 0, 0, // type, flags
0x04, 0x03, 0x02, 0x01, // stream_id
0x08, 0x07, 0x06, 0x05, // header_length
0x0c, 0x0b, 0x0a, 0x09, // message_length
0x10, 0x0f, 0x0e, 0x0d, // trailer_length
// garbage padding
0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0}))
.status(),
absl::InvalidArgumentError("Invalid padding"));
}
TEST(FrameHeaderTest, ComputeFrameSizes) {
EXPECT_EQ(
(FrameHeader{FrameType::kFragment, BitSet<3>::FromInt(7), 1, 0, 0, 0})
.ComputeFrameSizes(),
(FrameSizes{0, 0, 0}));
TEST(FrameHeaderTest, GetFrameLength) {
EXPECT_EQ(
(FrameHeader{FrameType::kFragment, BitSet<3>::FromInt(7), 1, 14, 0, 0})
.ComputeFrameSizes(),
(FrameSizes{64, 64, 64}));
(FrameHeader{FrameType::kFragment, BitSet<2>::FromInt(3), 1, 0, 0, 0, 0})
.GetFrameLength(),
0);
EXPECT_EQ(
(FrameHeader{FrameType::kFragment, BitSet<3>::FromInt(7), 1, 0, 14, 0})
.ComputeFrameSizes(),
(FrameSizes{0, 64, 64}));
(FrameHeader{FrameType::kFragment, BitSet<2>::FromInt(3), 1, 14, 0, 0, 0})
.GetFrameLength(),
14);
EXPECT_EQ((FrameHeader{FrameType::kFragment, BitSet<2>::FromInt(3), 1, 0, 14,
50, 0})
.GetFrameLength(),
0);
EXPECT_EQ(
(FrameHeader{FrameType::kFragment, BitSet<3>::FromInt(7), 1, 0, 0, 14})
.ComputeFrameSizes(),
(FrameSizes{0, 0, 64}));
(FrameHeader{FrameType::kFragment, BitSet<2>::FromInt(3), 1, 0, 0, 0, 14})
.GetFrameLength(),
14);
}
} // namespace

@ -28,10 +28,9 @@ template <typename T>
void AssertRoundTrips(const T input, FrameType expected_frame_type) {
HPackCompressor hpack_compressor;
auto serialized = input.Serialize(&hpack_compressor);
EXPECT_GE(serialized.Length(), 64);
EXPECT_EQ(serialized.Length() % 64, 0);
uint8_t header_bytes[64];
serialized.MoveFirstNBytesIntoBuffer(64, header_bytes);
EXPECT_GE(serialized.Length(), 24);
uint8_t header_bytes[24];
serialized.MoveFirstNBytesIntoBuffer(24, header_bytes);
auto header = FrameHeader::Parse(header_bytes);
EXPECT_TRUE(header.ok()) << header.status();
EXPECT_EQ(header->type, expected_frame_type);

Loading…
Cancel
Save