diff --git a/src/core/BUILD b/src/core/BUILD index ee0fa611cfc..cd0a5ee47bb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6183,7 +6183,6 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/status", - "absl/status:statusor", "absl/types:variant", ], language = "c++", diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 88364499d2c..6eb6e4b86ba 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -20,10 +20,7 @@ #include #include -#include "absl/status/statusor.h" - #include -#include #include #include "src/core/ext/transport/chaotic_good/frame.h" @@ -65,16 +62,10 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_.Append( frame->Serialize(hpack_compressor_.get())); if (frame->message != nullptr) { - auto frame_header = - FrameHeader::Parse( - reinterpret_cast(GRPC_SLICE_START_PTR( - control_endpoint_write_buffer_.c_slice_buffer() - ->slices[0]))) - .value(); - std::string message_padding(frame_header.message_padding, - '0'); + std::string message_padding( + frame->frame_header.message_padding, '0'); Slice slice(grpc_slice_from_cpp_string(message_padding)); - // Append message payload to data_endpoint_buffer. + // Append message padding to data_endpoint_buffer. data_endpoint_write_buffer_.Append(std::move(slice)); // Append message payload to data_endpoint_buffer. frame->message->payload()->MoveFirstNBytesIntoSliceBuffer( diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 4bd5a132b0c..9b2129c083f 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -17,6 +17,7 @@ #include +#include #include #include // IWYU pragma: keep @@ -31,6 +32,7 @@ #include #include "src/core/ext/transport/chaotic_good/frame.h" +#include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" @@ -60,7 +62,7 @@ class ClientTransport { auto AddStream(CallArgs call_args) { // At this point, the connection is set up. // Start sending data frames. - uint64_t stream_id; + uint32_t stream_id; { MutexLock lock(&mu_); stream_id = next_stream_id_++; @@ -71,10 +73,16 @@ class ClientTransport { [stream_id, initial_frame = true, client_initial_metadata = std::move(call_args.client_initial_metadata), - outgoing_frames = outgoing_frames_.MakeSender()]( - MessageHandle result) mutable { + outgoing_frames = outgoing_frames_.MakeSender(), + this](MessageHandle result) mutable { ClientFragmentFrame frame; - frame.stream_id = stream_id; + // Construct frame header (flags, header_length and + // trailer_length will be added in serialization). + uint32_t message_length = result->payload()->Length(); + uint32_t message_padding = message_length % aligned_bytes; + frame.frame_header = FrameHeader{ + FrameType::kFragment, {}, stream_id, 0, message_length, + message_padding, 0}; frame.message = std::move(result); if (initial_frame) { // Send initial frame with client intial metadata. @@ -99,6 +107,8 @@ class ClientTransport { MpscReceiver outgoing_frames_; Mutex mu_; uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; + // Assigned aligned bytes from setting frame. + size_t aligned_bytes = 64; ActivityPtr writer_; ActivityPtr reader_; std::unique_ptr control_endpoint_; diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index ac353d6bd7c..0f8302c54bf 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -28,6 +28,7 @@ #include #include +#include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/status_helper.h" @@ -40,15 +41,14 @@ namespace chaotic_good { namespace { const NoDestruct kZeroSlice{[] { // Frame header size is fixed to 24 bytes. - auto slice = GRPC_SLICE_MALLOC(24); - memset(GRPC_SLICE_START_PTR(slice), 0, 24); + auto slice = GRPC_SLICE_MALLOC(FrameHeader::frame_header_size_); + memset(GRPC_SLICE_START_PTR(slice), 0, FrameHeader::frame_header_size_); return slice; }()}; class FrameSerializer { public: - explicit FrameSerializer(FrameType type, uint32_t stream_id) - : header_{type, {}, stream_id, 0, 0, 0, 0} { + explicit FrameSerializer(FrameHeader header) : header_(header) { output_.AppendIndexed(kZeroSlice->Copy()); } // If called, must be called before AddTrailers, Finish. @@ -59,10 +59,24 @@ class FrameSerializer { // If called, must be called before Finish. SliceBuffer& AddTrailers() { header_.flags.set(1); + header_.header_length = output_.Length() - FrameHeader::frame_header_size_; return output_; } SliceBuffer Finish() { + // Calculate frame header_length or trailer_length if available. + if (header_.flags.is_set(1)) { + // Header length is already known in AddTrailers(). + header_.trailer_length = output_.Length() - header_.header_length - + FrameHeader::frame_header_size_; + } else { + if (header_.flags.is_set(0)) { + // Calculate frame header length in Finish() since AddTrailers() isn't + // called. + header_.header_length = + output_.Length() - FrameHeader::frame_header_size_; + } + } header_.Serialize( GRPC_SLICE_START_PTR(output_.c_slice_buffer()->slices[0])); return std::move(output_); @@ -151,7 +165,8 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header, } SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const { - FrameSerializer serializer(FrameType::kSettings, 0); + FrameSerializer serializer( + FrameHeader{FrameType::kSettings, {}, 0, 0, 0, 0, 0}); return serializer.Finish(); } @@ -162,7 +177,7 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, if (header.stream_id == 0) { return absl::InvalidArgumentError("Expected non-zero stream id"); } - stream_id = header.stream_id; + frame_header = header; if (header.type != FrameType::kFragment) { return absl::InvalidArgumentError("Expected fragment frame"); } @@ -171,6 +186,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, auto r = ReadMetadata(parser, deserializer.ReceiveHeaders(), header.stream_id, true, true, bitsrc); if (!r.ok()) return r.status(); + if (r.value() != nullptr) { + headers = std::move(r.value()); + } } if (header.flags.is_set(1)) { if (header.trailer_length != 0) { @@ -184,8 +202,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser, } SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const { - GPR_ASSERT(stream_id != 0); - FrameSerializer serializer(FrameType::kFragment, stream_id); + GPR_ASSERT(frame_header.stream_id != 0); + FrameSerializer serializer(frame_header); if (headers.get() != nullptr) { encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); } @@ -202,28 +220,32 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, if (header.stream_id == 0) { return absl::InvalidArgumentError("Expected non-zero stream id"); } - stream_id = header.stream_id; - if (header.type != FrameType::kFragment) { - return absl::InvalidArgumentError("Expected fragment frame"); - } + frame_header = header; FrameDeserializer deserializer(header, slice_buffer); if (header.flags.is_set(0)) { auto r = ReadMetadata(parser, deserializer.ReceiveHeaders(), header.stream_id, true, false, bitsrc); if (!r.ok()) return r.status(); + if (r.value() != nullptr) { + headers = std::move(r.value()); + } } if (header.flags.is_set(1)) { auto r = ReadMetadata(parser, deserializer.ReceiveTrailers(), header.stream_id, false, false, bitsrc); + if (!r.ok()) return r.status(); + if (r.value() != nullptr) { + trailers = std::move(r.value()); + } } return deserializer.Finish(); } SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const { - GPR_ASSERT(stream_id != 0); - FrameSerializer serializer(FrameType::kFragment, stream_id); + GPR_ASSERT(frame_header.stream_id != 0); + FrameSerializer serializer(frame_header); if (headers.get() != nullptr) { encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); } @@ -252,7 +274,8 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header, SliceBuffer CancelFrame::Serialize(HPackCompressor*) const { GPR_ASSERT(stream_id != 0); - FrameSerializer serializer(FrameType::kCancel, stream_id); + FrameSerializer serializer( + FrameHeader{FrameType::kCancel, {}, stream_id, 0, 0, 0, 0}); return serializer.Finish(); } diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 9f3538af71f..eca8200a1a8 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -77,13 +77,14 @@ struct ClientFragmentFrame final : public FrameInterface { SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; - uint32_t stream_id; + FrameHeader frame_header; ClientMetadataHandle headers; MessageHandle message; bool end_of_stream = false; bool operator==(const ClientFragmentFrame& other) const { - return stream_id == other.stream_id && EqHdl(headers, other.headers) && + return frame_header.stream_id == other.frame_header.stream_id && + EqHdl(headers, other.headers) && end_of_stream == other.end_of_stream; } }; @@ -94,13 +95,13 @@ struct ServerFragmentFrame final : public FrameInterface { SliceBuffer& slice_buffer) override; SliceBuffer Serialize(HPackCompressor* encoder) const override; - uint32_t stream_id; + FrameHeader frame_header; ServerMetadataHandle headers; ServerMetadataHandle trailers; bool operator==(const ServerFragmentFrame& other) const { - return stream_id == other.stream_id && EqHdl(headers, other.headers) && - EqHdl(trailers, other.trailers); + return frame_header.stream_id == other.frame_header.stream_id && + EqHdl(headers, other.headers) && EqHdl(trailers, other.trailers); } }; diff --git a/src/core/ext/transport/chaotic_good/frame_header.h b/src/core/ext/transport/chaotic_good/frame_header.h index a8210cf2574..7834c7d0a83 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.h +++ b/src/core/ext/transport/chaotic_good/frame_header.h @@ -17,6 +17,8 @@ #include +#include + #include #include "absl/status/statusor.h" @@ -55,6 +57,8 @@ struct FrameHeader { message_padding == h.message_padding && trailer_length == h.trailer_length; } + // Frame header size is fixed to 24 bytes. + static constexpr size_t frame_header_size_ = 24; }; } // namespace chaotic_good diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index b922eaa037a..96889082a2c 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -16,8 +16,6 @@ // IWYU pragma: no_include -#include - #include // IWYU pragma: keep #include #include