diff --git a/BUILD b/BUILD index b2e3b413be1..33461c9d86e 100644 --- a/BUILD +++ b/BUILD @@ -2255,6 +2255,7 @@ grpc_cc_library( "absl/status", "absl/status:statusor", "absl/time", + "absl/types:optional", "absl/functional:any_invocable", ], tags = ["nofixdeps"], diff --git a/include/grpc/event_engine/endpoint_config.h b/include/grpc/event_engine/endpoint_config.h index 6ca4b4f7b6c..68dae44caee 100644 --- a/include/grpc/event_engine/endpoint_config.h +++ b/include/grpc/event_engine/endpoint_config.h @@ -19,7 +19,7 @@ #include #include "absl/strings/string_view.h" -#include "absl/types/variant.h" +#include "absl/types/optional.h" namespace grpc_event_engine { namespace experimental { @@ -31,10 +31,16 @@ namespace experimental { class EndpointConfig { public: virtual ~EndpointConfig() = default; - using Setting = absl::variant; - /// Returns the Setting for a specified key, or \a absl::monostate if there is - /// no such entry. Caller does not take ownership of the resulting value. - virtual Setting Get(absl::string_view key) const = 0; + // If the key points to an integer config, an integer value gets returned. + // Otherwise it returns an absl::nullopt_t + virtual absl::optional GetInt(absl::string_view key) const = 0; + // If the key points to an string config, an string value gets returned. + // Otherwise it returns an absl::nullopt_t + virtual absl::optional GetString( + absl::string_view key) const = 0; + // If the key points to an void* config, a void* pointer value gets returned. + // Otherwise it returns nullptr + virtual void* GetVoidPointer(absl::string_view key) const = 0; }; } // namespace experimental diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 4b24573b493..65cfbd2b396 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -22,7 +22,6 @@ #include -#include #include #include @@ -50,6 +49,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/unique_type_name.h" @@ -401,12 +401,13 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd, .PreconditionChannelArgs(args) .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority") .SetObject(creds->Ref()); - auto c_final_args = final_args.ToC(); int flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); - grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client", true), c_final_args.get(), "fd-client"); + grpc_endpoint* client = grpc_tcp_create_from_fd( + grpc_fd_create(fd, "client", true), + grpc_event_engine::experimental::ChannelArgsEndpointConfig(final_args), + "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); GPR_ASSERT(transport); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 9ddd2b0256f..0049ab89c14 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -54,6 +54,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -87,7 +88,7 @@ #ifdef GPR_SUPPORT_CHANNELS_FROM_FD #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #endif // GPR_SUPPORT_CHANNELS_FROM_FD namespace grpc_core { @@ -155,7 +156,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { void Start(grpc_endpoint* endpoint, const ChannelArgs& args); - // Needed to be able to grab an external ref in ActiveConnection::Start() + // Needed to be able to grab an external ref in + // ActiveConnection::Start() using InternallyRefCounted::Ref; private: @@ -200,8 +202,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { // Set by HandshakingState before the handshaking begins and reset when // handshaking is done. OrphanablePtr handshaking_state_ ABSL_GUARDED_BY(&mu_); - // Set by HandshakingState when handshaking is done and a valid transport is - // created. + // Set by HandshakingState when handshaking is done and a valid transport + // is created. grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr; grpc_closure on_close_; grpc_timer drain_grace_timer_; @@ -227,11 +229,11 @@ class Chttp2ServerListener : public Server::ListenerInterface { grpc_closure* destroy_done); // The interface required by RefCountedPtr<> has been manually implemented - // here to take a ref on tcp_server_ instead. Note that, the handshaker needs - // tcp_server_ to exist for the lifetime of the handshake since it's needed by - // acceptor. Sharing refs between the listener and tcp_server_ is just an - // optimization to avoid taking additional refs on the listener, since - // TcpServerShutdownComplete already holds a ref to the listener. + // here to take a ref on tcp_server_ instead. Note that, the handshaker + // needs tcp_server_ to exist for the lifetime of the handshake since it's + // needed by acceptor. Sharing refs between the listener and tcp_server_ is + // just an optimization to avoid taking additional refs on the listener, + // since TcpServerShutdownComplete already holds a ref to the listener. void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); } void IncrementRefCount(const DebugLocation& /* location */, const char* /* reason */) { @@ -344,8 +346,8 @@ void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() { listener_->is_serving_ = false; connections = std::move(listener_->connections_); } - // Send GOAWAYs on the transports so that they disconnected when existing RPCs - // finish. + // Send GOAWAYs on the transports so that they disconnected when existing + // RPCs finish. for (auto& connection : connections) { connection.first->SendGoAway(); } @@ -486,9 +488,9 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( self->Ref().release(); // Held by OnReceiveSettings(). GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, grpc_schedule_on_exec_ctx); - // If the listener has been configured with a config fetcher, we need - // to watch on the transport being closed so that we can an updated - // list of active connections. + // If the listener has been configured with a config fetcher, we + // need to watch on the transport being closed so that we can an + // updated list of active connections. grpc_closure* on_close = nullptr; if (self->connection_->listener_->config_fetcher_watcher_ != nullptr) { @@ -627,8 +629,8 @@ void Chttp2ServerListener::ActiveConnection::OnClose( { MutexLock listener_lock(&self->listener_->mu_); MutexLock connection_lock(&self->mu_); - // The node was already deleted from the connections_ list if the connection - // is shutdown. + // The node was already deleted from the connections_ list if the + // connection is shutdown. if (!self->shutdown_) { auto it = self->listener_->connections_.find(self); if (it != self->listener_->connections_.end()) { @@ -678,8 +680,10 @@ grpc_error_handle Chttp2ServerListener::Create( grpc_error_handle error = GRPC_ERROR_NONE; // Create Chttp2ServerListener. listener = new Chttp2ServerListener(server, args, args_modifier); - error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_, - args.ToC().get(), &listener->tcp_server_); + error = grpc_tcp_server_create( + &listener->tcp_server_shutdown_complete_, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &listener->tcp_server_); if (!GRPC_ERROR_IS_NONE(error)) return error; if (server->config_fetcher() != nullptr) { listener->resolved_address_ = *addr; @@ -724,9 +728,10 @@ grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( Chttp2ServerArgsModifier args_modifier) { Chttp2ServerListener* listener = new Chttp2ServerListener(server, args, args_modifier); - grpc_error_handle error = - grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_, - args.ToC().get(), &listener->tcp_server_); + grpc_error_handle error = grpc_tcp_server_create( + &listener->tcp_server_shutdown_complete_, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &listener->tcp_server_); if (!GRPC_ERROR_IS_NONE(error)) { delete listener; return error; @@ -1072,8 +1077,10 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd, std::string name = absl::StrCat("fd:", fd); auto memory_quota = server_args.GetObject()->memory_quota(); - grpc_endpoint* server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name.c_str(), true), server_args.ToC().get(), name); + grpc_endpoint* server_endpoint = grpc_tcp_create_from_fd( + grpc_fd_create(fd, name.c_str(), true), + grpc_event_engine::experimental::ChannelArgsEndpointConfig(server_args), + name); grpc_transport* transport = grpc_create_chttp2_transport( server_args, server_endpoint, false /* is_client */ ); diff --git a/src/core/lib/event_engine/channel_args_endpoint_config.cc b/src/core/lib/event_engine/channel_args_endpoint_config.cc index cdbd508d733..41630a85ef2 100644 --- a/src/core/lib/event_engine/channel_args_endpoint_config.cc +++ b/src/core/lib/event_engine/channel_args_endpoint_config.cc @@ -15,33 +15,25 @@ #include "src/core/lib/event_engine/channel_args_endpoint_config.h" -#include - -#include "absl/types/variant.h" - -#include -#include +#include "absl/types/optional.h" #include "src/core/lib/channel/channel_args.h" namespace grpc_event_engine { namespace experimental { -EndpointConfig::Setting ChannelArgsEndpointConfig::Get( +absl::optional ChannelArgsEndpointConfig::GetInt( absl::string_view key) const { - const grpc_arg* arg = grpc_channel_args_find(args_, std::string(key).c_str()); - if (arg == nullptr) { - return absl::monostate(); - } - switch (arg->type) { - case GRPC_ARG_STRING: - return absl::string_view(arg->value.string); - case GRPC_ARG_INTEGER: - return arg->value.integer; - case GRPC_ARG_POINTER: - return arg->value.pointer.p; - } - GPR_UNREACHABLE_CODE(return absl::monostate()); + return args_.GetInt(key); +} + +absl::optional ChannelArgsEndpointConfig::GetString( + absl::string_view key) const { + return args_.GetString(key); +} + +void* ChannelArgsEndpointConfig::GetVoidPointer(absl::string_view key) const { + return args_.GetVoidPointer(key); } } // namespace experimental diff --git a/src/core/lib/event_engine/channel_args_endpoint_config.h b/src/core/lib/event_engine/channel_args_endpoint_config.h index 2efb1cac528..c449e3440f6 100644 --- a/src/core/lib/event_engine/channel_args_endpoint_config.h +++ b/src/core/lib/event_engine/channel_args_endpoint_config.h @@ -17,24 +17,30 @@ #include #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include -#include + +#include "src/core/lib/channel/channel_args.h" namespace grpc_event_engine { namespace experimental { -/// A readonly \a EndpointConfig based on grpc_channel_args. This class does not -/// take ownership of the grpc_endpoint_args*, and instances of this class -/// should not be used after the underlying args are destroyed. class ChannelArgsEndpointConfig : public EndpointConfig { public: - explicit ChannelArgsEndpointConfig(const grpc_channel_args* args) + ChannelArgsEndpointConfig() = default; + explicit ChannelArgsEndpointConfig(const grpc_core::ChannelArgs& args) : args_(args) {} - Setting Get(absl::string_view key) const override; + ChannelArgsEndpointConfig(const ChannelArgsEndpointConfig& config) = default; + ChannelArgsEndpointConfig& operator=(const ChannelArgsEndpointConfig& other) = + default; + absl::optional GetInt(absl::string_view key) const override; + absl::optional GetString( + absl::string_view key) const override; + void* GetVoidPointer(absl::string_view key) const override; private: - const grpc_channel_args* args_; + grpc_core::ChannelArgs args_; }; } // namespace experimental diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 812d36bb8ec..cc721015fdb 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -35,6 +35,7 @@ #include #include +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -60,17 +61,20 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, create_sockets(sv); grpc_core::ExecCtx exec_ctx; std::string final_name = absl::StrCat(name, ":client"); - const grpc_channel_args* new_args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(args) - .ToC() - .release(); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name.c_str(), false), - new_args, "socketpair-server"); + auto new_args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(args); + p.client = grpc_tcp_create( + grpc_fd_create(sv[1], final_name.c_str(), false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig(new_args)), + "socketpair-server"); final_name = absl::StrCat(name, ":server"); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name.c_str(), false), - new_args, "socketpair-client"); - grpc_channel_args_destroy(new_args); + p.server = grpc_tcp_create( + grpc_fd_create(sv[0], final_name.c_str(), false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig(new_args)), + "socketpair-client"); return p; } diff --git a/src/core/lib/iomgr/endpoint_pair_windows.cc b/src/core/lib/iomgr/endpoint_pair_windows.cc index 386057b8951..7ea1dcb3d5d 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.cc +++ b/src/core/lib/iomgr/endpoint_pair_windows.cc @@ -77,9 +77,9 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( create_sockets(sv); grpc_core::ExecCtx exec_ctx; p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), - channel_args, "endpoint:server"); + "endpoint:server"); p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), - channel_args, "endpoint:client"); + "endpoint:client"); return p; } diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index 03d946de2eb..f32dc0fd501 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -43,12 +43,12 @@ #include +#include #include #include #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -298,10 +298,9 @@ void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) { /* Set TCP_USER_TIMEOUT */ grpc_error_handle grpc_set_socket_tcp_user_timeout( - int fd, const grpc_channel_args* channel_args, bool is_client) { + int fd, const grpc_core::PosixTcpOptions& options, bool is_client) { // Use conditionally-important parameter to avoid warning (void)fd; - (void)channel_args; (void)is_client; extern grpc_core::TraceFlag grpc_tcp_trace; if (g_socket_supports_tcp_user_timeout.load() >= 0) { @@ -314,29 +313,13 @@ grpc_error_handle grpc_set_socket_tcp_user_timeout( enable = g_default_server_tcp_user_timeout_enabled; timeout = g_default_server_tcp_user_timeout_ms; } - if (channel_args) { - for (unsigned int i = 0; i < channel_args->num_args; i++) { - if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX}); - /* Continue using default if value is 0 */ - if (value == 0) { - continue; - } - /* Disable if value is INT_MAX */ - enable = value != INT_MAX; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], grpc_integer_options{0, 1, INT_MAX}); - /* Continue using default if value is 0 */ - if (value == 0) { - continue; - } - timeout = value; - } - } + int value = options.keep_alive_time_ms; + if (value > 0) { + enable = value != INT_MAX; + } + value = options.keep_alive_timeout_ms; + if (value > 0) { + timeout = value; } if (enable) { int newval; @@ -398,16 +381,11 @@ grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage, } grpc_error_handle grpc_apply_socket_mutator_in_args( - int fd, grpc_fd_usage usage, const grpc_channel_args* args) { - const grpc_arg* socket_mutator_arg = - grpc_channel_args_find(args, GRPC_ARG_SOCKET_MUTATOR); - if (socket_mutator_arg == nullptr) { + int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options) { + if (options.socket_mutator == nullptr) { return GRPC_ERROR_NONE; } - GPR_DEBUG_ASSERT(socket_mutator_arg->type == GRPC_ARG_POINTER); - grpc_socket_mutator* mutator = - static_cast(socket_mutator_arg->value.pointer.p); - return grpc_set_socket_with_mutator(fd, usage, mutator); + return grpc_set_socket_with_mutator(fd, usage, options.socket_mutator); } static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; diff --git a/src/core/lib/iomgr/socket_utils_posix.cc b/src/core/lib/iomgr/socket_utils_posix.cc index 333e60db782..66f7d109a67 100644 --- a/src/core/lib/iomgr/socket_utils_posix.cc +++ b/src/core/lib/iomgr/socket_utils_posix.cc @@ -18,18 +18,100 @@ #include +#include "absl/types/optional.h" + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKETUTILS - #include #include #include +#include #include #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#endif + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" + +using ::grpc_event_engine::experimental::EndpointConfig; + +using ::grpc_core::PosixTcpOptions; + +namespace { + +int AdjustValue(int default_value, int min_value, int max_value, + absl::optional actual_value) { + if (!actual_value.has_value() || *actual_value < min_value || + *actual_value > max_value) { + return default_value; + } + return *actual_value; +} +} // namespace + +PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) { + void* value; + PosixTcpOptions options; + options.tcp_read_chunk_size = AdjustValue( + PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE)); + options.tcp_min_read_chunk_size = + AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1, + PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)); + options.tcp_max_read_chunk_size = + AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1, + PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)); + options.tcp_tx_zerocopy_send_bytes_threshold = + AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)); + options.tcp_tx_zerocopy_max_simultaneous_sends = + AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)); + options.tcp_tx_zero_copy_enabled = + (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0); + options.keep_alive_time_ms = + AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS)); + options.keep_alive_timeout_ms = + AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)); + options.expand_wildcard_addrs = + (AdjustValue(0, 1, INT_MAX, + config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0); + options.allow_reuse_port = + (AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) != + 0); + + if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) { + options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size; + } + options.tcp_read_chunk_size = grpc_core::Clamp( + options.tcp_read_chunk_size, options.tcp_min_read_chunk_size, + options.tcp_max_read_chunk_size); + + value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA); + if (value != nullptr) { + options.resource_quota = + reinterpret_cast(value)->Ref(); + } + value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR); + if (value != nullptr) { + options.socket_mutator = + grpc_socket_mutator_ref(static_cast(value)); + } + return options; +} + +#endif /* GRPC_POSIX_SOCKET_TCP */ + +#ifdef GRPC_POSIX_SOCKETUTILS int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec) { diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index d79a4a15ed8..27c3b284edb 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -21,15 +21,14 @@ #include -#include -#include - +#include #include #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" +#include "src/core/lib/resource_quota/resource_quota.h" #ifdef GRPC_LINUX_ERRQUEUE #ifndef SO_ZEROCOPY @@ -40,6 +39,98 @@ #endif #endif /* ifdef GRPC_LINUX_ERRQUEUE */ +namespace grpc_core { + +struct PosixTcpOptions { + static constexpr int kDefaultReadChunkSize = 8192; + static constexpr int kDefaultMinReadChunksize = 256; + static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024; + static constexpr int kZerocpTxEnabledDefault = 0; + static constexpr int kMaxChunkSize = 32 * 1024 * 1024; + static constexpr int kDefaultMaxSends = 4; + static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; + int tcp_read_chunk_size = kDefaultReadChunkSize; + int tcp_min_read_chunk_size = kDefaultMinReadChunksize; + int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; + int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; + int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; + bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; + int keep_alive_time_ms = 0; + int keep_alive_timeout_ms = 0; + bool expand_wildcard_addrs = false; + bool allow_reuse_port = false; + RefCountedPtr resource_quota; + struct grpc_socket_mutator* socket_mutator = nullptr; + PosixTcpOptions() = default; + // Move ctor + PosixTcpOptions(PosixTcpOptions&& other) noexcept { + socket_mutator = absl::exchange(other.socket_mutator, nullptr); + resource_quota = std::move(other.resource_quota); + CopyIntegerOptions(other); + } + // Move assignment + PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept { + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + } + socket_mutator = absl::exchange(other.socket_mutator, nullptr); + resource_quota = std::move(other.resource_quota); + CopyIntegerOptions(other); + return *this; + } + // Copy ctor + PosixTcpOptions(const PosixTcpOptions& other) { + if (other.socket_mutator != nullptr) { + socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); + } + resource_quota = other.resource_quota; + CopyIntegerOptions(other); + } + // Copy assignment + PosixTcpOptions& operator=(const PosixTcpOptions& other) { + if (&other == this) { + return *this; + } + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + socket_mutator = nullptr; + } + if (other.socket_mutator != nullptr) { + socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); + } + resource_quota = other.resource_quota; + CopyIntegerOptions(other); + return *this; + } + // Destructor. + ~PosixTcpOptions() { + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + } + } + + private: + void CopyIntegerOptions(const PosixTcpOptions& other) { + tcp_read_chunk_size = other.tcp_read_chunk_size; + tcp_min_read_chunk_size = other.tcp_min_read_chunk_size; + tcp_max_read_chunk_size = other.tcp_max_read_chunk_size; + tcp_tx_zerocopy_send_bytes_threshold = + other.tcp_tx_zerocopy_send_bytes_threshold; + tcp_tx_zerocopy_max_simultaneous_sends = + other.tcp_tx_zerocopy_max_simultaneous_sends; + tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled; + keep_alive_time_ms = other.keep_alive_time_ms; + keep_alive_timeout_ms = other.keep_alive_timeout_ms; + expand_wildcard_addrs = other.expand_wildcard_addrs; + allow_reuse_port = other.allow_reuse_port; + } +}; + +} // namespace grpc_core + +grpc_core::PosixTcpOptions TcpOptionsFromEndpointConfig( + const grpc_event_engine::experimental::EndpointConfig& config); + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec); @@ -70,7 +161,7 @@ void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client); /* Set TCP_USER_TIMEOUT */ grpc_error_handle grpc_set_socket_tcp_user_timeout( - int fd, const grpc_channel_args* channel_args, bool is_client); + int fd, const grpc_core::PosixTcpOptions& options, bool is_client); /* Returns true if this system can create AF_INET6 sockets bound to ::1. The value is probed once, and cached for the life of the process. @@ -104,9 +195,10 @@ grpc_error_handle grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes); grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage, grpc_socket_mutator* mutator); -/* Extracts the first socket mutator from args if any and applies on the fd. */ +/* Extracts the first socket mutator from config if any and applies on the fd. + */ grpc_error_handle grpc_apply_socket_mutator_in_args( - int fd, grpc_fd_usage usage, const grpc_channel_args* args); + int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options); /* An enum to keep track of IPv4/IPv6 socket modes. diff --git a/src/core/lib/iomgr/tcp_client.cc b/src/core/lib/iomgr/tcp_client.cc index aba610cee70..05511a4192c 100644 --- a/src/core/lib/iomgr/tcp_client.cc +++ b/src/core/lib/iomgr/tcp_client.cc @@ -22,14 +22,13 @@ grpc_tcp_client_vtable* grpc_tcp_client_impl; -int64_t grpc_tcp_client_connect(grpc_closure* on_connect, - grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) { +int64_t grpc_tcp_client_connect( + grpc_closure* on_connect, grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { return grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties, - channel_args, addr, deadline); + config, addr, deadline); } bool grpc_tcp_client_cancel_connect(int64_t connection_handle) { diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 24235cde277..12edeb97b69 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -21,6 +21,7 @@ #include +#include #include #include @@ -30,11 +31,11 @@ #include "src/core/lib/resource_quota/memory_quota.h" typedef struct grpc_tcp_client_vtable { - int64_t (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); + int64_t (*connect)( + grpc_closure* on_connect, grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline); bool (*cancel_connect)(int64_t connection_handle); } grpc_tcp_client_vtable; @@ -45,12 +46,11 @@ typedef struct grpc_tcp_client_vtable { in this connection being established (in order to continue their work). It returns a handle to the connect operation which can be used to cancel the connection attempt. */ -int64_t grpc_tcp_client_connect(grpc_closure* on_connect, - grpc_endpoint** endpoint, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); +int64_t grpc_tcp_client_connect( + grpc_closure* on_connect, grpc_endpoint** endpoint, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline); // Returns true if a connect attempt corresponding to the provided handle // is successfully cancelled. Otherwise it returns false. If the connect diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index de42f01245e..4cd23173ef8 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -27,12 +27,12 @@ #include #include +#include #include #include #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/iomgr/cfstream_handle.h" #include "src/core/lib/iomgr/closure.h" @@ -149,11 +149,11 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, *port = grpc_sockaddr_get_port(addr); } -static int64_t CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* resolved_addr, - grpc_core::Timestamp deadline) { +static int64_t CFStreamClientConnect( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& /*config*/, + const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) { auto addr_uri = grpc_sockaddr_to_uri(resolved_addr); if (!addr_uri.ok()) { grpc_error_handle error = diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 849a1d903ea..e11e7790337 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -35,7 +35,6 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" @@ -51,6 +50,8 @@ extern grpc_core::TraceFlag grpc_tcp_trace; +using ::grpc_event_engine::experimental::EndpointConfig; + struct async_connect { gpr_mu mu; grpc_fd* fd; @@ -62,9 +63,9 @@ struct async_connect { std::string addr_str; grpc_endpoint** ep; grpc_closure* closure; - grpc_channel_args* channel_args; int64_t connection_handle; bool connect_cancelled; + grpc_core::PosixTcpOptions options; }; struct ConnectionShard { @@ -90,9 +91,9 @@ void grpc_tcp_client_global_init() { gpr_once_init(&g_tcp_client_posix_init, do_tcp_client_global_init); } -static grpc_error_handle prepare_socket(const grpc_resolved_address* addr, - int fd, - const grpc_channel_args* channel_args) { +static grpc_error_handle prepare_socket( + const grpc_resolved_address* addr, int fd, + const grpc_core::PosixTcpOptions& options) { grpc_error_handle err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); @@ -106,15 +107,14 @@ static grpc_error_handle prepare_socket(const grpc_resolved_address* addr, if (!GRPC_ERROR_IS_NONE(err)) goto error; err = grpc_set_socket_reuse_addr(fd, 1); if (!GRPC_ERROR_IS_NONE(err)) goto error; - err = grpc_set_socket_tcp_user_timeout(fd, channel_args, - true /* is_client */); + err = grpc_set_socket_tcp_user_timeout(fd, options, true /* is_client */); if (!GRPC_ERROR_IS_NONE(err)) goto error; } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (!GRPC_ERROR_IS_NONE(err)) goto error; err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE, - channel_args); + options); if (!GRPC_ERROR_IS_NONE(err)) goto error; goto done; @@ -143,15 +143,20 @@ static void tc_on_alarm(void* acp, grpc_error_handle error) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); - grpc_channel_args_destroy(ac->channel_args); delete ac; } } -grpc_endpoint* grpc_tcp_client_create_from_fd( - grpc_fd* fd, const grpc_channel_args* channel_args, +static grpc_endpoint* grpc_tcp_client_create_from_fd( + grpc_fd* fd, const grpc_core::PosixTcpOptions& options, + absl::string_view addr_str) { + return grpc_tcp_create(fd, options, addr_str); +} + +grpc_endpoint* grpc_tcp_create_from_fd( + grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config, absl::string_view addr_str) { - return grpc_tcp_create(fd, channel_args, addr_str); + return grpc_tcp_create(fd, TcpOptionsFromEndpointConfig(config), addr_str); } static void on_writable(void* acp, grpc_error_handle error) { @@ -207,7 +212,7 @@ static void on_writable(void* acp, grpc_error_handle error) { switch (so_error) { case 0: grpc_pollset_set_del_fd(ac->interested_parties, fd); - *ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str); + *ep = grpc_tcp_client_create_from_fd(fd, ac->options, ac->addr_str); fd = nullptr; break; case ENOBUFS: @@ -269,7 +274,6 @@ finish: // This is safe even outside the lock, because "done", the sentinel, is // populated *inside* the lock. gpr_mu_destroy(&ac->mu); - grpc_channel_args_destroy(ac->channel_args); delete ac; } // Push async connect closure to the executor since this may actually be @@ -284,8 +288,9 @@ finish: } grpc_error_handle grpc_tcp_client_prepare_fd( - const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_resolved_address* mapped_addr, int* fd) { + const grpc_core::PosixTcpOptions& options, + const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr, + int* fd) { grpc_dualstack_mode dsmode; grpc_error_handle error; *fd = -1; @@ -306,8 +311,7 @@ grpc_error_handle grpc_tcp_client_prepare_fd( memcpy(mapped_addr, addr, sizeof(*mapped_addr)); } } - if ((error = prepare_socket(mapped_addr, *fd, channel_args)) != - GRPC_ERROR_NONE) { + if ((error = prepare_socket(mapped_addr, *fd, options)) != GRPC_ERROR_NONE) { return error; } return GRPC_ERROR_NONE; @@ -315,8 +319,9 @@ grpc_error_handle grpc_tcp_client_prepare_fd( int64_t grpc_tcp_client_create_from_prepared_fd( grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd, - const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_core::Timestamp deadline, grpc_endpoint** ep) { + const grpc_core::PosixTcpOptions& options, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline, + grpc_endpoint** ep) { int err; do { err = connect(fd, reinterpret_cast(addr->addr), @@ -342,7 +347,7 @@ int64_t grpc_tcp_client_create_from_prepared_fd( if (err >= 0) { // Connection already succeded. Return 0 to discourage any cancellation // attempts. - *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_uri.value()); + *ep = grpc_tcp_client_create_from_fd(fdobj, options, addr_uri.value()); grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); return 0; } @@ -371,7 +376,7 @@ int64_t grpc_tcp_client_create_from_prepared_fd( ac->refs = 2; GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac, grpc_schedule_on_exec_ctx); - ac->channel_args = grpc_channel_args_copy(channel_args); + ac->options = options; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p", @@ -395,21 +400,21 @@ int64_t grpc_tcp_client_create_from_prepared_fd( static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, + const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { grpc_resolved_address mapped_addr; + grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config)); int fd = -1; grpc_error_handle error; *ep = nullptr; - if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr, - &fd)) != GRPC_ERROR_NONE) { + if ((error = grpc_tcp_client_prepare_fd(options, addr, &mapped_addr, &fd)) != + GRPC_ERROR_NONE) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); return 0; } - return grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, - fd, channel_args, &mapped_addr, - deadline, ep); + return grpc_tcp_client_create_from_prepared_fd( + interested_parties, closure, fd, options, &mapped_addr, deadline, ep); } static bool tcp_cancel_connect(int64_t connection_handle) { @@ -458,7 +463,6 @@ static bool tcp_cancel_connect(int64_t connection_handle) { // This is safe even outside the lock, because "done", the sentinel, is // populated *inside* the lock. gpr_mu_destroy(&ac->mu); - grpc_channel_args_destroy(ac->channel_args); delete ac; } return connection_cancel_success; diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h index ddcb6e0691d..ab2dd76f8b5 100644 --- a/src/core/lib/iomgr/tcp_client_posix.h +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -23,23 +23,24 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_client.h" /* Create an endpoint from a connected grpc_fd. fd: a connected FD. Ownership is taken. - channel_args: may contain custom settings for the endpoint + config: may contain custom settings for the endpoint addr_str: destination address in printable format slice_allocator: ownership is taken by client. Returns: a new endpoint */ -grpc_endpoint* grpc_tcp_client_create_from_fd( - grpc_fd* fd, const grpc_channel_args* channel_args, +grpc_endpoint* grpc_tcp_create_from_fd( + grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config, absl::string_view addr_str); /* Return a configured, unbound, unconnected TCP client fd. - channel_args: may contain custom settings for the fd + options: may contain custom settings for the fd addr: the destination address mapped_addr: out parameter. addr mapped to an address appropriate to the type of socket FD created. For example, if addr is IPv4 and dual stack @@ -48,8 +49,9 @@ grpc_endpoint* grpc_tcp_client_create_from_fd( Returns: error, if any. Out parameters are not set on error */ grpc_error_handle grpc_tcp_client_prepare_fd( - const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_resolved_address* mapped_addr, int* fd); + const grpc_core::PosixTcpOptions& options, + const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr, + int* fd); /* Connect a configured TCP client fd. @@ -57,13 +59,14 @@ grpc_error_handle grpc_tcp_client_prepare_fd( connection being established (in order to continue their work closure: called when complete. On success, *ep will be set. fd: an FD returned from grpc_tcp_client_prepare_fd(). - channel_args: may contain custom settings for the endpoint + options: may contain custom settings for the endpoint deadline: connection deadline ep: out parameter. Set before closure is called if successful */ int64_t grpc_tcp_client_create_from_prepared_fd( grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd, - const grpc_channel_args* channel_args, const grpc_resolved_address* addr, - grpc_core::Timestamp deadline, grpc_endpoint** ep); + const grpc_core::PosixTcpOptions& options, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline, + grpc_endpoint** ep); #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index f419c075ccd..495b475d201 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -24,13 +24,13 @@ #ifdef GRPC_WINSOCK_SOCKET +#include #include #include #include #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_windows.h" @@ -38,8 +38,11 @@ #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/slice/slice_internal.h" +using ::grpc_event_engine::experimental::EndpointConfig; + struct async_connect { grpc_closure* on_done; gpr_mu mu; @@ -50,7 +53,6 @@ struct async_connect { int refs; grpc_closure on_connect; grpc_endpoint** endpoint; - grpc_channel_args* channel_args; }; static void async_connect_unlock_and_cleanup(async_connect* ac, @@ -58,7 +60,6 @@ static void async_connect_unlock_and_cleanup(async_connect* ac, int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { - grpc_channel_args_destroy(ac->channel_args); gpr_mu_destroy(&ac->mu); delete ac; } @@ -105,7 +106,7 @@ static void on_connect(void* acp, grpc_error_handle error) { error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); closesocket(socket->socket); } else { - *ep = grpc_tcp_create(socket, ac->channel_args, ac->addr_name); + *ep = grpc_tcp_create(socket, ac->addr_name); socket = nullptr; } } else { @@ -123,7 +124,7 @@ static void on_connect(void* acp, grpc_error_handle error) { notification request for the connection, and one timeout alert. */ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, + const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { SOCKET sock = INVALID_SOCKET; @@ -208,7 +209,6 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint, ac->refs = 2; ac->addr_name = addr_uri.value(); ac->endpoint = endpoint; - ac->channel_args = grpc_channel_args_copy(channel_args); GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 7af5244c250..cf7e1bf3cac 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -48,7 +48,6 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/experiments/experiments.h" @@ -466,8 +465,12 @@ using grpc_core::TcpZerocopySendRecord; namespace { struct grpc_tcp { - grpc_tcp(int max_sends, size_t send_bytes_threshold) - : tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {} + explicit grpc_tcp(const grpc_core::PosixTcpOptions& tcp_options) + : min_read_chunk_size(tcp_options.tcp_min_read_chunk_size), + max_read_chunk_size(tcp_options.tcp_max_read_chunk_size), + tcp_zerocopy_send_ctx( + tcp_options.tcp_tx_zerocopy_max_simultaneous_sends, + tcp_options.tcp_tx_zerocopy_send_bytes_threshold) {} grpc_endpoint base; grpc_fd* em_fd; int fd; @@ -1891,72 +1894,16 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_get_fd, tcp_can_track_err}; -#define MAX_CHUNK_SIZE (32 * 1024 * 1024) - grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, - const grpc_channel_args* channel_args, + const grpc_core::PosixTcpOptions& options, absl::string_view peer_string) { - static constexpr bool kZerocpTxEnabledDefault = false; - int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - int tcp_max_read_chunk_size = 4 * 1024 * 1024; - int tcp_min_read_chunk_size = 256; - bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault; - int tcp_tx_zerocopy_send_bytes_thresh = - grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold; - int tcp_tx_zerocopy_max_simult_sends = - grpc_core::TcpZerocopySendCtx::kDefaultMaxSends; - if (channel_args != nullptr) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { - grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE}; - tcp_read_chunk_size = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) { - grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE}; - tcp_min_read_chunk_size = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) { - grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE}; - tcp_max_read_chunk_size = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) { - tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool( - &channel_args->args[i], kZerocpTxEnabledDefault); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)) { - grpc_integer_options options = { - grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0, - INT_MAX}; - tcp_tx_zerocopy_send_bytes_thresh = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)) { - grpc_integer_options options = { - grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX}; - tcp_tx_zerocopy_max_simult_sends = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } - } - } - - if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) { - tcp_min_read_chunk_size = tcp_max_read_chunk_size; - } - tcp_read_chunk_size = grpc_core::Clamp( - tcp_read_chunk_size, tcp_min_read_chunk_size, tcp_max_read_chunk_size); - - grpc_tcp* tcp = new grpc_tcp(tcp_tx_zerocopy_max_simult_sends, - tcp_tx_zerocopy_send_bytes_thresh); + grpc_tcp* tcp = new grpc_tcp(options); tcp->base.vtable = &vtable; tcp->peer_string = std::string(peer_string); tcp->fd = grpc_fd_wrapped_fd(em_fd); - tcp->memory_owner = grpc_core::ResourceQuotaFromChannelArgs(channel_args) - ->memory_quota() - ->CreateMemoryOwner(peer_string); + GPR_ASSERT(options.resource_quota != nullptr); + tcp->memory_owner = + options.resource_quota->memory_quota()->CreateMemoryOwner(peer_string); tcp->self_reservation = tcp->memory_owner.MakeReservation(sizeof(grpc_tcp)); grpc_resolved_address resolved_local_addr; memset(&resolved_local_addr, 0, sizeof(resolved_local_addr)); @@ -1975,9 +1922,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->current_zerocopy_send = nullptr; tcp->release_fd_cb = nullptr; tcp->release_fd = nullptr; - tcp->target_length = static_cast(tcp_read_chunk_size); - tcp->min_read_chunk_size = tcp_min_read_chunk_size; - tcp->max_read_chunk_size = tcp_max_read_chunk_size; + tcp->target_length = static_cast(options.tcp_read_chunk_size); tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; @@ -1987,7 +1932,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->outgoing_buffer_arg = nullptr; tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled(); tcp->min_progress_size = 1; - if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) { + if (options.tcp_tx_zero_copy_enabled && + !tcp->tcp_zerocopy_send_ctx.memory_limited()) { #ifdef GRPC_LINUX_ERRQUEUE const int enable = 1; auto err = diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index d657539bc04..01b6ed39879 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -36,12 +36,14 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" extern grpc_core::TraceFlag grpc_tcp_trace; /// Create a tcp endpoint given a file desciptor and a read slice size. /// Takes ownership of \a fd. Takes ownership of the \a slice_allocator. -grpc_endpoint* grpc_tcp_create(grpc_fd* fd, const grpc_channel_args* args, +grpc_endpoint* grpc_tcp_create(grpc_fd* fd, + const grpc_core::PosixTcpOptions& options, absl::string_view peer_string); /// Return the tcp endpoint's fd, or -1 if this is not available. Does not diff --git a/src/core/lib/iomgr/tcp_server.cc b/src/core/lib/iomgr/tcp_server.cc index 8d6fefbbb20..0c762c559fb 100644 --- a/src/core/lib/iomgr/tcp_server.cc +++ b/src/core/lib/iomgr/tcp_server.cc @@ -22,10 +22,11 @@ grpc_tcp_server_vtable* grpc_tcp_server_impl; -grpc_error_handle grpc_tcp_server_create(grpc_closure* shutdown_complete, - const grpc_channel_args* args, - grpc_tcp_server** server) { - return grpc_tcp_server_impl->create(shutdown_complete, args, server); +grpc_error_handle grpc_tcp_server_create( + grpc_closure* shutdown_complete, + const grpc_event_engine::experimental::EndpointConfig& config, + grpc_tcp_server** server) { + return grpc_tcp_server_impl->create(shutdown_complete, config, server); } void grpc_tcp_server_start(grpc_tcp_server* server, diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 9c88428021f..93fdc30b199 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -23,6 +23,7 @@ #include +#include #include #include @@ -63,9 +64,10 @@ class TcpServerFdHandler { } // namespace grpc_core typedef struct grpc_tcp_server_vtable { - grpc_error_handle (*create)(grpc_closure* shutdown_complete, - const grpc_channel_args* args, - grpc_tcp_server** server); + grpc_error_handle (*create)( + grpc_closure* shutdown_complete, + const grpc_event_engine::experimental::EndpointConfig& config, + grpc_tcp_server** server); void (*start)(grpc_tcp_server* server, const std::vector* pollsets, grpc_tcp_server_cb on_accept_cb, void* cb_arg); @@ -86,9 +88,10 @@ typedef struct grpc_tcp_server_vtable { If shutdown_complete is not NULL, it will be used by grpc_tcp_server_unref() when the ref count reaches zero. Takes ownership of the slice_allocator_factory. */ -grpc_error_handle grpc_tcp_server_create(grpc_closure* shutdown_complete, - const grpc_channel_args* args, - grpc_tcp_server** server); +grpc_error_handle grpc_tcp_server_create( + grpc_closure* shutdown_complete, + const grpc_event_engine::experimental::EndpointConfig& config, + grpc_tcp_server** server); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server* server, diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 7ea6b3f4e98..d43113fb037 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -43,13 +43,13 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include #include #include #include #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -64,31 +64,21 @@ static std::atomic num_dropped_connections{0}; +using ::grpc_event_engine::experimental::EndpointConfig; + static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, - const grpc_channel_args* args, + const EndpointConfig& config, grpc_tcp_server** server) { grpc_tcp_server* s = new grpc_tcp_server; s->so_reuseport = grpc_is_socket_reuse_port_supported(); s->expand_wildcard_addrs = false; - for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) { - if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_INTEGER) { - s->so_reuseport = grpc_is_socket_reuse_port_supported() && - (args->args[i].value.integer != 0); - } else { - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT - " must be an integer"); - } - } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_INTEGER) { - s->expand_wildcard_addrs = (args->args[i].value.integer != 0); - } else { - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); - } - } + auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT); + if (value.has_value()) { + s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0); + } + value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS); + if (value.has_value()) { + s->expand_wildcard_addrs = (*value != 0); } gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); @@ -103,10 +93,10 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, s->head = nullptr; s->tail = nullptr; s->nports = 0; - s->channel_args = grpc_channel_args_copy(args); + s->options = TcpOptionsFromEndpointConfig(config); s->fd_handler = nullptr; - s->memory_quota = - grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota(); + GPR_ASSERT(s->options.resource_quota != nullptr); + s->memory_quota = s->options.resource_quota->memory_quota(); gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); *server = s; return GRPC_ERROR_NONE; @@ -126,7 +116,6 @@ static void finish_shutdown(grpc_tcp_server* s) { s->head = sp->next; gpr_free(sp); } - grpc_channel_args_destroy(s->channel_args); delete s->fd_handler; delete s; } @@ -252,7 +241,7 @@ static void on_read(void* arg, grpc_error_handle err) { (void)grpc_set_socket_no_sigpipe_if_possible(fd); err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE, - sp->server->channel_args); + sp->server->options); if (!GRPC_ERROR_IS_NONE(err)) { goto error; } @@ -287,7 +276,7 @@ static void on_read(void* arg, grpc_error_handle err) { acceptor->external_connection = false; sp->server->on_accept_cb( sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->channel_args, addr_uri.value()), + grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()), read_notifier_pollset, acceptor); } @@ -639,7 +628,7 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler { acceptor->listener_fd = listener_fd; acceptor->pending_data = buf; s_->on_accept_cb(s_->on_accept_cb_arg, - grpc_tcp_create(fdobj, s_->channel_args, addr_uri.value()), + grpc_tcp_create(fdobj, s_->options, addr_uri.value()), read_notifier_pollset, acceptor); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 79527c24613..94faa2c17ea 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -90,8 +90,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign = 0; - /* channel args for this server */ - grpc_channel_args* channel_args = nullptr; + /* Contains config extracted from channel args for this server */ + grpc_core::PosixTcpOptions options; /* a handler for external connections, owned */ grpc_core::TcpServerFdHandler* fd_handler = nullptr; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 549d01bff3b..73a6b943ec5 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -178,15 +178,15 @@ grpc_error_handle grpc_tcp_server_prepare_socket( if (!GRPC_ERROR_IS_NONE(err)) goto error; err = grpc_set_socket_reuse_addr(fd, 1); if (!GRPC_ERROR_IS_NONE(err)) goto error; - err = grpc_set_socket_tcp_user_timeout(fd, s->channel_args, - false /* is_client */); + err = + grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */); if (!GRPC_ERROR_IS_NONE(err)) goto error; } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (!GRPC_ERROR_IS_NONE(err)) goto error; err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_LISTENER_USAGE, - s->channel_args); + s->options); if (!GRPC_ERROR_IS_NONE(err)) goto error; if (bind(fd, reinterpret_cast(const_cast(addr->addr)), diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index 88036260070..aea53ccd0a0 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -29,6 +29,7 @@ #include "absl/strings/str_cat.h" +#include #include #include #include @@ -37,7 +38,6 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/pollset_windows.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -45,10 +45,13 @@ #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/tcp_windows.h" +#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/slice/slice_internal.h" #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 +using ::grpc_event_engine::experimental::EndpointConfig; + /* one listening port */ typedef struct grpc_tcp_listener grpc_tcp_listener; struct grpc_tcp_listener { @@ -94,17 +97,14 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure* shutdown_complete; - - grpc_channel_args* channel_args; }; /* Public function. Allocates the proper data structures to hold a grpc_tcp_server. */ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete, - const grpc_channel_args* args, + const EndpointConfig& config, grpc_tcp_server** server) { grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server)); - s->channel_args = grpc_channel_args_copy(args); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; @@ -132,7 +132,6 @@ static void destroy_server(void* arg, grpc_error_handle error) { grpc_winsocket_destroy(sp->socket); gpr_free(sp); } - grpc_channel_args_destroy(s->channel_args); gpr_mu_destroy(&s->mu); gpr_free(s); } @@ -363,7 +362,7 @@ static void on_accept(void* arg, grpc_error_handle error) { } std::string fd_name = absl::StrCat("tcp_server:", peer_name_string); ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()), - sp->server->channel_args, peer_name_string); + peer_name_string); } else { closesocket(sock); } diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 3fefaa0b3d0..19b21ab5bdd 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -505,7 +505,6 @@ static grpc_endpoint_vtable vtable = {win_read, win_can_track_err}; grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket, - grpc_channel_args* channel_args, absl::string_view peer_string) { // TODO(jtattermusch): C++ize grpc_tcp and its dependencies (i.e. add // constructors) to ensure proper initialization diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 9c6ad526850..6916a8ffd17 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -41,7 +41,6 @@ * Takes ownership of the handle. */ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket, - grpc_channel_args* channel_args, absl::string_view peer_string); grpc_error_handle grpc_tcp_prepare_socket(SOCKET sock); diff --git a/src/core/lib/resource_quota/api.cc b/src/core/lib/resource_quota/api.cc index e90fd8de970..34c24c10f4f 100644 --- a/src/core/lib/resource_quota/api.cc +++ b/src/core/lib/resource_quota/api.cc @@ -44,6 +44,15 @@ ResourceQuotaRefPtr ResourceQuotaFromChannelArgs( ->Ref(); } +ResourceQuotaRefPtr ResourceQuotaFromEndpointConfig( + const grpc_event_engine::experimental::EndpointConfig& config) { + void* value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA); + if (value != nullptr) { + return reinterpret_cast(value)->Ref(); + } + return nullptr; +} + ChannelArgs EnsureResourceQuotaInChannelArgs(const ChannelArgs& args) { if (args.GetObject() != nullptr) return args; // If there's no existing quota, add it to the default one - shared between diff --git a/src/core/lib/resource_quota/api.h b/src/core/lib/resource_quota/api.h index c287091b952..ee9cee3662f 100644 --- a/src/core/lib/resource_quota/api.h +++ b/src/core/lib/resource_quota/api.h @@ -19,6 +19,7 @@ #include +#include #include #include "src/core/lib/config/core_configuration.h" @@ -36,6 +37,11 @@ constexpr size_t kResourceQuotaChannelSize = 34 * 1024; // UB if not set. ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args* args); +// Retrieve the resource quota from the EndpointConfig. +// Returns nullptr if not set. +ResourceQuotaRefPtr ResourceQuotaFromEndpointConfig( + const grpc_event_engine::experimental::EndpointConfig& config); + void RegisterResourceQuota(CoreConfiguration::Builder* builder); } // namespace grpc_core diff --git a/src/core/lib/transport/tcp_connect_handshaker.cc b/src/core/lib/transport/tcp_connect_handshaker.cc index 5705fbffb79..708f591d06c 100644 --- a/src/core/lib/transport/tcp_connect_handshaker.cc +++ b/src/core/lib/transport/tcp_connect_handshaker.cc @@ -35,6 +35,7 @@ #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -154,9 +155,10 @@ void TCPConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, // we don't want to pass args->endpoint directly. // Instead pass endpoint_ and swap this endpoint to // args endpoint on success. - grpc_tcp_client_connect(&connected_, &endpoint_to_destroy_, - interested_parties_, args->args.ToC().get(), &addr_, - args->deadline); + grpc_tcp_client_connect( + &connected_, &endpoint_to_destroy_, interested_parties_, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args->args), + &addr_, args->deadline); } void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) { @@ -179,8 +181,9 @@ void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) { self->shutdown_ = true; self->FinishLocked(error); } else { - // The on_handshake_done_ is already as part of shutdown when connecting - // So nothing to be done here other than unrefing the error. + // The on_handshake_done_ is already as part of shutdown when + // connecting So nothing to be done here other than unrefing the + // error. GRPC_ERROR_UNREF(error); } return; diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index bbc1ac2fbb9..23acaa5058c 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -40,6 +39,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/thd.h" @@ -557,11 +557,11 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) { grpc_schedule_on_exec_ctx); auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint, - conn->pollset_set, args.get(), &(*addresses_or)[0], - deadline); + .PreconditionChannelArgs(nullptr); + grpc_tcp_client_connect( + &conn->on_server_connect_done, &conn->server_endpoint, conn->pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &(*addresses_or)[0], deadline); } static void on_read_request_done(void* arg, grpc_error_handle error) { @@ -632,13 +632,14 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port); gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name.c_str()); // Create TCP server. - proxy->channel_args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(args) - .ToC() - .release(); - grpc_error_handle error = - grpc_tcp_server_create(nullptr, proxy->channel_args, &proxy->server); + auto channel_args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(args); + proxy->channel_args = channel_args.ToC().release(); + grpc_error_handle error = grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args), + &proxy->server); GPR_ASSERT(GRPC_ERROR_IS_NONE(error)); // Bind to port. grpc_resolved_address resolved_addr; diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 9f83bdc4492..1d166ee14be 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -36,6 +36,7 @@ #include "absl/types/optional.h" #include +#include #include #include #include @@ -312,11 +313,11 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep, GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx)); } -static int64_t my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* /*interested_parties*/, - const grpc_channel_args* /*channel_args*/, - const grpc_resolved_address* /*addr*/, - grpc_core::Timestamp deadline) { +static int64_t my_tcp_client_connect( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* /*interested_parties*/, + const grpc_event_engine::experimental::EndpointConfig& /*config*/, + const grpc_resolved_address* /*addr*/, grpc_core::Timestamp deadline) { sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC)); return 0; } diff --git a/test/core/event_engine/endpoint_config_test.cc b/test/core/event_engine/endpoint_config_test.cc index c2f52dd8971..c80aba84531 100644 --- a/test/core/event_engine/endpoint_config_test.cc +++ b/test/core/event_engine/endpoint_config_test.cc @@ -26,16 +26,17 @@ using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; TEST(EndpointConfigTest, CanSRetrieveValuesFromChannelArgs) { - grpc_arg arg = grpc_channel_arg_integer_create(const_cast("arst"), 3); - const grpc_channel_args args = {1, &arg}; - ChannelArgsEndpointConfig config(&args); - EXPECT_EQ(absl::get(config.Get("arst")), 3); + grpc_core::ChannelArgs args; + args = args.Set("arst", 3); + ChannelArgsEndpointConfig config(args); + EXPECT_EQ(*config.GetInt("arst"), 3); } -TEST(EndpointConfigTest, ReturnsMonostateForMissingKeys) { - ChannelArgsEndpointConfig config(nullptr); - EXPECT_TRUE( - absl::holds_alternative(config.Get("nonexistent"))); +TEST(EndpointConfigTest, ReturnsNoValueForMissingKeys) { + ChannelArgsEndpointConfig config; + EXPECT_TRUE(!config.GetInt("nonexistent").has_value()); + EXPECT_TRUE(!config.GetString("nonexistent").has_value()); + EXPECT_EQ(config.GetVoidPointer("nonexistent"), nullptr); } int main(int argc, char** argv) { diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index b0c6f8bae87..86990f5a726 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -36,6 +37,8 @@ class EventEngineClientTest : public EventEngineTest {}; +using namespace std::chrono_literals; + namespace { using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; @@ -44,7 +47,6 @@ using ::grpc_event_engine::experimental::Promise; using ::grpc_event_engine::experimental::URIToResolvedAddress; using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; -using namespace std::chrono_literals; constexpr int kMinMessageSize = 1024; constexpr int kMaxMessageSize = 4096; @@ -82,9 +84,10 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { grpc_core::ExecCtx ctx; auto test_ee = this->NewEventEngine(); Promise> client_endpoint_promise; - auto memory_quota = std::make_unique("bar"); + auto memory_quota = absl::make_unique("bar"); // Create a test EventEngine client endpoint and connect to a non existent // listener. + ChannelArgsEndpointConfig config; test_ee->Connect( [&client_endpoint_promise]( absl::StatusOr> status) { @@ -92,8 +95,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { EXPECT_FALSE(status.ok()); client_endpoint_promise.Set(nullptr); }, - URIToResolvedAddress("ipv6:[::1]:7000"), - ChannelArgsEndpointConfig(nullptr), + URIToResolvedAddress("ipv6:[::1]:7000"), config, memory_quota->CreateMemoryAllocator("conn-1"), 24h); auto client_endpoint = std::move(client_endpoint_promise.Get()); @@ -108,7 +110,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { grpc_core::ExecCtx ctx; auto oracle_ee = this->NewOracleEventEngine(); auto test_ee = this->NewEventEngine(); - auto memory_quota = std::make_unique("bar"); + auto memory_quota = absl::make_unique("bar"); std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); Promise> client_endpoint_promise; @@ -121,11 +123,11 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { server_endpoint_promise.Set(std::move(ep)); }; + ChannelArgsEndpointConfig config; auto status = oracle_ee->CreateListener( std::move(accept_cb), - [](absl::Status status) { GPR_ASSERT(status.ok()); }, - ChannelArgsEndpointConfig(nullptr), - std::make_unique("foo")); + [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, + absl::make_unique("foo")); EXPECT_TRUE(status.ok()); std::unique_ptr listener = std::move(*status); @@ -143,7 +145,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_endpoint_promise.Set(std::move(*status)); } }, - URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr), + URIToResolvedAddress(target_addr), config, memory_quota->CreateMemoryAllocator("conn-1"), 24h); auto client_endpoint = std::move(client_endpoint_promise.Get()); @@ -173,7 +175,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { static constexpr int kNumConnections = 10; // M auto oracle_ee = this->NewOracleEventEngine(); auto test_ee = this->NewEventEngine(); - auto memory_quota = std::make_unique("bar"); + auto memory_quota = absl::make_unique("bar"); Promise> client_endpoint_promise; Promise> server_endpoint_promise; std::vector target_addrs; @@ -186,11 +188,11 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { grpc_core::MemoryAllocator /*memory_allocator*/) { server_endpoint_promise.Set(std::move(ep)); }; + ChannelArgsEndpointConfig config; auto status = oracle_ee->CreateListener( std::move(accept_cb), - [](absl::Status status) { GPR_ASSERT(status.ok()); }, - ChannelArgsEndpointConfig(nullptr), - std::make_unique("foo")); + [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, + absl::make_unique("foo")); EXPECT_TRUE(status.ok()); std::unique_ptr listener = std::move(*status); @@ -207,6 +209,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { // Create a test EventEngine client endpoint and connect to a one of the // addresses bound to the oracle listener. Verify that the connection // succeeds. + ChannelArgsEndpointConfig config; test_ee->Connect( [&client_endpoint_promise]( absl::StatusOr> status) { @@ -218,8 +221,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { client_endpoint_promise.Set(std::move(*status)); } }, - URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), - ChannelArgsEndpointConfig(nullptr), + URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), config, memory_quota->CreateMemoryAllocator( absl::StrCat("conn-", std::to_string(i))), 24h); diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.cc b/test/core/event_engine/test_suite/event_engine_test_utils.cc index b7ccbce48e6..a35730dafea 100644 --- a/test/core/event_engine/test_suite/event_engine_test_utils.cc +++ b/test/core/event_engine/test_suite/event_engine_test_utils.cc @@ -137,11 +137,11 @@ absl::Status ConnectionManager::BindAndStartListener( EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get() : test_event_engine_.get(); + ChannelArgsEndpointConfig config; auto status = event_engine->CreateListener( std::move(accept_cb), - [](absl::Status status) { GPR_ASSERT(status.ok()); }, - ChannelArgsEndpointConfig(nullptr), - std::make_unique("foo")); + [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, + absl::make_unique("foo")); if (!status.ok()) { return status.status(); } @@ -174,6 +174,7 @@ ConnectionManager::CreateConnection(std::string target_addr, absl::StrCat("connection-", std::to_string(num_processed_connections_++)); EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get() : test_event_engine_.get(); + ChannelArgsEndpointConfig config; event_engine->Connect( [this](absl::StatusOr> status) { if (!status.ok()) { @@ -184,7 +185,7 @@ ConnectionManager::CreateConnection(std::string target_addr, last_in_progress_connection_.SetClientEndpoint(std::move(*status)); } }, - URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr), + URIToResolvedAddress(target_addr), config, memory_quota_->CreateMemoryAllocator(conn_name), timeout); auto client_endpoint = last_in_progress_connection_.GetClientEndpoint(); diff --git a/test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm b/test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm index 226cb956939..8cc2e05cbd5 100644 --- a/test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm +++ b/test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm @@ -30,6 +30,7 @@ #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_client.h" @@ -103,12 +104,12 @@ static void must_fail(void* arg, grpc_error_handle error) { /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr, (socklen_t*)&resolved_addr->len) == 0); GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx); - auto args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_tcp_client_connect(&done, &g_connecting, nullptr, args.get(), &*resolved_addr, - grpc_core::Timestamp::InfFuture()); + auto args = + grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( + nullptr); + grpc_tcp_client_connect(&done, &g_connecting, nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &*resolved_addr, grpc_core::Timestamp::InfFuture()); /* await the connection */ do { @@ -160,12 +161,12 @@ static void must_fail(void* arg, grpc_error_handle error) { /* connect to a broken address */ GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx); - auto args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_tcp_client_connect(&done, &g_connecting, nullptr, args.get(), &*resolved_addr, - grpc_core::Timestamp::InfFuture()); + auto args = + grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( + nullptr); + grpc_tcp_client_connect(&done, &g_connecting, nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &*resolved_addr, grpc_core::Timestamp::InfFuture()); grpc_core::ExecCtx::Get()->Flush(); diff --git a/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm index 6d94118183f..95f3814aba3 100644 --- a/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm +++ b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm @@ -32,6 +32,7 @@ #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_client.h" @@ -125,12 +126,12 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch /* connect to it */ XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0); init_event_closure(&done, &connected_promise); - auto args = grpc_core::CoreConfiguration::Get() - .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_tcp_client_connect(&done, &ep_, nullptr, args.get(), &*resolved_addr, - grpc_core::Timestamp::InfFuture()); + auto args = + grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( + nullptr); + grpc_tcp_client_connect(&done, &ep_, nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &*resolved_addr, grpc_core::Timestamp::InfFuture()); /* await the connection */ do { diff --git a/test/core/iomgr/tcp_client_posix_test.cc b/test/core/iomgr/tcp_client_posix_test.cc index 2af0a4bfa96..ac20f03ae67 100644 --- a/test/core/iomgr/tcp_client_posix_test.cc +++ b/test/core/iomgr/tcp_client_posix_test.cc @@ -37,6 +37,7 @@ #include #include +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -116,8 +117,9 @@ void test_succeeds(void) { .channel_args_preconditioning() .PreconditionChannelArgs(nullptr); int64_t connection_handle = grpc_tcp_client_connect( - &done, &g_connecting, g_pollset_set, args.ToC().get(), &resolved_addr, - grpc_core::Timestamp::InfFuture()); + &done, &g_connecting, g_pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &resolved_addr, grpc_core::Timestamp::InfFuture()); /* await the connection */ do { resolved_addr.len = static_cast(sizeof(addr)); @@ -169,8 +171,9 @@ void test_fails(void) { /* connect to a broken address */ GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx); int64_t connection_handle = grpc_tcp_client_connect( - &done, &g_connecting, g_pollset_set, nullptr, &resolved_addr, - grpc_core::Timestamp::InfFuture()); + &done, &g_connecting, g_pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(), + &resolved_addr, grpc_core::Timestamp::InfFuture()); gpr_mu_lock(g_mu); /* wait for the connection callback to finish */ @@ -232,8 +235,9 @@ void test_connect_cancellation_succeeds(void) { .channel_args_preconditioning() .PreconditionChannelArgs(nullptr); int64_t connection_handle = grpc_tcp_client_connect( - &done, &g_connecting, g_pollset_set, args.ToC().get(), &resolved_addr, - grpc_core::Timestamp::InfFuture()); + &done, &g_connecting, g_pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &resolved_addr, grpc_core::Timestamp::InfFuture()); ASSERT_GT(connection_handle, 0); ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true); close(svr_fd); @@ -257,8 +261,10 @@ void test_fails_bad_addr_no_leak(void) { gpr_mu_unlock(g_mu); // connect to an invalid address. GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx); - grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr, - &resolved_addr, grpc_core::Timestamp::InfFuture()); + grpc_tcp_client_connect( + &done, &g_connecting, g_pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(), + &resolved_addr, grpc_core::Timestamp::InfFuture()); gpr_mu_lock(g_mu); while (g_connections_complete == connections_complete_before) { grpc_pollset_worker* worker = nullptr; diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 89a634a07ac..69e200f087a 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -16,6 +16,7 @@ * */ +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/port.h" @@ -35,10 +36,12 @@ #include #include +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/sockaddr_posix.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/iomgr/endpoint_tests.h" @@ -231,8 +234,12 @@ static void read_test(size_t num_bytes, size_t slice_size, a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = - grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test"); + ep = grpc_tcp_create( + grpc_fd_create(sv[1], "read_test", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -291,8 +298,12 @@ static void large_read_test(size_t slice_size, int min_progress_size) { a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args, - "test"); + ep = grpc_tcp_create( + grpc_fd_create(sv[1], "large_read_test", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); written_bytes = fill_socket(sv[0]); @@ -461,8 +472,12 @@ static void write_test(size_t num_bytes, size_t slice_size, a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps), - &args, "test"); + ep = grpc_tcp_create( + grpc_fd_create(sv[1], "write_test", collect_timestamps), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); state.ep = ep; @@ -545,8 +560,12 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = - grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test"); + ep = grpc_tcp_create( + grpc_fd_create(sv[1], "read_test", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); grpc_endpoint_add_to_pollset(ep, g_pollset); @@ -648,10 +667,18 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( a[1].value.pointer.p = grpc_resource_quota_create("test"); a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false), - &args, "test"); - f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false), - &args, "test"); + f.client_ep = grpc_tcp_create( + grpc_fd_create(sv[0], "fixture:client", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); + f.server_ep = grpc_tcp_create( + grpc_fd_create(sv[1], "fixture:server", false), + TcpOptionsFromEndpointConfig( + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + grpc_core::ChannelArgs::FromC(&args))), + "test"); grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); grpc_endpoint_add_to_pollset(f.server_ep, g_pollset); grpc_resource_quota_unref( diff --git a/test/core/iomgr/tcp_server_posix_test.cc b/test/core/iomgr/tcp_server_posix_test.cc index f65a950cebd..6bb83b3d979 100644 --- a/test/core/iomgr/tcp_server_posix_test.cc +++ b/test/core/iomgr/tcp_server_posix_test.cc @@ -43,6 +43,7 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/iomgr.h" @@ -169,9 +170,13 @@ static void test_no_op(void) { grpc_tcp_server* s; auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s)); + .PreconditionChannelArgs(nullptr); + ASSERT_EQ( + GRPC_ERROR_NONE, + grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &s)); grpc_tcp_server_unref(s); } @@ -180,9 +185,13 @@ static void test_no_op_with_start(void) { grpc_tcp_server* s; auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s)); + .PreconditionChannelArgs(nullptr); + ASSERT_EQ( + GRPC_ERROR_NONE, + grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &s)); LOG_TEST("test_no_op_with_start"); std::vector empty_pollset; grpc_tcp_server_start(s, &empty_pollset, on_connect, nullptr); @@ -197,9 +206,13 @@ static void test_no_op_with_port(void) { grpc_tcp_server* s; auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s)); + .PreconditionChannelArgs(nullptr); + ASSERT_EQ( + GRPC_ERROR_NONE, + grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &s)); LOG_TEST("test_no_op_with_port"); memset(&resolved_addr, 0, sizeof(resolved_addr)); @@ -221,9 +234,13 @@ static void test_no_op_with_port_and_start(void) { grpc_tcp_server* s; auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s)); + .PreconditionChannelArgs(nullptr); + ASSERT_EQ( + GRPC_ERROR_NONE, + grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &s)); LOG_TEST("test_no_op_with_port_and_start"); int port = -1; @@ -321,10 +338,13 @@ static void test_connect(size_t num_connects, const unsigned num_ports = 2; auto new_channel_args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(channel_args) - .ToC(); + .PreconditionChannelArgs(channel_args); ASSERT_EQ(GRPC_ERROR_NONE, - grpc_tcp_server_create(nullptr, new_channel_args.get(), &s)); + grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig( + new_channel_args), + &s)); unsigned port_num; server_weak_ref weak_ref; server_weak_ref_init(&weak_ref); diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index d60badf1ce9..d63a3a49a50 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -32,6 +32,7 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -140,10 +141,11 @@ void bad_server_thread(void* vargs) { grpc_tcp_server* s; auto channel_args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_error_handle error = - grpc_tcp_server_create(nullptr, channel_args.get(), &s); + .PreconditionChannelArgs(nullptr); + grpc_error_handle error = grpc_tcp_server_create( + nullptr, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args), + &s); ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)); memset(&resolved_addr, 0, sizeof(resolved_addr)); addr->sa_family = GRPC_AF_INET; diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc index 79be4d2f16e..3d3fd152062 100644 --- a/test/core/transport/chttp2/settings_timeout_test.cc +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -40,9 +40,9 @@ #include #include -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" @@ -142,11 +142,11 @@ class Client { EventState state; auto args = CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); - grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set, - args.get(), addresses_or->data(), - ExecCtx::Get()->Now() + Duration::Seconds(1)); + .PreconditionChannelArgs(nullptr); + grpc_tcp_client_connect( + state.closure(), &endpoint_, pollset_set, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + addresses_or->data(), ExecCtx::Get()->Now() + Duration::Seconds(1)); ASSERT_TRUE(PollUntilDone(&state, Timestamp::InfFuture())); ASSERT_EQ(GRPC_ERROR_NONE, state.error()); grpc_pollset_set_destroy(pollset_set); diff --git a/test/core/util/test_tcp_server.cc b/test/core/util/test_tcp_server.cc index 126f1a27b94..468e9f3e7de 100644 --- a/test/core/util/test_tcp_server.cc +++ b/test/core/util/test_tcp_server.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -30,9 +29,9 @@ #include #include -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -76,10 +75,11 @@ void test_tcp_server_start(test_tcp_server* server, int port) { auto args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() - .PreconditionChannelArgs(nullptr) - .ToC(); + .PreconditionChannelArgs(nullptr); grpc_error_handle error = grpc_tcp_server_create( - &server->shutdown_complete, args.get(), &server->tcp_server); + &server->shutdown_complete, + grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), + &server->tcp_server); GPR_ASSERT(GRPC_ERROR_IS_NONE(error)); error = grpc_tcp_server_add_port(server->tcp_server, &resolved_addr, &port_added); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e7c74ebacd0..e24e2b5afef 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -30,6 +30,7 @@ #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include #include #include #include diff --git a/test/cpp/end2end/connection_attempt_injector.cc b/test/cpp/end2end/connection_attempt_injector.cc index 30d045ade03..61f430b6618 100644 --- a/test/cpp/end2end/connection_attempt_injector.cc +++ b/test/cpp/end2end/connection_attempt_injector.cc @@ -25,6 +25,8 @@ // defined in tcp_client.cc extern grpc_tcp_client_vtable* grpc_tcp_client_impl; +using ::grpc_event_engine::experimental::EndpointConfig; + namespace grpc { namespace testing { @@ -53,18 +55,18 @@ void ConnectionAttemptInjector::Init() { int64_t ConnectionAttemptInjector::TcpConnect( grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + grpc_pollset_set* interested_parties, const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { grpc_core::MutexLock lock(g_mu); // If there's no injector, use the original vtable. if (g_injector == nullptr) { - g_original_vtable->connect(closure, ep, interested_parties, channel_args, - addr, deadline); + g_original_vtable->connect(closure, ep, interested_parties, config, addr, + deadline); return 0; } // Otherwise, use the injector. - g_injector->HandleConnection(closure, ep, interested_parties, channel_args, - addr, deadline); + g_injector->HandleConnection(closure, ep, interested_parties, config, addr, + deadline); return 0; } @@ -111,7 +113,7 @@ void ConnectionAttemptInjector::SetDelay(grpc_core::Duration delay) { void ConnectionAttemptInjector::HandleConnection( grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + grpc_pollset_set* interested_parties, const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { const int port = grpc_sockaddr_get_port(addr); gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port); @@ -128,7 +130,7 @@ void ConnectionAttemptInjector::HandleConnection( hold, nullptr); } hold->queued_attempt_ = absl::make_unique( - closure, ep, interested_parties, channel_args, addr, deadline); + closure, ep, interested_parties, config, addr, deadline); hold->start_cv_.Signal(); holds_.erase(it); return; @@ -136,14 +138,14 @@ void ConnectionAttemptInjector::HandleConnection( } // Otherwise, if there's a configured delay, impose it. if (delay_.has_value()) { - new InjectedDelay(*delay_, closure, ep, interested_parties, channel_args, - addr, deadline); + new InjectedDelay(*delay_, closure, ep, interested_parties, config, addr, + deadline); return; } } // Anything we're not holding or delaying should proceed normally. - g_original_vtable->connect(closure, ep, interested_parties, channel_args, - addr, deadline); + g_original_vtable->connect(closure, ep, interested_parties, config, addr, + deadline); } // @@ -152,25 +154,25 @@ void ConnectionAttemptInjector::HandleConnection( ConnectionAttemptInjector::QueuedAttempt::QueuedAttempt( grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + grpc_pollset_set* interested_parties, const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) : closure_(closure), endpoint_(ep), interested_parties_(interested_parties), - channel_args_(grpc_channel_args_copy(channel_args)), + config_(*reinterpret_cast(&config)), deadline_(deadline) { memcpy(&address_, addr, sizeof(address_)); } ConnectionAttemptInjector::QueuedAttempt::~QueuedAttempt() { GPR_ASSERT(closure_ == nullptr); - grpc_channel_args_destroy(channel_args_); } void ConnectionAttemptInjector::QueuedAttempt::Resume() { GPR_ASSERT(closure_ != nullptr); - g_original_vtable->connect(closure_, endpoint_, interested_parties_, - channel_args_, &address_, deadline_); + g_original_vtable->connect(closure_, endpoint_, interested_parties_, config_, + &address_, deadline_); closure_ = nullptr; } @@ -186,9 +188,9 @@ void ConnectionAttemptInjector::QueuedAttempt::Fail(grpc_error_handle error) { ConnectionAttemptInjector::InjectedDelay::InjectedDelay( grpc_core::Duration duration, grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + grpc_pollset_set* interested_parties, const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) - : attempt_(closure, ep, interested_parties, channel_args, addr, deadline) { + : attempt_(closure, ep, interested_parties, config, addr, deadline) { GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr); grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); duration = std::min(duration, deadline - now); diff --git a/test/cpp/end2end/connection_attempt_injector.h b/test/cpp/end2end/connection_attempt_injector.h index 36e6dcf88aa..2d2289597c6 100644 --- a/test/cpp/end2end/connection_attempt_injector.h +++ b/test/cpp/end2end/connection_attempt_injector.h @@ -17,7 +17,7 @@ #include -#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/timer.h" @@ -123,7 +123,7 @@ class ConnectionAttemptInjector final { public: QueuedAttempt(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, + const grpc_event_engine::experimental::EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline); ~QueuedAttempt(); @@ -138,7 +138,7 @@ class ConnectionAttemptInjector final { grpc_closure* closure_; grpc_endpoint** endpoint_; grpc_pollset_set* interested_parties_; - const grpc_channel_args* channel_args_; + grpc_event_engine::experimental::ChannelArgsEndpointConfig config_; grpc_resolved_address address_; grpc_core::Timestamp deadline_; }; @@ -150,7 +150,7 @@ class ConnectionAttemptInjector final { InjectedDelay(grpc_core::Duration duration, grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, + const grpc_event_engine::experimental::EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline); @@ -163,25 +163,25 @@ class ConnectionAttemptInjector final { }; // Invoked for every TCP connection attempt. - void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); - - static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); + void HandleConnection( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline); + + static void AttemptConnection( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline); // Replacement iomgr tcp_connect vtable functions that use the current // ConnectionAttemptInjector object. - static int64_t TcpConnect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline); + static int64_t TcpConnect( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline); static bool TcpConnectCancel(int64_t connection_handle); std::vector holds_ ABSL_GUARDED_BY(&mu_); diff --git a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc index 165f7c776e7..194ea5bc048 100644 --- a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc @@ -21,6 +21,8 @@ #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" +#include + #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index e7f4a3f604a..59083d1cbb7 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -22,6 +22,8 @@ #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include + #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"