diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h index cc5de5cd03a..161cdc86bfc 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h @@ -120,7 +120,8 @@ class ChaoticGoodTransport : public RefCounted { GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD: DeserializeFrame " << (s.ok() ? frame.ToString() : s.ToString()); - return s; + if (s.ok()) return std::move(frame); + return std::move(s); } private: diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index b1cbfbb9ed2..87fc0506fb5 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -114,7 +114,7 @@ auto ChaoticGoodClientTransport::DispatchFrame(ChaoticGoodTransport* transport, const FrameHeader& header, SliceBuffer payload) { return TrySeq( - [transport, &header, &payload]() { + [transport, header, payload = std::move(payload)]() mutable { return transport->DeserializeFrame(header, std::move(payload)); }, [this](T frame) { diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index 9732fe3d758..175d3160faf 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -57,6 +57,7 @@ void AddFrame(FrameType frame_type, uint32_t stream_id, const SliceBuffer& paylo header.stream_id = stream_id; header.payload_length = payload.Length(); header.payload_connection_id = header.payload_length > 1024 ? 1 : 0; + LOG(INFO) << "Serialize header: " << header; header.Serialize(out->control.AddTiny(FrameHeader::kFrameHeaderSize)); if (header.payload_connection_id == 0) { out->control.Append(payload); @@ -79,6 +80,7 @@ void AddInlineFrame(FrameType frame_type, uint32_t stream_id, F gen_frame, header.stream_id = stream_id; header.payload_length = size_after - size_before; header.payload_connection_id = 0; + LOG(INFO) << "Serialize header: " << header; header.Serialize(const_cast( GRPC_SLICE_START_PTR(out->control.c_slice_at(header_slice)))); } @@ -117,7 +119,7 @@ absl::Status SettingsFrame::Deserialize(const DeserializeContext& ctx, SliceBuffer payload) { CHECK_EQ(header.type, FrameType::kSettings); if (header.stream_id != 0) { - return absl::InvalidArgumentError("Expected stream id 0"); + return absl::InternalError("Expected stream id 0"); } return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, true, ctx.bitsrc, &headers); @@ -144,7 +146,7 @@ absl::Status ClientInitialMetadataFrame::Deserialize(const DeserializeContext& c SliceBuffer payload) { CHECK_EQ(header.type, FrameType::kClientInitialMetadata); if (header.stream_id == 0) { - return absl::InvalidArgumentError("Expected non-zero stream id"); + return absl::InternalError("Expected non-zero stream id"); } stream_id = header.stream_id; return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, true, @@ -155,7 +157,7 @@ void ClientInitialMetadataFrame::Serialize(const SerializeContext& ctx, BufferPair* out) const { CHECK_NE(stream_id, 0u); AddInlineFrame( - FrameType::kSettings, 0, + FrameType::kClientInitialMetadata, stream_id, [&ctx, this](SliceBuffer& out) { ctx.saw_encoding_errors |= !ctx.encoder->EncodeRawHeaders(*headers, out); }, @@ -168,12 +170,35 @@ std::string ClientInitialMetadataFrame::ToString() const { ", headers=", headers == nullptr ? "" : headers->DebugString(), "}"); } +absl::Status ClientEndOfStream::Deserialize(const DeserializeContext& ctx, + const FrameHeader& header, + SliceBuffer payload) { + CHECK_EQ(header.type, FrameType::kClientEndOfStream); + if (header.stream_id == 0) { + return absl::InternalError("Expected non-zero stream id"); + } + if (header.payload_length != 0) { + return absl::InternalError( + "Expected zero payload length on ClientEndOfStream"); + } + stream_id = header.stream_id; + return absl::OkStatus(); +} + +void ClientEndOfStream::Serialize(const SerializeContext& ctx, + BufferPair* out) const { + AddInlineFrame( + FrameType::kClientEndOfStream, stream_id, [](SliceBuffer&) {}, out); +} + +std::string ClientEndOfStream::ToString() const { return "ClientEndOfStream"; } + absl::Status MessageFrame::Deserialize(const DeserializeContext& ctx, const FrameHeader& header, SliceBuffer payload) { CHECK_EQ(header.type, FrameType::kMessage); if (header.stream_id == 0) { - return absl::InvalidArgumentError("Expected non-zero stream id"); + return absl::InternalError("Expected non-zero stream id"); } stream_id = header.stream_id; message = Arena::MakePooled(std::move(payload), 0); @@ -199,7 +224,7 @@ absl::Status ServerInitialMetadataFrame::Deserialize(const DeserializeContext& c SliceBuffer payload) { CHECK_EQ(header.type, FrameType::kServerInitialMetadata); if (header.stream_id == 0) { - return absl::InvalidArgumentError("Expected non-zero stream id"); + return absl::InternalError("Expected non-zero stream id"); } stream_id = header.stream_id; return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, true, false, @@ -228,7 +253,7 @@ absl::Status ServerTrailingMetadataFrame::Deserialize(const DeserializeContext& SliceBuffer payload) { CHECK_EQ(header.type, FrameType::kServerTrailingMetadata); if (header.stream_id == 0) { - return absl::InvalidArgumentError("Expected non-zero stream id"); + return absl::InternalError("Expected non-zero stream id"); } stream_id = header.stream_id; return ReadMetadata(ctx.parser, std::move(payload), header.stream_id, false, false, @@ -260,12 +285,12 @@ absl::Status CancelFrame::Deserialize(const DeserializeContext& ctx, // Ensure the stream_id is non-zero if (header.stream_id == 0) { - return absl::InvalidArgumentError("Expected non-zero stream id"); + return absl::InternalError("Expected non-zero stream id"); } // Ensure there is no payload if (payload.Length() != 0) { - return absl::InvalidArgumentError("Unexpected payload for Cancel frame"); + return absl::InternalError("Unexpected payload for Cancel frame"); } // Set the stream_id diff --git a/src/core/ext/transport/chaotic_good/frame_header.h b/src/core/ext/transport/chaotic_good/frame_header.h index 5ab4a3d685e..02068212af6 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.h +++ b/src/core/ext/transport/chaotic_good/frame_header.h @@ -28,22 +28,22 @@ namespace chaotic_good { enum class FrameType : uint8_t { kSettings = 0x00, - kPadding = 0x10, kClientInitialMetadata = 0x80, - kMessage = 0x81, - kServerInitialMetadata = 0x82, - kServerTrailingMetadata = 0x83, - kCancel = 0x84, + kClientEndOfStream = 0x81, + kServerInitialMetadata = 0x91, + kServerTrailingMetadata = 0x92, + kMessage = 0xa0, + kCancel = 0xff, }; inline std::ostream& operator<<(std::ostream& out, FrameType type) { switch (type) { case FrameType::kSettings: return out << "Settings"; - case FrameType::kPadding: - return out << "Padding"; case FrameType::kClientInitialMetadata: return out << "ClientInitialMetadata"; + case FrameType::kClientEndOfStream: + return out << "ClientEndOfStream"; case FrameType::kMessage: return out << "Message"; case FrameType::kServerInitialMetadata: diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 3481230ee10..35ea02e8e59 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -103,14 +103,17 @@ ChannelArgs MakeChannelArgs() { TEST_F(TransportTest, AddOneStream) { MockPromiseEndpoint control_endpoint(1000); MockPromiseEndpoint data_endpoint(1001); + static const std::string many_as(1024 * 1024, 'a'); control_endpoint.ExpectRead( - {SerializedFrameHeader(FrameType::kFragment, 7, 1, 26, 8, 56, 15), + {SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1, 26), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, sizeof(kPathDemoServiceStep)), + SerializedFrameHeader(FrameType::kMessage, 1, 1, many_as.length()), + SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1, 15), EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))}, event_engine().get()); data_endpoint.ExpectRead( - {EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr); + {EventEngineSlice::FromCopiedString(many_as), Zeros(56)}, nullptr); EXPECT_CALL(*control_endpoint.endpoint, Read) .InSequence(control_endpoint.read_sequence) .WillOnce(Return(false)); @@ -122,18 +125,17 @@ TEST_F(TransportTest, AddOneStream) { StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 1, 1, - sizeof(kPathDemoServiceStep), 0, 0, 0), + {SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1, + sizeof(kPathDemoServiceStep)), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, sizeof(kPathDemoServiceStep))}, nullptr); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)}, + {SerializedFrameHeader(FrameType::kMessage, 0, 1, 1), + EventEngineSlice::FromCopiedString("0")}, nullptr); - data_endpoint.ExpectWrite( - {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); + {SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)}, nullptr); transport->StartCall(call.handler.StartCall()); call.initiator.SpawnGuarded("test-send", [initiator = call.initiator]() mutable { @@ -157,7 +159,7 @@ TEST_F(TransportTest, AddOneStream) { [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), many_as); return Empty{}; }, [initiator]() mutable { return initiator.PullMessage(); }, @@ -184,18 +186,17 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { MockPromiseEndpoint control_endpoint(1000); MockPromiseEndpoint data_endpoint(1001); control_endpoint.ExpectRead( - {SerializedFrameHeader(FrameType::kFragment, 3, 1, 26, 8, 56, 0), + // 3, 1, 26, 8, 56, 0 + {SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1, 26), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, - sizeof(kPathDemoServiceStep))}, - event_engine().get()); - control_endpoint.ExpectRead( - {SerializedFrameHeader(FrameType::kFragment, 6, 1, 0, 8, 56, 15), + sizeof(kPathDemoServiceStep)), + SerializedFrameHeader(FrameType::kMessage, 0, 1, 8), + EventEngineSlice::FromCopiedString("12345678"), + SerializedFrameHeader(FrameType::kMessage, 0, 1, 8), + EventEngineSlice::FromCopiedString("87654321"), + SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1, 15), EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))}, event_engine().get()); - data_endpoint.ExpectRead( - {EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr); - data_endpoint.ExpectRead( - {EventEngineSlice::FromCopiedString("87654321"), Zeros(56)}, nullptr); EXPECT_CALL(*control_endpoint.endpoint, Read) .InSequence(control_endpoint.read_sequence) .WillOnce(Return(false)); @@ -207,23 +208,21 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 1, 1, - sizeof(kPathDemoServiceStep), 0, 0, 0), + {SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1, + sizeof(kPathDemoServiceStep)), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, sizeof(kPathDemoServiceStep))}, nullptr); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)}, + {SerializedFrameHeader(FrameType::kMessage, 0, 1, 1), + EventEngineSlice::FromCopiedString("0")}, nullptr); - data_endpoint.ExpectWrite( - {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 1, 63, 0)}, + {SerializedFrameHeader(FrameType::kMessage, 0, 1, 1), + EventEngineSlice::FromCopiedString("1")}, nullptr); - data_endpoint.ExpectWrite( - {EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); + {SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)}, nullptr); transport->StartCall(call.handler.StartCall()); call.initiator.SpawnGuarded("test-send", [initiator = call.initiator]() mutable { diff --git a/test/core/transport/chaotic_good/transport_test.cc b/test/core/transport/chaotic_good/transport_test.cc index b43098fa7c7..2a5522b2cc9 100644 --- a/test/core/transport/chaotic_good/transport_test.cc +++ b/test/core/transport/chaotic_good/transport_test.cc @@ -19,34 +19,24 @@ namespace chaotic_good { namespace testing { grpc_event_engine::experimental::Slice SerializedFrameHeader( - FrameType type, uint8_t flags, uint32_t stream_id, uint32_t header_length, - uint32_t message_length, uint32_t message_padding, - uint32_t trailer_length) { - uint8_t buffer[24] = {static_cast(type), - flags, - 0, - 0, - static_cast(stream_id), - static_cast(stream_id >> 8), - static_cast(stream_id >> 16), - static_cast(stream_id >> 24), - static_cast(header_length), - static_cast(header_length >> 8), - static_cast(header_length >> 16), - static_cast(header_length >> 24), - static_cast(message_length), - static_cast(message_length >> 8), - static_cast(message_length >> 16), - static_cast(message_length >> 24), - static_cast(message_padding), - static_cast(message_padding >> 8), - static_cast(message_padding >> 16), - static_cast(message_padding >> 24), - static_cast(trailer_length), - static_cast(trailer_length >> 8), - static_cast(trailer_length >> 16), - static_cast(trailer_length >> 24)}; - return grpc_event_engine::experimental::Slice::FromCopiedBuffer(buffer, 24); + FrameType type, uint16_t payload_connection_id, uint32_t stream_id, + uint32_t payload_length) { + uint8_t buffer[FrameHeader::kFrameHeaderSize] = { + static_cast(payload_connection_id), + static_cast(payload_connection_id >> 16), + static_cast(type), + 0, + static_cast(stream_id), + static_cast(stream_id >> 8), + static_cast(stream_id >> 16), + static_cast(stream_id >> 24), + static_cast(payload_length), + static_cast(payload_length >> 8), + static_cast(payload_length >> 16), + static_cast(payload_length >> 24), + }; + return grpc_event_engine::experimental::Slice::FromCopiedBuffer( + buffer, sizeof(buffer)); } grpc_event_engine::experimental::Slice Zeros(uint32_t length) { diff --git a/test/core/transport/chaotic_good/transport_test.h b/test/core/transport/chaotic_good/transport_test.h index d6269c6e984..1d70051b6dc 100644 --- a/test/core/transport/chaotic_good/transport_test.h +++ b/test/core/transport/chaotic_good/transport_test.h @@ -71,8 +71,8 @@ class TransportTest : public ::testing::Test { }; grpc_event_engine::experimental::Slice SerializedFrameHeader( - FrameType type, uint8_t flags, uint32_t stream_id, uint32_t header_length, - uint32_t message_length, uint32_t message_padding, uint32_t trailer_length); + FrameType type, uint16_t payload_connection_id, uint32_t stream_id, + uint32_t payload_length); grpc_event_engine::experimental::Slice Zeros(uint32_t length);