Add new EventEngine::Extension to allow transport to send and receive frames to and from endpoint.

PiperOrigin-RevId: 693029414
pull/37136/head
Alisha Nanda 3 weeks ago committed by Copybara-Service
parent 574b19ec31
commit 86a68b4000
  1. 2
      BUILD
  2. 1
      Package.swift
  3. 2
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 17
      src/core/BUILD
  9. 28
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  10. 8
      src/core/ext/transport/chttp2/transport/internal.h
  11. 47
      src/core/lib/transport/transport_framing_endpoint_extension.h
  12. 1
      tools/doxygen/Doxyfile.c++.internal
  13. 1
      tools/doxygen/Doxyfile.core.internal

@ -4723,6 +4723,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/functional:bind_front",
"absl/hash",
"absl/log:check",
"absl/log:log",
@ -4798,6 +4799,7 @@ grpc_cc_library(
"//src/core:status_conversion",
"//src/core:status_helper",
"//src/core:time",
"//src/core:transport_framing_endpoint_extension",
"//src/core:useful",
"//src/core:write_size_policy",
],

1
Package.swift generated

@ -1651,6 +1651,7 @@ let package = Package(
"src/core/lib/transport/timeout_encoding.h",
"src/core/lib/transport/transport.cc",
"src/core/lib/transport/transport.h",
"src/core/lib/transport/transport_framing_endpoint_extension.h",
"src/core/lib/transport/transport_fwd.h",
"src/core/lib/transport/transport_op_string.cc",
"src/core/load_balancing/address_filtering.cc",

@ -1084,6 +1084,7 @@ libs:
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- src/core/lib/transport/transport.h
- src/core/lib/transport/transport_framing_endpoint_extension.h
- src/core/lib/transport/transport_fwd.h
- src/core/load_balancing/address_filtering.h
- src/core/load_balancing/backend_metric_data.h
@ -2635,6 +2636,7 @@ libs:
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- src/core/lib/transport/transport.h
- src/core/lib/transport/transport_framing_endpoint_extension.h
- src/core/lib/transport/transport_fwd.h
- src/core/load_balancing/address_filtering.h
- src/core/load_balancing/backend_metric_data.h

2
gRPC-C++.podspec generated

@ -1135,6 +1135,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/status_conversion.h',
'src/core/lib/transport/timeout_encoding.h',
'src/core/lib/transport/transport.h',
'src/core/lib/transport/transport_framing_endpoint_extension.h',
'src/core/lib/transport/transport_fwd.h',
'src/core/load_balancing/address_filtering.h',
'src/core/load_balancing/backend_metric_data.h',
@ -2424,6 +2425,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/status_conversion.h',
'src/core/lib/transport/timeout_encoding.h',
'src/core/lib/transport/transport.h',
'src/core/lib/transport/transport_framing_endpoint_extension.h',
'src/core/lib/transport/transport_fwd.h',
'src/core/load_balancing/address_filtering.h',
'src/core/load_balancing/backend_metric_data.h',

2
gRPC-Core.podspec generated

@ -1767,6 +1767,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/timeout_encoding.h',
'src/core/lib/transport/transport.cc',
'src/core/lib/transport/transport.h',
'src/core/lib/transport/transport_framing_endpoint_extension.h',
'src/core/lib/transport/transport_fwd.h',
'src/core/lib/transport/transport_op_string.cc',
'src/core/load_balancing/address_filtering.cc',
@ -3276,6 +3277,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/status_conversion.h',
'src/core/lib/transport/timeout_encoding.h',
'src/core/lib/transport/transport.h',
'src/core/lib/transport/transport_framing_endpoint_extension.h',
'src/core/lib/transport/transport_fwd.h',
'src/core/load_balancing/address_filtering.h',
'src/core/load_balancing/backend_metric_data.h',

1
grpc.gemspec generated

@ -1653,6 +1653,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/timeout_encoding.h )
s.files += %w( src/core/lib/transport/transport.cc )
s.files += %w( src/core/lib/transport/transport.h )
s.files += %w( src/core/lib/transport/transport_framing_endpoint_extension.h )
s.files += %w( src/core/lib/transport/transport_fwd.h )
s.files += %w( src/core/lib/transport/transport_op_string.cc )
s.files += %w( src/core/load_balancing/address_filtering.cc )

1
package.xml generated

@ -1635,6 +1635,7 @@
<file baseinstalldir="/" name="src/core/lib/transport/timeout_encoding.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/transport.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/transport.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/transport_framing_endpoint_extension.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/transport_fwd.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/transport_op_string.cc" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/address_filtering.cc" role="src" />

@ -92,6 +92,7 @@ grpc_cc_library(
],
deps = [
":memory_quota",
":slice_buffer",
"//:event_engine_base_hdrs",
"//:gpr_platform",
"//:tcp_tracer",
@ -132,6 +133,22 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "transport_framing_endpoint_extension",
hdrs = [
"lib/transport/transport_framing_endpoint_extension.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/strings",
],
deps = [
"slice_buffer",
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "latent_see",
srcs = [

@ -98,6 +98,7 @@
#include "src/core/lib/transport/metadata_info.h"
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_framing_endpoint_extension.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/telemetry/stats.h"
#include "src/core/telemetry/stats_data.h"
@ -577,6 +578,24 @@ static void init_keepalive_pings_if_enabled_locked(
}
}
// TODO(alishananda): add unit testing as part of chttp2 promise conversion work
void grpc_chttp2_transport::WriteSecurityFrame(grpc_core::SliceBuffer* data) {
combiner->Run(grpc_core::NewClosure(
[transport = Ref(), data](grpc_error_handle) mutable {
transport->WriteSecurityFrameLocked(data);
}),
absl::OkStatus());
}
void grpc_chttp2_transport::WriteSecurityFrameLocked(
grpc_core::SliceBuffer* data) {
if (data == nullptr) {
return;
}
// TODO(alishananda): create security frame and append to qbuf, initiate write
grpc_core::Crash("unreachable");
}
using grpc_event_engine::experimental::QueryExtension;
using grpc_event_engine::experimental::TcpTraceExtension;
@ -619,6 +638,15 @@ grpc_chttp2_transport::grpc_chttp2_transport(
}
}
transport_framing_endpoint_extension = QueryExtension<
grpc_core::TransportFramingEndpointExtension>(
grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
ep.get()));
if (transport_framing_endpoint_extension != nullptr) {
transport_framing_endpoint_extension->SetSendFrameCallback(
[this](grpc_core::SliceBuffer* data) { WriteSecurityFrame(data); });
}
CHECK(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);

@ -70,6 +70,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_framing_endpoint_extension.h"
#include "src/core/telemetry/call_tracer.h"
#include "src/core/telemetry/tcp_tracer.h"
#include "src/core/util/bitset.h"
@ -255,12 +256,19 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
void SetPollsetSet(grpc_stream* stream,
grpc_pollset_set* pollset_set) override;
void PerformOp(grpc_transport_op* op) override;
// Callback for transport framing endpoint extension to send security frames
// received directly from the endpoint on wire.
void WriteSecurityFrame(grpc_core::SliceBuffer* data);
void WriteSecurityFrameLocked(grpc_core::SliceBuffer* data);
grpc_core::OrphanablePtr<grpc_endpoint> ep;
grpc_core::Mutex ep_destroy_mu; // Guards endpoint destruction only.
grpc_core::Slice peer_string;
grpc_core::TransportFramingEndpointExtension*
transport_framing_endpoint_extension = nullptr;
grpc_core::MemoryOwner memory_owner;
const grpc_core::MemoryAllocator::Reservation self_reservation;
grpc_core::ReclamationSweep active_reclamation;

@ -0,0 +1,47 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_FRAMING_ENDPOINT_EXTENSION_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_FRAMING_ENDPOINT_EXTENSION_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/slice/slice_buffer.h"
namespace grpc_core {
/// An Endpoint extension class that will be supported by EventEngine endpoints
/// which can send data to a transport and receive data from it.
class TransportFramingEndpointExtension {
public:
virtual ~TransportFramingEndpointExtension() = default;
static absl::string_view EndpointExtensionName() {
return "io.grpc.transport.extension.transport_framing_endpoint_"
"extension";
}
// Send data to transport through `cb`. The data will be sent in a single
// frame.
virtual void SetSendFrameCallback(
absl::AnyInvocable<void(SliceBuffer* data)> cb) = 0;
/// Receive data from transport. The data will be from a single frame.
virtual void ReceiveFrame(SliceBuffer data) = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_TRANSPORT_FRAMING_ENDPOINT_EXTENSION_H

@ -2617,6 +2617,7 @@ src/core/lib/transport/timeout_encoding.cc \
src/core/lib/transport/timeout_encoding.h \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport.h \
src/core/lib/transport/transport_framing_endpoint_extension.h \
src/core/lib/transport/transport_fwd.h \
src/core/lib/transport/transport_op_string.cc \
src/core/load_balancing/address_filtering.cc \

@ -2429,6 +2429,7 @@ src/core/lib/transport/timeout_encoding.cc \
src/core/lib/transport/timeout_encoding.h \
src/core/lib/transport/transport.cc \
src/core/lib/transport/transport.h \
src/core/lib/transport/transport_framing_endpoint_extension.h \
src/core/lib/transport/transport_fwd.h \
src/core/lib/transport/transport_op_string.cc \
src/core/load_balancing/address_filtering.cc \

Loading…
Cancel
Save