gRPC EventEngine Interface (#25795)

pull/25879/head
AJ Heller 4 years ago committed by GitHub
parent a483e6ca35
commit 4d4ee609c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      BUILD
  2. 5
      BUILD.gn
  3. 10
      CMakeLists.txt
  4. 10
      Makefile
  5. 10
      build_autogenerated.yaml
  6. 3
      config.m4
  7. 3
      config.w32
  8. 5
      grpc.gemspec
  9. 4
      grpc.gyp
  10. 38
      include/grpc/event_engine/README.md
  11. 28
      include/grpc/event_engine/channel_args.h
  12. 281
      include/grpc/event_engine/event_engine.h
  13. 81
      include/grpc/event_engine/slice_allocator.h
  14. 20
      include/grpc/impl/codegen/port_platform.h
  15. 28
      include/grpc/module.modulemap
  16. 5
      package.xml
  17. 59
      src/core/lib/event_engine/slice_allocator.cc
  18. 37
      src/core/lib/event_engine/sockaddr.cc
  19. 2
      src/core/lib/iomgr/port.h
  20. 2
      src/core/tsi/alts/crypt/gsec.h
  21. 2
      src/python/grpcio/grpc_core_dependencies.py
  22. 23
      templates/gRPC-Core.podspec.template
  23. 47
      templates/include/grpc/module.modulemap.template
  24. 3
      templates/test/core/surface/public_headers_must_be_c89.c.template
  25. 2
      tools/buildgen/_mako_renderer.py
  26. 10
      tools/distrib/check_include_guards.py
  27. 3
      tools/doxygen/Doxyfile.c++
  28. 5
      tools/doxygen/Doxyfile.c++.internal
  29. 3
      tools/doxygen/Doxyfile.core
  30. 5
      tools/doxygen/Doxyfile.core.internal

10
BUILD

@ -128,6 +128,12 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/support/workaround_list.h",
]
GRPC_PUBLIC_EVENT_ENGINE_HDRS = [
"include/grpc/event_engine/channel_args.h",
"include/grpc/event_engine/event_engine.h",
"include/grpc/event_engine/slice_allocator.h",
]
GRPC_SECURE_PUBLIC_HDRS = [
"include/grpc/grpc_security.h",
]
@ -767,6 +773,8 @@ grpc_cc_library(
"src/core/lib/compression/stream_compression_identity.cc",
"src/core/lib/debug/stats.cc",
"src/core/lib/debug/stats_data.cc",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
"src/core/lib/http/parser.cc",
@ -1057,7 +1065,7 @@ grpc_cc_library(
"absl/container:flat_hash_map",
],
language = "c++",
public_hdrs = GRPC_PUBLIC_HDRS,
public_hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS,
deps = [
"dual_ref_counted",
"eventmanager_libuv",

@ -199,6 +199,9 @@ config("grpc_config") {
"include/grpc/byte_buffer_reader.h",
"include/grpc/census.h",
"include/grpc/compression.h",
"include/grpc/event_engine/channel_args.h",
"include/grpc/event_engine/event_engine.h",
"include/grpc/event_engine/slice_allocator.h",
"include/grpc/fork.h",
"include/grpc/grpc.h",
"include/grpc/grpc_posix.h",
@ -849,6 +852,8 @@ config("grpc_config") {
"src/core/lib/debug/stats_data.h",
"src/core/lib/debug/trace.cc",
"src/core/lib/debug/trace.h",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/gprpp/atomic.h",
"src/core/lib/gprpp/debug_location.h",
"src/core/lib/gprpp/dual_ref_counted.h",

@ -1800,6 +1800,8 @@ add_library(grpc
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/httpcli_security_connector.cc
@ -2082,6 +2084,9 @@ foreach(_hdr
include/grpc/byte_buffer_reader.h
include/grpc/census.h
include/grpc/compression.h
include/grpc/event_engine/channel_args.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/slice_allocator.h
include/grpc/fork.h
include/grpc/grpc.h
include/grpc/grpc_posix.h
@ -2430,6 +2435,8 @@ add_library(grpc_unsecure
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
@ -2629,6 +2636,9 @@ foreach(_hdr
include/grpc/byte_buffer_reader.h
include/grpc/census.h
include/grpc/compression.h
include/grpc/event_engine/channel_args.h
include/grpc/event_engine/event_engine.h
include/grpc/event_engine/slice_allocator.h
include/grpc/fork.h
include/grpc/grpc.h
include/grpc/grpc_posix.h

@ -1364,6 +1364,8 @@ LIBGRPC_SRC = \
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/httpcli_security_connector.cc \
@ -1595,6 +1597,9 @@ PUBLIC_HEADERS_C += \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1842,6 +1847,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
@ -1992,6 +1999,9 @@ PUBLIC_HEADERS_C += \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -368,6 +368,9 @@ libs:
- include/grpc/byte_buffer_reader.h
- include/grpc/census.h
- include/grpc/compression.h
- include/grpc/event_engine/channel_args.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/slice_allocator.h
- include/grpc/fork.h
- include/grpc/grpc.h
- include/grpc/grpc_posix.h
@ -1214,6 +1217,8 @@ libs:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
- src/core/lib/http/httpcli_security_connector.cc
@ -1575,6 +1580,9 @@ libs:
- include/grpc/byte_buffer_reader.h
- include/grpc/census.h
- include/grpc/compression.h
- include/grpc/event_engine/channel_args.h
- include/grpc/event_engine/event_engine.h
- include/grpc/event_engine/slice_allocator.h
- include/grpc/fork.h
- include/grpc/grpc.h
- include/grpc/grpc_posix.h
@ -1968,6 +1976,8 @@ libs:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
- src/core/lib/http/parser.cc

@ -372,6 +372,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/atm.cc \
src/core/lib/gpr/cpu_iphone.cc \
@ -1158,6 +1160,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/channel)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/compression)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/debug)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/gpr)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/gprpp)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/http)

@ -338,6 +338,8 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\debug\\stats.cc " +
"src\\core\\lib\\debug\\stats_data.cc " +
"src\\core\\lib\\debug\\trace.cc " +
"src\\core\\lib\\event_engine\\slice_allocator.cc " +
"src\\core\\lib\\event_engine\\sockaddr.cc " +
"src\\core\\lib\\gpr\\alloc.cc " +
"src\\core\\lib\\gpr\\atm.cc " +
"src\\core\\lib\\gpr\\cpu_iphone.cc " +
@ -1258,6 +1260,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\channel");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\compression");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\debug");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\gpr");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\gprpp");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\http");

@ -53,6 +53,9 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/byte_buffer_reader.h )
s.files += %w( include/grpc/census.h )
s.files += %w( include/grpc/compression.h )
s.files += %w( include/grpc/event_engine/channel_args.h )
s.files += %w( include/grpc/event_engine/event_engine.h )
s.files += %w( include/grpc/event_engine/slice_allocator.h )
s.files += %w( include/grpc/fork.h )
s.files += %w( include/grpc/grpc.h )
s.files += %w( include/grpc/grpc_posix.h )
@ -748,6 +751,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/debug/stats_data.h )
s.files += %w( src/core/lib/debug/trace.cc )
s.files += %w( src/core/lib/debug/trace.h )
s.files += %w( src/core/lib/event_engine/slice_allocator.cc )
s.files += %w( src/core/lib/event_engine/sockaddr.cc )
s.files += %w( src/core/lib/gpr/alloc.cc )
s.files += %w( src/core/lib/gpr/alloc.h )
s.files += %w( src/core/lib/gpr/arena.h )

@ -778,6 +778,8 @@
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/httpcli_security_connector.cc',
@ -1234,6 +1236,8 @@
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',

@ -0,0 +1,38 @@
# gRPC EventEngine
An EventEngine handles all cross-platform I/O, task execution, and DNS
resolution for gRPC. A default, cross-platform implementation is provided with
gRPC, but part of the intent here is to provide an interface for external
integrators to bring their own functionality. This allows for integration with
external event loops, siloing I/O and task execution between channels or
servers, and other custom integrations that were previously unsupported.
*WARNING*: This is experimental code and is subject to change.
## High level expectations of an EventEngine implementation
### Provide their own I/O threads
EventEngines are expected to internally create whatever threads are required to
perform I/O and execute callbacks. For example, an EventEngine implementation
may want to spawn separate thread pools for polling and callback execution.
### Provisioning data buffers via Slice allocation
At a high level, gRPC provides a `ResourceQuota` system that allows gRPC to
reclaim memory and degrade gracefully when memory reaches application-defined
thresholds. To enable this feature, the memory allocation of read/write buffers
within an EventEngine must be acquired in the form of Slices from
SliceAllocators. This is covered more fully in the gRFC and code.
### Documentating expectations around callback execution
Some callbacks may be expensive to run. EventEngines should decide on and
document whether callback execution might block polling operations. This way,
application developers can plan accordingly (e.g., run their expensive callbacks
on a separate thread if necessary).
### Handling concurrent usage
Assume that gRPC may use an EventEngine concurrently across multiple threads.
## TODO: documentation
* Example usage
* Link to gRFC

@ -0,0 +1,28 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_CHANNEL_ARGS_H
#define GRPC_EVENT_ENGINE_CHANNEL_ARGS_H
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {
// TODO(hork): define
class ChannelArgs;
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_CHANNEL_ARGS_H

@ -0,0 +1,281 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#include <grpc/support/port_platform.h>
#include <functional>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include "grpc/event_engine/channel_args.h"
#include "grpc/event_engine/slice_allocator.h"
// TODO(hork): explicitly define lifetimes and ownership of all objects.
// TODO(hork): Define the Endpoint::Write metrics collection system
namespace grpc_event_engine {
namespace experimental {
////////////////////////////////////////////////////////////////////////////////
/// The EventEngine encapsulates all platform-specific behaviors related to low
/// level network I/O, timers, asynchronous execution, and DNS resolution.
///
/// This interface allows developers to provide their own event management and
/// network stacks. Motivating uses cases for supporting custom EventEngines
/// include the ability to hook into external event loops, and using different
/// EventEngine instances for each channel to better insulate network I/O and
/// callback processing from other channels.
///
/// A default cross-platform EventEngine instance is provided by gRPC.
///
/// LIFESPAN AND OWNERSHIP
///
/// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure
/// that the engines remain available until they are no longer needed. Depending
/// on the use case, engines may live until gRPC is shut down.
///
/// EXAMPLE USAGE (Not yet implemented)
///
/// Custom EventEngines can be specified per channel, and allow configuration
/// for both clients and servers. To set a custom EventEngine for a client
/// channel, you can do something like the following:
///
/// ChannelArguments args;
/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
/// args.SetEventEngine(engine);
/// MyAppClient client(grpc::CreateCustomChannel(
/// "localhost:50051", grpc::InsecureChannelCredentials(), args));
///
/// A gRPC server can use a custom EventEngine by calling the
/// ServerBuilder::SetEventEngine method:
///
/// ServerBuilder builder;
/// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
/// builder.SetEventEngine(engine);
/// std::unique_ptr<Server> server(builder.BuildAndStart());
/// server->Wait();
///
////////////////////////////////////////////////////////////////////////////////
class EventEngine {
public:
/// A basic callable function. The first argument to all callbacks is an
/// absl::Status indicating the status of the operation associated with this
/// callback. Each EventEngine method that takes a callback parameter, defines
/// the expected sets and meanings of statuses for that use case.
using Callback = std::function<void(absl::Status)>;
struct TaskHandle {
intptr_t key;
};
/// A thin wrapper around a platform-specific sockaddr type. A sockaddr struct
/// exists on all platforms that gRPC supports.
///
/// Platforms are expected to provide definitions for:
/// * sockaddr
/// * sockaddr_in
/// * sockaddr_in6
class ResolvedAddress {
public:
static constexpr socklen_t MAX_SIZE_BYTES = 128;
ResolvedAddress(const sockaddr* address, socklen_t size);
const struct sockaddr* address() const;
socklen_t size() const;
private:
char address_[MAX_SIZE_BYTES];
socklen_t size_;
};
/// An Endpoint represents one end of a connection between a gRPC client and
/// server. Endpoints are created when connections are established, and
/// Endpoint operations are gRPC's primary means of communication.
///
/// Endpoints must use the provided SliceAllocator for all data buffer memory
/// allocations. gRPC allows applications to set memory constraints per
/// Channel or Server, and the implementation depends on all dynamic memory
/// allocation being handled by the quota system.
class Endpoint {
public:
virtual ~Endpoint() = 0;
// TODO(hork): define status codes for the callback
/// Read data from the Endpoint.
///
/// When data is available on the connection, that data is moved into the
/// \a buffer, and the \a on_read callback is called. The caller must ensure
/// that the callback has access to the buffer when executed later.
/// Ownership of the buffer is not transferred. Valid slices *may* be placed
/// into the buffer even if the callback is invoked with Status != OK.
virtual void Read(Callback on_read, SliceBuffer* buffer,
absl::Time deadline) = 0;
// TODO(hork): define status codes for the callback
/// Write data out on the connection.
///
/// \a on_writable is called when the connection is ready for more data. The
/// Slices within the \a data buffer may be mutated at will by the Endpoint
/// until \a on_writable is called. The \a data SliceBuffer will remain
/// valid after calling \a Write, but its state is otherwise undefined.
virtual void Write(Callback on_writable, SliceBuffer* data,
absl::Time deadline) = 0;
// TODO(hork): define status codes for the callback
// TODO(hork): define cleanup operations, lifetimes, responsibilities.
virtual void Close(Callback on_close) = 0;
/// These methods return an address in the format described in DNSResolver.
/// The returned values are owned by the Endpoint and are expected to remain
/// valid for the life of the Endpoint.
virtual const ResolvedAddress* GetPeerAddress() const = 0;
virtual const ResolvedAddress* GetLocalAddress() const = 0;
};
/// Called when a new connection is established. This callback takes ownership
/// of the Endpoint and is responsible for its destruction.
using OnConnectCallback = std::function<void(absl::Status, Endpoint*)>;
/// An EventEngine Listener listens for incoming connection requests from gRPC
/// clients and initiates request processing once connections are established.
class Listener {
public:
/// A callback handle, used to cancel a callback. Called when the listener
/// has accepted a new client connection. This callback takes ownership of
/// the Endpoint and is responsible its destruction.
using AcceptCallback = std::function<void(absl::Status, Endpoint*)>;
virtual ~Listener() = 0;
// TODO(hork): define return status codes
// TODO(hork): requires output port argument, return value, or callback
/// Bind an address/port to this Listener. It is expected that multiple
/// addresses/ports can be bound to this Listener before Listener::Start has
/// been called.
virtual absl::Status Bind(const ResolvedAddress& addr) = 0;
virtual absl::Status Start() = 0;
virtual absl::Status Shutdown() = 0;
};
// TODO(hork): define status codes for the callback
// TODO(hork): define return status codes
// TODO(hork): document status arg meanings for on_accept and on_shutdown
/// Factory method to create a network listener.
virtual absl::StatusOr<Listener> CreateListener(
Listener::AcceptCallback on_accept, Callback on_shutdown,
const ChannelArgs& args,
SliceAllocatorFactory slice_allocator_factory) = 0;
// TODO(hork): define status codes for the callback
// TODO(hork): define return status codes
/// Creates a network connection to a remote network listener.
virtual absl::Status Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const ChannelArgs& args,
SliceAllocator slice_allocator,
absl::Time deadline) = 0;
/// The DNSResolver that provides asynchronous resolution.
class DNSResolver {
public:
/// A task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t key;
};
/// A DNS SRV record type.
struct SRVRecord {
std::string host;
int port = 0;
int priority = 0;
int weight = 0;
};
/// Called with the collection of sockaddrs that were resolved from a given
/// target address.
using LookupHostnameCallback =
std::function<void(absl::Status, std::vector<ResolvedAddress>)>;
/// Called with a collection of SRV records.
using LookupSRVCallback =
std::function<void(absl::Status, std::vector<SRVRecord>)>;
/// Called with the result of a TXT record lookup
using LookupTXTCallback = std::function<void(absl::Status, std::string)>;
virtual ~DNSResolver() = 0;
// TODO(hork): define status codes for the callback
/// Asynchronously resolve an address. \a default_port may be a non-numeric
/// named service port, and will only be used if \a address does not already
/// contain a port component.
virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
absl::string_view address,
absl::string_view default_port,
absl::Time deadline) = 0;
// TODO(hork): define status codes for the callback
virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
absl::string_view name,
absl::Time deadline) = 0;
// TODO(hork): define status codes for the callback
virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
absl::string_view name,
absl::Time deadline) = 0;
/// Cancel an asynchronous lookup operation.
virtual void TryCancelLookup(LookupTaskHandle handle) = 0;
};
virtual ~EventEngine() = 0;
// TODO(hork): define return status codes
/// Retrieves an instance of a DNSResolver.
virtual absl::StatusOr<DNSResolver> GetDNSResolver() = 0;
/// Intended for future expansion of Task run functionality.
struct RunOptions {};
// TODO(hork): define status codes for the callback
// TODO(hork): consider recommendation to make TaskHandle an output arg
/// Run a callback as soon as possible.
virtual TaskHandle Run(Callback fn, RunOptions opts) = 0;
// TODO(hork): define status codes for the callback
/// Synonymous with scheduling an alarm to run at time \a when.
virtual TaskHandle RunAt(absl::Time when, Callback fn, RunOptions opts) = 0;
/// Immediately tries to cancel a callback.
/// Note that this is a "best effort" cancellation. No guarantee is made that
/// the callback will be cancelled, the call could be in any stage.
///
/// There are three scenarios in which we may cancel a scheduled function:
/// 1. We cancel the execution before it has run.
/// 2. The callback has already run.
/// 3. We can't cancel it because it is "in flight".
///
/// In all cases, the cancellation is still considered successful, the
/// callback will be run exactly once from either cancellation or from its
/// activation.
virtual void TryCancel(TaskHandle handle) = 0;
// TODO(hork): define return status codes
// TODO(hork): Carefully evaluate shutdown requirements, determine if we need
// a callback parameter to be added to this method.
/// Immediately run all callbacks with status indicating the shutdown. Every
/// EventEngine is expected to shut down exactly once. No new callbacks/tasks
/// should be scheduled after shutdown has begun. Any registered callbacks
/// must be executed.
virtual absl::Status Shutdown() = 0;
};
// Lazily instantiate and return a default global EventEngine instance if no
// custom instance is provided. If a custom EventEngine is provided for every
// channel/server via ChannelArgs, this method should never be called, and the
// default instance will never be instantiated.
std::shared_ptr<EventEngine> GetDefaultEventEngine();
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H

@ -0,0 +1,81 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H
#define GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include <functional>
#include "absl/status/status.h"
// forward-declaring an internal struct, not used publicly.
struct grpc_resource_quota;
struct grpc_resource_user;
namespace grpc_event_engine {
namespace experimental {
// TODO(nnoble): forward declared here, needs definition.
class SliceBuffer;
class SliceAllocator {
public:
// gRPC-internal constructor
explicit SliceAllocator(grpc_resource_user* user);
// Not copyable
SliceAllocator(SliceAllocator& other) = delete;
SliceAllocator& operator=(const SliceAllocator& other) = delete;
// Moveable
SliceAllocator(SliceAllocator&& other) = default;
SliceAllocator& operator=(SliceAllocator&& other) = default;
~SliceAllocator();
using AllocateCallback =
std::function<void(absl::Status, SliceBuffer* buffer)>;
// TODO(hork): explain what happens under resource exhaustion.
/// Requests \a size bytes from gRPC, and populates \a dest with the allocated
/// slices. Ownership of the \a SliceBuffer is not transferred.
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb);
private:
grpc_resource_user* resource_user_;
};
class SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
SliceAllocatorFactory(SliceAllocatorFactory& other) = delete;
SliceAllocatorFactory& operator=(const SliceAllocatorFactory& other) = delete;
// Moveable
SliceAllocatorFactory(SliceAllocatorFactory&& other) = default;
SliceAllocatorFactory& operator=(SliceAllocatorFactory&& other) = default;
~SliceAllocatorFactory();
/// On Endpoint creation, call \a CreateSliceAllocator with the name of the
/// endpoint peer (a URI string, most likely). Note: \a peer_name must outlive
/// the Endpoint.
SliceAllocator CreateSliceAllocator(absl::string_view peer_name);
private:
grpc_resource_quota* resource_quota_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_EVENT_ENGINE_SLICE_ALLOCATOR_H

@ -659,4 +659,24 @@ typedef unsigned __int64 uint64_t;
#define __STDC_FORMAT_MACROS
#endif
// Platform-specific sockaddr includes
#ifdef GRPC_UV
#include <uv.h>
#elif defined(GPR_ANDROID) || defined(GPR_LINUX) || defined(GPR_APPLE) || \
defined(GPR_FREEBSD) || defined(GPR_OPENBSD) || defined(GPR_SOLARIS) || \
defined(GPR_AIX) || defined(GPR_NACL) || defined(GPR_FUCHSIA) || \
defined(GRPC_POSIX_SOCKET)
#define GRPC_EVENT_ENGINE_POSIX
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#elif defined(GPR_WINDOWS)
#include <winsock2.h>
#include <ws2tcpip.h>
// must be included after the above
#include <mswsock.h>
#else
#error UNKNOWN PLATFORM
#endif
#endif /* GRPC_IMPL_CODEGEN_PORT_PLATFORM_H */

@ -2,6 +2,15 @@
framework module grpc {
umbrella header "grpc.h"
header "byte_buffer.h"
header "byte_buffer_reader.h"
header "census.h"
header "compression.h"
header "fork.h"
header "grpc.h"
header "grpc_posix.h"
header "grpc_security.h"
header "grpc_security_constants.h"
header "impl/codegen/atm.h"
header "impl/codegen/byte_buffer.h"
header "impl/codegen/byte_buffer_reader.h"
@ -19,6 +28,10 @@ framework module grpc {
header "impl/codegen/sync.h"
header "impl/codegen/sync_abseil.h"
header "impl/codegen/sync_generic.h"
header "load_reporting.h"
header "slice.h"
header "slice_buffer.h"
header "status.h"
header "support/alloc.h"
header "support/atm.h"
header "support/cpu.h"
@ -31,22 +44,9 @@ framework module grpc {
header "support/sync_generic.h"
header "support/thd_id.h"
header "support/time.h"
header "byte_buffer.h"
header "byte_buffer_reader.h"
header "census.h"
header "compression.h"
header "fork.h"
header "grpc.h"
header "grpc_posix.h"
header "grpc_security.h"
header "grpc_security_constants.h"
header "load_reporting.h"
header "slice.h"
header "slice_buffer.h"
header "status.h"
header "support/workaround_list.h"
textual header "impl/codegen/atm_gcc_atomic.h"
textual header "impl/codegen/atm_gcc_atomic.h"
textual header "impl/codegen/atm_gcc_sync.h"
textual header "impl/codegen/atm_windows.h"
textual header "impl/codegen/sync_custom.h"

@ -33,6 +33,9 @@
<file baseinstalldir="/" name="include/grpc/byte_buffer_reader.h" role="src" />
<file baseinstalldir="/" name="include/grpc/census.h" role="src" />
<file baseinstalldir="/" name="include/grpc/compression.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/channel_args.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/event_engine.h" role="src" />
<file baseinstalldir="/" name="include/grpc/event_engine/slice_allocator.h" role="src" />
<file baseinstalldir="/" name="include/grpc/fork.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc_posix.h" role="src" />
@ -728,6 +731,8 @@
<file baseinstalldir="/" name="src/core/lib/debug/stats_data.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/debug/trace.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/debug/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/sockaddr.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/arena.h" role="src" />

@ -0,0 +1,59 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "grpc/event_engine/slice_allocator.h"
#include <functional>
#include "absl/status/status.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
SliceAllocator::SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {
grpc_resource_user_ref(resource_user_);
};
SliceAllocator::~SliceAllocator() { grpc_resource_user_unref(resource_user_); };
absl::Status SliceAllocator::Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) {
// TODO(hork): implement
(void)size;
(void)dest;
(void)cb;
return absl::OkStatus();
};
SliceAllocatorFactory::SliceAllocatorFactory(grpc_resource_quota* quota)
: resource_quota_(quota) {
grpc_resource_quota_ref_internal(resource_quota_);
};
SliceAllocatorFactory::~SliceAllocatorFactory() {
grpc_resource_quota_unref_internal(resource_quota_);
}
SliceAllocator SliceAllocatorFactory::CreateSliceAllocator(
absl::string_view peer_name) {
return SliceAllocator(
grpc_resource_user_create(resource_quota_, peer_name.data()));
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,37 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <string.h>
#include "grpc/event_engine/event_engine.h"
#include "grpc/support/log.h"
namespace grpc_event_engine {
namespace experimental {
EventEngine::ResolvedAddress::ResolvedAddress(const sockaddr* address,
socklen_t size) {
GPR_ASSERT(size <= sizeof(address_));
memcpy(&address_, address, size);
}
const struct sockaddr* EventEngine::ResolvedAddress::address() const {
return reinterpret_cast<const struct sockaddr*>(address_);
}
socklen_t EventEngine::ResolvedAddress::size() const { return size_; }
} // namespace experimental
} // namespace grpc_event_engine

@ -192,7 +192,7 @@
defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \
1
#error \
"Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET"
"Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET, GRPC_CFSTREAM"
#endif
#ifdef GRPC_POSIX_SOCKET

@ -27,10 +27,12 @@
#include <grpc/grpc.h>
#ifndef GRPC_EVENT_ENGINE_POSIX
struct iovec {
void* iov_base;
size_t iov_len;
};
#endif // GRPC_EVENT_ENGINE_POSIX
/**
* A gsec interface for AEAD encryption schemes. The API is thread-compatible.

@ -347,6 +347,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/gpr/alloc.cc',
'src/core/lib/gpr/atm.cc',
'src/core/lib/gpr/cpu_iphone.cc',

@ -59,13 +59,29 @@
files.update(lib.get(field, []))
return list(sorted(files))
# Wrapped languages don't need to access EventEngine APIs.
event_engine_files = [
file
for file in list_lib_files("grpc", ("public_headers", "headers", "src"))
if '/event_engine/' in file
]
# ObjectiveC doesn't use c-ares so we don't need address_sorting files at all
address_sorting_unwanted_files = list_lib_files("address_sorting", ("public_headers", "headers", "src"))
# ObjectiveC needs to obtain re2 explicitly unlike other languages; TODO @donnadionne make ObjC more consistent with others
grpc_private_files = list(sorted((set(list_lib_files("grpc", ("headers", "src"))) - set(address_sorting_unwanted_files)) | set(list_lib_files("re2", ("headers", "src")))))
grpc_public_headers = list(sorted((set(list_lib_files("grpc", ("public_headers",))) - set(address_sorting_unwanted_files)) | set(list_lib_files("re2", ("public_headers",)))))
grpc_private_headers = list(sorted((set(list_lib_files("grpc", ("headers",))) - set(address_sorting_unwanted_files)) | set(list_lib_files("re2", ("headers",)))))
grpc_private_files = list(
sorted((set(list_lib_files("grpc", ("headers", "src"))) -
set(address_sorting_unwanted_files) - set(event_engine_files))
| set(list_lib_files("re2", ("headers", "src")))))
grpc_public_headers = list(
sorted((set(list_lib_files("grpc", ("public_headers", ))) -
set(address_sorting_unwanted_files)) - set(event_engine_files)
| set(list_lib_files("re2", ("public_headers", )))))
grpc_private_headers = list(
sorted((set(list_lib_files("grpc", ("headers", ))) -
set(address_sorting_unwanted_files)) - set(event_engine_files)
| set(list_lib_files("re2", ("headers", )))))
grpc_abseil_specs = list_abseil_specs("grpc")
grpc_tests_abseil_specs = list(sorted(set(list_abseil_specs("end2end_tests")) - set(grpc_abseil_specs)))
@ -91,6 +107,7 @@
set(list_lib_files("end2end_tests", ("src", "headers")))
- set(grpc_private_files)
- set(address_sorting_unwanted_files)
- set(event_engine_files)
- set([
# Subprocess is not supported in tvOS and not needed by our tests.
"test/core/util/subprocess_posix.cc",

@ -1,8 +1,8 @@
%YAML 1.2
--- |
<%!
<%
# TODO (mxyan): Make this list from build.yaml
textual_headers = ["include/grpc/support/atm_gcc_atomic.h",
textual_headers = {"include/grpc/support/atm_gcc_atomic.h",
"include/grpc/support/atm_gcc_sync.h",
"include/grpc/support/atm_windows.h",
"include/grpc/support/sync_custom.h",
@ -16,28 +16,19 @@
"include/grpc/impl/codegen/atm_windows.h",
"include/grpc/impl/codegen/sync_custom.h",
"include/grpc/impl/codegen/sync_posix.h",
"include/grpc/impl/codegen/sync_windows.h"]
def grpc_public_headers_no_dir(libs):
out = []
for lib in libs:
if lib.name in ("grpc", "gpr"):
out += lib.get('public_headers', [])
out = [f for f in out if f not in textual_headers]
out = [hdr.split('/', 2)[2] for hdr in out]
return out
# Generate the list of platform-specific headers as textual headers so that
# they are not built when the module is built but only when they are named by
# an #include directive.
def grpc_public_textual_headers_no_dir(libs):
out = []
for lib in libs:
if lib.name in ("grpc", "gpr"):
out += lib.get('public_headers', [])
out = [f for f in out if f in textual_headers]
out = [hdr.split('/', 2)[2] for hdr in out]
return out
"include/grpc/impl/codegen/sync_windows.h"}
grpc_public_headers = {
file for lib in libs for file in lib.get('public_headers', [])
if lib.name in ("grpc", "gpr")
}
event_engine_files = {
file for file in grpc_public_headers if 'event_engine' in file
}
def un_dir(files):
return {f.split ('/', 2)[2] for f in files}
def header_lines(files):
return ('\n ').join('header "%s"' % f for f in files)
@ -48,9 +39,13 @@
framework module grpc {
umbrella header "grpc.h"
${header_lines(grpc_public_headers_no_dir(libs))}
${header_lines(
sorted(un_dir(grpc_public_headers - event_engine_files -
textual_headers)))}
${textual_header_lines(grpc_public_textual_headers_no_dir(libs))}
${textual_header_lines(
sorted(un_dir(grpc_public_headers.intersection(textual_headers) -
event_engine_files)))}
export *
module * { export * }

@ -27,6 +27,8 @@
return False
def is_cronet_header(hdr):
return "cronet" in hdr
def is_event_engine_header(hdr):
return "/event_engine/" in hdr
hdrs = set()
pfx = 'include/'
for lib in libs:
@ -34,6 +36,7 @@
for hdr in lib.get('public_headers', []):
if is_platform_header(hdr): continue
if is_cronet_header(hdr): continue
if is_event_engine_header(hdr): continue
assert(hdr[0:len(pfx)] == pfx)
hdrs.add(hdr[len(pfx):])
hdrs = sorted(list(hdrs))

@ -49,7 +49,7 @@ def showhelp() -> None:
def render_template(template: Template, context: Context) -> None:
"""Render the mako template with given context.
Prints an error template to indicate where and what in the template caused
the render failure.
"""

@ -48,9 +48,12 @@ class GuardValidator(object):
self.endif_re = re.compile(r'#endif // ([A-Z][A-Z_1-9]*)')
self.failed = False
def _is_c_core_header(self, fpath):
return 'include' in fpath and not ('grpc++' in fpath or 'grpcpp'
in fpath or 'event_engine' in fpath)
def fail(self, fpath, regexp, fcontents, match_txt, correct, fix):
c_core_header = 'include' in fpath and not ('grpc++' in fpath or
'grpcpp' in fpath)
c_core_header = self._is_c_core_header(fpath)
self.failed = True
invalid_guards_msg_template = (
'{0}: Missing preprocessor guards (RE {1}). '
@ -81,8 +84,7 @@ class GuardValidator(object):
return fcontents
def check(self, fpath, fix):
c_core_header = 'include' in fpath and not ('grpc++' in fpath or
'grpcpp' in fpath)
c_core_header = self._is_c_core_header(fpath)
valid_guard = build_valid_guard(fpath)
fcontents = load(fpath)

@ -881,6 +881,9 @@ include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -881,6 +881,9 @@ include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1680,6 +1683,8 @@ src/core/lib/debug/stats_data.cc \
src/core/lib/debug/stats_data.h \
src/core/lib/debug/trace.cc \
src/core/lib/debug/trace.h \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/alloc.h \
src/core/lib/gpr/arena.h \

@ -811,6 +811,9 @@ include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \

@ -811,6 +811,9 @@ include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/census.h \
include/grpc/compression.h \
include/grpc/event_engine/channel_args.h \
include/grpc/event_engine/event_engine.h \
include/grpc/event_engine/slice_allocator.h \
include/grpc/fork.h \
include/grpc/grpc.h \
include/grpc/grpc_posix.h \
@ -1517,6 +1520,8 @@ src/core/lib/debug/stats_data.cc \
src/core/lib/debug/stats_data.h \
src/core/lib/debug/trace.cc \
src/core/lib/debug/trace.h \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/gpr/README.md \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/alloc.h \

Loading…
Cancel
Save