diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h index 98e120baceb..55de0213818 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h @@ -50,6 +50,13 @@ inline std::vector OneDataEndpoint(PromiseEndpoint endpoint) { return ep; } +// One received frame: the header, and the serialized bytes of the payload. +// The payload may not yet be received into memory, so the accessor for that +// returns a promise that will need to be resolved prior to inspecting the +// bytes. +// In this way we can pull bytes from various different data connections and +// read them in any order, but still have a trivial reassembly in the receiving +// call promise. class IncomingFrame { public: template @@ -107,14 +114,17 @@ class ChaoticGoodTransport : public RefCounted { .value_or("<>") << " " << frame.ToString(); return If( + // If we have no data endpoints, OR this is a small payload data_endpoints_.empty() || header.payload_length <= options_.inlined_payload_size_threshold, + // ... then write it to the control endpoint [this, &header, &frame]() { SliceBuffer output; header.Serialize(output.AddTiny(FrameHeader::kFrameHeaderSize)); frame.SerializePayload(output); return control_endpoint_.Write(std::move(output)); }, + // ... otherwise write it to a data connection [this, header, &frame]() mutable { SliceBuffer payload; // Temporarily give a bogus connection id to get padding right @@ -141,6 +151,8 @@ class ChaoticGoodTransport : public RefCounted { }); } + // Common outbound loop for both client and server (these vary only over the + // frame type). template auto TransportWriteLoop(MpscReceiver& outgoing_frames) { return Loop([self = Ref(), &outgoing_frames] { @@ -180,7 +192,13 @@ class ChaoticGoodTransport : public RefCounted { }, [this](FrameHeader frame_header) { return If( + // If the payload is on the connection frame frame_header.payload_connection_id == 0, + // ... then read the data immediately and return an IncomingFrame + // that contains the payload. + // We need to do this here so that we do not create head of line + // blocking issues reading later control frames (but waiting for a + // call to get scheduled time to read the payload). [this, frame_header]() { return Map(control_endpoint_.Read(frame_header.payload_length), [frame_header](absl::StatusOr payload) @@ -190,6 +208,10 @@ class ChaoticGoodTransport : public RefCounted { std::move(payload), 0); }); }, + // ... otherwise issue a read to the appropriate data endpoint, + // which will return a read ticket - which can be used later + // in the call promise to asynchronously wait for those bytes + // to be available. [this, frame_header]() -> absl::StatusOr { const auto padding = frame_header.Padding(options_.decode_alignment); diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 8db98bdb48c..476bc018246 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -131,12 +131,14 @@ auto ChaoticGoodClientTransport::DispatchFrame( stream != nullptr, [this, &stream, &incoming_frame, &transport]() { auto& call = stream->call; + // TODO(ctiller): instead of SpawnWaitable here we probably want a + // small queue to push into, so that the call can proceed + // asynchronously to other calls regardless of frame ordering. return call.SpawnWaitable( "push-frame", [this, stream = std::move(stream), incoming_frame = std::move(incoming_frame), transport = std::move(transport)]() mutable { - auto& call = stream->call; - return call.CancelIfFails(TrySeq( + return TrySeq( incoming_frame.Payload(), [transport = std::move(transport), header = incoming_frame.header()](SliceBuffer payload) { @@ -144,10 +146,11 @@ auto ChaoticGoodClientTransport::DispatchFrame( header, std::move(payload)); }, [stream = std::move(stream), this](T frame) mutable { - return PushFrameIntoCall(std::move(frame), - std::move(stream)); + auto& call = stream->call; + return call.CancelIfFails(PushFrameIntoCall( + std::move(frame), std::move(stream))); }, - ImmediateOkStatus())); + ImmediateOkStatus()); }); }, []() { return absl::OkStatus(); })); diff --git a/src/core/ext/transport/chaotic_good/control_endpoint.cc b/src/core/ext/transport/chaotic_good/control_endpoint.cc index b2136286b0c..0a6baad0a55 100644 --- a/src/core/ext/transport/chaotic_good/control_endpoint.cc +++ b/src/core/ext/transport/chaotic_good/control_endpoint.cc @@ -47,7 +47,9 @@ ControlEndpoint::ControlEndpoint( GRPC_LATENT_SEE_PROMISE( "FlushLoop", Loop([endpoint = endpoint_, buffer = buffer_]() { return TrySeq( + // Pull one set of buffered writes buffer->Pull(), + // And write them [endpoint, buffer = buffer.get()](SliceBuffer flushing) { GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD: Flush " << flushing.Length() @@ -56,6 +58,7 @@ ControlEndpoint::ControlEndpoint( .value_or("<>"); return endpoint->Write(std::move(flushing)); }, + // Then repeat []() -> LoopCtl { return Continue{}; }); })), [](absl::Status) {}); diff --git a/src/core/ext/transport/chaotic_good/control_endpoint.h b/src/core/ext/transport/chaotic_good/control_endpoint.h index ce2b4e160fe..b139724b240 100644 --- a/src/core/ext/transport/chaotic_good/control_endpoint.h +++ b/src/core/ext/transport/chaotic_good/control_endpoint.h @@ -23,10 +23,20 @@ namespace grpc_core { namespace chaotic_good { +// Wrapper around PromiseEndpoint. +// Buffers all of the small writes that get enqueued to this endpoint, and then +// uses a separate party to flush them to the wire. +// In doing so we get to batch up effectively all the writes from the transport +// (since party wakeups are sticky), and then flush all the writes in one go. class ControlEndpoint { private: class Buffer : public RefCounted { public: + // Queue some buffer to be written. + // We cap the queue size so that we don't infinitely buffer on one + // connection - if the cap is hit, this queue operation will not resolve + // until it empties. + // Returns a promise that resolves to Empty{} when the data has been queued. auto Queue(SliceBuffer&& buffer) { return [buffer = std::move(buffer), this]() mutable -> Poll { Waker waker; @@ -70,6 +80,7 @@ class ControlEndpoint { // to Empty{} -- it's not possible to see errors from this api. auto Write(SliceBuffer&& bytes) { return buffer_->Queue(std::move(bytes)); } + // Read operations are simply passthroughs to the underlying promise endpoint. auto ReadSlice(size_t length) { return endpoint_->ReadSlice(length); } auto Read(size_t length) { return endpoint_->Read(length); } auto GetPeerAddress() const { return endpoint_->GetPeerAddress(); } diff --git a/src/core/ext/transport/chaotic_good/data_endpoints.h b/src/core/ext/transport/chaotic_good/data_endpoints.h index c12f877389b..c8a897997c8 100644 --- a/src/core/ext/transport/chaotic_good/data_endpoints.h +++ b/src/core/ext/transport/chaotic_good/data_endpoints.h @@ -30,6 +30,7 @@ struct Endpoints : public RefCounted { std::vector endpoints; }; +// Buffered writes for one data endpoint class OutputBuffer { public: bool Accept(SliceBuffer& buffer); @@ -46,6 +47,7 @@ class OutputBuffer { SliceBuffer pending_; }; +// The set of output buffers for all connected data endpoints class OutputBuffers : public RefCounted { public: explicit OutputBuffers(uint32_t num_connections); @@ -71,6 +73,13 @@ class OutputBuffers : public RefCounted { class InputQueues : public RefCounted { public: + // One outstanding read. + // ReadTickets get filed by read requests, and all tickets are fullfilled + // by an endpoint. + // A call may Await a ticket to get the bytes back later (or it may skip that + // step - in which case the bytes are thrown away after reading). + // This decoupling is necessary to ensure that cancelled reads by calls do not + // cause data corruption for other calls. class ReadTicket { public: ReadTicket(absl::StatusOr ticket, @@ -147,6 +156,7 @@ class InputQueues : public RefCounted { }; } // namespace data_endpoints_detail +// Collection of data connections. class DataEndpoints { public: using ReadTicket = data_endpoints_detail::InputQueues::ReadTicket; diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 22446bb2b56..e2a705e1df0 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -95,11 +95,14 @@ auto ChaoticGoodServerTransport::DispatchFrame( return If( stream != nullptr, [this, &stream, &frame, &transport]() { + // TODO(ctiller): instead of SpawnWaitable here we probably want a + // small queue to push into, so that the call can proceed + // asynchronously to other calls regardless of frame ordering. return stream->call.SpawnWaitable( "push-frame", [this, stream, frame = std::move(frame), transport = std::move(transport)]() mutable { auto& call = stream->call; - return call.CancelIfFails(TrySeq( + return TrySeq( frame.Payload(), [transport = std::move(transport), header = frame.header()](SliceBuffer payload) { @@ -107,10 +110,11 @@ auto ChaoticGoodServerTransport::DispatchFrame( std::move(payload)); }, [stream = std::move(stream), this](T frame) mutable { - return PushFrameIntoCall(std::move(stream), - std::move(frame)); + auto& call = stream->call; + return call.CancelIfFails( + PushFrameIntoCall(std::move(stream), std::move(frame))); }, - ImmediateOkStatus())); + ImmediateOkStatus()); }); }, []() { return absl::OkStatus(); }); @@ -250,8 +254,7 @@ auto ChaoticGoodServerTransport::ReadOneFrame( "ReadOneFrame", TrySeq( transport->ReadFrameBytes(), - [this, - transport = std::move(transport)](IncomingFrame incoming_frame) { + [this, transport](IncomingFrame incoming_frame) mutable { // CHECK_EQ(header.payload_length, payload.Length()); return Switch( incoming_frame.header().type, diff --git a/src/core/util/http_client/httpcli.cc b/src/core/util/http_client/httpcli.cc index 0eb2292d9b1..20e77682c81 100644 --- a/src/core/util/http_client/httpcli.cc +++ b/src/core/util/http_client/httpcli.cc @@ -79,8 +79,9 @@ OrphanablePtr HttpRequest::Get( } std::string name = absl::StrFormat("HTTP:GET:%s:%s", uri.authority(), uri.path()); - const grpc_slice request_text = grpc_httpcli_format_get_request( - request, uri.authority().c_str(), uri.path().c_str()); + const grpc_slice request_text = + grpc_httpcli_format_get_request(request, uri.authority().c_str(), + uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), @@ -103,8 +104,9 @@ OrphanablePtr HttpRequest::Post( } std::string name = absl::StrFormat("HTTP:POST:%s:%s", uri.authority(), uri.path()); - const grpc_slice request_text = grpc_httpcli_format_post_request( - request, uri.authority().c_str(), uri.path().c_str()); + const grpc_slice request_text = + grpc_httpcli_format_post_request(request, uri.authority().c_str(), + uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), @@ -127,8 +129,9 @@ OrphanablePtr HttpRequest::Put( } std::string name = absl::StrFormat("HTTP:PUT:%s:%s", uri.authority(), uri.path()); - const grpc_slice request_text = grpc_httpcli_format_put_request( - request, uri.authority().c_str(), uri.path().c_str()); + const grpc_slice request_text = + grpc_httpcli_format_put_request(request, uri.authority().c_str(), + uri.EncodedPathAndQueryParams().c_str()); return MakeOrphanable( std::move(uri), request_text, response, deadline, channel_args, on_done, pollent, name.c_str(), std::move(test_only_generate_response), @@ -241,6 +244,8 @@ void HttpRequest::AppendError(grpc_error_handle error) { void HttpRequest::OnReadInternal(grpc_error_handle error) { for (size_t i = 0; i < incoming_.count; i++) { + GRPC_TRACE_LOG(http1, INFO) + << "HTTP response data: " << StringViewFromSlice(incoming_.slices[i]); if (GRPC_SLICE_LENGTH(incoming_.slices[i])) { have_read_byte_ = 1; grpc_error_handle err = @@ -275,6 +280,8 @@ void HttpRequest::ContinueDoneWriteAfterScheduleOnExecCtx( } void HttpRequest::StartWrite() { + GRPC_TRACE_LOG(http1, INFO) + << "Sending HTTP1 request: " << StringViewFromSlice(request_text_); CSliceRef(request_text_); grpc_slice_buffer_add(&outgoing_, request_text_); Ref().release(); // ref held by pending write diff --git a/src/core/util/uri.cc b/src/core/util/uri.cc index e6a94e57173..e7b82242abc 100644 --- a/src/core/util/uri.cc +++ b/src/core/util/uri.cc @@ -352,6 +352,16 @@ std::string URI::ToString() const { parts.emplace_back("//"); parts.emplace_back(PercentEncode(authority_, IsAuthorityChar)); } + parts.emplace_back(EncodedPathAndQueryParams()); + if (!fragment_.empty()) { + parts.push_back("#"); + parts.push_back(PercentEncode(fragment_, IsQueryOrFragmentChar)); + } + return absl::StrJoin(parts, ""); +} + +std::string URI::EncodedPathAndQueryParams() const { + std::vector parts; if (!path_.empty()) { parts.emplace_back(PercentEncode(path_, IsPathChar)); } @@ -360,10 +370,6 @@ std::string URI::ToString() const { parts.push_back( absl::StrJoin(query_parameter_pairs_, "&", QueryParameterFormatter())); } - if (!fragment_.empty()) { - parts.push_back("#"); - parts.push_back(PercentEncode(fragment_, IsQueryOrFragmentChar)); - } return absl::StrJoin(parts, ""); } diff --git a/src/core/util/uri.h b/src/core/util/uri.h index 14e9274eaa8..3f9dc7d1b25 100644 --- a/src/core/util/uri.h +++ b/src/core/util/uri.h @@ -76,7 +76,7 @@ class URI { return query_parameter_map_; } // A vector of key:value query parameter pairs, kept in order of appearance - // within the URI search string. Repeated keys are represented as separate + // within the URI string. Repeated keys are represented as separate // key:value elements. const std::vector& query_parameter_pairs() const { return query_parameter_pairs_; @@ -85,6 +85,10 @@ class URI { std::string ToString() const; + // Returns the encoded path and query params, such as would be used on + // the wire in an HTTP request. + std::string EncodedPathAndQueryParams() const; + private: URI(std::string scheme, std::string authority, std::string path, std::vector query_parameter_pairs, std::string fragment); diff --git a/test/core/http/httpcli_test.cc b/test/core/http/httpcli_test.cc index 0a3c508460d..ba3dc9d92f1 100644 --- a/test/core/http/httpcli_test.cc +++ b/test/core/http/httpcli_test.cc @@ -193,8 +193,10 @@ TEST_F(HttpRequestTest, Get) { std::string host = absl::StrFormat("localhost:%d", g_server_port); LOG(INFO) << "requesting from " << host; memset(&req, 0, sizeof(req)); - auto uri = grpc_core::URI::Create("http", host, "/get", {} /* query params */, - "" /* fragment */); + auto uri = grpc_core::URI::Create( + "http", host, "/get", + /*query_parameter_pairs=*/{{"foo", "bar"}, {"baz", "quux"}}, + /*fragment=*/""); CHECK(uri.ok()); grpc_core::OrphanablePtr http_request = grpc_core::HttpRequest::Get( @@ -219,8 +221,10 @@ TEST_F(HttpRequestTest, Post) { memset(&req, 0, sizeof(req)); req.body = const_cast("hello"); req.body_length = 5; - auto uri = grpc_core::URI::Create("http", host, "/post", - {} /* query params */, "" /* fragment */); + auto uri = grpc_core::URI::Create( + "http", host, "/post", + /*query_parameter_pairs=*/{{"foo", "bar"}, {"mumble", "frotz"}}, + /*fragment=*/""); CHECK(uri.ok()); grpc_core::OrphanablePtr http_request = grpc_core::HttpRequest::Post( diff --git a/test/core/http/httpscli_test.cc b/test/core/http/httpscli_test.cc index 88c764bae1c..8c8c8bf47fd 100644 --- a/test/core/http/httpscli_test.cc +++ b/test/core/http/httpscli_test.cc @@ -191,8 +191,10 @@ TEST_F(HttpsCliTest, Get) { const_cast(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG), const_cast("foo.test.google.fr")); grpc_channel_args args = {1, &ssl_override_arg}; - auto uri = grpc_core::URI::Create("https", host, "/get", - {} /* query params */, "" /* fragment */); + auto uri = grpc_core::URI::Create( + "https", host, "/get", + /*query_parameter_pairs=*/{{"foo", "bar"}, {"baz", "quux"}}, + /*fragment=*/""); CHECK(uri.ok()); grpc_core::OrphanablePtr http_request = grpc_core::HttpRequest::Get( @@ -219,8 +221,10 @@ TEST_F(HttpsCliTest, Post) { const_cast(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG), const_cast("foo.test.google.fr")); grpc_channel_args args = {1, &ssl_override_arg}; - auto uri = grpc_core::URI::Create("https", host, "/post", - {} /* query params */, "" /* fragment */); + auto uri = grpc_core::URI::Create( + "https", host, "/post", + /*query_parameter_pairs=*/{{"foo", "bar"}, {"mumble", "frotz"}}, + /*fragment=*/""); CHECK(uri.ok()); grpc_core::OrphanablePtr http_request = grpc_core::HttpRequest::Post( diff --git a/test/core/http/test_server.py b/test/core/http/test_server.py index 6da64fc84e9..34cb717bcbe 100755 --- a/test/core/http/test_server.py +++ b/test/core/http/test_server.py @@ -59,13 +59,13 @@ class Handler(BaseHTTPRequestHandler): ) def do_GET(self): - if self.path == "/get": + if self.path == "/get?foo=bar&baz=quux": self.good() def do_POST(self): content_len = self.headers.get("content-length") content = self.rfile.read(int(content_len)).decode("ascii") - if self.path == "/post" and content == "hello": + if self.path == "/post?foo=bar&mumble=frotz" and content == "hello": self.good() diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 96232af3600..cf0c6055dd3 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -254,9 +254,9 @@ grpc_yodel_simple_test( "no_windows", ], deps = [ - "//test/core/transport/util:mock_promise_endpoint", "//src/core:chaotic_good_control_endpoint", "//test/core/call/yodel:yodel_test", + "//test/core/transport/util:mock_promise_endpoint", ], ) @@ -269,8 +269,8 @@ grpc_yodel_simple_test( "no_windows", ], deps = [ - "//test/core/transport/util:mock_promise_endpoint", "//src/core:chaotic_good_data_endpoints", "//test/core/call/yodel:yodel_test", + "//test/core/transport/util:mock_promise_endpoint", ], ) diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index b743bbedc0d..cf764c8cb65 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -502,7 +502,7 @@ void XdsEnd2endTest::InitClient( if (xds_resource_does_not_exist_timeout_ms > 0) { xds_channel_args_to_add_.emplace_back(grpc_channel_arg_integer_create( const_cast(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS), - xds_resource_does_not_exist_timeout_ms)); + xds_resource_does_not_exist_timeout_ms * grpc_test_slowdown_factor())); } if (!lb_expected_authority.empty()) { constexpr char authority_const[] = "localhost:%d";