[chaotic-good] Update chaotic good extension to allow setting attributes on event engine endpoints which need to work with the chaotic good transport.

PiperOrigin-RevId: 620089768
pull/36191/head
Vignesh Babu 10 months ago committed by Copybara-Service
parent d1287ad07a
commit 86383a031d
  1. 1
      src/core/BUILD
  2. 5
      src/core/ext/transport/chaotic_good/chaotic_good_transport.h
  3. 2
      src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc
  4. 6
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
  5. 17
      src/core/lib/event_engine/extensions/chaotic_good_extension.h
  6. 5
      src/core/lib/transport/promise_endpoint.h

@ -68,6 +68,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
":memory_quota",
"//:event_engine_base_hdrs",
"//:gpr_platform",
],

@ -47,7 +47,10 @@ class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
data_endpoint_(std::move(data_endpoint)),
encoder_(std::move(hpack_encoder)),
parser_(std::move(hpack_parser)) {
data_endpoint_.EnforceRxMemoryAlignment();
// Enable RxMemoryAlignment and RPC receive coalescing after the transport
// setup is complete. At this point all the settings frames should have
// been read.
data_endpoint_.EnforceRxMemoryAlignmentAndCoalescing();
}
auto WriteFrame(const FrameInterface& frame) {

@ -257,6 +257,8 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result,
endpoint.value().get());
if (chaotic_good_ext != nullptr) {
chaotic_good_ext->EnableStatsCollection(/*is_control_channel=*/true);
chaotic_good_ext->UseMemoryQuota(
ResourceQuota::Default()->memory_quota());
}
p->handshake_mgr_->DoHandshake(
grpc_event_engine_endpoint_create(std::move(endpoint.value())),

@ -59,6 +59,7 @@
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/server.h"
@ -418,6 +419,11 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState::
[self, chaotic_good_ext](bool is_control_endpoint) {
if (chaotic_good_ext != nullptr) {
chaotic_good_ext->EnableStatsCollection(is_control_endpoint);
if (is_control_endpoint) {
// Control endpoint should use the default memory quota
chaotic_good_ext->UseMemoryQuota(
ResourceQuota::Default()->memory_quota());
}
}
return EndpointWriteSettingsFrame(self, is_control_endpoint);
});

@ -19,6 +19,8 @@
#include "absl/strings/string_view.h"
#include "src/core/lib/resource_quota/memory_quota.h"
namespace grpc_event_engine {
namespace experimental {
@ -37,6 +39,21 @@ class ChaoticGoodExtension {
/// Otherwise they are grouped into histograms and counters specific to the
/// chaotic good data channel.
virtual void EnableStatsCollection(bool is_control_channel) = 0;
/// Forces the endpoint to use the provided memory quota instead of using the
/// one provided to it through the channel args. It is safe to call this
/// only when there are no outstanding Reads on the Endpoint.
virtual void UseMemoryQuota(grpc_core::MemoryQuotaRefPtr mem_quota) = 0;
/// Forces the endpoint to receive rpcs in one contiguous block of memory.
/// It is safe to call this only when there are no outstanding Reads on
/// the Endpoint.
virtual void EnableRpcReceiveCoalescing() = 0;
/// Disables rpc receive coalescing until it is explicitly enabled again.
/// It is safe to call this only when there are no outstanding Reads on
/// the Endpoint.
virtual void DisableRpcReceiveCoalescing() = 0;
/// If invoked, the endpoint tries to preserve proper order and alignment of
/// any memory that maybe shared across reads.
virtual void EnforceRxMemoryAlignment() = 0;

@ -219,11 +219,14 @@ class PromiseEndpoint {
});
}
void EnforceRxMemoryAlignment() {
// Enables RPC receive coalescing and alignment of memory holding received
// RPCs.
void EnforceRxMemoryAlignmentAndCoalescing() {
auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get());
if (chaotic_good_ext != nullptr) {
chaotic_good_ext->EnforceRxMemoryAlignment();
chaotic_good_ext->EnableRpcReceiveCoalescing();
if (read_state_->buffer.Length() == 0) {
return;
}

Loading…
Cancel
Save