Changes from feedback on gRFC L82: gRPC Core EventEngine API (#26733)

reviewable/pr26751/r1
AJ Heller 4 years ago committed by GitHub
parent 53701640c6
commit a10a5bf655
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 3
      BUILD.gn
  3. 4
      CMakeLists.txt
  4. 4
      Makefile
  5. 6
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 4
      gRPC-Core.podspec
  10. 3
      grpc.gemspec
  11. 4
      grpc.gyp
  12. 24
      include/grpc/event_engine/event_engine.h
  13. 41
      include/grpc/event_engine/slice_allocator.h
  14. 3
      package.xml
  15. 53
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
  16. 74
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
  17. 14
      src/core/lib/iomgr/event_engine/endpoint.cc
  18. 40
      src/core/lib/iomgr/event_engine/tcp.cc
  19. 2
      src/core/lib/iomgr/resource_quota.cc
  20. 2
      src/python/grpcio/grpc_core_dependencies.py
  21. 3
      tools/doxygen/Doxyfile.c++.internal
  22. 3
      tools/doxygen/Doxyfile.core.internal

@ -855,7 +855,6 @@ grpc_cc_library(
"src/core/lib/debug/stats_data.cc",
"src/core/lib/event_engine/endpoint_config.cc",
"src/core/lib/event_engine/event_engine.cc",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
@ -2447,6 +2446,7 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/bin_decoder.cc",
"src/core/ext/transport/chttp2/transport/bin_encoder.cc",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/context_list.cc",
"src/core/ext/transport/chttp2/transport/flow_control.cc",
@ -2471,6 +2471,7 @@ grpc_cc_library(
hdrs = [
"src/core/ext/transport/chttp2/transport/bin_decoder.h",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/context_list.h",
"src/core/ext/transport/chttp2/transport/flow_control.h",
@ -2492,6 +2493,8 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/varint.h",
],
external_deps = [
"absl/memory",
"absl/status",
"absl/strings:str_format",
"absl/strings",
],

@ -397,6 +397,8 @@ config("grpc_config") {
"src/core/ext/transport/chttp2/transport/bin_encoder.cc",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/context_list.cc",
@ -868,7 +870,6 @@ config("grpc_config") {
"src/core/lib/event_engine/endpoint_config.cc",
"src/core/lib/event_engine/endpoint_config_internal.h",
"src/core/lib/event_engine/event_engine.cc",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/event_engine/sockaddr.h",
"src/core/lib/gprpp/atomic.h",

@ -1599,6 +1599,7 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -1834,7 +1835,6 @@ add_library(grpc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/endpoint_config.cc
src/core/lib/event_engine/event_engine.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
@ -2419,6 +2419,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2471,7 +2472,6 @@ add_library(grpc_unsecure
src/core/lib/debug/trace.cc
src/core/lib/event_engine/endpoint_config.cc
src/core/lib/event_engine/event_engine.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc

@ -1136,6 +1136,7 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -1371,7 +1372,6 @@ LIBGRPC_SRC = \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
@ -1807,6 +1807,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -1859,7 +1860,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \

@ -489,6 +489,7 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -1016,6 +1017,7 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -1251,7 +1253,6 @@ libs:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/endpoint_config.cc
- src/core/lib/event_engine/event_engine.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
@ -1699,6 +1700,7 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -1960,6 +1962,7 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2012,7 +2015,6 @@ libs:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/endpoint_config.cc
- src/core/lib/event_engine/event_engine.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc

@ -130,6 +130,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -378,7 +379,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/atm.cc \

@ -96,6 +96,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\bin_decoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\bin_encoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_plugin.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_slice_allocator.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_transport.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\context_list.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\flow_control.cc " +
@ -344,7 +345,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\debug\\trace.cc " +
"src\\core\\lib\\event_engine\\endpoint_config.cc " +
"src\\core\\lib\\event_engine\\event_engine.cc " +
"src\\core\\lib\\event_engine\\slice_allocator.cc " +
"src\\core\\lib\\event_engine\\sockaddr.cc " +
"src\\core\\lib\\gpr\\alloc.cc " +
"src\\core\\lib\\gpr\\atm.cc " +

@ -269,6 +269,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
@ -922,6 +923,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',

@ -350,6 +350,8 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.cc',
@ -847,7 +849,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/endpoint_config_internal.h',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/event_engine/sockaddr.h',
'src/core/lib/gpr/alloc.cc',
@ -1502,6 +1503,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',

@ -265,6 +265,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_plugin.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h )
s.files += %w( src/core/ext/transport/chttp2/transport/context_list.cc )
@ -762,7 +764,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/endpoint_config.cc )
s.files += %w( src/core/lib/event_engine/endpoint_config_internal.h )
s.files += %w( src/core/lib/event_engine/event_engine.cc )
s.files += %w( src/core/lib/event_engine/slice_allocator.cc )
s.files += %w( src/core/lib/event_engine/sockaddr.cc )
s.files += %w( src/core/lib/event_engine/sockaddr.h )
s.files += %w( src/core/lib/gpr/alloc.cc )

@ -563,6 +563,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -798,7 +799,6 @@
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
@ -1206,6 +1206,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -1258,7 +1259,6 @@
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',

@ -80,7 +80,7 @@ class EventEngine {
using Callback = std::function<void(absl::Status)>;
/// A callback handle, used to cancel a callback.
struct TaskHandle {
intptr_t key;
intptr_t keys[2];
};
/// A thin wrapper around a platform-specific sockaddr type. A sockaddr struct
/// exists on all platforms that gRPC supports.
@ -127,10 +127,8 @@ class EventEngine {
///
/// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive
/// DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED on
/// endpoint shutdown.
virtual void Read(Callback on_read, SliceBuffer* buffer,
absl::Time deadline) = 0;
/// CANCELLED on endpoint shutdown.
virtual void Read(Callback on_read, SliceBuffer* buffer) = 0;
/// Write data out on the connection.
///
/// \a on_writable is called when the connection is ready for more data. The
@ -140,15 +138,13 @@ class EventEngine {
///
/// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to
/// receive DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED
/// on endpoint shutdown.
virtual void Write(Callback on_writable, SliceBuffer* data,
absl::Time deadline) = 0;
/// receive CANCELLED on endpoint shutdown.
virtual void Write(Callback on_writable, SliceBuffer* data) = 0;
/// These methods return an address in the format described in DNSResolver.
/// The returned values are owned by the Endpoint and are expected to remain
/// valid for the life of the Endpoint.
virtual const ResolvedAddress* GetPeerAddress() const = 0;
virtual const ResolvedAddress* GetLocalAddress() const = 0;
virtual const ResolvedAddress& GetPeerAddress() const = 0;
virtual const ResolvedAddress& GetLocalAddress() const = 0;
};
/// Called when a new connection is established.
@ -197,7 +193,7 @@ class EventEngine {
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept, Callback on_shutdown,
const EndpointConfig& args,
SliceAllocatorFactory slice_allocator_factory) = 0;
std::unique_ptr<SliceAllocatorFactory> slice_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener.
///
/// \a Connect may return an error status immediately if there was a failure
@ -214,7 +210,7 @@ class EventEngine {
virtual absl::Status Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
SliceAllocator slice_allocator,
std::unique_ptr<SliceAllocator> slice_allocator,
absl::Time deadline) = 0;
/// The DNSResolver that provides asynchronous resolution.
@ -222,7 +218,7 @@ class EventEngine {
public:
/// A task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t key;
intptr_t key[2];
};
/// A DNS SRV record type.
struct SRVRecord {

@ -37,18 +37,8 @@ class SliceBuffer {
class SliceAllocator {
public:
// gRPC-internal constructor
explicit SliceAllocator(grpc_resource_user* user);
// Not copyable
SliceAllocator(SliceAllocator& other) = delete;
SliceAllocator& operator=(const SliceAllocator& other) = delete;
// Moveable
SliceAllocator(SliceAllocator&& other) noexcept;
SliceAllocator& operator=(SliceAllocator&& other) noexcept;
~SliceAllocator();
using AllocateCallback =
std::function<void(absl::Status, SliceBuffer* buffer)>;
using AllocateCallback = std::function<void(absl::Status)>;
virtual ~SliceAllocator() = default;
/// Requests \a size bytes from gRPC, and populates \a dest with the allocated
/// slices. Ownership of the \a SliceBuffer is not transferred.
///
@ -57,32 +47,17 @@ class SliceAllocator {
/// interrupted to attempt to reclaim memory from participating gRPC
/// internals. When there is sufficient memory available, slice allocation
/// proceeds as normal.
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb);
private:
grpc_resource_user* resource_user_;
virtual absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) = 0;
};
class SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
SliceAllocatorFactory(SliceAllocatorFactory& other) = delete;
SliceAllocatorFactory& operator=(const SliceAllocatorFactory& other) = delete;
// Moveable
SliceAllocatorFactory(SliceAllocatorFactory&& other) noexcept;
SliceAllocatorFactory& operator=(SliceAllocatorFactory&& other) noexcept;
~SliceAllocatorFactory();
virtual ~SliceAllocatorFactory() = default;
/// On Endpoint creation, call \a CreateSliceAllocator with the name of the
/// endpoint peer (a URI string, most likely). Note: \a peer_name must outlive
/// the Endpoint.
SliceAllocator CreateSliceAllocator(absl::string_view peer_name);
private:
grpc_resource_quota* resource_quota_;
/// endpoint peer (a URI string, most likely).
virtual std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) = 0;
};
} // namespace experimental

@ -245,6 +245,8 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/context_list.cc" role="src" />
@ -742,7 +744,6 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/endpoint_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/endpoint_config_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/event_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/sockaddr.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/sockaddr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />

@ -17,71 +17,48 @@
#include <functional>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
SliceAllocator::SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {
grpc_resource_user_ref(resource_user_);
};
Chttp2SliceAllocator::Chttp2SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {}
SliceAllocator::~SliceAllocator() {
Chttp2SliceAllocator::~Chttp2SliceAllocator() {
if (resource_user_ != nullptr) {
grpc_resource_user_unref(resource_user_);
}
};
SliceAllocator::SliceAllocator(SliceAllocator&& other) noexcept
: resource_user_(other.resource_user_) {
other.resource_user_ = nullptr;
}
SliceAllocator& SliceAllocator::operator=(SliceAllocator&& other) noexcept {
resource_user_ = other.resource_user_;
other.resource_user_ = nullptr;
return *this;
}
absl::Status SliceAllocator::Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) {
absl::Status Chttp2SliceAllocator::Allocate(
size_t size, SliceBuffer* dest, SliceAllocator::AllocateCallback cb) {
// TODO(hork): merge the implementation from the uv-ee branch.
(void)size;
(void)dest;
(void)cb;
return absl::OkStatus();
};
}
SliceAllocatorFactory::SliceAllocatorFactory(grpc_resource_quota* quota)
Chttp2SliceAllocatorFactory::Chttp2SliceAllocatorFactory(
grpc_resource_quota* quota)
: resource_quota_(quota) {
grpc_resource_quota_ref_internal(resource_quota_);
};
}
SliceAllocatorFactory::~SliceAllocatorFactory() {
Chttp2SliceAllocatorFactory::~Chttp2SliceAllocatorFactory() {
if (resource_quota_ != nullptr) {
grpc_resource_quota_unref_internal(resource_quota_);
}
}
SliceAllocatorFactory::SliceAllocatorFactory(
SliceAllocatorFactory&& other) noexcept
: resource_quota_(other.resource_quota_) {
other.resource_quota_ = nullptr;
}
SliceAllocatorFactory& SliceAllocatorFactory::operator=(
SliceAllocatorFactory&& other) noexcept {
resource_quota_ = other.resource_quota_;
other.resource_quota_ = nullptr;
return *this;
}
SliceAllocator SliceAllocatorFactory::CreateSliceAllocator(
absl::string_view peer_name) {
return SliceAllocator(
std::unique_ptr<SliceAllocator>
Chttp2SliceAllocatorFactory::CreateSliceAllocator(absl::string_view peer_name) {
return absl::make_unique<Chttp2SliceAllocator>(
grpc_resource_user_create(resource_quota_, peer_name.data()));
}

@ -0,0 +1,74 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include "grpc/event_engine/slice_allocator.h"
#include <functional>
#include "absl/status/status.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
class Chttp2SliceAllocator
: public grpc_event_engine::experimental::SliceAllocator {
public:
/// gRPC-internal constructor. Takes ownership of a resource_user ref from the
/// caller.
explicit Chttp2SliceAllocator(grpc_resource_user* user);
// Not copyable
Chttp2SliceAllocator(Chttp2SliceAllocator& other) = delete;
Chttp2SliceAllocator& operator=(const Chttp2SliceAllocator& other) = delete;
// Not Moveable
Chttp2SliceAllocator(Chttp2SliceAllocator&& other) = delete;
Chttp2SliceAllocator& operator=(Chttp2SliceAllocator&& other) = delete;
~Chttp2SliceAllocator() override;
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) override;
private:
grpc_resource_user* resource_user_;
};
class Chttp2SliceAllocatorFactory
: public grpc_event_engine::experimental::SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit Chttp2SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory& other) = delete;
Chttp2SliceAllocatorFactory& operator=(
const Chttp2SliceAllocatorFactory& other) = delete;
// Not Moveable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory&& other) = delete;
Chttp2SliceAllocatorFactory& operator=(Chttp2SliceAllocatorFactory&& other) =
delete;
~Chttp2SliceAllocatorFactory() override;
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) override;
private:
grpc_resource_quota* resource_quota_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H

@ -59,7 +59,7 @@ void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
},
read_buffer, absl::InfiniteFuture());
read_buffer);
}
void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
@ -83,7 +83,7 @@ void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
},
write_buffer, absl::InfiniteFuture());
write_buffer);
}
void endpoint_add_to_pollset(grpc_endpoint* /* ep */,
grpc_pollset* /* pollset */) {}
@ -123,9 +123,8 @@ absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
return "";
}
if (eeep->peer_address.empty()) {
const EventEngine::ResolvedAddress* addr = eeep->endpoint->GetPeerAddress();
GPR_ASSERT(addr != nullptr);
eeep->peer_address = ResolvedAddressToURI(*addr);
const EventEngine::ResolvedAddress& addr = eeep->endpoint->GetPeerAddress();
eeep->peer_address = ResolvedAddressToURI(addr);
}
return eeep->peer_address;
}
@ -136,10 +135,9 @@ absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
return "";
}
if (eeep->local_address.empty()) {
const EventEngine::ResolvedAddress* addr =
const EventEngine::ResolvedAddress& addr =
eeep->endpoint->GetLocalAddress();
GPR_ASSERT(addr != nullptr);
eeep->local_address = ResolvedAddressToURI(*addr);
eeep->local_address = ResolvedAddressToURI(addr);
}
return eeep->local_address;
}

@ -38,20 +38,36 @@ using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GrpcClosureToCallback;
using ::grpc_event_engine::experimental::SliceAllocator;
using ::grpc_event_engine::experimental::SliceAllocatorFactory;
using ::grpc_event_engine::experimental::SliceBuffer;
} // namespace
// TODO(hork): remove these classes in PR #26643, when the iomgr APIs change to
// accept SliceAllocators and SliceAllocatorFactory(ie)s. In the meantime, the
// libuv work has temporary implementations as well.
class NoopSliceAllocator : public SliceAllocator {
public:
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) {
return absl::OkStatus();
}
};
class NoopSliceAllocatorFactory : public SliceAllocatorFactory {
public:
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) {
return absl::make_unique<NoopSliceAllocator>();
};
};
struct grpc_tcp_server {
grpc_tcp_server(std::unique_ptr<EventEngine::Listener> listener,
grpc_resource_quota* rq)
explicit grpc_tcp_server(std::unique_ptr<EventEngine::Listener> listener)
: refcount(1, GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) ? "tcp" : nullptr),
listener(std::move(listener)),
resource_quota(rq) {
listener(std::move(listener)) {
shutdown_starting.head = nullptr;
shutdown_starting.tail = nullptr;
};
~grpc_tcp_server() {
// TODO(nnoble): see if we can handle this in ~SliceAllocatorFactory
grpc_resource_quota_unref_internal(resource_quota);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &shutdown_starting);
grpc_core::ExecCtx::Get()->Flush();
}
@ -59,7 +75,6 @@ struct grpc_tcp_server {
grpc_core::Mutex mu;
std::unique_ptr<EventEngine::Listener> listener;
grpc_closure_list shutdown_starting ABSL_GUARDED_BY(mu);
grpc_resource_quota* resource_quota;
grpc_tcp_server_cb on_accept_internal;
void* on_accept_internal_arg;
};
@ -99,7 +114,9 @@ void tcp_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
*endpoint = &ee_endpoint->base;
EventEngine::OnConnectCallback ee_on_connect =
GrpcClosureToOnConnectCallback(on_connect, endpoint);
SliceAllocator sa(ee_endpoint->ru);
// TODO(hork): tcp_connect will change to accept a SliceAllocator. This is
// temporary.
auto sa = absl::make_unique<NoopSliceAllocator>();
EventEngine::ResolvedAddress ra(reinterpret_cast<const sockaddr*>(addr->addr),
addr->len);
absl::Time ee_deadline = grpc_core::ToAbslTime(
@ -124,6 +141,9 @@ grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
if (rq == nullptr) {
rq = grpc_resource_quota_create(nullptr);
}
// TODO(hork): tcp_server_create will change to accept a
// SliceAllocatorFactory. This is temporary.
auto saf = absl::make_unique<NoopSliceAllocatorFactory>();
EventEngine* event_engine = grpc_iomgr_event_engine();
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener =
event_engine->CreateListener(
@ -144,11 +164,11 @@ grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
grpc_pollset_ee_broadcast_event();
},
GrpcClosureToCallback(shutdown_complete, GRPC_ERROR_NONE),
endpoint_config, SliceAllocatorFactory(rq));
endpoint_config, std::move(saf));
if (!listener.ok()) {
return absl_status_to_grpc_error(listener.status());
}
*server = new grpc_tcp_server(std::move(*listener), rq);
*server = new grpc_tcp_server(std::move(*listener));
return GRPC_ERROR_NONE;
}

@ -805,6 +805,8 @@ grpc_resource_user* grpc_resource_user_create(
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = nullptr;
}
// TODO(hork): the RU should own a copy of the name. See Craig's comments on
// the EventEngine gRFC for justification.
if (name != nullptr) {
resource_user->name = name;
} else {

@ -105,6 +105,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -353,7 +354,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/gpr/alloc.cc',
'src/core/lib/gpr/atm.cc',

@ -1202,6 +1202,8 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \
@ -1695,7 +1697,6 @@ src/core/lib/debug/trace.h \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/endpoint_config_internal.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/event_engine/sockaddr.h \
src/core/lib/gpr/alloc.cc \

@ -1037,6 +1037,8 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \
@ -1532,7 +1534,6 @@ src/core/lib/debug/trace.h \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/endpoint_config_internal.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/event_engine/sockaddr.h \
src/core/lib/gpr/README.md \

Loading…
Cancel
Save