[tcp] Add channel argument for SO_RVCBUF size. (#32887)

Example usage:

```
ServerBuilder builder;
builder.AddChannelArgument(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE, 1024*1024);
```
pull/32896/head
AJ Heller 2 years ago committed by GitHub
parent abcd371740
commit c57c27ff90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpc/impl/grpc_types.h
  2. 7
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  3. 3
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  4. 3
      src/core/lib/iomgr/socket_utils_posix.cc
  5. 3
      src/core/lib/iomgr/socket_utils_posix.h
  6. 4
      src/core/lib/iomgr/tcp_client_posix.cc

@ -368,6 +368,8 @@ typedef struct {
issued by the tcp_write(). By default, this is set to 4. */
#define GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS \
"grpc.experimental.tcp_tx_zerocopy_max_simultaneous_sends"
/* Overrides the TCP socket recieve buffer size, SO_RCVBUF. */
#define GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE "grpc.tcp_receive_buffer_size"
/* Timeout in milliseconds to use for calls to the grpclb load balancer.
If 0 or unset, the balancer calls will have no deadline. */
#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"

@ -125,7 +125,9 @@ absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
});
GRPC_RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
GRPC_RETURN_IF_ERROR(sock.SetSocketCloexec(1));
if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
GRPC_RETURN_IF_ERROR(sock.SetSocketRcvBuf(options.tcp_receive_buffer_size));
}
if (reinterpret_cast<const sockaddr*>(addr.address())->sa_family != AF_UNIX) {
// If its not a unix socket address.
GRPC_RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
@ -169,6 +171,9 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
options.tcp_tx_zerocopy_max_simultaneous_sends =
AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
options.tcp_receive_buffer_size =
AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE));
options.tcp_tx_zero_copy_enabled =
(AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);

@ -58,11 +58,14 @@ struct PosixTcpOptions {
static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
// Let the system decide the proper buffer size.
static constexpr int kReadBufferSizeUnset = -1;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
int tcp_receive_buffer_size = kReadBufferSizeUnset;
bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
int keep_alive_time_ms = 0;
int keep_alive_timeout_ms = 0;

@ -77,6 +77,9 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
options.tcp_tx_zerocopy_max_simultaneous_sends =
AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
options.tcp_receive_buffer_size =
AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE));
options.tcp_tx_zero_copy_enabled =
(AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);

@ -49,11 +49,14 @@ struct PosixTcpOptions {
static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
// Let the system decide the proper buffer size.
static constexpr int kReadBufferSizeUnset = -1;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
int tcp_receive_buffer_size = kReadBufferSizeUnset;
bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
int keep_alive_time_ms = 0;
int keep_alive_timeout_ms = 0;

@ -107,6 +107,10 @@ static grpc_error_handle prepare_socket(
if (!err.ok()) goto error;
err = grpc_set_socket_cloexec(fd, 1);
if (!err.ok()) goto error;
if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
err = grpc_set_socket_rcvbuf(fd, options.tcp_receive_buffer_size);
if (!err.ok()) goto error;
}
if (!grpc_is_unix_socket(addr)) {
err = grpc_set_socket_low_latency(fd, 1);
if (!err.ok()) goto error;

Loading…
Cancel
Save