pull/37765/head
Craig Tiller 4 months ago
parent 7b38350b75
commit 383068f94e
  1. 3
      src/core/ext/transport/chaotic_good/chaotic_good_transport.h
  2. 2
      src/core/ext/transport/chaotic_good/client_transport.cc
  3. 41
      src/core/ext/transport/chaotic_good/frame.cc
  4. 14
      src/core/ext/transport/chaotic_good/frame_header.h
  5. 53
      test/core/transport/chaotic_good/client_transport_test.cc
  6. 46
      test/core/transport/chaotic_good/transport_test.cc
  7. 4
      test/core/transport/chaotic_good/transport_test.h

@ -120,7 +120,8 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: DeserializeFrame "
<< (s.ok() ? frame.ToString() : s.ToString());
return s;
if (s.ok()) return std::move(frame);
return std::move(s);
}
private:

@ -114,7 +114,7 @@ auto ChaoticGoodClientTransport::DispatchFrame(ChaoticGoodTransport* transport,
const FrameHeader& header,
SliceBuffer payload) {
return TrySeq(
[transport, &header, &payload]() {
[transport, header, payload = std::move(payload)]() mutable {
return transport->DeserializeFrame<T>(header, std::move(payload));
},
[this](T frame) {

@ -57,6 +57,7 @@ void AddFrame(FrameType frame_type, uint32_t stream_id, const SliceBuffer& paylo
header.stream_id = stream_id;
header.payload_length = payload.Length();
header.payload_connection_id = header.payload_length > 1024 ? 1 : 0;
LOG(INFO) << "Serialize header: " << header;
header.Serialize(out->control.AddTiny(FrameHeader::kFrameHeaderSize));
if (header.payload_connection_id == 0) {
out->control.Append(payload);
@ -79,6 +80,7 @@ void AddInlineFrame(FrameType frame_type, uint32_t stream_id, F gen_frame,
header.stream_id = stream_id;
header.payload_length = size_after - size_before;
header.payload_connection_id = 0;
LOG(INFO) << "Serialize header: " << header;
header.Serialize(const_cast<uint8_t*>(
GRPC_SLICE_START_PTR(out->control.c_slice_at(header_slice))));
}
@ -117,7 +119,7 @@ absl::Status SettingsFrame::Deserialize(const DeserializeContext& ctx,
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kSettings);
if (header.stream_id != 0) {
return absl::InvalidArgumentError("Expected stream id 0");
return absl::InternalError("Expected stream id 0");
}
return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, true,
ctx.bitsrc, &headers);
@ -144,7 +146,7 @@ absl::Status ClientInitialMetadataFrame::Deserialize(const DeserializeContext& c
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kClientInitialMetadata);
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
return absl::InternalError("Expected non-zero stream id");
}
stream_id = header.stream_id;
return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, true,
@ -155,7 +157,7 @@ void ClientInitialMetadataFrame::Serialize(const SerializeContext& ctx,
BufferPair* out) const {
CHECK_NE(stream_id, 0u);
AddInlineFrame(
FrameType::kSettings, 0,
FrameType::kClientInitialMetadata, stream_id,
[&ctx, this](SliceBuffer& out) {
ctx.saw_encoding_errors |= !ctx.encoder->EncodeRawHeaders(*headers, out);
},
@ -168,12 +170,35 @@ std::string ClientInitialMetadataFrame::ToString() const {
", headers=", headers == nullptr ? "" : headers->DebugString(), "}");
}
absl::Status ClientEndOfStream::Deserialize(const DeserializeContext& ctx,
const FrameHeader& header,
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kClientEndOfStream);
if (header.stream_id == 0) {
return absl::InternalError("Expected non-zero stream id");
}
if (header.payload_length != 0) {
return absl::InternalError(
"Expected zero payload length on ClientEndOfStream");
}
stream_id = header.stream_id;
return absl::OkStatus();
}
void ClientEndOfStream::Serialize(const SerializeContext& ctx,
BufferPair* out) const {
AddInlineFrame(
FrameType::kClientEndOfStream, stream_id, [](SliceBuffer&) {}, out);
}
std::string ClientEndOfStream::ToString() const { return "ClientEndOfStream"; }
absl::Status MessageFrame::Deserialize(const DeserializeContext& ctx,
const FrameHeader& header,
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kMessage);
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
return absl::InternalError("Expected non-zero stream id");
}
stream_id = header.stream_id;
message = Arena::MakePooled<Message>(std::move(payload), 0);
@ -199,7 +224,7 @@ absl::Status ServerInitialMetadataFrame::Deserialize(const DeserializeContext& c
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kServerInitialMetadata);
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
return absl::InternalError("Expected non-zero stream id");
}
stream_id = header.stream_id;
return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, false,
@ -228,7 +253,7 @@ absl::Status ServerTrailingMetadataFrame::Deserialize(const DeserializeContext&
SliceBuffer payload) {
CHECK_EQ(header.type, FrameType::kServerTrailingMetadata);
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
return absl::InternalError("Expected non-zero stream id");
}
stream_id = header.stream_id;
return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, false, false,
@ -260,12 +285,12 @@ absl::Status CancelFrame::Deserialize(const DeserializeContext& ctx,
// Ensure the stream_id is non-zero
if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id");
return absl::InternalError("Expected non-zero stream id");
}
// Ensure there is no payload
if (payload.Length() != 0) {
return absl::InvalidArgumentError("Unexpected payload for Cancel frame");
return absl::InternalError("Unexpected payload for Cancel frame");
}
// Set the stream_id

@ -28,22 +28,22 @@ namespace chaotic_good {
enum class FrameType : uint8_t {
kSettings = 0x00,
kPadding = 0x10,
kClientInitialMetadata = 0x80,
kMessage = 0x81,
kServerInitialMetadata = 0x82,
kServerTrailingMetadata = 0x83,
kCancel = 0x84,
kClientEndOfStream = 0x81,
kServerInitialMetadata = 0x91,
kServerTrailingMetadata = 0x92,
kMessage = 0xa0,
kCancel = 0xff,
};
inline std::ostream& operator<<(std::ostream& out, FrameType type) {
switch (type) {
case FrameType::kSettings:
return out << "Settings";
case FrameType::kPadding:
return out << "Padding";
case FrameType::kClientInitialMetadata:
return out << "ClientInitialMetadata";
case FrameType::kClientEndOfStream:
return out << "ClientEndOfStream";
case FrameType::kMessage:
return out << "Message";
case FrameType::kServerInitialMetadata:

@ -103,14 +103,17 @@ ChannelArgs MakeChannelArgs() {
TEST_F(TransportTest, AddOneStream) {
MockPromiseEndpoint control_endpoint(1000);
MockPromiseEndpoint data_endpoint(1001);
static const std::string many_as(1024 * 1024, 'a');
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kFragment, 7, 1, 26, 8, 56, 15),
{SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1, 26),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep)),
SerializedFrameHeader(FrameType::kMessage, 1, 1, many_as.length()),
SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1, 15),
EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))},
event_engine().get());
data_endpoint.ExpectRead(
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
{EventEngineSlice::FromCopiedString(many_as), Zeros(56)}, nullptr);
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
.WillOnce(Return(false));
@ -122,18 +125,17 @@ TEST_F(TransportTest, AddOneStream) {
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 1, 1,
sizeof(kPathDemoServiceStep), 0, 0, 0),
{SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1,
sizeof(kPathDemoServiceStep)),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
{SerializedFrameHeader(FrameType::kMessage, 0, 1, 1),
EventEngineSlice::FromCopiedString("0")},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
{SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)}, nullptr);
transport->StartCall(call.handler.StartCall());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
@ -157,7 +159,7 @@ TEST_F(TransportTest, AddOneStream) {
[](ServerToClientNextMessage msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678");
EXPECT_EQ(msg.value().payload()->JoinIntoString(), many_as);
return Empty{};
},
[initiator]() mutable { return initiator.PullMessage(); },
@ -184,18 +186,17 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
MockPromiseEndpoint control_endpoint(1000);
MockPromiseEndpoint data_endpoint(1001);
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kFragment, 3, 1, 26, 8, 56, 0),
// 3, 1, 26, 8, 56, 0
{SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1, 26),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
event_engine().get());
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kFragment, 6, 1, 0, 8, 56, 15),
sizeof(kPathDemoServiceStep)),
SerializedFrameHeader(FrameType::kMessage, 0, 1, 8),
EventEngineSlice::FromCopiedString("12345678"),
SerializedFrameHeader(FrameType::kMessage, 0, 1, 8),
EventEngineSlice::FromCopiedString("87654321"),
SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1, 15),
EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))},
event_engine().get());
data_endpoint.ExpectRead(
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
data_endpoint.ExpectRead(
{EventEngineSlice::FromCopiedString("87654321"), Zeros(56)}, nullptr);
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
.WillOnce(Return(false));
@ -207,23 +208,21 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 1, 1,
sizeof(kPathDemoServiceStep), 0, 0, 0),
{SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1,
sizeof(kPathDemoServiceStep)),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
sizeof(kPathDemoServiceStep))},
nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
{SerializedFrameHeader(FrameType::kMessage, 0, 1, 1),
EventEngineSlice::FromCopiedString("0")},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)},
{SerializedFrameHeader(FrameType::kMessage, 0, 1, 1),
EventEngineSlice::FromCopiedString("1")},
nullptr);
data_endpoint.ExpectWrite(
{EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
{SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)}, nullptr);
transport->StartCall(call.handler.StartCall());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {

@ -19,34 +19,24 @@ namespace chaotic_good {
namespace testing {
grpc_event_engine::experimental::Slice SerializedFrameHeader(
FrameType type, uint8_t flags, uint32_t stream_id, uint32_t header_length,
uint32_t message_length, uint32_t message_padding,
uint32_t trailer_length) {
uint8_t buffer[24] = {static_cast<uint8_t>(type),
flags,
0,
0,
static_cast<uint8_t>(stream_id),
static_cast<uint8_t>(stream_id >> 8),
static_cast<uint8_t>(stream_id >> 16),
static_cast<uint8_t>(stream_id >> 24),
static_cast<uint8_t>(header_length),
static_cast<uint8_t>(header_length >> 8),
static_cast<uint8_t>(header_length >> 16),
static_cast<uint8_t>(header_length >> 24),
static_cast<uint8_t>(message_length),
static_cast<uint8_t>(message_length >> 8),
static_cast<uint8_t>(message_length >> 16),
static_cast<uint8_t>(message_length >> 24),
static_cast<uint8_t>(message_padding),
static_cast<uint8_t>(message_padding >> 8),
static_cast<uint8_t>(message_padding >> 16),
static_cast<uint8_t>(message_padding >> 24),
static_cast<uint8_t>(trailer_length),
static_cast<uint8_t>(trailer_length >> 8),
static_cast<uint8_t>(trailer_length >> 16),
static_cast<uint8_t>(trailer_length >> 24)};
return grpc_event_engine::experimental::Slice::FromCopiedBuffer(buffer, 24);
FrameType type, uint16_t payload_connection_id, uint32_t stream_id,
uint32_t payload_length) {
uint8_t buffer[FrameHeader::kFrameHeaderSize] = {
static_cast<uint8_t>(payload_connection_id),
static_cast<uint8_t>(payload_connection_id >> 16),
static_cast<uint8_t>(type),
0,
static_cast<uint8_t>(stream_id),
static_cast<uint8_t>(stream_id >> 8),
static_cast<uint8_t>(stream_id >> 16),
static_cast<uint8_t>(stream_id >> 24),
static_cast<uint8_t>(payload_length),
static_cast<uint8_t>(payload_length >> 8),
static_cast<uint8_t>(payload_length >> 16),
static_cast<uint8_t>(payload_length >> 24),
};
return grpc_event_engine::experimental::Slice::FromCopiedBuffer(
buffer, sizeof(buffer));
}
grpc_event_engine::experimental::Slice Zeros(uint32_t length) {

@ -71,8 +71,8 @@ class TransportTest : public ::testing::Test {
};
grpc_event_engine::experimental::Slice SerializedFrameHeader(
FrameType type, uint8_t flags, uint32_t stream_id, uint32_t header_length,
uint32_t message_length, uint32_t message_padding, uint32_t trailer_length);
FrameType type, uint16_t payload_connection_id, uint32_t stream_id,
uint32_t payload_length);
grpc_event_engine::experimental::Slice Zeros(uint32_t length);

Loading…
Cancel
Save