Revert "Revert "Modifying iomgr tcp code to use event engine EndpointConfig instead of channel_args"" (#30509)

* Revert "Revert "Modifying iomgr tcp code to use event engine EndpointConfig instead of channel_args (#30028)" (#30495)"

This reverts commit 0b1a8c984b.

* bug fix

* bug fix

* fix sanity
pull/30736/head^2
Vignesh Babu 2 years ago committed by GitHub
parent 903e0490ea
commit 70dd34b438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 16
      include/grpc/event_engine/endpoint_config.h
  3. 9
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  4. 53
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  5. 32
      src/core/lib/event_engine/channel_args_endpoint_config.cc
  6. 20
      src/core/lib/event_engine/channel_args_endpoint_config.h
  7. 24
      src/core/lib/iomgr/endpoint_pair_posix.cc
  8. 4
      src/core/lib/iomgr/endpoint_pair_windows.cc
  9. 46
      src/core/lib/iomgr/socket_utils_common_posix.cc
  10. 84
      src/core/lib/iomgr/socket_utils_posix.cc
  11. 104
      src/core/lib/iomgr/socket_utils_posix.h
  12. 13
      src/core/lib/iomgr/tcp_client.cc
  13. 22
      src/core/lib/iomgr/tcp_client.h
  14. 12
      src/core/lib/iomgr/tcp_client_cfstream.cc
  15. 62
      src/core/lib/iomgr/tcp_client_posix.cc
  16. 21
      src/core/lib/iomgr/tcp_client_posix.h
  17. 12
      src/core/lib/iomgr/tcp_client_windows.cc
  18. 82
      src/core/lib/iomgr/tcp_posix.cc
  19. 4
      src/core/lib/iomgr/tcp_posix.h
  20. 9
      src/core/lib/iomgr/tcp_server.cc
  21. 15
      src/core/lib/iomgr/tcp_server.h
  22. 45
      src/core/lib/iomgr/tcp_server_posix.cc
  23. 4
      src/core/lib/iomgr/tcp_server_utils_posix.h
  24. 6
      src/core/lib/iomgr/tcp_server_utils_posix_common.cc
  25. 13
      src/core/lib/iomgr/tcp_server_windows.cc
  26. 1
      src/core/lib/iomgr/tcp_windows.cc
  27. 1
      src/core/lib/iomgr/tcp_windows.h
  28. 9
      src/core/lib/resource_quota/api.cc
  29. 6
      src/core/lib/resource_quota/api.h
  30. 13
      src/core/lib/transport/tcp_connect_handshaker.cc
  31. 27
      test/core/end2end/fixtures/http_proxy_fixture.cc
  32. 11
      test/core/end2end/fuzzers/api_fuzzer.cc
  33. 17
      test/core/event_engine/endpoint_config_test.cc
  34. 32
      test/core/event_engine/test_suite/client_test.cc
  35. 9
      test/core/event_engine/test_suite/event_engine_test_utils.cc
  36. 25
      test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm
  37. 13
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  38. 22
      test/core/iomgr/tcp_client_posix_test.cc
  39. 51
      test/core/iomgr/tcp_posix_test.cc
  40. 50
      test/core/iomgr/tcp_server_posix_test.cc
  41. 10
      test/core/surface/concurrent_connectivity_test.cc
  42. 12
      test/core/transport/chttp2/settings_timeout_test.cc
  43. 10
      test/core/util/test_tcp_server.cc
  44. 1
      test/cpp/end2end/client_lb_end2end_test.cc
  45. 38
      test/cpp/end2end/connection_attempt_injector.cc
  46. 40
      test/cpp/end2end/connection_attempt_injector.h
  47. 2
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  48. 2
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -2255,6 +2255,7 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/time",
"absl/types:optional",
"absl/functional:any_invocable",
],
tags = ["nofixdeps"],

@ -19,7 +19,7 @@
#include <string>
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include "absl/types/optional.h"
namespace grpc_event_engine {
namespace experimental {
@ -31,10 +31,16 @@ namespace experimental {
class EndpointConfig {
public:
virtual ~EndpointConfig() = default;
using Setting = absl::variant<absl::monostate, int, absl::string_view, void*>;
/// Returns the Setting for a specified key, or \a absl::monostate if there is
/// no such entry. Caller does not take ownership of the resulting value.
virtual Setting Get(absl::string_view key) const = 0;
// If the key points to an integer config, an integer value gets returned.
// Otherwise it returns an absl::nullopt_t
virtual absl::optional<int> GetInt(absl::string_view key) const = 0;
// If the key points to an string config, an string value gets returned.
// Otherwise it returns an absl::nullopt_t
virtual absl::optional<absl::string_view> GetString(
absl::string_view key) const = 0;
// If the key points to an void* config, a void* pointer value gets returned.
// Otherwise it returns nullptr
virtual void* GetVoidPointer(absl::string_view key) const = 0;
};
} // namespace experimental

@ -22,7 +22,6 @@
#include <stdint.h>
#include <memory>
#include <string>
#include <utility>
@ -50,6 +49,7 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/unique_type_name.h"
@ -401,12 +401,13 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
.PreconditionChannelArgs(args)
.SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority")
.SetObject(creds->Ref());
auto c_final_args = final_args.ToC();
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
grpc_fd_create(fd, "client", true), c_final_args.get(), "fd-client");
grpc_endpoint* client = grpc_tcp_create_from_fd(
grpc_fd_create(fd, "client", true),
grpc_event_engine::experimental::ChannelArgsEndpointConfig(final_args),
"fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);

@ -54,6 +54,7 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -87,7 +88,7 @@
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/tcp_client_posix.h"
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
namespace grpc_core {
@ -155,7 +156,8 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Start(grpc_endpoint* endpoint, const ChannelArgs& args);
// Needed to be able to grab an external ref in ActiveConnection::Start()
// Needed to be able to grab an external ref in
// ActiveConnection::Start()
using InternallyRefCounted<HandshakingState>::Ref;
private:
@ -200,8 +202,8 @@ class Chttp2ServerListener : public Server::ListenerInterface {
// Set by HandshakingState before the handshaking begins and reset when
// handshaking is done.
OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
// Set by HandshakingState when handshaking is done and a valid transport is
// created.
// Set by HandshakingState when handshaking is done and a valid transport
// is created.
grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_closure on_close_;
grpc_timer drain_grace_timer_;
@ -227,11 +229,11 @@ class Chttp2ServerListener : public Server::ListenerInterface {
grpc_closure* destroy_done);
// The interface required by RefCountedPtr<> has been manually implemented
// here to take a ref on tcp_server_ instead. Note that, the handshaker needs
// tcp_server_ to exist for the lifetime of the handshake since it's needed by
// acceptor. Sharing refs between the listener and tcp_server_ is just an
// optimization to avoid taking additional refs on the listener, since
// TcpServerShutdownComplete already holds a ref to the listener.
// here to take a ref on tcp_server_ instead. Note that, the handshaker
// needs tcp_server_ to exist for the lifetime of the handshake since it's
// needed by acceptor. Sharing refs between the listener and tcp_server_ is
// just an optimization to avoid taking additional refs on the listener,
// since TcpServerShutdownComplete already holds a ref to the listener.
void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
void IncrementRefCount(const DebugLocation& /* location */,
const char* /* reason */) {
@ -344,8 +346,8 @@ void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
listener_->is_serving_ = false;
connections = std::move(listener_->connections_);
}
// Send GOAWAYs on the transports so that they disconnected when existing RPCs
// finish.
// Send GOAWAYs on the transports so that they disconnected when existing
// RPCs finish.
for (auto& connection : connections) {
connection.first->SendGoAway();
}
@ -486,9 +488,9 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
self, grpc_schedule_on_exec_ctx);
// If the listener has been configured with a config fetcher, we need
// to watch on the transport being closed so that we can an updated
// list of active connections.
// If the listener has been configured with a config fetcher, we
// need to watch on the transport being closed so that we can an
// updated list of active connections.
grpc_closure* on_close = nullptr;
if (self->connection_->listener_->config_fetcher_watcher_ !=
nullptr) {
@ -627,8 +629,8 @@ void Chttp2ServerListener::ActiveConnection::OnClose(
{
MutexLock listener_lock(&self->listener_->mu_);
MutexLock connection_lock(&self->mu_);
// The node was already deleted from the connections_ list if the connection
// is shutdown.
// The node was already deleted from the connections_ list if the
// connection is shutdown.
if (!self->shutdown_) {
auto it = self->listener_->connections_.find(self);
if (it != self->listener_->connections_.end()) {
@ -678,8 +680,10 @@ grpc_error_handle Chttp2ServerListener::Create(
grpc_error_handle error = GRPC_ERROR_NONE;
// Create Chttp2ServerListener.
listener = new Chttp2ServerListener(server, args, args_modifier);
error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args.ToC().get(), &listener->tcp_server_);
error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&listener->tcp_server_);
if (!GRPC_ERROR_IS_NONE(error)) return error;
if (server->config_fetcher() != nullptr) {
listener->resolved_address_ = *addr;
@ -724,9 +728,10 @@ grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
Chttp2ServerArgsModifier args_modifier) {
Chttp2ServerListener* listener =
new Chttp2ServerListener(server, args, args_modifier);
grpc_error_handle error =
grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args.ToC().get(), &listener->tcp_server_);
grpc_error_handle error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&listener->tcp_server_);
if (!GRPC_ERROR_IS_NONE(error)) {
delete listener;
return error;
@ -1072,8 +1077,10 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
std::string name = absl::StrCat("fd:", fd);
auto memory_quota =
server_args.GetObject<grpc_core::ResourceQuota>()->memory_quota();
grpc_endpoint* server_endpoint = grpc_tcp_create(
grpc_fd_create(fd, name.c_str(), true), server_args.ToC().get(), name);
grpc_endpoint* server_endpoint = grpc_tcp_create_from_fd(
grpc_fd_create(fd, name.c_str(), true),
grpc_event_engine::experimental::ChannelArgsEndpointConfig(server_args),
name);
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false /* is_client */
);

@ -15,33 +15,25 @@
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include <string>
#include "absl/types/variant.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "absl/types/optional.h"
#include "src/core/lib/channel/channel_args.h"
namespace grpc_event_engine {
namespace experimental {
EndpointConfig::Setting ChannelArgsEndpointConfig::Get(
absl::optional<int> ChannelArgsEndpointConfig::GetInt(
absl::string_view key) const {
const grpc_arg* arg = grpc_channel_args_find(args_, std::string(key).c_str());
if (arg == nullptr) {
return absl::monostate();
}
switch (arg->type) {
case GRPC_ARG_STRING:
return absl::string_view(arg->value.string);
case GRPC_ARG_INTEGER:
return arg->value.integer;
case GRPC_ARG_POINTER:
return arg->value.pointer.p;
}
GPR_UNREACHABLE_CODE(return absl::monostate());
return args_.GetInt(key);
}
absl::optional<absl::string_view> ChannelArgsEndpointConfig::GetString(
absl::string_view key) const {
return args_.GetString(key);
}
void* ChannelArgsEndpointConfig::GetVoidPointer(absl::string_view key) const {
return args_.GetVoidPointer(key);
}
} // namespace experimental

@ -17,24 +17,30 @@
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_args.h"
namespace grpc_event_engine {
namespace experimental {
/// A readonly \a EndpointConfig based on grpc_channel_args. This class does not
/// take ownership of the grpc_endpoint_args*, and instances of this class
/// should not be used after the underlying args are destroyed.
class ChannelArgsEndpointConfig : public EndpointConfig {
public:
explicit ChannelArgsEndpointConfig(const grpc_channel_args* args)
ChannelArgsEndpointConfig() = default;
explicit ChannelArgsEndpointConfig(const grpc_core::ChannelArgs& args)
: args_(args) {}
Setting Get(absl::string_view key) const override;
ChannelArgsEndpointConfig(const ChannelArgsEndpointConfig& config) = default;
ChannelArgsEndpointConfig& operator=(const ChannelArgsEndpointConfig& other) =
default;
absl::optional<int> GetInt(absl::string_view key) const override;
absl::optional<absl::string_view> GetString(
absl::string_view key) const override;
void* GetVoidPointer(absl::string_view key) const override;
private:
const grpc_channel_args* args_;
grpc_core::ChannelArgs args_;
};
} // namespace experimental

@ -35,6 +35,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@ -60,17 +61,20 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
create_sockets(sv);
grpc_core::ExecCtx exec_ctx;
std::string final_name = absl::StrCat(name, ":client");
const grpc_channel_args* new_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args)
.ToC()
.release();
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name.c_str(), false),
new_args, "socketpair-server");
auto new_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
p.client = grpc_tcp_create(
grpc_fd_create(sv[1], final_name.c_str(), false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(new_args)),
"socketpair-server");
final_name = absl::StrCat(name, ":server");
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name.c_str(), false),
new_args, "socketpair-client");
grpc_channel_args_destroy(new_args);
p.server = grpc_tcp_create(
grpc_fd_create(sv[0], final_name.c_str(), false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(new_args)),
"socketpair-client");
return p;
}

@ -77,9 +77,9 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
create_sockets(sv);
grpc_core::ExecCtx exec_ctx;
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
channel_args, "endpoint:server");
"endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
channel_args, "endpoint:client");
"endpoint:client");
return p;
}

@ -43,12 +43,12 @@
#include <string>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -298,10 +298,9 @@ void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) {
/* Set TCP_USER_TIMEOUT */
grpc_error_handle grpc_set_socket_tcp_user_timeout(
int fd, const grpc_channel_args* channel_args, bool is_client) {
int fd, const grpc_core::PosixTcpOptions& options, bool is_client) {
// Use conditionally-important parameter to avoid warning
(void)fd;
(void)channel_args;
(void)is_client;
extern grpc_core::TraceFlag grpc_tcp_trace;
if (g_socket_supports_tcp_user_timeout.load() >= 0) {
@ -314,29 +313,13 @@ grpc_error_handle grpc_set_socket_tcp_user_timeout(
enable = g_default_server_tcp_user_timeout_enabled;
timeout = g_default_server_tcp_user_timeout_ms;
}
if (channel_args) {
for (unsigned int i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i], grpc_integer_options{0, 1, INT_MAX});
/* Continue using default if value is 0 */
if (value == 0) {
continue;
}
/* Disable if value is INT_MAX */
enable = value != INT_MAX;
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
&channel_args->args[i], grpc_integer_options{0, 1, INT_MAX});
/* Continue using default if value is 0 */
if (value == 0) {
continue;
}
timeout = value;
}
}
int value = options.keep_alive_time_ms;
if (value > 0) {
enable = value != INT_MAX;
}
value = options.keep_alive_timeout_ms;
if (value > 0) {
timeout = value;
}
if (enable) {
int newval;
@ -398,16 +381,11 @@ grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage,
}
grpc_error_handle grpc_apply_socket_mutator_in_args(
int fd, grpc_fd_usage usage, const grpc_channel_args* args) {
const grpc_arg* socket_mutator_arg =
grpc_channel_args_find(args, GRPC_ARG_SOCKET_MUTATOR);
if (socket_mutator_arg == nullptr) {
int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options) {
if (options.socket_mutator == nullptr) {
return GRPC_ERROR_NONE;
}
GPR_DEBUG_ASSERT(socket_mutator_arg->type == GRPC_ARG_POINTER);
grpc_socket_mutator* mutator =
static_cast<grpc_socket_mutator*>(socket_mutator_arg->value.pointer.p);
return grpc_set_socket_with_mutator(fd, usage, mutator);
return grpc_set_socket_with_mutator(fd, usage, options.socket_mutator);
}
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;

@ -18,18 +18,100 @@
#include <grpc/support/port_platform.h>
#include "absl/types/optional.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKETUTILS
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#endif
#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_core::PosixTcpOptions;
namespace {
int AdjustValue(int default_value, int min_value, int max_value,
absl::optional<int> actual_value) {
if (!actual_value.has_value() || *actual_value < min_value ||
*actual_value > max_value) {
return default_value;
}
return *actual_value;
}
} // namespace
PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
void* value;
PosixTcpOptions options;
options.tcp_read_chunk_size = AdjustValue(
PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE));
options.tcp_min_read_chunk_size =
AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1,
PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE));
options.tcp_max_read_chunk_size =
AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1,
PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE));
options.tcp_tx_zerocopy_send_bytes_threshold =
AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD));
options.tcp_tx_zerocopy_max_simultaneous_sends =
AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
options.tcp_tx_zero_copy_enabled =
(AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);
options.keep_alive_time_ms =
AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS));
options.keep_alive_timeout_ms =
AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS));
options.expand_wildcard_addrs =
(AdjustValue(0, 1, INT_MAX,
config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
options.allow_reuse_port =
(AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
0);
if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;
}
options.tcp_read_chunk_size = grpc_core::Clamp(
options.tcp_read_chunk_size, options.tcp_min_read_chunk_size,
options.tcp_max_read_chunk_size);
value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
if (value != nullptr) {
options.resource_quota =
reinterpret_cast<grpc_core::ResourceQuota*>(value)->Ref();
}
value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR);
if (value != nullptr) {
options.socket_mutator =
grpc_socket_mutator_ref(static_cast<grpc_socket_mutator*>(value));
}
return options;
}
#endif /* GRPC_POSIX_SOCKET_TCP */
#ifdef GRPC_POSIX_SOCKETUTILS
int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int cloexec) {

@ -21,15 +21,14 @@
#include <grpc/support/port_platform.h>
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_factory_posix.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#ifdef GRPC_LINUX_ERRQUEUE
#ifndef SO_ZEROCOPY
@ -40,6 +39,98 @@
#endif
#endif /* ifdef GRPC_LINUX_ERRQUEUE */
namespace grpc_core {
struct PosixTcpOptions {
static constexpr int kDefaultReadChunkSize = 8192;
static constexpr int kDefaultMinReadChunksize = 256;
static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024;
static constexpr int kZerocpTxEnabledDefault = 0;
static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
int keep_alive_time_ms = 0;
int keep_alive_timeout_ms = 0;
bool expand_wildcard_addrs = false;
bool allow_reuse_port = false;
RefCountedPtr<ResourceQuota> resource_quota;
struct grpc_socket_mutator* socket_mutator = nullptr;
PosixTcpOptions() = default;
// Move ctor
PosixTcpOptions(PosixTcpOptions&& other) noexcept {
socket_mutator = absl::exchange(other.socket_mutator, nullptr);
resource_quota = std::move(other.resource_quota);
CopyIntegerOptions(other);
}
// Move assignment
PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept {
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
}
socket_mutator = absl::exchange(other.socket_mutator, nullptr);
resource_quota = std::move(other.resource_quota);
CopyIntegerOptions(other);
return *this;
}
// Copy ctor
PosixTcpOptions(const PosixTcpOptions& other) {
if (other.socket_mutator != nullptr) {
socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
}
resource_quota = other.resource_quota;
CopyIntegerOptions(other);
}
// Copy assignment
PosixTcpOptions& operator=(const PosixTcpOptions& other) {
if (&other == this) {
return *this;
}
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
socket_mutator = nullptr;
}
if (other.socket_mutator != nullptr) {
socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
}
resource_quota = other.resource_quota;
CopyIntegerOptions(other);
return *this;
}
// Destructor.
~PosixTcpOptions() {
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
}
}
private:
void CopyIntegerOptions(const PosixTcpOptions& other) {
tcp_read_chunk_size = other.tcp_read_chunk_size;
tcp_min_read_chunk_size = other.tcp_min_read_chunk_size;
tcp_max_read_chunk_size = other.tcp_max_read_chunk_size;
tcp_tx_zerocopy_send_bytes_threshold =
other.tcp_tx_zerocopy_send_bytes_threshold;
tcp_tx_zerocopy_max_simultaneous_sends =
other.tcp_tx_zerocopy_max_simultaneous_sends;
tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled;
keep_alive_time_ms = other.keep_alive_time_ms;
keep_alive_timeout_ms = other.keep_alive_timeout_ms;
expand_wildcard_addrs = other.expand_wildcard_addrs;
allow_reuse_port = other.allow_reuse_port;
}
};
} // namespace grpc_core
grpc_core::PosixTcpOptions TcpOptionsFromEndpointConfig(
const grpc_event_engine::experimental::EndpointConfig& config);
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int cloexec);
@ -70,7 +161,7 @@ void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client);
/* Set TCP_USER_TIMEOUT */
grpc_error_handle grpc_set_socket_tcp_user_timeout(
int fd, const grpc_channel_args* channel_args, bool is_client);
int fd, const grpc_core::PosixTcpOptions& options, bool is_client);
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
@ -104,9 +195,10 @@ grpc_error_handle grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes);
grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage,
grpc_socket_mutator* mutator);
/* Extracts the first socket mutator from args if any and applies on the fd. */
/* Extracts the first socket mutator from config if any and applies on the fd.
*/
grpc_error_handle grpc_apply_socket_mutator_in_args(
int fd, grpc_fd_usage usage, const grpc_channel_args* args);
int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options);
/* An enum to keep track of IPv4/IPv6 socket modes.

@ -22,14 +22,13 @@
grpc_tcp_client_vtable* grpc_tcp_client_impl;
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
int64_t grpc_tcp_client_connect(
grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
return grpc_tcp_client_impl->connect(on_connect, endpoint, interested_parties,
channel_args, addr, deadline);
config, addr, deadline);
}
bool grpc_tcp_client_cancel_connect(int64_t connection_handle) {

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/time.h>
@ -30,11 +31,11 @@
#include "src/core/lib/resource_quota/memory_quota.h"
typedef struct grpc_tcp_client_vtable {
int64_t (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
int64_t (*connect)(
grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
bool (*cancel_connect)(int64_t connection_handle);
} grpc_tcp_client_vtable;
@ -45,12 +46,11 @@ typedef struct grpc_tcp_client_vtable {
in this connection being established (in order to continue their work). It
returns a handle to the connect operation which can be used to cancel the
connection attempt. */
int64_t grpc_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
int64_t grpc_tcp_client_connect(
grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
// Returns true if a connect attempt corresponding to the provided handle
// is successfully cancelled. Otherwise it returns false. If the connect

@ -27,12 +27,12 @@
#include <netinet/in.h>
#include <string.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/cfstream_handle.h"
#include "src/core/lib/iomgr/closure.h"
@ -149,11 +149,11 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr,
*port = grpc_sockaddr_get_port(addr);
}
static int64_t CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* resolved_addr,
grpc_core::Timestamp deadline) {
static int64_t CFStreamClientConnect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& /*config*/,
const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
if (!addr_uri.ok()) {
grpc_error_handle error =

@ -35,7 +35,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
@ -51,6 +50,8 @@
extern grpc_core::TraceFlag grpc_tcp_trace;
using ::grpc_event_engine::experimental::EndpointConfig;
struct async_connect {
gpr_mu mu;
grpc_fd* fd;
@ -62,9 +63,9 @@ struct async_connect {
std::string addr_str;
grpc_endpoint** ep;
grpc_closure* closure;
grpc_channel_args* channel_args;
int64_t connection_handle;
bool connect_cancelled;
grpc_core::PosixTcpOptions options;
};
struct ConnectionShard {
@ -90,9 +91,9 @@ void grpc_tcp_client_global_init() {
gpr_once_init(&g_tcp_client_posix_init, do_tcp_client_global_init);
}
static grpc_error_handle prepare_socket(const grpc_resolved_address* addr,
int fd,
const grpc_channel_args* channel_args) {
static grpc_error_handle prepare_socket(
const grpc_resolved_address* addr, int fd,
const grpc_core::PosixTcpOptions& options) {
grpc_error_handle err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@ -106,15 +107,14 @@ static grpc_error_handle prepare_socket(const grpc_resolved_address* addr,
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_set_socket_reuse_addr(fd, 1);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_set_socket_tcp_user_timeout(fd, channel_args,
true /* is_client */);
err = grpc_set_socket_tcp_user_timeout(fd, options, true /* is_client */);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE,
channel_args);
options);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
goto done;
@ -143,15 +143,20 @@ static void tc_on_alarm(void* acp, grpc_error_handle error) {
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
grpc_channel_args_destroy(ac->channel_args);
delete ac;
}
}
grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_fd* fd, const grpc_channel_args* channel_args,
static grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_fd* fd, const grpc_core::PosixTcpOptions& options,
absl::string_view addr_str) {
return grpc_tcp_create(fd, options, addr_str);
}
grpc_endpoint* grpc_tcp_create_from_fd(
grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config,
absl::string_view addr_str) {
return grpc_tcp_create(fd, channel_args, addr_str);
return grpc_tcp_create(fd, TcpOptionsFromEndpointConfig(config), addr_str);
}
static void on_writable(void* acp, grpc_error_handle error) {
@ -207,7 +212,7 @@ static void on_writable(void* acp, grpc_error_handle error) {
switch (so_error) {
case 0:
grpc_pollset_set_del_fd(ac->interested_parties, fd);
*ep = grpc_tcp_client_create_from_fd(fd, ac->channel_args, ac->addr_str);
*ep = grpc_tcp_client_create_from_fd(fd, ac->options, ac->addr_str);
fd = nullptr;
break;
case ENOBUFS:
@ -269,7 +274,6 @@ finish:
// This is safe even outside the lock, because "done", the sentinel, is
// populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
grpc_channel_args_destroy(ac->channel_args);
delete ac;
}
// Push async connect closure to the executor since this may actually be
@ -284,8 +288,9 @@ finish:
}
grpc_error_handle grpc_tcp_client_prepare_fd(
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_resolved_address* mapped_addr, int* fd) {
const grpc_core::PosixTcpOptions& options,
const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr,
int* fd) {
grpc_dualstack_mode dsmode;
grpc_error_handle error;
*fd = -1;
@ -306,8 +311,7 @@ grpc_error_handle grpc_tcp_client_prepare_fd(
memcpy(mapped_addr, addr, sizeof(*mapped_addr));
}
}
if ((error = prepare_socket(mapped_addr, *fd, channel_args)) !=
GRPC_ERROR_NONE) {
if ((error = prepare_socket(mapped_addr, *fd, options)) != GRPC_ERROR_NONE) {
return error;
}
return GRPC_ERROR_NONE;
@ -315,8 +319,9 @@ grpc_error_handle grpc_tcp_client_prepare_fd(
int64_t grpc_tcp_client_create_from_prepared_fd(
grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_core::Timestamp deadline, grpc_endpoint** ep) {
const grpc_core::PosixTcpOptions& options,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline,
grpc_endpoint** ep) {
int err;
do {
err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
@ -342,7 +347,7 @@ int64_t grpc_tcp_client_create_from_prepared_fd(
if (err >= 0) {
// Connection already succeded. Return 0 to discourage any cancellation
// attempts.
*ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_uri.value());
*ep = grpc_tcp_client_create_from_fd(fdobj, options, addr_uri.value());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
return 0;
}
@ -371,7 +376,7 @@ int64_t grpc_tcp_client_create_from_prepared_fd(
ac->refs = 2;
GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
ac->options = options;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
@ -395,21 +400,21 @@ int64_t grpc_tcp_client_create_from_prepared_fd(
static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_resolved_address mapped_addr;
grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config));
int fd = -1;
grpc_error_handle error;
*ep = nullptr;
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fd)) != GRPC_ERROR_NONE) {
if ((error = grpc_tcp_client_prepare_fd(options, addr, &mapped_addr, &fd)) !=
GRPC_ERROR_NONE) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return 0;
}
return grpc_tcp_client_create_from_prepared_fd(interested_parties, closure,
fd, channel_args, &mapped_addr,
deadline, ep);
return grpc_tcp_client_create_from_prepared_fd(
interested_parties, closure, fd, options, &mapped_addr, deadline, ep);
}
static bool tcp_cancel_connect(int64_t connection_handle) {
@ -458,7 +463,6 @@ static bool tcp_cancel_connect(int64_t connection_handle) {
// This is safe even outside the lock, because "done", the sentinel, is
// populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
grpc_channel_args_destroy(ac->channel_args);
delete ac;
}
return connection_cancel_success;

@ -23,23 +23,24 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_client.h"
/* Create an endpoint from a connected grpc_fd.
fd: a connected FD. Ownership is taken.
channel_args: may contain custom settings for the endpoint
config: may contain custom settings for the endpoint
addr_str: destination address in printable format
slice_allocator: ownership is taken by client.
Returns: a new endpoint
*/
grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_fd* fd, const grpc_channel_args* channel_args,
grpc_endpoint* grpc_tcp_create_from_fd(
grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config,
absl::string_view addr_str);
/* Return a configured, unbound, unconnected TCP client fd.
channel_args: may contain custom settings for the fd
options: may contain custom settings for the fd
addr: the destination address
mapped_addr: out parameter. addr mapped to an address appropriate to the
type of socket FD created. For example, if addr is IPv4 and dual stack
@ -48,8 +49,9 @@ grpc_endpoint* grpc_tcp_client_create_from_fd(
Returns: error, if any. Out parameters are not set on error
*/
grpc_error_handle grpc_tcp_client_prepare_fd(
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_resolved_address* mapped_addr, int* fd);
const grpc_core::PosixTcpOptions& options,
const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr,
int* fd);
/* Connect a configured TCP client fd.
@ -57,13 +59,14 @@ grpc_error_handle grpc_tcp_client_prepare_fd(
connection being established (in order to continue their work
closure: called when complete. On success, *ep will be set.
fd: an FD returned from grpc_tcp_client_prepare_fd().
channel_args: may contain custom settings for the endpoint
options: may contain custom settings for the endpoint
deadline: connection deadline
ep: out parameter. Set before closure is called if successful
*/
int64_t grpc_tcp_client_create_from_prepared_fd(
grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_core::Timestamp deadline, grpc_endpoint** ep);
const grpc_core::PosixTcpOptions& options,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline,
grpc_endpoint** ep);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */

@ -24,13 +24,13 @@
#ifdef GRPC_WINSOCK_SOCKET
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/log_windows.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_windows.h"
@ -38,8 +38,11 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/slice/slice_internal.h"
using ::grpc_event_engine::experimental::EndpointConfig;
struct async_connect {
grpc_closure* on_done;
gpr_mu mu;
@ -50,7 +53,6 @@ struct async_connect {
int refs;
grpc_closure on_connect;
grpc_endpoint** endpoint;
grpc_channel_args* channel_args;
};
static void async_connect_unlock_and_cleanup(async_connect* ac,
@ -58,7 +60,6 @@ static void async_connect_unlock_and_cleanup(async_connect* ac,
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
grpc_channel_args_destroy(ac->channel_args);
gpr_mu_destroy(&ac->mu);
delete ac;
}
@ -105,7 +106,7 @@ static void on_connect(void* acp, grpc_error_handle error) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
closesocket(socket->socket);
} else {
*ep = grpc_tcp_create(socket, ac->channel_args, ac->addr_name);
*ep = grpc_tcp_create(socket, ac->addr_name);
socket = nullptr;
}
} else {
@ -123,7 +124,7 @@ static void on_connect(void* acp, grpc_error_handle error) {
notification request for the connection, and one timeout alert. */
static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
SOCKET sock = INVALID_SOCKET;
@ -208,7 +209,6 @@ static int64_t tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
ac->refs = 2;
ac->addr_name = addr_uri.value();
ac->endpoint = endpoint;
ac->channel_args = grpc_channel_args_copy(channel_args);
GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);

@ -48,7 +48,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
@ -466,8 +465,12 @@ using grpc_core::TcpZerocopySendRecord;
namespace {
struct grpc_tcp {
grpc_tcp(int max_sends, size_t send_bytes_threshold)
: tcp_zerocopy_send_ctx(max_sends, send_bytes_threshold) {}
explicit grpc_tcp(const grpc_core::PosixTcpOptions& tcp_options)
: min_read_chunk_size(tcp_options.tcp_min_read_chunk_size),
max_read_chunk_size(tcp_options.tcp_max_read_chunk_size),
tcp_zerocopy_send_ctx(
tcp_options.tcp_tx_zerocopy_max_simultaneous_sends,
tcp_options.tcp_tx_zerocopy_send_bytes_threshold) {}
grpc_endpoint base;
grpc_fd* em_fd;
int fd;
@ -1891,72 +1894,16 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_get_fd,
tcp_can_track_err};
#define MAX_CHUNK_SIZE (32 * 1024 * 1024)
grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
const grpc_channel_args* channel_args,
const grpc_core::PosixTcpOptions& options,
absl::string_view peer_string) {
static constexpr bool kZerocpTxEnabledDefault = false;
int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
int tcp_max_read_chunk_size = 4 * 1024 * 1024;
int tcp_min_read_chunk_size = 256;
bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault;
int tcp_tx_zerocopy_send_bytes_thresh =
grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold;
int tcp_tx_zerocopy_max_simult_sends =
grpc_core::TcpZerocopySendCtx::kDefaultMaxSends;
if (channel_args != nullptr) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
tcp_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) {
grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
tcp_min_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) {
grpc_integer_options options = {tcp_read_chunk_size, 1, MAX_CHUNK_SIZE};
tcp_max_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) {
tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool(
&channel_args->args[i], kZerocpTxEnabledDefault);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)) {
grpc_integer_options options = {
grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0,
INT_MAX};
tcp_tx_zerocopy_send_bytes_thresh =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)) {
grpc_integer_options options = {
grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX};
tcp_tx_zerocopy_max_simult_sends =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
}
}
}
if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
tcp_min_read_chunk_size = tcp_max_read_chunk_size;
}
tcp_read_chunk_size = grpc_core::Clamp(
tcp_read_chunk_size, tcp_min_read_chunk_size, tcp_max_read_chunk_size);
grpc_tcp* tcp = new grpc_tcp(tcp_tx_zerocopy_max_simult_sends,
tcp_tx_zerocopy_send_bytes_thresh);
grpc_tcp* tcp = new grpc_tcp(options);
tcp->base.vtable = &vtable;
tcp->peer_string = std::string(peer_string);
tcp->fd = grpc_fd_wrapped_fd(em_fd);
tcp->memory_owner = grpc_core::ResourceQuotaFromChannelArgs(channel_args)
->memory_quota()
->CreateMemoryOwner(peer_string);
GPR_ASSERT(options.resource_quota != nullptr);
tcp->memory_owner =
options.resource_quota->memory_quota()->CreateMemoryOwner(peer_string);
tcp->self_reservation = tcp->memory_owner.MakeReservation(sizeof(grpc_tcp));
grpc_resolved_address resolved_local_addr;
memset(&resolved_local_addr, 0, sizeof(resolved_local_addr));
@ -1975,9 +1922,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->current_zerocopy_send = nullptr;
tcp->release_fd_cb = nullptr;
tcp->release_fd = nullptr;
tcp->target_length = static_cast<double>(tcp_read_chunk_size);
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->target_length = static_cast<double>(options.tcp_read_chunk_size);
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
@ -1987,7 +1932,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->outgoing_buffer_arg = nullptr;
tcp->frame_size_tuning_enabled = grpc_core::IsTcpFrameSizeTuningEnabled();
tcp->min_progress_size = 1;
if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
if (options.tcp_tx_zero_copy_enabled &&
!tcp->tcp_zerocopy_send_ctx.memory_limited()) {
#ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1;
auto err =

@ -36,12 +36,14 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
/// Create a tcp endpoint given a file desciptor and a read slice size.
/// Takes ownership of \a fd. Takes ownership of the \a slice_allocator.
grpc_endpoint* grpc_tcp_create(grpc_fd* fd, const grpc_channel_args* args,
grpc_endpoint* grpc_tcp_create(grpc_fd* fd,
const grpc_core::PosixTcpOptions& options,
absl::string_view peer_string);
/// Return the tcp endpoint's fd, or -1 if this is not available. Does not

@ -22,10 +22,11 @@
grpc_tcp_server_vtable* grpc_tcp_server_impl;
grpc_error_handle grpc_tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server) {
return grpc_tcp_server_impl->create(shutdown_complete, args, server);
grpc_error_handle grpc_tcp_server_create(
grpc_closure* shutdown_complete,
const grpc_event_engine::experimental::EndpointConfig& config,
grpc_tcp_server** server) {
return grpc_tcp_server_impl->create(shutdown_complete, config, server);
}
void grpc_tcp_server_start(grpc_tcp_server* server,

@ -23,6 +23,7 @@
#include <vector>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -63,9 +64,10 @@ class TcpServerFdHandler {
} // namespace grpc_core
typedef struct grpc_tcp_server_vtable {
grpc_error_handle (*create)(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server);
grpc_error_handle (*create)(
grpc_closure* shutdown_complete,
const grpc_event_engine::experimental::EndpointConfig& config,
grpc_tcp_server** server);
void (*start)(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* pollsets,
grpc_tcp_server_cb on_accept_cb, void* cb_arg);
@ -86,9 +88,10 @@ typedef struct grpc_tcp_server_vtable {
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero.
Takes ownership of the slice_allocator_factory. */
grpc_error_handle grpc_tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server);
grpc_error_handle grpc_tcp_server_create(
grpc_closure* shutdown_complete,
const grpc_event_engine::experimental::EndpointConfig& config,
grpc_tcp_server** server);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_tcp_server* server,

@ -43,13 +43,13 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -64,31 +64,21 @@
static std::atomic<int64_t> num_dropped_connections{0};
using ::grpc_event_engine::experimental::EndpointConfig;
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
const EndpointConfig& config,
grpc_tcp_server** server) {
grpc_tcp_server* s = new grpc_tcp_server;
s->so_reuseport = grpc_is_socket_reuse_port_supported();
s->expand_wildcard_addrs = false;
for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
(args->args[i].value.integer != 0);
} else {
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
} else {
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
}
}
auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
if (value.has_value()) {
s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0);
}
value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
if (value.has_value()) {
s->expand_wildcard_addrs = (*value != 0);
}
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
@ -103,10 +93,10 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
s->head = nullptr;
s->tail = nullptr;
s->nports = 0;
s->channel_args = grpc_channel_args_copy(args);
s->options = TcpOptionsFromEndpointConfig(config);
s->fd_handler = nullptr;
s->memory_quota =
grpc_core::ResourceQuotaFromChannelArgs(args)->memory_quota();
GPR_ASSERT(s->options.resource_quota != nullptr);
s->memory_quota = s->options.resource_quota->memory_quota();
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
@ -126,7 +116,6 @@ static void finish_shutdown(grpc_tcp_server* s) {
s->head = sp->next;
gpr_free(sp);
}
grpc_channel_args_destroy(s->channel_args);
delete s->fd_handler;
delete s;
}
@ -252,7 +241,7 @@ static void on_read(void* arg, grpc_error_handle err) {
(void)grpc_set_socket_no_sigpipe_if_possible(fd);
err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
sp->server->channel_args);
sp->server->options);
if (!GRPC_ERROR_IS_NONE(err)) {
goto error;
}
@ -287,7 +276,7 @@ static void on_read(void* arg, grpc_error_handle err) {
acceptor->external_connection = false;
sp->server->on_accept_cb(
sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->channel_args, addr_uri.value()),
grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()),
read_notifier_pollset, acceptor);
}
@ -639,7 +628,7 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
acceptor->listener_fd = listener_fd;
acceptor->pending_data = buf;
s_->on_accept_cb(s_->on_accept_cb_arg,
grpc_tcp_create(fdobj, s_->channel_args, addr_uri.value()),
grpc_tcp_create(fdobj, s_->options, addr_uri.value()),
read_notifier_pollset, acceptor);
}

@ -90,8 +90,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign = 0;
/* channel args for this server */
grpc_channel_args* channel_args = nullptr;
/* Contains config extracted from channel args for this server */
grpc_core::PosixTcpOptions options;
/* a handler for external connections, owned */
grpc_core::TcpServerFdHandler* fd_handler = nullptr;

@ -178,15 +178,15 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_set_socket_reuse_addr(fd, 1);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_set_socket_tcp_user_timeout(fd, s->channel_args,
false /* is_client */);
err =
grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_LISTENER_USAGE,
s->channel_args);
s->options);
if (!GRPC_ERROR_IS_NONE(err)) goto error;
if (bind(fd, reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),

@ -29,6 +29,7 @@
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/log_windows.h>
@ -37,7 +38,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/pollset_windows.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -45,10 +45,13 @@
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/slice/slice_internal.h"
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
using ::grpc_event_engine::experimental::EndpointConfig;
/* one listening port */
typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
@ -94,17 +97,14 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure* shutdown_complete;
grpc_channel_args* channel_args;
};
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
const EndpointConfig& config,
grpc_tcp_server** server) {
grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
s->channel_args = grpc_channel_args_copy(args);
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
@ -132,7 +132,6 @@ static void destroy_server(void* arg, grpc_error_handle error) {
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
grpc_channel_args_destroy(s->channel_args);
gpr_mu_destroy(&s->mu);
gpr_free(s);
}
@ -363,7 +362,7 @@ static void on_accept(void* arg, grpc_error_handle error) {
}
std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
sp->server->channel_args, peer_name_string);
peer_name_string);
} else {
closesocket(sock);
}

@ -505,7 +505,6 @@ static grpc_endpoint_vtable vtable = {win_read,
win_can_track_err};
grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_channel_args* channel_args,
absl::string_view peer_string) {
// TODO(jtattermusch): C++ize grpc_tcp and its dependencies (i.e. add
// constructors) to ensure proper initialization

@ -41,7 +41,6 @@
* Takes ownership of the handle.
*/
grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_channel_args* channel_args,
absl::string_view peer_string);
grpc_error_handle grpc_tcp_prepare_socket(SOCKET sock);

@ -44,6 +44,15 @@ ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(
->Ref();
}
ResourceQuotaRefPtr ResourceQuotaFromEndpointConfig(
const grpc_event_engine::experimental::EndpointConfig& config) {
void* value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
if (value != nullptr) {
return reinterpret_cast<ResourceQuota*>(value)->Ref();
}
return nullptr;
}
ChannelArgs EnsureResourceQuotaInChannelArgs(const ChannelArgs& args) {
if (args.GetObject<ResourceQuota>() != nullptr) return args;
// If there's no existing quota, add it to the default one - shared between

@ -19,6 +19,7 @@
#include <stddef.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/config/core_configuration.h"
@ -36,6 +37,11 @@ constexpr size_t kResourceQuotaChannelSize = 34 * 1024;
// UB if not set.
ResourceQuotaRefPtr ResourceQuotaFromChannelArgs(const grpc_channel_args* args);
// Retrieve the resource quota from the EndpointConfig.
// Returns nullptr if not set.
ResourceQuotaRefPtr ResourceQuotaFromEndpointConfig(
const grpc_event_engine::experimental::EndpointConfig& config);
void RegisterResourceQuota(CoreConfiguration::Builder* builder);
} // namespace grpc_core

@ -35,6 +35,7 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -154,9 +155,10 @@ void TCPConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
// we don't want to pass args->endpoint directly.
// Instead pass endpoint_ and swap this endpoint to
// args endpoint on success.
grpc_tcp_client_connect(&connected_, &endpoint_to_destroy_,
interested_parties_, args->args.ToC().get(), &addr_,
args->deadline);
grpc_tcp_client_connect(
&connected_, &endpoint_to_destroy_, interested_parties_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args->args),
&addr_, args->deadline);
}
void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
@ -179,8 +181,9 @@ void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
self->shutdown_ = true;
self->FinishLocked(error);
} else {
// The on_handshake_done_ is already as part of shutdown when connecting
// So nothing to be done here other than unrefing the error.
// The on_handshake_done_ is already as part of shutdown when
// connecting So nothing to be done here other than unrefing the
// error.
GRPC_ERROR_UNREF(error);
}
return;

@ -22,7 +22,6 @@
#include <string.h>
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
@ -40,6 +39,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/thd.h"
@ -557,11 +557,11 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) {
grpc_schedule_on_exec_ctx);
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint,
conn->pollset_set, args.get(), &(*addresses_or)[0],
deadline);
.PreconditionChannelArgs(nullptr);
grpc_tcp_client_connect(
&conn->on_server_connect_done, &conn->server_endpoint, conn->pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&(*addresses_or)[0], deadline);
}
static void on_read_request_done(void* arg, grpc_error_handle error) {
@ -632,13 +632,14 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port);
gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name.c_str());
// Create TCP server.
proxy->channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args)
.ToC()
.release();
grpc_error_handle error =
grpc_tcp_server_create(nullptr, proxy->channel_args, &proxy->server);
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
proxy->channel_args = channel_args.ToC().release();
grpc_error_handle error = grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args),
&proxy->server);
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
// Bind to port.
grpc_resolved_address resolved_addr;

@ -36,6 +36,7 @@
#include "absl/types/optional.h"
#include <grpc/byte_buffer.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
@ -312,11 +313,11 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
}
static int64_t my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_channel_args* /*channel_args*/,
const grpc_resolved_address* /*addr*/,
grpc_core::Timestamp deadline) {
static int64_t my_tcp_client_connect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* /*interested_parties*/,
const grpc_event_engine::experimental::EndpointConfig& /*config*/,
const grpc_resolved_address* /*addr*/, grpc_core::Timestamp deadline) {
sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
return 0;
}

@ -26,16 +26,17 @@
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
TEST(EndpointConfigTest, CanSRetrieveValuesFromChannelArgs) {
grpc_arg arg = grpc_channel_arg_integer_create(const_cast<char*>("arst"), 3);
const grpc_channel_args args = {1, &arg};
ChannelArgsEndpointConfig config(&args);
EXPECT_EQ(absl::get<int>(config.Get("arst")), 3);
grpc_core::ChannelArgs args;
args = args.Set("arst", 3);
ChannelArgsEndpointConfig config(args);
EXPECT_EQ(*config.GetInt("arst"), 3);
}
TEST(EndpointConfigTest, ReturnsMonostateForMissingKeys) {
ChannelArgsEndpointConfig config(nullptr);
EXPECT_TRUE(
absl::holds_alternative<absl::monostate>(config.Get("nonexistent")));
TEST(EndpointConfigTest, ReturnsNoValueForMissingKeys) {
ChannelArgsEndpointConfig config;
EXPECT_TRUE(!config.GetInt("nonexistent").has_value());
EXPECT_TRUE(!config.GetString("nonexistent").has_value());
EXPECT_EQ(config.GetVoidPointer("nonexistent"), nullptr);
}
int main(int argc, char** argv) {

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <random>
#include <string>
#include <thread>
@ -36,6 +37,8 @@
class EventEngineClientTest : public EventEngineTest {};
using namespace std::chrono_literals;
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
@ -44,7 +47,6 @@ using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
using namespace std::chrono_literals;
constexpr int kMinMessageSize = 1024;
constexpr int kMaxMessageSize = 4096;
@ -82,9 +84,10 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
grpc_core::ExecCtx ctx;
auto test_ee = this->NewEventEngine();
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
// Create a test EventEngine client endpoint and connect to a non existent
// listener.
ChannelArgsEndpointConfig config;
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
@ -92,8 +95,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
EXPECT_FALSE(status.ok());
client_endpoint_promise.Set(nullptr);
},
URIToResolvedAddress("ipv6:[::1]:7000"),
ChannelArgsEndpointConfig(nullptr),
URIToResolvedAddress("ipv6:[::1]:7000"), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
@ -108,7 +110,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
grpc_core::ExecCtx ctx;
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
@ -121,11 +123,11 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
server_endpoint_promise.Set(std::move(ep));
};
ChannelArgsEndpointConfig config;
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
absl::make_unique<grpc_core::MemoryQuota>("foo"));
EXPECT_TRUE(status.ok());
std::unique_ptr<Listener> listener = std::move(*status);
@ -143,7 +145,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
client_endpoint_promise.Set(std::move(*status));
}
},
URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
@ -173,7 +175,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
static constexpr int kNumConnections = 10; // M
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
std::vector<std::string> target_addrs;
@ -186,11 +188,11 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint_promise.Set(std::move(ep));
};
ChannelArgsEndpointConfig config;
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
absl::make_unique<grpc_core::MemoryQuota>("foo"));
EXPECT_TRUE(status.ok());
std::unique_ptr<Listener> listener = std::move(*status);
@ -207,6 +209,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
// Create a test EventEngine client endpoint and connect to a one of the
// addresses bound to the oracle listener. Verify that the connection
// succeeds.
ChannelArgsEndpointConfig config;
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
@ -218,8 +221,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
client_endpoint_promise.Set(std::move(*status));
}
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
ChannelArgsEndpointConfig(nullptr),
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), config,
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),
24h);

@ -137,11 +137,11 @@ absl::Status ConnectionManager::BindAndStartListener(
EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
: test_event_engine_.get();
ChannelArgsEndpointConfig config;
auto status = event_engine->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
absl::make_unique<grpc_core::MemoryQuota>("foo"));
if (!status.ok()) {
return status.status();
}
@ -174,6 +174,7 @@ ConnectionManager::CreateConnection(std::string target_addr,
absl::StrCat("connection-", std::to_string(num_processed_connections_++));
EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
: test_event_engine_.get();
ChannelArgsEndpointConfig config;
event_engine->Connect(
[this](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
@ -184,7 +185,7 @@ ConnectionManager::CreateConnection(std::string target_addr,
last_in_progress_connection_.SetClientEndpoint(std::move(*status));
}
},
URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
URIToResolvedAddress(target_addr), config,
memory_quota_->CreateMemoryAllocator(conn_name), timeout);
auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();

@ -30,6 +30,7 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
@ -103,12 +104,12 @@ static void must_fail(void* arg, grpc_error_handle error) {
/* connect to it */
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr, (socklen_t*)&resolved_addr->len) == 0);
GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &g_connecting, nullptr, args.get(), &*resolved_addr,
grpc_core::Timestamp::InfFuture());
auto args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
grpc_tcp_client_connect(&done, &g_connecting, nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&*resolved_addr, grpc_core::Timestamp::InfFuture());
/* await the connection */
do {
@ -160,12 +161,12 @@ static void must_fail(void* arg, grpc_error_handle error) {
/* connect to a broken address */
GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &g_connecting, nullptr, args.get(), &*resolved_addr,
grpc_core::Timestamp::InfFuture());
auto args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
grpc_tcp_client_connect(&done, &g_connecting, nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&*resolved_addr, grpc_core::Timestamp::InfFuture());
grpc_core::ExecCtx::Get()->Flush();

@ -32,6 +32,7 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
@ -125,12 +126,12 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
/* connect to it */
XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0);
init_event_closure(&done, &connected_promise);
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &ep_, nullptr, args.get(), &*resolved_addr,
grpc_core::Timestamp::InfFuture());
auto args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
grpc_tcp_client_connect(&done, &ep_, nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&*resolved_addr, grpc_core::Timestamp::InfFuture());
/* await the connection */
do {

@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@ -116,8 +117,9 @@ void test_succeeds(void) {
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
int64_t connection_handle = grpc_tcp_client_connect(
&done, &g_connecting, g_pollset_set, args.ToC().get(), &resolved_addr,
grpc_core::Timestamp::InfFuture());
&done, &g_connecting, g_pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&resolved_addr, grpc_core::Timestamp::InfFuture());
/* await the connection */
do {
resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
@ -169,8 +171,9 @@ void test_fails(void) {
/* connect to a broken address */
GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
int64_t connection_handle = grpc_tcp_client_connect(
&done, &g_connecting, g_pollset_set, nullptr, &resolved_addr,
grpc_core::Timestamp::InfFuture());
&done, &g_connecting, g_pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
&resolved_addr, grpc_core::Timestamp::InfFuture());
gpr_mu_lock(g_mu);
/* wait for the connection callback to finish */
@ -232,8 +235,9 @@ void test_connect_cancellation_succeeds(void) {
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
int64_t connection_handle = grpc_tcp_client_connect(
&done, &g_connecting, g_pollset_set, args.ToC().get(), &resolved_addr,
grpc_core::Timestamp::InfFuture());
&done, &g_connecting, g_pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&resolved_addr, grpc_core::Timestamp::InfFuture());
ASSERT_GT(connection_handle, 0);
ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true);
close(svr_fd);
@ -257,8 +261,10 @@ void test_fails_bad_addr_no_leak(void) {
gpr_mu_unlock(g_mu);
// connect to an invalid address.
GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr,
&resolved_addr, grpc_core::Timestamp::InfFuture());
grpc_tcp_client_connect(
&done, &g_connecting, g_pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
&resolved_addr, grpc_core::Timestamp::InfFuture());
gpr_mu_lock(g_mu);
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker* worker = nullptr;

@ -16,6 +16,7 @@
*
*/
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
@ -35,10 +36,12 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/iomgr/endpoint_tests.h"
@ -231,8 +234,12 @@ static void read_test(size_t num_bytes, size_t slice_size,
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep =
grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
ep = grpc_tcp_create(
grpc_fd_create(sv[1], "read_test", false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@ -291,8 +298,12 @@ static void large_read_test(size_t slice_size, int min_progress_size) {
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
"test");
ep = grpc_tcp_create(
grpc_fd_create(sv[1], "large_read_test", false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@ -461,8 +472,12 @@ static void write_test(size_t num_bytes, size_t slice_size,
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
&args, "test");
ep = grpc_tcp_create(
grpc_fd_create(sv[1], "write_test", collect_timestamps),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@ -545,8 +560,12 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep =
grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
ep = grpc_tcp_create(
grpc_fd_create(sv[1], "read_test", false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
grpc_endpoint_add_to_pollset(ep, g_pollset);
@ -648,10 +667,18 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
&args, "test");
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
&args, "test");
f.client_ep = grpc_tcp_create(
grpc_fd_create(sv[0], "fixture:client", false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
f.server_ep = grpc_tcp_create(
grpc_fd_create(sv[1], "fixture:server", false),
TcpOptionsFromEndpointConfig(
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args))),
"test");
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
grpc_resource_quota_unref(

@ -43,6 +43,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr.h"
@ -169,9 +170,13 @@ static void test_no_op(void) {
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s));
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
GRPC_ERROR_NONE,
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&s));
grpc_tcp_server_unref(s);
}
@ -180,9 +185,13 @@ static void test_no_op_with_start(void) {
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s));
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
GRPC_ERROR_NONE,
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&s));
LOG_TEST("test_no_op_with_start");
std::vector<grpc_pollset*> empty_pollset;
grpc_tcp_server_start(s, &empty_pollset, on_connect, nullptr);
@ -197,9 +206,13 @@ static void test_no_op_with_port(void) {
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s));
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
GRPC_ERROR_NONE,
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&s));
LOG_TEST("test_no_op_with_port");
memset(&resolved_addr, 0, sizeof(resolved_addr));
@ -221,9 +234,13 @@ static void test_no_op_with_port_and_start(void) {
grpc_tcp_server* s;
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
ASSERT_EQ(GRPC_ERROR_NONE, grpc_tcp_server_create(nullptr, args.get(), &s));
.PreconditionChannelArgs(nullptr);
ASSERT_EQ(
GRPC_ERROR_NONE,
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&s));
LOG_TEST("test_no_op_with_port_and_start");
int port = -1;
@ -321,10 +338,13 @@ static void test_connect(size_t num_connects,
const unsigned num_ports = 2;
auto new_channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(channel_args)
.ToC();
.PreconditionChannelArgs(channel_args);
ASSERT_EQ(GRPC_ERROR_NONE,
grpc_tcp_server_create(nullptr, new_channel_args.get(), &s));
grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
new_channel_args),
&s));
unsigned port_num;
server_weak_ref weak_ref;
server_weak_ref_init(&weak_ref);

@ -32,6 +32,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -140,10 +141,11 @@ void bad_server_thread(void* vargs) {
grpc_tcp_server* s;
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_error_handle error =
grpc_tcp_server_create(nullptr, channel_args.get(), &s);
.PreconditionChannelArgs(nullptr);
grpc_error_handle error = grpc_tcp_server_create(
nullptr,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args),
&s);
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error));
memset(&resolved_addr, 0, sizeof(resolved_addr));
addr->sa_family = GRPC_AF_INET;

@ -40,9 +40,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@ -142,11 +142,11 @@ class Client {
EventState state;
auto args = CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set,
args.get(), addresses_or->data(),
ExecCtx::Get()->Now() + Duration::Seconds(1));
.PreconditionChannelArgs(nullptr);
grpc_tcp_client_connect(
state.closure(), &endpoint_, pollset_set,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
addresses_or->data(), ExecCtx::Get()->Now() + Duration::Seconds(1));
ASSERT_TRUE(PollUntilDone(&state, Timestamp::InfFuture()));
ASSERT_EQ(GRPC_ERROR_NONE, state.error());
grpc_pollset_set_destroy(pollset_set);

@ -22,7 +22,6 @@
#include <string.h>
#include <algorithm>
#include <memory>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -30,9 +29,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -76,10 +75,11 @@ void test_tcp_server_start(test_tcp_server* server, int port) {
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
.PreconditionChannelArgs(nullptr);
grpc_error_handle error = grpc_tcp_server_create(
&server->shutdown_complete, args.get(), &server->tcp_server);
&server->shutdown_complete,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
&server->tcp_server);
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
error =
grpc_tcp_server_add_port(server->tcp_server, &resolved_addr, &port_added);

@ -30,6 +30,7 @@
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>

@ -25,6 +25,8 @@
// defined in tcp_client.cc
extern grpc_tcp_client_vtable* grpc_tcp_client_impl;
using ::grpc_event_engine::experimental::EndpointConfig;
namespace grpc {
namespace testing {
@ -53,18 +55,18 @@ void ConnectionAttemptInjector::Init() {
int64_t ConnectionAttemptInjector::TcpConnect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
grpc_pollset_set* interested_parties, const EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
grpc_core::MutexLock lock(g_mu);
// If there's no injector, use the original vtable.
if (g_injector == nullptr) {
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
g_original_vtable->connect(closure, ep, interested_parties, config, addr,
deadline);
return 0;
}
// Otherwise, use the injector.
g_injector->HandleConnection(closure, ep, interested_parties, channel_args,
addr, deadline);
g_injector->HandleConnection(closure, ep, interested_parties, config, addr,
deadline);
return 0;
}
@ -111,7 +113,7 @@ void ConnectionAttemptInjector::SetDelay(grpc_core::Duration delay) {
void ConnectionAttemptInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
grpc_pollset_set* interested_parties, const EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): port=%d", port);
@ -128,7 +130,7 @@ void ConnectionAttemptInjector::HandleConnection(
hold, nullptr);
}
hold->queued_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
closure, ep, interested_parties, config, addr, deadline);
hold->start_cv_.Signal();
holds_.erase(it);
return;
@ -136,14 +138,14 @@ void ConnectionAttemptInjector::HandleConnection(
}
// Otherwise, if there's a configured delay, impose it.
if (delay_.has_value()) {
new InjectedDelay(*delay_, closure, ep, interested_parties, channel_args,
addr, deadline);
new InjectedDelay(*delay_, closure, ep, interested_parties, config, addr,
deadline);
return;
}
}
// Anything we're not holding or delaying should proceed normally.
g_original_vtable->connect(closure, ep, interested_parties, channel_args,
addr, deadline);
g_original_vtable->connect(closure, ep, interested_parties, config, addr,
deadline);
}
//
@ -152,25 +154,25 @@ void ConnectionAttemptInjector::HandleConnection(
ConnectionAttemptInjector::QueuedAttempt::QueuedAttempt(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
grpc_pollset_set* interested_parties, const EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline)
: closure_(closure),
endpoint_(ep),
interested_parties_(interested_parties),
channel_args_(grpc_channel_args_copy(channel_args)),
config_(*reinterpret_cast<const grpc_event_engine::experimental::
ChannelArgsEndpointConfig*>(&config)),
deadline_(deadline) {
memcpy(&address_, addr, sizeof(address_));
}
ConnectionAttemptInjector::QueuedAttempt::~QueuedAttempt() {
GPR_ASSERT(closure_ == nullptr);
grpc_channel_args_destroy(channel_args_);
}
void ConnectionAttemptInjector::QueuedAttempt::Resume() {
GPR_ASSERT(closure_ != nullptr);
g_original_vtable->connect(closure_, endpoint_, interested_parties_,
channel_args_, &address_, deadline_);
g_original_vtable->connect(closure_, endpoint_, interested_parties_, config_,
&address_, deadline_);
closure_ = nullptr;
}
@ -186,9 +188,9 @@ void ConnectionAttemptInjector::QueuedAttempt::Fail(grpc_error_handle error) {
ConnectionAttemptInjector::InjectedDelay::InjectedDelay(
grpc_core::Duration duration, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
grpc_pollset_set* interested_parties, const EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline)
: attempt_(closure, ep, interested_parties, channel_args, addr, deadline) {
: attempt_(closure, ep, interested_parties, config, addr, deadline) {
GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr);
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
duration = std::min(duration, deadline - now);

@ -17,7 +17,7 @@
#include <memory>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
@ -123,7 +123,7 @@ class ConnectionAttemptInjector final {
public:
QueuedAttempt(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
~QueuedAttempt();
@ -138,7 +138,7 @@ class ConnectionAttemptInjector final {
grpc_closure* closure_;
grpc_endpoint** endpoint_;
grpc_pollset_set* interested_parties_;
const grpc_channel_args* channel_args_;
grpc_event_engine::experimental::ChannelArgsEndpointConfig config_;
grpc_resolved_address address_;
grpc_core::Timestamp deadline_;
};
@ -150,7 +150,7 @@ class ConnectionAttemptInjector final {
InjectedDelay(grpc_core::Duration duration, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
@ -163,25 +163,25 @@ class ConnectionAttemptInjector final {
};
// Invoked for every TCP connection attempt.
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
void HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
static void AttemptConnection(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
// Replacement iomgr tcp_connect vtable functions that use the current
// ConnectionAttemptInjector object.
static int64_t TcpConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
static int64_t TcpConnect(
grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
static bool TcpConnectCancel(int64_t connection_handle);
std::vector<Hold*> holds_ ABSL_GUARDED_BY(&mu_);

@ -21,6 +21,8 @@
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/endpoint_config.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"

@ -22,6 +22,8 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include <grpc/event_engine/endpoint_config.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"

Loading…
Cancel
Save