pull/35278/head
Craig Tiller 1 year ago
parent 645d847885
commit c803ad1f1a
  1. 11
      src/core/ext/transport/chaotic_good/client_transport.cc
  2. 72
      src/core/ext/transport/chaotic_good/client_transport.h
  3. 49
      src/core/ext/transport/chaotic_good/frame.cc
  4. 7
      src/core/ext/transport/chaotic_good/frame.h
  5. 1
      test/core/transport/chaotic_good/frame_fuzzer.cc

@ -77,8 +77,7 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get()));
if (frame->message != nullptr) {
std::string message_padding(
frame->frame_header.message_padding, '0');
std::string message_padding(frame->message_padding, '0');
Slice slice(grpc_slice_from_cpp_string(message_padding));
// Append message padding to data_endpoint_buffer.
data_endpoint_write_buffer_.Append(std::move(slice));
@ -157,11 +156,9 @@ ClientTransport::ClientTransport(
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
auto stream_id = frame.frame_header.stream_id;
{
MutexLock lock(&mu_);
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
}
MutexLock lock(&mu_);
return stream_map_[frame.stream_id]->Push(
ServerFrame(std::move(frame)));
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {

@ -107,40 +107,37 @@ class ClientTransport {
return TrySeq(
TryJoin(
// Continuously send client frame with client to server messages.
ForEach(
std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
uint32_t message_padding = message_length % aligned_bytes;
frame.frame_header = FrameHeader{
FrameType::kFragment, {}, stream_id, 0, message_length,
message_padding, 0};
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error message
// from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender(),
this](MessageHandle result) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
uint32_t message_length = result->payload()->Length();
frame.stream_id = stream_id;
frame.message_padding = message_length % aligned_bytes;
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
// TODO(ladynana): propagate the actual error
// message from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,
@ -217,9 +214,10 @@ class ClientTransport {
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
std::map<uint32_t, std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>
stream_map_ ABSL_GUARDED_BY(mu_);
std::map<uint32_t,
std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>> stream_map_
ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;

@ -50,7 +50,8 @@ const NoDestruct<Slice> kZeroSlice{[] {
class FrameSerializer {
public:
explicit FrameSerializer(FrameType frame_type, uint32_t stream_id) {
explicit FrameSerializer(FrameType frame_type, uint32_t stream_id,
uint32_t message_padding) {
output_.AppendIndexed(kZeroSlice->Copy());
header_.type = frame_type;
header_.stream_id = stream_id;
@ -172,10 +173,12 @@ absl::Status SettingsFrame::Deserialize(HPackParser*, const FrameHeader& header,
}
SliceBuffer SettingsFrame::Serialize(HPackCompressor*) const {
FrameSerializer serializer(FrameType::kSettings, 0);
FrameSerializer serializer(FrameType::kSettings, 0, 0);
return serializer.Finish();
}
std::string SettingsFrame::ToString() const { return "SettingsFrame{}"; }
absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
@ -184,6 +187,7 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
return absl::InvalidArgumentError("Expected non-zero stream id");
}
stream_id = header.stream_id;
message_padding = header.message_padding;
if (header.type != FrameType::kFragment) {
return absl::InvalidArgumentError("Expected fragment frame");
}
@ -195,6 +199,9 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
if (header.trailer_length != 0) {
@ -209,7 +216,7 @@ absl::Status ClientFragmentFrame::Deserialize(HPackParser* parser,
SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
@ -219,6 +226,16 @@ SliceBuffer ClientFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}
std::string ClientFragmentFrame::ToString() const {
return absl::StrCat(
"ClientFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", end_of_stream=", end_of_stream,
"}");
}
absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
const FrameHeader& header,
absl::BitGenRef bitsrc,
@ -227,6 +244,7 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
return absl::InvalidArgumentError("Expected non-zero stream id");
}
stream_id = header.stream_id;
message_padding = header.message_padding;
FrameDeserializer deserializer(header, slice_buffer);
if (header.flags.is_set(0)) {
auto r =
@ -236,6 +254,9 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} else if (header.header_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero header length", header.header_length));
}
if (header.flags.is_set(1)) {
auto r =
@ -245,13 +266,16 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
if (r.value() != nullptr) {
trailers = std::move(r.value());
}
} else if (header.trailer_length != 0) {
return absl::InvalidArgumentError(absl::StrCat(
"Unexpected non-zero trailer length", header.trailer_length));
}
return deserializer.Finish();
}
SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kFragment, stream_id);
FrameSerializer serializer(FrameType::kFragment, stream_id, message_padding);
if (headers.get() != nullptr) {
encoder->EncodeRawHeaders(*headers.get(), serializer.AddHeaders());
}
@ -261,6 +285,17 @@ SliceBuffer ServerFragmentFrame::Serialize(HPackCompressor* encoder) const {
return serializer.Finish();
}
std::string ServerFragmentFrame::ToString() const {
return absl::StrCat(
"ServerFragmentFrame{stream_id=", stream_id, ", headers=",
headers.get() != nullptr ? headers->DebugString().c_str() : "nullptr",
", message=",
message.get() != nullptr ? message->DebugString().c_str() : "nullptr",
", message_padding=", message_padding, ", trailers=",
trailers.get() != nullptr ? trailers->DebugString().c_str() : "nullptr",
"}");
}
absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
absl::BitGenRef,
SliceBuffer& slice_buffer) {
@ -280,9 +315,13 @@ absl::Status CancelFrame::Deserialize(HPackParser*, const FrameHeader& header,
SliceBuffer CancelFrame::Serialize(HPackCompressor*) const {
GPR_ASSERT(stream_id != 0);
FrameSerializer serializer(FrameType::kCancel, stream_id);
FrameSerializer serializer(FrameType::kCancel, stream_id, 0);
return serializer.Finish();
}
std::string CancelFrame::ToString() const {
return absl::StrCat("CancelFrame{stream_id=", stream_id, "}");
}
} // namespace chaotic_good
} // namespace grpc_core

@ -43,6 +43,7 @@ class FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) = 0;
virtual SliceBuffer Serialize(HPackCompressor* encoder) const = 0;
virtual std::string ToString() const = 0;
protected:
static bool EqVal(const Message& a, const Message& b) {
@ -67,6 +68,7 @@ struct SettingsFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
bool operator==(const SettingsFrame&) const { return true; }
};
@ -76,10 +78,12 @@ struct ClientFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
uint32_t stream_id;
ClientMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
bool end_of_stream = false;
bool operator==(const ClientFragmentFrame& other) const {
@ -93,10 +97,12 @@ struct ServerFragmentFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
uint32_t stream_id;
ServerMetadataHandle headers;
MessageHandle message;
uint32_t message_padding;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {
@ -110,6 +116,7 @@ struct CancelFrame final : public FrameInterface {
absl::BitGenRef bitsrc,
SliceBuffer& slice_buffer) override;
SliceBuffer Serialize(HPackCompressor* encoder) const override;
std::string ToString() const override;
uint32_t stream_id;

@ -85,6 +85,7 @@ void FinishParseAndChecks(const FrameHeader& header, const uint8_t* data,
auto deser = parsed.Deserialize(&hpack_parser, header,
absl::BitGenRef(bitgen), serialized);
if (!deser.ok()) return;
gpr_log(GPR_INFO, "Read frame: %s", parsed.ToString().c_str());
AssertRoundTrips(parsed, header.type);
}

Loading…
Cancel
Save