diff --git a/include/grpc/impl/grpc_types.h b/include/grpc/impl/grpc_types.h index e3a268d6b6a..9a3c2d1bed2 100644 --- a/include/grpc/impl/grpc_types.h +++ b/include/grpc/impl/grpc_types.h @@ -368,6 +368,8 @@ typedef struct { issued by the tcp_write(). By default, this is set to 4. */ #define GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS \ "grpc.experimental.tcp_tx_zerocopy_max_simultaneous_sends" +/* Overrides the TCP socket recieve buffer size, SO_RCVBUF. */ +#define GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE "grpc.tcp_receive_buffer_size" /* Timeout in milliseconds to use for calls to the grpclb load balancer. If 0 or unset, the balancer calls will have no deadline. */ #define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc index c1e4feb1820..4900a06e9a1 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -125,7 +125,9 @@ absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock, }); GRPC_RETURN_IF_ERROR(sock.SetSocketNonBlocking(1)); GRPC_RETURN_IF_ERROR(sock.SetSocketCloexec(1)); - + if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) { + GRPC_RETURN_IF_ERROR(sock.SetSocketRcvBuf(options.tcp_receive_buffer_size)); + } if (reinterpret_cast(addr.address())->sa_family != AF_UNIX) { // If its not a unix socket address. GRPC_RETURN_IF_ERROR(sock.SetSocketLowLatency(1)); @@ -169,6 +171,9 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) { options.tcp_tx_zerocopy_max_simultaneous_sends = AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX, config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)); + options.tcp_receive_buffer_size = + AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE)); options.tcp_tx_zero_copy_enabled = (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1, config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0); diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h index a9059bf32db..3911777c4ff 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h @@ -58,11 +58,14 @@ struct PosixTcpOptions { static constexpr int kMaxChunkSize = 32 * 1024 * 1024; static constexpr int kDefaultMaxSends = 4; static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; + // Let the system decide the proper buffer size. + static constexpr int kReadBufferSizeUnset = -1; int tcp_read_chunk_size = kDefaultReadChunkSize; int tcp_min_read_chunk_size = kDefaultMinReadChunksize; int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; + int tcp_receive_buffer_size = kReadBufferSizeUnset; bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; int keep_alive_time_ms = 0; int keep_alive_timeout_ms = 0; diff --git a/src/core/lib/iomgr/socket_utils_posix.cc b/src/core/lib/iomgr/socket_utils_posix.cc index bd81dd76a2c..e20091bd672 100644 --- a/src/core/lib/iomgr/socket_utils_posix.cc +++ b/src/core/lib/iomgr/socket_utils_posix.cc @@ -77,6 +77,9 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) { options.tcp_tx_zerocopy_max_simultaneous_sends = AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX, config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)); + options.tcp_receive_buffer_size = + AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE)); options.tcp_tx_zero_copy_enabled = (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1, config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 40dc3c773a5..2caaee015b3 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -49,11 +49,14 @@ struct PosixTcpOptions { static constexpr int kMaxChunkSize = 32 * 1024 * 1024; static constexpr int kDefaultMaxSends = 4; static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; + // Let the system decide the proper buffer size. + static constexpr int kReadBufferSizeUnset = -1; int tcp_read_chunk_size = kDefaultReadChunkSize; int tcp_min_read_chunk_size = kDefaultMinReadChunksize; int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; + int tcp_receive_buffer_size = kReadBufferSizeUnset; bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; int keep_alive_time_ms = 0; int keep_alive_timeout_ms = 0; diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 5a7c77ae8fe..68918d3ce1d 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -107,6 +107,10 @@ static grpc_error_handle prepare_socket( if (!err.ok()) goto error; err = grpc_set_socket_cloexec(fd, 1); if (!err.ok()) goto error; + if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) { + err = grpc_set_socket_rcvbuf(fd, options.tcp_receive_buffer_size); + if (!err.ok()) goto error; + } if (!grpc_is_unix_socket(addr)) { err = grpc_set_socket_low_latency(fd, 1); if (!err.ok()) goto error;