Merge branch 'tiefling-buffer' into tiefling-chunky-monkey

pull/38052/head
Craig Tiller 2 weeks ago
commit d72550f0ca
  1. 22
      src/core/ext/transport/chaotic_good/chaotic_good_transport.h
  2. 13
      src/core/ext/transport/chaotic_good/client_transport.cc
  3. 3
      src/core/ext/transport/chaotic_good/control_endpoint.cc
  4. 11
      src/core/ext/transport/chaotic_good/control_endpoint.h
  5. 10
      src/core/ext/transport/chaotic_good/data_endpoints.h
  6. 15
      src/core/ext/transport/chaotic_good/server_transport.cc
  7. 19
      src/core/util/http_client/httpcli.cc
  8. 14
      src/core/util/uri.cc
  9. 6
      src/core/util/uri.h
  10. 12
      test/core/http/httpcli_test.cc
  11. 12
      test/core/http/httpscli_test.cc
  12. 4
      test/core/http/test_server.py
  13. 4
      test/core/transport/chaotic_good/BUILD
  14. 2
      test/cpp/end2end/xds/xds_end2end_test_lib.cc

@ -50,6 +50,13 @@ inline std::vector<PromiseEndpoint> OneDataEndpoint(PromiseEndpoint endpoint) {
return ep;
}
// One received frame: the header, and the serialized bytes of the payload.
// The payload may not yet be received into memory, so the accessor for that
// returns a promise that will need to be resolved prior to inspecting the
// bytes.
// In this way we can pull bytes from various different data connections and
// read them in any order, but still have a trivial reassembly in the receiving
// call promise.
class IncomingFrame {
public:
template <typename T>
@ -107,14 +114,17 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
.value_or("<<unknown peer address>>")
<< " " << frame.ToString();
return If(
// If we have no data endpoints, OR this is a small payload
data_endpoints_.empty() ||
header.payload_length <= options_.inlined_payload_size_threshold,
// ... then write it to the control endpoint
[this, &header, &frame]() {
SliceBuffer output;
header.Serialize(output.AddTiny(FrameHeader::kFrameHeaderSize));
frame.SerializePayload(output);
return control_endpoint_.Write(std::move(output));
},
// ... otherwise write it to a data connection
[this, header, &frame]() mutable {
SliceBuffer payload;
// Temporarily give a bogus connection id to get padding right
@ -141,6 +151,8 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
});
}
// Common outbound loop for both client and server (these vary only over the
// frame type).
template <typename Frame>
auto TransportWriteLoop(MpscReceiver<Frame>& outgoing_frames) {
return Loop([self = Ref(), &outgoing_frames] {
@ -180,7 +192,13 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
},
[this](FrameHeader frame_header) {
return If(
// If the payload is on the connection frame
frame_header.payload_connection_id == 0,
// ... then read the data immediately and return an IncomingFrame
// that contains the payload.
// We need to do this here so that we do not create head of line
// blocking issues reading later control frames (but waiting for a
// call to get scheduled time to read the payload).
[this, frame_header]() {
return Map(control_endpoint_.Read(frame_header.payload_length),
[frame_header](absl::StatusOr<SliceBuffer> payload)
@ -190,6 +208,10 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
std::move(payload), 0);
});
},
// ... otherwise issue a read to the appropriate data endpoint,
// which will return a read ticket - which can be used later
// in the call promise to asynchronously wait for those bytes
// to be available.
[this, frame_header]() -> absl::StatusOr<IncomingFrame> {
const auto padding =
frame_header.Padding(options_.decode_alignment);

@ -131,12 +131,14 @@ auto ChaoticGoodClientTransport::DispatchFrame(
stream != nullptr,
[this, &stream, &incoming_frame, &transport]() {
auto& call = stream->call;
// TODO(ctiller): instead of SpawnWaitable here we probably want a
// small queue to push into, so that the call can proceed
// asynchronously to other calls regardless of frame ordering.
return call.SpawnWaitable(
"push-frame", [this, stream = std::move(stream),
incoming_frame = std::move(incoming_frame),
transport = std::move(transport)]() mutable {
auto& call = stream->call;
return call.CancelIfFails(TrySeq(
return TrySeq(
incoming_frame.Payload(),
[transport = std::move(transport),
header = incoming_frame.header()](SliceBuffer payload) {
@ -144,10 +146,11 @@ auto ChaoticGoodClientTransport::DispatchFrame(
header, std::move(payload));
},
[stream = std::move(stream), this](T frame) mutable {
return PushFrameIntoCall(std::move(frame),
std::move(stream));
auto& call = stream->call;
return call.CancelIfFails(PushFrameIntoCall(
std::move(frame), std::move(stream)));
},
ImmediateOkStatus()));
ImmediateOkStatus());
});
},
[]() { return absl::OkStatus(); }));

@ -47,7 +47,9 @@ ControlEndpoint::ControlEndpoint(
GRPC_LATENT_SEE_PROMISE(
"FlushLoop", Loop([endpoint = endpoint_, buffer = buffer_]() {
return TrySeq(
// Pull one set of buffered writes
buffer->Pull(),
// And write them
[endpoint, buffer = buffer.get()](SliceBuffer flushing) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Flush " << flushing.Length()
@ -56,6 +58,7 @@ ControlEndpoint::ControlEndpoint(
.value_or("<<unknown peer address>>");
return endpoint->Write(std::move(flushing));
},
// Then repeat
[]() -> LoopCtl<absl::Status> { return Continue{}; });
})),
[](absl::Status) {});

@ -23,10 +23,20 @@
namespace grpc_core {
namespace chaotic_good {
// Wrapper around PromiseEndpoint.
// Buffers all of the small writes that get enqueued to this endpoint, and then
// uses a separate party to flush them to the wire.
// In doing so we get to batch up effectively all the writes from the transport
// (since party wakeups are sticky), and then flush all the writes in one go.
class ControlEndpoint {
private:
class Buffer : public RefCounted<Buffer> {
public:
// Queue some buffer to be written.
// We cap the queue size so that we don't infinitely buffer on one
// connection - if the cap is hit, this queue operation will not resolve
// until it empties.
// Returns a promise that resolves to Empty{} when the data has been queued.
auto Queue(SliceBuffer&& buffer) {
return [buffer = std::move(buffer), this]() mutable -> Poll<Empty> {
Waker waker;
@ -70,6 +80,7 @@ class ControlEndpoint {
// to Empty{} -- it's not possible to see errors from this api.
auto Write(SliceBuffer&& bytes) { return buffer_->Queue(std::move(bytes)); }
// Read operations are simply passthroughs to the underlying promise endpoint.
auto ReadSlice(size_t length) { return endpoint_->ReadSlice(length); }
auto Read(size_t length) { return endpoint_->Read(length); }
auto GetPeerAddress() const { return endpoint_->GetPeerAddress(); }

@ -30,6 +30,7 @@ struct Endpoints : public RefCounted<Endpoints> {
std::vector<PromiseEndpoint> endpoints;
};
// Buffered writes for one data endpoint
class OutputBuffer {
public:
bool Accept(SliceBuffer& buffer);
@ -46,6 +47,7 @@ class OutputBuffer {
SliceBuffer pending_;
};
// The set of output buffers for all connected data endpoints
class OutputBuffers : public RefCounted<OutputBuffers> {
public:
explicit OutputBuffers(uint32_t num_connections);
@ -71,6 +73,13 @@ class OutputBuffers : public RefCounted<OutputBuffers> {
class InputQueues : public RefCounted<InputQueues> {
public:
// One outstanding read.
// ReadTickets get filed by read requests, and all tickets are fullfilled
// by an endpoint.
// A call may Await a ticket to get the bytes back later (or it may skip that
// step - in which case the bytes are thrown away after reading).
// This decoupling is necessary to ensure that cancelled reads by calls do not
// cause data corruption for other calls.
class ReadTicket {
public:
ReadTicket(absl::StatusOr<uint64_t> ticket,
@ -147,6 +156,7 @@ class InputQueues : public RefCounted<InputQueues> {
};
} // namespace data_endpoints_detail
// Collection of data connections.
class DataEndpoints {
public:
using ReadTicket = data_endpoints_detail::InputQueues::ReadTicket;

@ -95,11 +95,14 @@ auto ChaoticGoodServerTransport::DispatchFrame(
return If(
stream != nullptr,
[this, &stream, &frame, &transport]() {
// TODO(ctiller): instead of SpawnWaitable here we probably want a
// small queue to push into, so that the call can proceed
// asynchronously to other calls regardless of frame ordering.
return stream->call.SpawnWaitable(
"push-frame", [this, stream, frame = std::move(frame),
transport = std::move(transport)]() mutable {
auto& call = stream->call;
return call.CancelIfFails(TrySeq(
return TrySeq(
frame.Payload(),
[transport = std::move(transport),
header = frame.header()](SliceBuffer payload) {
@ -107,10 +110,11 @@ auto ChaoticGoodServerTransport::DispatchFrame(
std::move(payload));
},
[stream = std::move(stream), this](T frame) mutable {
return PushFrameIntoCall(std::move(stream),
std::move(frame));
auto& call = stream->call;
return call.CancelIfFails(
PushFrameIntoCall(std::move(stream), std::move(frame)));
},
ImmediateOkStatus()));
ImmediateOkStatus());
});
},
[]() { return absl::OkStatus(); });
@ -250,8 +254,7 @@ auto ChaoticGoodServerTransport::ReadOneFrame(
"ReadOneFrame",
TrySeq(
transport->ReadFrameBytes(),
[this,
transport = std::move(transport)](IncomingFrame incoming_frame) {
[this, transport](IncomingFrame incoming_frame) mutable {
// CHECK_EQ(header.payload_length, payload.Length());
return Switch(
incoming_frame.header().type,

@ -79,8 +79,9 @@ OrphanablePtr<HttpRequest> HttpRequest::Get(
}
std::string name =
absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_get_request(
request, uri.authority().c_str(), uri.path().c_str());
const grpc_slice request_text =
grpc_httpcli_format_get_request(request, uri.authority().c_str(),
uri.EncodedPathAndQueryParams().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
@ -103,8 +104,9 @@ OrphanablePtr<HttpRequest> HttpRequest::Post(
}
std::string name =
absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_post_request(
request, uri.authority().c_str(), uri.path().c_str());
const grpc_slice request_text =
grpc_httpcli_format_post_request(request, uri.authority().c_str(),
uri.EncodedPathAndQueryParams().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
@ -127,8 +129,9 @@ OrphanablePtr<HttpRequest> HttpRequest::Put(
}
std::string name =
absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path());
const grpc_slice request_text = grpc_httpcli_format_put_request(
request, uri.authority().c_str(), uri.path().c_str());
const grpc_slice request_text =
grpc_httpcli_format_put_request(request, uri.authority().c_str(),
uri.EncodedPathAndQueryParams().c_str());
return MakeOrphanable<HttpRequest>(
std::move(uri), request_text, response, deadline, channel_args, on_done,
pollent, name.c_str(), std::move(test_only_generate_response),
@ -241,6 +244,8 @@ void HttpRequest::AppendError(grpc_error_handle error) {
void HttpRequest::OnReadInternal(grpc_error_handle error) {
for (size_t i = 0; i < incoming_.count; i++) {
GRPC_TRACE_LOG(http1, INFO)
<< "HTTP response data: " << StringViewFromSlice(incoming_.slices[i]);
if (GRPC_SLICE_LENGTH(incoming_.slices[i])) {
have_read_byte_ = 1;
grpc_error_handle err =
@ -275,6 +280,8 @@ void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx(
}
void HttpRequest::StartWrite() {
GRPC_TRACE_LOG(http1, INFO)
<< "Sending HTTP1 request: " << StringViewFromSlice(request_text_);
CSliceRef(request_text_);
grpc_slice_buffer_add(&outgoing_, request_text_);
Ref().release(); // ref held by pending write

@ -352,6 +352,16 @@ std::string URI::ToString() const {
parts.emplace_back("//");
parts.emplace_back(PercentEncode(authority_, IsAuthorityChar));
}
parts.emplace_back(EncodedPathAndQueryParams());
if (!fragment_.empty()) {
parts.push_back("#");
parts.push_back(PercentEncode(fragment_, IsQueryOrFragmentChar));
}
return absl::StrJoin(parts, "");
}
std::string URI::EncodedPathAndQueryParams() const {
std::vector<std::string> parts;
if (!path_.empty()) {
parts.emplace_back(PercentEncode(path_, IsPathChar));
}
@ -360,10 +370,6 @@ std::string URI::ToString() const {
parts.push_back(
absl::StrJoin(query_parameter_pairs_, "&", QueryParameterFormatter()));
}
if (!fragment_.empty()) {
parts.push_back("#");
parts.push_back(PercentEncode(fragment_, IsQueryOrFragmentChar));
}
return absl::StrJoin(parts, "");
}

@ -76,7 +76,7 @@ class URI {
return query_parameter_map_;
}
// A vector of key:value query parameter pairs, kept in order of appearance
// within the URI search string. Repeated keys are represented as separate
// within the URI string. Repeated keys are represented as separate
// key:value elements.
const std::vector<QueryParam>& query_parameter_pairs() const {
return query_parameter_pairs_;
@ -85,6 +85,10 @@ class URI {
std::string ToString() const;
// Returns the encoded path and query params, such as would be used on
// the wire in an HTTP request.
std::string EncodedPathAndQueryParams() const;
private:
URI(std::string scheme, std::string authority, std::string path,
std::vector<QueryParam> query_parameter_pairs, std::string fragment);

@ -193,8 +193,10 @@ TEST_F(HttpRequestTest, Get) {
std::string host = absl::StrFormat("localhost:%d", g_server_port);
LOG(INFO) << "requesting from " << host;
memset(&req, 0, sizeof(req));
auto uri = grpc_core::URI::Create("http", host, "/get", {} /* query params */,
"" /* fragment */);
auto uri = grpc_core::URI::Create(
"http", host, "/get",
/*query_parameter_pairs=*/{{"foo", "bar"}, {"baz", "quux"}},
/*fragment=*/"");
CHECK(uri.ok());
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
grpc_core::HttpRequest::Get(
@ -219,8 +221,10 @@ TEST_F(HttpRequestTest, Post) {
memset(&req, 0, sizeof(req));
req.body = const_cast<char*>("hello");
req.body_length = 5;
auto uri = grpc_core::URI::Create("http", host, "/post",
{} /* query params */, "" /* fragment */);
auto uri = grpc_core::URI::Create(
"http", host, "/post",
/*query_parameter_pairs=*/{{"foo", "bar"}, {"mumble", "frotz"}},
/*fragment=*/"");
CHECK(uri.ok());
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
grpc_core::HttpRequest::Post(

@ -191,8 +191,10 @@ TEST_F(HttpsCliTest, Get) {
const_cast<char*>(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
const_cast<char*>("foo.test.google.fr"));
grpc_channel_args args = {1, &ssl_override_arg};
auto uri = grpc_core::URI::Create("https", host, "/get",
{} /* query params */, "" /* fragment */);
auto uri = grpc_core::URI::Create(
"https", host, "/get",
/*query_parameter_pairs=*/{{"foo", "bar"}, {"baz", "quux"}},
/*fragment=*/"");
CHECK(uri.ok());
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
grpc_core::HttpRequest::Get(
@ -219,8 +221,10 @@ TEST_F(HttpsCliTest, Post) {
const_cast<char*>(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
const_cast<char*>("foo.test.google.fr"));
grpc_channel_args args = {1, &ssl_override_arg};
auto uri = grpc_core::URI::Create("https", host, "/post",
{} /* query params */, "" /* fragment */);
auto uri = grpc_core::URI::Create(
"https", host, "/post",
/*query_parameter_pairs=*/{{"foo", "bar"}, {"mumble", "frotz"}},
/*fragment=*/"");
CHECK(uri.ok());
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
grpc_core::HttpRequest::Post(

@ -59,13 +59,13 @@ class Handler(BaseHTTPRequestHandler):
)
def do_GET(self):
if self.path == "/get":
if self.path == "/get?foo=bar&baz=quux":
self.good()
def do_POST(self):
content_len = self.headers.get("content-length")
content = self.rfile.read(int(content_len)).decode("ascii")
if self.path == "/post" and content == "hello":
if self.path == "/post?foo=bar&mumble=frotz" and content == "hello":
self.good()

@ -254,9 +254,9 @@ grpc_yodel_simple_test(
"no_windows",
],
deps = [
"//test/core/transport/util:mock_promise_endpoint",
"//src/core:chaotic_good_control_endpoint",
"//test/core/call/yodel:yodel_test",
"//test/core/transport/util:mock_promise_endpoint",
],
)
@ -269,8 +269,8 @@ grpc_yodel_simple_test(
"no_windows",
],
deps = [
"//test/core/transport/util:mock_promise_endpoint",
"//src/core:chaotic_good_data_endpoints",
"//test/core/call/yodel:yodel_test",
"//test/core/transport/util:mock_promise_endpoint",
],
)

@ -502,7 +502,7 @@ void XdsEnd2endTest::InitClient(
if (xds_resource_does_not_exist_timeout_ms > 0) {
xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS),
xds_resource_does_not_exist_timeout_ms));
xds_resource_does_not_exist_timeout_ms * grpc_test_slowdown_factor()));
}
if (!lb_expected_authority.empty()) {
constexpr char authority_const[] = "localhost:%d";

Loading…
Cancel
Save