diff --git a/CMakeLists.txt b/CMakeLists.txt index 90f0b5d3a4a..bef30891364 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12503,8 +12503,8 @@ target_include_directories(frame_header_test target_link_libraries(frame_header_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - absl::status absl::statusor + gpr ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 1aeb1ad9026..bff13ebb1ee 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9610,15 +9610,14 @@ targets: language: c++ headers: - src/core/ext/transport/chaotic_good/frame_header.h - - src/core/lib/gpr/useful.h - src/core/lib/gprpp/bitset.h src: - src/core/ext/transport/chaotic_good/frame_header.cc - test/core/transport/chaotic_good/frame_header_test.cc deps: - gtest - - absl/status:status - absl/status:statusor + - gpr - name: fuzzing_event_engine_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index e5ad5240aea..bbdba17e486 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6214,6 +6214,7 @@ grpc_cc_library( ], deps = [ "bitset", + "//:gpr", "//:gpr_platform", ], ) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 686a024827b..e958b03a7d8 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -16,6 +16,7 @@ #include "src/core/ext/transport/chaotic_good/client_transport.h" +#include #include #include #include @@ -77,8 +78,7 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_.Append( frame->Serialize(hpack_compressor_.get())); if (frame->message != nullptr) { - std::string message_padding( - frame->frame_header.message_padding, '0'); + std::string message_padding(frame->message_padding, '0'); Slice slice(grpc_slice_from_cpp_string(message_padding)); // Append message padding to data_endpoint_buffer. data_endpoint_write_buffer_.Append(std::move(slice)); @@ -157,11 +157,9 @@ ClientTransport::ClientTransport( // Move message into frame. frame.message = arena_->MakePooled( std::move(data_endpoint_read_buffer_), 0); - auto stream_id = frame.frame_header.stream_id; - { - MutexLock lock(&mu_); - return stream_map_[stream_id]->Push(ServerFrame(std::move(frame))); - } + MutexLock lock(&mu_); + const uint32_t stream_id = frame_header_->stream_id; + return stream_map_[stream_id]->Push(ServerFrame(std::move(frame))); }, // Check if send frame to corresponding stream successfully. [](bool ret) -> LoopCtl { diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 4972630cf9c..86f9072fcfc 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -107,40 +107,37 @@ class ClientTransport { return TrySeq( TryJoin( // Continuously send client frame with client to server messages. - ForEach( - std::move(*call_args.client_to_server_messages), - [stream_id, initial_frame = true, - client_initial_metadata = - std::move(call_args.client_initial_metadata), - outgoing_frames = outgoing_frames_.MakeSender(), - this](MessageHandle result) mutable { - ClientFragmentFrame frame; - // Construct frame header (flags, header_length and - // trailer_length will be added in serialization). - uint32_t message_length = result->payload()->Length(); - uint32_t message_padding = message_length % aligned_bytes; - frame.frame_header = FrameHeader{ - FrameType::kFragment, {}, stream_id, 0, message_length, - message_padding, 0}; - frame.message = std::move(result); - if (initial_frame) { - // Send initial frame with client intial metadata. - frame.headers = std::move(client_initial_metadata); - initial_frame = false; - } - return TrySeq( - outgoing_frames.Send(ClientFrame(std::move(frame))), - [](bool success) -> absl::Status { - if (!success) { - // TODO(ladynana): propagate the actual error message - // from EventEngine. - return absl::UnavailableError( - "Transport closed due to endpoint write/read " - "failed."); - } - return absl::OkStatus(); - }); - }), + ForEach(std::move(*call_args.client_to_server_messages), + [stream_id, initial_frame = true, + client_initial_metadata = + std::move(call_args.client_initial_metadata), + outgoing_frames = outgoing_frames_.MakeSender(), + this](MessageHandle result) mutable { + ClientFragmentFrame frame; + // Construct frame header (flags, header_length and + // trailer_length will be added in serialization). + uint32_t message_length = result->payload()->Length(); + frame.stream_id = stream_id; + frame.message_padding = message_length % aligned_bytes; + frame.message = std::move(result); + if (initial_frame) { + // Send initial frame with client intial metadata. + frame.headers = std::move(client_initial_metadata); + initial_frame = false; + } + return TrySeq( + outgoing_frames.Send(ClientFrame(std::move(frame))), + [](bool success) -> absl::Status { + if (!success) { + // TODO(ladynana): propagate the actual error + // message from EventEngine. + return absl::UnavailableError( + "Transport closed due to endpoint write/read " + "failed."); + } + return absl::OkStatus(); + }); + }), // Continuously receive server frames from endpoints and save // results to call_args. Loop([server_initial_metadata = call_args.server_initial_metadata, diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index 2f7f938e7c3..f49fa4c4f3b 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -50,12 +50,13 @@ const NoDestruct kZeroSlice{[] { class FrameSerializer { public: - explicit FrameSerializer(FrameHeader header) : header_(header) { + explicit FrameSerializer(FrameType frame_type, uint32_t stream_id, + uint32_t message_padding) { output_.AppendIndexed(kZeroSlice->Copy()); - // Initialize header flags, header_length, trailer_length to 0. + header_.type = frame_type; + header_.stream_id = stream_id; + header_.message_padding = message_padding; header_.flags.SetAll(false); - header_.header_length = 0; - header_.trailer_length = 0; } // If called, must be called before AddTrailers, Finish. SliceBuffer& AddHeaders() { @@ -173,11 +174,12 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header, } SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const { - FrameSerializer serializer( - FrameHeader{FrameType::kSettings, {}, 0, 0, 0, 0, 0}); + FrameSerializer serializer(FrameType::kSettings, 0, 0); return serializer.Finish(); } +std::string SettingsFrame::ToString() const { return "SettingsFrame{}"; } + absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, const FrameHeader& header, absl::BitGenRef bitsrc, @@ -185,7 +187,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, if (header.stream_id == 0) { return absl::InvalidArgumentError("Expected non-zero stream id"); } - frame_header = header; + stream_id = header.stream_id; + message_padding = header.message_padding; if (header.type != FrameType::kFragment) { return absl::InvalidArgumentError("Expected fragment frame"); } @@ -197,6 +200,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, if (r.value() != nullptr) { headers = std::move(r.value()); } + } else if (header.header_length != 0) { + return absl::InvalidArgumentError(absl::StrCat( + "Unexpected non-zero header length", header.header_length)); } if (header.flags.is_set(1)) { if (header.trailer_length != 0) { @@ -210,8 +216,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, } SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const { - GPR_ASSERT(frame_header.stream_id != 0); - FrameSerializer serializer(frame_header); + GPR_ASSERT(stream_id != 0); + FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding); if (headers.get() != nullptr) { encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); } @@ -221,6 +227,16 @@ SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const { return serializer.Finish(); } +std::string ClientFragmentFrame::ToString() const { + return absl::StrCat( + "ClientFragmentFrame{stream_id=", stream_id, ", headers=", + headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr", + ", message=", + message.get() != nullptr ? message->DebugString().c_str() : "nullptr", + ", message_padding=", message_padding, ", end_of_stream=", end_of_stream, + "}"); +} + absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, const FrameHeader& header, absl::BitGenRef bitsrc, @@ -228,7 +244,8 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, if (header.stream_id == 0) { return absl::InvalidArgumentError("Expected non-zero stream id"); } - frame_header = header; + stream_id = header.stream_id; + message_padding = header.message_padding; FrameDeserializer deserializer(header, slice_buffer); if (header.flags.is_set(0)) { auto r = @@ -238,6 +255,9 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, if (r.value() != nullptr) { headers = std::move(r.value()); } + } else if (header.header_length != 0) { + return absl::InvalidArgumentError(absl::StrCat( + "Unexpected non-zero header length", header.header_length)); } if (header.flags.is_set(1)) { auto r = @@ -247,13 +267,16 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, if (r.value() != nullptr) { trailers = std::move(r.value()); } + } else if (header.trailer_length != 0) { + return absl::InvalidArgumentError(absl::StrCat( + "Unexpected non-zero trailer length", header.trailer_length)); } return deserializer.Finish(); } SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const { - GPR_ASSERT(frame_header.stream_id != 0); - FrameSerializer serializer(frame_header); + GPR_ASSERT(stream_id != 0); + FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding); if (headers.get() != nullptr) { encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); } @@ -263,6 +286,17 @@ SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const { return serializer.Finish(); } +std::string ServerFragmentFrame::ToString() const { + return absl::StrCat( + "ServerFragmentFrame{stream_id=", stream_id, ", headers=", + headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr", + ", message=", + message.get() != nullptr ? message->DebugString().c_str() : "nullptr", + ", message_padding=", message_padding, ", trailers=", + trailers.get() != nullptr ? trailers->DebugString().c_str() : "nullptr", + "}"); +} + absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header, absl::BitGenRef, SliceBuffer& slice_buffer) { @@ -282,10 +316,13 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header, SliceBuffer CancelFrame::Serialize(HPackCompressor*) const { GPR_ASSERT(stream_id != 0); - FrameSerializer serializer( - FrameHeader{FrameType::kCancel, {}, stream_id, 0, 0, 0, 0}); + FrameSerializer serializer(FrameType::kCancel, stream_id, 0); return serializer.Finish(); } +std::string CancelFrame::ToString() const { + return absl::StrCat("CancelFrame{stream_id=", stream_id, "}"); +} + } // namespace chaotic_good } // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 8e5031802e5..529c89570c7 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -43,6 +43,7 @@ class FrameInterface { absl::BitGenRef bitsrc, SliceBuffer& slice_buffer) = 0; virtual SliceBuffer Serialize(HPackCompressor* encoder) const = 0; + virtual std::string ToString() const = 0; protected: static bool EqVal(const Message& a, const Message& b) { @@ -67,6 +68,7 @@ struct SettingsFrame final : public FrameInterface { absl::BitGenRef bitsrc, SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; + std::string ToString() const override; bool operator==(const SettingsFrame&) const { return true; } }; @@ -76,15 +78,16 @@ struct ClientFragmentFrame final : public FrameInterface { absl::BitGenRef bitsrc, SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; + std::string ToString() const override; - FrameHeader frame_header; + uint32_t stream_id; ClientMetadataHandle headers; MessageHandle message; + uint32_t message_padding; bool end_of_stream = false; bool operator==(const ClientFragmentFrame& other) const { - return frame_header.stream_id == other.frame_header.stream_id && - EqHdl(headers, other.headers) && + return stream_id == other.stream_id && EqHdl(headers, other.headers) && end_of_stream == other.end_of_stream; } }; @@ -94,15 +97,17 @@ struct ServerFragmentFrame final : public FrameInterface { absl::BitGenRef bitsrc, SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; + std::string ToString() const override; - FrameHeader frame_header; + uint32_t stream_id; ServerMetadataHandle headers; MessageHandle message; + uint32_t message_padding; ServerMetadataHandle trailers; bool operator==(const ServerFragmentFrame& other) const { - return frame_header.stream_id == other.frame_header.stream_id && - EqHdl(headers, other.headers) && EqHdl(trailers, other.trailers); + return stream_id == other.stream_id && EqHdl(headers, other.headers) && + EqHdl(trailers, other.trailers); } }; @@ -111,6 +116,7 @@ struct CancelFrame final : public FrameInterface { absl::BitGenRef bitsrc, SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; + std::string ToString() const override; uint32_t stream_id; diff --git a/src/core/ext/transport/chaotic_good/frame_header.cc b/src/core/ext/transport/chaotic_good/frame_header.cc index 5e0e00d5f7d..e39d6a34b58 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.cc +++ b/src/core/ext/transport/chaotic_good/frame_header.cc @@ -20,6 +20,8 @@ #include "absl/status/status.h" +#include + namespace grpc_core { namespace chaotic_good { @@ -43,6 +45,8 @@ uint32_t ReadLittleEndianUint32(const uint8_t* data) { void FrameHeader::Serialize(uint8_t* data) const { WriteLittleEndianUint32( static_cast(type) | (flags.ToInt() << 8), data); + if (flags.is_set(0)) GPR_ASSERT(header_length > 0); + if (flags.is_set(1)) GPR_ASSERT(trailer_length > 0); WriteLittleEndianUint32(stream_id, data + 4); WriteLittleEndianUint32(header_length, data + 8); WriteLittleEndianUint32(message_length, data + 12); @@ -61,13 +65,15 @@ absl::StatusOr FrameHeader::Parse(const uint8_t* data) { header.stream_id = ReadLittleEndianUint32(data + 4); header.header_length = ReadLittleEndianUint32(data + 8); if (header.flags.is_set(0) && header.header_length <= 0) { - return absl::InvalidArgumentError("Invalid header length"); + return absl::InvalidArgumentError( + absl::StrCat("Invalid header length: ", header.header_length)); } header.message_length = ReadLittleEndianUint32(data + 12); header.message_padding = ReadLittleEndianUint32(data + 16); header.trailer_length = ReadLittleEndianUint32(data + 20); if (header.flags.is_set(1) && header.trailer_length <= 0) { - return absl::InvalidArgumentError("Invalid trailer length"); + return absl::InvalidArgumentError( + absl::StrCat("Invalid trailer length", header.trailer_length)); } return header; } @@ -79,5 +85,13 @@ uint32_t FrameHeader::GetFrameLength() const { return frame_length; } +std::string FrameHeader::ToString() const { + return absl::StrFormat( + "[type=0x%02x, flags=0x%02x, stream_id=%d, header_length=%d, " + "message_length=%d, message_padding=%d, trailer_length=%d]", + static_cast(type), flags.ToInt(), stream_id, + header_length, message_length, message_padding, trailer_length); +} + } // namespace chaotic_good } // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/frame_header.h b/src/core/ext/transport/chaotic_good/frame_header.h index 7834c7d0a83..fa236ed3342 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.h +++ b/src/core/ext/transport/chaotic_good/frame_header.h @@ -35,13 +35,13 @@ enum class FrameType : uint8_t { }; struct FrameHeader { - FrameType type; + FrameType type = FrameType::kCancel; BitSet<2> flags; - uint32_t stream_id; - uint32_t header_length; - uint32_t message_length; - uint32_t message_padding; - uint32_t trailer_length; + uint32_t stream_id = 0; + uint32_t header_length = 0; + uint32_t message_length = 0; + uint32_t message_padding = 0; + uint32_t trailer_length = 0; // Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed. static absl::StatusOr Parse(const uint8_t* data); @@ -49,6 +49,8 @@ struct FrameHeader { void Serialize(uint8_t* data) const; // Compute frame sizes from the header. uint32_t GetFrameLength() const; + // Report contents as a string + std::string ToString() const; bool operator==(const FrameHeader& h) const { return type == h.type && flags == h.flags && stream_id == h.stream_id && diff --git a/test/core/transport/chaotic_good/frame_fuzzer.cc b/test/core/transport/chaotic_good/frame_fuzzer.cc index cf3ef86ac23..57180ce1c20 100644 --- a/test/core/transport/chaotic_good/frame_fuzzer.cc +++ b/test/core/transport/chaotic_good/frame_fuzzer.cc @@ -56,7 +56,13 @@ void AssertRoundTrips(const T& input, FrameType expected_frame_type) { uint8_t header_bytes[24]; serialized.MoveFirstNBytesIntoBuffer(24, header_bytes); auto header = FrameHeader::Parse(header_bytes); - GPR_ASSERT(header.ok()); + if (!header.ok()) { + if (!squelch) { + gpr_log(GPR_ERROR, "Failed to parse header: %s", + header.status().ToString().c_str()); + } + Crash("Failed to parse header"); + } GPR_ASSERT(header->type == expected_frame_type); T output; HPackParser hpack_parser; @@ -79,6 +85,7 @@ void FinishParseAndChecks(const FrameHeader& header, const uint8_t* data, auto deser = parsed.Deserialize(&hpack_parser, header, absl::BitGenRef(bitgen), serialized); if (!deser.ok()) return; + gpr_log(GPR_INFO, "Read frame: %s", parsed.ToString().c_str()); AssertRoundTrips(parsed, header.type); } @@ -90,6 +97,7 @@ int Run(const uint8_t* data, size_t size) { if (size < 24) return 0; auto r = FrameHeader::Parse(data); if (!r.ok()) return 0; + gpr_log(GPR_INFO, "Read frame header: %s", r->ToString().c_str()); size -= 24; data += 24; MemoryAllocator memory_allocator = MemoryAllocator( diff --git a/test/core/transport/chaotic_good/frame_fuzzer_corpus/5072496117219328 b/test/core/transport/chaotic_good/frame_fuzzer_corpus/5072496117219328 new file mode 100644 index 00000000000..16d6e2f4fde Binary files /dev/null and b/test/core/transport/chaotic_good/frame_fuzzer_corpus/5072496117219328 differ