diff --git a/src/core/BUILD b/src/core/BUILD index 37d1d4f0e6e..b50b8bcae8c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -68,6 +68,7 @@ grpc_cc_library( "absl/strings", ], deps = [ + ":memory_quota", "//:event_engine_base_hdrs", "//:gpr_platform", ], diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h index 294567701e8..40a216c1d9f 100644 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h @@ -47,7 +47,10 @@ class ChaoticGoodTransport : public RefCounted { data_endpoint_(std::move(data_endpoint)), encoder_(std::move(hpack_encoder)), parser_(std::move(hpack_parser)) { - data_endpoint_.EnforceRxMemoryAlignment(); + // Enable RxMemoryAlignment and RPC receive coalescing after the transport + // setup is complete. At this point all the settings frames should have + // been read. + data_endpoint_.EnforceRxMemoryAlignmentAndCoalescing(); } auto WriteFrame(const FrameInterface& frame) { diff --git a/src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc b/src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc index 9be72346484..7f3d85e4530 100644 --- a/src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc +++ b/src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc @@ -257,6 +257,8 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result, endpoint.value().get()); if (chaotic_good_ext != nullptr) { chaotic_good_ext->EnableStatsCollection(/*is_control_channel=*/true); + chaotic_good_ext->UseMemoryQuota( + ResourceQuota::Default()->memory_quota()); } p->handshake_mgr_->DoHandshake( grpc_event_engine_endpoint_create(std::move(endpoint.value())), diff --git a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc index 70bf644c2a9..65912c07b82 100644 --- a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc +++ b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc @@ -59,6 +59,7 @@ #include "src/core/lib/promise/sleep.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/surface/server.h" @@ -418,6 +419,11 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState:: [self, chaotic_good_ext](bool is_control_endpoint) { if (chaotic_good_ext != nullptr) { chaotic_good_ext->EnableStatsCollection(is_control_endpoint); + if (is_control_endpoint) { + // Control endpoint should use the default memory quota + chaotic_good_ext->UseMemoryQuota( + ResourceQuota::Default()->memory_quota()); + } } return EndpointWriteSettingsFrame(self, is_control_endpoint); }); diff --git a/src/core/lib/event_engine/extensions/chaotic_good_extension.h b/src/core/lib/event_engine/extensions/chaotic_good_extension.h index b79c2c2d4c3..b6b1699875e 100644 --- a/src/core/lib/event_engine/extensions/chaotic_good_extension.h +++ b/src/core/lib/event_engine/extensions/chaotic_good_extension.h @@ -19,6 +19,8 @@ #include "absl/strings/string_view.h" +#include "src/core/lib/resource_quota/memory_quota.h" + namespace grpc_event_engine { namespace experimental { @@ -37,6 +39,21 @@ class ChaoticGoodExtension { /// Otherwise they are grouped into histograms and counters specific to the /// chaotic good data channel. virtual void EnableStatsCollection(bool is_control_channel) = 0; + + /// Forces the endpoint to use the provided memory quota instead of using the + /// one provided to it through the channel args. It is safe to call this + /// only when there are no outstanding Reads on the Endpoint. + virtual void UseMemoryQuota(grpc_core::MemoryQuotaRefPtr mem_quota) = 0; + + /// Forces the endpoint to receive rpcs in one contiguous block of memory. + /// It is safe to call this only when there are no outstanding Reads on + /// the Endpoint. + virtual void EnableRpcReceiveCoalescing() = 0; + + /// Disables rpc receive coalescing until it is explicitly enabled again. + /// It is safe to call this only when there are no outstanding Reads on + /// the Endpoint. + virtual void DisableRpcReceiveCoalescing() = 0; /// If invoked, the endpoint tries to preserve proper order and alignment of /// any memory that maybe shared across reads. virtual void EnforceRxMemoryAlignment() = 0; diff --git a/src/core/lib/transport/promise_endpoint.h b/src/core/lib/transport/promise_endpoint.h index 761dc71c8e3..8fe3c5c9877 100644 --- a/src/core/lib/transport/promise_endpoint.h +++ b/src/core/lib/transport/promise_endpoint.h @@ -219,11 +219,14 @@ class PromiseEndpoint { }); } - void EnforceRxMemoryAlignment() { + // Enables RPC receive coalescing and alignment of memory holding received + // RPCs. + void EnforceRxMemoryAlignmentAndCoalescing() { auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension< grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get()); if (chaotic_good_ext != nullptr) { chaotic_good_ext->EnforceRxMemoryAlignment(); + chaotic_good_ext->EnableRpcReceiveCoalescing(); if (read_state_->buffer.Length() == 0) { return; }