[chaotic-good] Fix frame serialization (#34711)

This PR aims to fix the issue found in frame_fuzzer after #34191 was
merged, where frame serialization is missing the frame header info and
causes mismatch with the original frame.

This PR needs to be merged before #34657.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34755/head
nanahpang 1 year ago committed by GitHub
parent e81d181fd7
commit 16c214629b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 15
      src/core/ext/transport/chaotic_good/client_transport.cc
  3. 18
      src/core/ext/transport/chaotic_good/client_transport.h
  4. 53
      src/core/ext/transport/chaotic_good/frame.cc
  5. 11
      src/core/ext/transport/chaotic_good/frame.h
  6. 4
      src/core/ext/transport/chaotic_good/frame_header.h
  7. 2
      test/core/transport/chaotic_good/client_transport_test.cc

@ -6183,7 +6183,6 @@ grpc_cc_library(
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/status", "absl/status",
"absl/status:statusor",
"absl/types:variant", "absl/types:variant",
], ],
language = "c++", language = "c++",

@ -20,10 +20,7 @@
#include <string> #include <string>
#include <tuple> #include <tuple>
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame.h"
@ -65,16 +62,10 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_.Append( control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get())); frame->Serialize(hpack_compressor_.get()));
if (frame->message != nullptr) { if (frame->message != nullptr) {
auto frame_header = std::string message_padding(
FrameHeader::Parse( frame->frame_header.message_padding, '0');
reinterpret_cast<const uint8_t*>(GRPC_SLICE_START_PTR(
control_endpoint_write_buffer_.c_slice_buffer()
->slices[0])))
.value();
std::string message_padding(frame_header.message_padding,
'0');
Slice slice(grpc_slice_from_cpp_string(message_padding)); Slice slice(grpc_slice_from_cpp_string(message_padding));
// Append message payload to data_endpoint_buffer. // Append message padding to data_endpoint_buffer.
data_endpoint_write_buffer_.Append(std::move(slice)); data_endpoint_write_buffer_.Append(std::move(slice));
// Append message payload to data_endpoint_buffer. // Append message payload to data_endpoint_buffer.
frame->message->payload()->MoveFirstNBytesIntoSliceBuffer( frame->message->payload()->MoveFirstNBytesIntoSliceBuffer(

@ -17,6 +17,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <initializer_list> // IWYU pragma: keep #include <initializer_list> // IWYU pragma: keep
@ -31,6 +32,7 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
@ -60,7 +62,7 @@ class ClientTransport {
auto AddStream(CallArgs call_args) { auto AddStream(CallArgs call_args) {
// At this point, the connection is set up. // At this point, the connection is set up.
// Start sending data frames. // Start sending data frames.
uint64_t stream_id; uint32_t stream_id;
{ {
MutexLock lock(&mu_); MutexLock lock(&mu_);
stream_id = next_stream_id_++; stream_id = next_stream_id_++;
@ -71,10 +73,16 @@ class ClientTransport {
[stream_id, initial_frame = true, [stream_id, initial_frame = true,
client_initial_metadata = client_initial_metadata =
std::move(call_args.client_initial_metadata), std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender()]( outgoing_frames = outgoing_frames_.MakeSender(),
MessageHandle result) mutable { this](MessageHandle result) mutable {
ClientFragmentFrame frame; ClientFragmentFrame frame;
frame.stream_id = stream_id; // Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
uint32_t message_padding = message_length % aligned_bytes;
frame.frame_header = FrameHeader{
FrameType::kFragment, {}, stream_id, 0, message_length,
message_padding, 0};
frame.message = std::move(result); frame.message = std::move(result);
if (initial_frame) { if (initial_frame) {
// Send initial frame with client intial metadata. // Send initial frame with client intial metadata.
@ -99,6 +107,8 @@ class ClientTransport {
MpscReceiver<ClientFrame> outgoing_frames_; MpscReceiver<ClientFrame> outgoing_frames_;
Mutex mu_; Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Assigned aligned bytes from setting frame.
size_t aligned_bytes = 64;
ActivityPtr writer_; ActivityPtr writer_;
ActivityPtr reader_; ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_; std::unique_ptr<PromiseEndpoint> control_endpoint_;

@ -28,6 +28,7 @@
#include <grpc/slice.h> #include <grpc/slice.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
@ -40,15 +41,14 @@ namespace chaotic_good {
namespace { namespace {
const NoDestruct<Slice> kZeroSlice{[] { const NoDestruct<Slice> kZeroSlice{[] {
// Frame header size is fixed to 24 bytes. // Frame header size is fixed to 24 bytes.
auto slice = GRPC_SLICE_MALLOC(24); auto slice = GRPC_SLICE_MALLOC(FrameHeader::frame_header_size_);
memset(GRPC_SLICE_START_PTR(slice), 0, 24); memset(GRPC_SLICE_START_PTR(slice), 0, FrameHeader::frame_header_size_);
return slice; return slice;
}()}; }()};
class FrameSerializer { class FrameSerializer {
public: public:
explicit FrameSerializer(FrameType type, uint32_t stream_id) explicit FrameSerializer(FrameHeader header) : header_(header) {
: header_{type, {}, stream_id, 0, 0, 0, 0} {
output_.AppendIndexed(kZeroSlice->Copy()); output_.AppendIndexed(kZeroSlice->Copy());
} }
// If called, must be called before AddTrailers, Finish. // If called, must be called before AddTrailers, Finish.
@ -59,10 +59,24 @@ class FrameSerializer {
// If called, must be called before Finish. // If called, must be called before Finish.
SliceBuffer& AddTrailers() { SliceBuffer& AddTrailers() {
header_.flags.set(1); header_.flags.set(1);
header_.header_length = output_.Length() - FrameHeader::frame_header_size_;
return output_; return output_;
} }
SliceBuffer Finish() { SliceBuffer Finish() {
// Calculate frame header_length or trailer_length if available.
if (header_.flags.is_set(1)) {
// Header length is already known in AddTrailers().
header_.trailer_length = output_.Length() - header_.header_length -
FrameHeader::frame_header_size_;
} else {
if (header_.flags.is_set(0)) {
// Calculate frame header length in Finish() since AddTrailers() isn't
// called.
header_.header_length =
output_.Length() - FrameHeader::frame_header_size_;
}
}
header_.Serialize( header_.Serialize(
GRPC_SLICE_START_PTR(output_.c_slice_buffer()->slices[0])); GRPC_SLICE_START_PTR(output_.c_slice_buffer()->slices[0]));
return std::move(output_); return std::move(output_);
@ -151,7 +165,8 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header,
} }
SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const { SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const {
FrameSerializer serializer(FrameType::kSettings, 0); FrameSerializer serializer(
FrameHeader{FrameType::kSettings, {}, 0, 0, 0, 0, 0});
return serializer.Finish(); return serializer.Finish();
} }
@ -162,7 +177,7 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (header.stream_id == 0) { if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id"); return absl::InvalidArgumentError("Expected non-zero stream id");
} }
stream_id = header.stream_id; frame_header = header;
if (header.type != FrameType::kFragment) { if (header.type != FrameType::kFragment) {
return absl::InvalidArgumentError("Expected fragment frame"); return absl::InvalidArgumentError("Expected fragment frame");
} }
@ -171,6 +186,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
auto r = ReadMetadata<ClientMetadata>(parser, deserializer.ReceiveHeaders(), auto r = ReadMetadata<ClientMetadata>(parser, deserializer.ReceiveHeaders(),
header.stream_id, true, true, bitsrc); header.stream_id, true, true, bitsrc);
if (!r.ok()) return r.status(); if (!r.ok()) return r.status();
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} }
if (header.flags.is_set(1)) { if (header.flags.is_set(1)) {
if (header.trailer_length != 0) { if (header.trailer_length != 0) {
@ -184,8 +202,8 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
} }
SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const { SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(stream_id != 0); GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id); FrameSerializer serializer(frame_header);
if (headers.get() != nullptr) { if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
} }
@ -202,28 +220,32 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (header.stream_id == 0) { if (header.stream_id == 0) {
return absl::InvalidArgumentError("Expected non-zero stream id"); return absl::InvalidArgumentError("Expected non-zero stream id");
} }
stream_id = header.stream_id; frame_header = header;
if (header.type != FrameType::kFragment) {
return absl::InvalidArgumentError("Expected fragment frame");
}
FrameDeserializer deserializer(header, slice_buffer); FrameDeserializer deserializer(header, slice_buffer);
if (header.flags.is_set(0)) { if (header.flags.is_set(0)) {
auto r = auto r =
ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveHeaders(), ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveHeaders(),
header.stream_id, true, false, bitsrc); header.stream_id, true, false, bitsrc);
if (!r.ok()) return r.status(); if (!r.ok()) return r.status();
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} }
if (header.flags.is_set(1)) { if (header.flags.is_set(1)) {
auto r = auto r =
ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveTrailers(), ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveTrailers(),
header.stream_id, false, false, bitsrc); header.stream_id, false, false, bitsrc);
if (!r.ok()) return r.status();
if (r.value() != nullptr) {
trailers = std::move(r.value());
}
} }
return deserializer.Finish(); return deserializer.Finish();
} }
SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const { SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(stream_id != 0); GPR_ASSERT(frame_header.stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id); FrameSerializer serializer(frame_header);
if (headers.get() != nullptr) { if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders()); encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
} }
@ -252,7 +274,8 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
SliceBuffer CancelFrame::Serialize(HPackCompressor*) const { SliceBuffer CancelFrame::Serialize(HPackCompressor*) const {
GPR_ASSERT(stream_id != 0); GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kCancel, stream_id); FrameSerializer serializer(
FrameHeader{FrameType::kCancel, {}, stream_id, 0, 0, 0, 0});
return serializer.Finish(); return serializer.Finish();
} }

@ -77,13 +77,14 @@ struct ClientFragmentFrame final : public FrameInterface {
SliceBuffer& slice_buffer) override; SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override; SliceBuffer Serialize(HPackCompressor* encoder) const override;
uint32_t stream_id; FrameHeader frame_header;
ClientMetadataHandle headers; ClientMetadataHandle headers;
MessageHandle message; MessageHandle message;
bool end_of_stream = false; bool end_of_stream = false;
bool operator==(const ClientFragmentFrame& other) const { bool operator==(const ClientFragmentFrame& other) const {
return stream_id == other.stream_id && EqHdl(headers, other.headers) && return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(headers, other.headers) &&
end_of_stream == other.end_of_stream; end_of_stream == other.end_of_stream;
} }
}; };
@ -94,13 +95,13 @@ struct ServerFragmentFrame final : public FrameInterface {
SliceBuffer& slice_buffer) override; SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override; SliceBuffer Serialize(HPackCompressor* encoder) const override;
uint32_t stream_id; FrameHeader frame_header;
ServerMetadataHandle headers; ServerMetadataHandle headers;
ServerMetadataHandle trailers; ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const { bool operator==(const ServerFragmentFrame& other) const {
return stream_id == other.stream_id && EqHdl(headers, other.headers) && return frame_header.stream_id == other.frame_header.stream_id &&
EqHdl(trailers, other.trailers); EqHdl(headers, other.headers) && EqHdl(trailers, other.trailers);
} }
}; };

@ -17,6 +17,8 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <stddef.h>
#include <cstdint> #include <cstdint>
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -55,6 +57,8 @@ struct FrameHeader {
message_padding == h.message_padding && message_padding == h.message_padding &&
trailer_length == h.trailer_length; trailer_length == h.trailer_length;
} }
// Frame header size is fixed to 24 bytes.
static constexpr size_t frame_header_size_ = 24;
}; };
} // namespace chaotic_good } // namespace chaotic_good

@ -16,8 +16,6 @@
// IWYU pragma: no_include <sys/socket.h> // IWYU pragma: no_include <sys/socket.h>
#include <stdio.h>
#include <algorithm> // IWYU pragma: keep #include <algorithm> // IWYU pragma: keep
#include <memory> #include <memory>
#include <tuple> #include <tuple>

Loading…
Cancel
Save