From 48f026d90ece794eb718d7749e0b54b83ef76feb Mon Sep 17 00:00:00 2001 From: Arjun Roy Date: Wed, 30 Oct 2019 17:40:53 -0700 Subject: [PATCH] gRPC TCP Transmit-side Zerocopy. Implements TCP Tx-side zerocopy. Must be explicitly enabled to use. For large RPCs (>= 16KiB) it reduces the amount of CPU time spent since it avoids a userspace to kernel data buffer copy. However, there is a tradeoff - the application must process a callback on the socket error queue placed by the kernel, informing the application that the data buffer can be freed since the kernel is done. The cost of processing the error queue means that we do not have an advantage for small RPCs. --- include/grpc/impl/codegen/grpc_types.h | 14 + .../lib/iomgr/socket_utils_common_posix.cc | 14 + src/core/lib/iomgr/socket_utils_posix.h | 12 + src/core/lib/iomgr/tcp_posix.cc | 663 ++++++++++++++++-- .../iomgr/tcp_server_utils_posix_common.cc | 8 + 5 files changed, 655 insertions(+), 56 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 836441f8948..89fd15faf1a 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -323,6 +323,20 @@ typedef struct { "grpc.experimental.tcp_min_read_chunk_size" #define GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE \ "grpc.experimental.tcp_max_read_chunk_size" +/* TCP TX Zerocopy enable state: zero is disabled, non-zero is enabled. By + default, it is disabled. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED \ + "grpc.experimental.tcp_tx_zerocopy_enabled" +/* TCP TX Zerocopy send threshold: only zerocopy if >= this many bytes sent. By + default, this is set to 16KB. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD \ + "grpc.experimental.tcp_tx_zerocopy_send_bytes_threshold" +/* TCP TX Zerocopy max simultaneous sends: limit for maximum number of pending + calls to tcp_write() using zerocopy. A tcp_write() is considered pending + until the kernel performs the zerocopy-done callback for all sendmsg() calls + issued by the tcp_write(). By default, this is set to 4. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS \ + "grpc.experimental.tcp_tx_zerocopy_max_simultaneous_sends" /* Timeout in milliseconds to use for calls to the grpclb load balancer. If 0 or unset, the balancer calls will have no deadline. */ #define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index f46cbd51c88..3974ae7dec2 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -50,6 +50,20 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +/* set a socket to use zerocopy */ +grpc_error* grpc_set_socket_zerocopy(int fd) { +#ifdef GRPC_LINUX_ERRQUEUE + const int enable = 1; + auto err = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); + if (err != 0) { + return GRPC_OS_ERROR(errno, "setsockopt(SO_ZEROCOPY)"); + } + return GRPC_ERROR_NONE; +#else + return GRPC_OS_ERROR(ENOSYS, "setsockopt(SO_ZEROCOPY)"); +#endif +} + /* set a socket to non blocking mode */ grpc_error* grpc_set_socket_nonblocking(int fd, int non_blocking) { int oldflags = fcntl(fd, F_GETFL, 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index a708a7a0ed5..734d340a953 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -31,10 +31,22 @@ #include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" +#ifdef GRPC_LINUX_ERRQUEUE +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif +#endif /* ifdef GRPC_LINUX_ERRQUEUE */ + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec); +/* set a socket to use zerocopy */ +grpc_error* grpc_set_socket_zerocopy(int fd); + /* set a socket to non blocking mode */ grpc_error* grpc_set_socket_nonblocking(int fd, int non_blocking); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 668a0c805e8..c96031183b3 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -49,9 +50,11 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -71,6 +74,15 @@ #define SENDMSG_FLAGS 0 #endif +// TCP zero copy sendmsg flag. +// NB: We define this here as a fallback in case we're using an older set of +// library headers that has not defined MSG_ZEROCOPY. Since this constant is +// part of the kernel, we are guaranteed it will never change/disagree so +// defining it here is safe. +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + #ifdef GRPC_MSG_IOVLEN_TYPE typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; #else @@ -79,6 +91,264 @@ typedef size_t msg_iovlen_type; extern grpc_core::TraceFlag grpc_tcp_trace; +namespace grpc_core { + +class TcpZerocopySendRecord { + public: + TcpZerocopySendRecord() { grpc_slice_buffer_init(&buf_); } + + ~TcpZerocopySendRecord() { + AssertEmpty(); + grpc_slice_buffer_destroy_internal(&buf_); + } + + // Given the slices that we wish to send, and the current offset into the + // slice buffer (indicating which have already been sent), populate an iovec + // array that will be used for a zerocopy enabled sendmsg(). + msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx, + size_t* unwind_byte_idx, size_t* sending_length, + iovec* iov); + + // A sendmsg() may not be able to send the bytes that we requested at this + // time, returning EAGAIN (possibly due to backpressure). In this case, + // unwind the offset into the slice buffer so we retry sending these bytes. + void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) { + out_offset_.byte_idx = unwind_byte_idx; + out_offset_.slice_idx = unwind_slice_idx; + } + + // Update the offset into the slice buffer based on how much we wanted to sent + // vs. what sendmsg() actually sent (which may be lower, possibly due to + // backpressure). + void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent); + + // Indicates whether all underlying data has been sent or not. + bool AllSlicesSent() { return out_offset_.slice_idx == buf_.count; } + + // Reset this structure for a new tcp_write() with zerocopy. + void PrepareForSends(grpc_slice_buffer* slices_to_send) { + AssertEmpty(); + out_offset_.slice_idx = 0; + out_offset_.byte_idx = 0; + grpc_slice_buffer_swap(slices_to_send, &buf_); + Ref(); + } + + // References: 1 reference per sendmsg(), and 1 for the tcp_write(). + void Ref() { ref_.FetchAdd(1, MemoryOrder::RELAXED); } + + // Unref: called when we get an error queue notification for a sendmsg(), if a + // sendmsg() failed or when tcp_write() is done. + bool Unref() { + const intptr_t prior = ref_.FetchSub(1, MemoryOrder::ACQ_REL); + GPR_DEBUG_ASSERT(prior > 0); + if (prior == 1) { + AllSendsComplete(); + return true; + } + return false; + } + + private: + struct OutgoingOffset { + size_t slice_idx = 0; + size_t byte_idx = 0; + }; + + void AssertEmpty() { + GPR_DEBUG_ASSERT(buf_.count == 0); + GPR_DEBUG_ASSERT(buf_.length == 0); + GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0); + } + + // When all sendmsg() calls associated with this tcp_write() have been + // completed (ie. we have received the notifications for each sequence number + // for each sendmsg()) and all reference counts have been dropped, drop our + // reference to the underlying data since we no longer need it. + void AllSendsComplete() { + GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0); + grpc_slice_buffer_reset_and_unref_internal(&buf_); + } + + grpc_slice_buffer buf_; + Atomic ref_; + OutgoingOffset out_offset_; +}; + +class TcpZerocopySendCtx { + public: + static constexpr int kDefaultMaxSends = 4; + static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB + + TcpZerocopySendCtx(int max_sends = kDefaultMaxSends, + size_t send_bytes_threshold = kDefaultSendBytesThreshold) + : max_sends_(max_sends), + free_send_records_size_(max_sends), + threshold_bytes_(send_bytes_threshold) { + send_records_ = static_cast( + gpr_malloc(max_sends * sizeof(*send_records_))); + free_send_records_ = static_cast( + gpr_malloc(max_sends * sizeof(*free_send_records_))); + if (send_records_ == nullptr || free_send_records_ == nullptr) { + gpr_free(send_records_); + gpr_free(free_send_records_); + gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n"); + memory_limited_ = true; + } else { + for (int idx = 0; idx < max_sends_; ++idx) { + new (send_records_ + idx) TcpZerocopySendRecord(); + free_send_records_[idx] = send_records_ + idx; + } + } + } + + ~TcpZerocopySendCtx() { + if (send_records_ != nullptr) { + for (int idx = 0; idx < max_sends_; ++idx) { + send_records_[idx].~TcpZerocopySendRecord(); + } + } + gpr_free(send_records_); + gpr_free(free_send_records_); + } + + // True if we were unable to allocate the various bookkeeping structures at + // transport initialization time. If memory limited, we do not zerocopy. + bool memory_limited() const { return memory_limited_; } + + // TCP send zerocopy maintains an implicit sequence number for every + // successful sendmsg() with zerocopy enabled; the kernel later gives us an + // error queue notification with this sequence number indicating that the + // underlying data buffers that we sent can now be released. Once that + // notification is received, we can release the buffers associated with this + // zerocopy send record. Here, we associate the sequence number with the data + // buffers that were sent with the corresponding call to sendmsg(). + void NoteSend(TcpZerocopySendRecord* record) { + record->Ref(); + AssociateSeqWithSendRecord(last_send_, record); + ++last_send_; + } + + // If sendmsg() actually failed, though, we need to revert the sequence number + // that we speculatively bumped before calling sendmsg(). Note that we bump + // this sequence number and perform relevant bookkeeping (see: NoteSend()) + // *before* calling sendmsg() since, if we called it *after* sendmsg(), then + // there is a possible race with the release notification which could occur on + // another thread before we do the necessary bookkeeping. Hence, calling + // NoteSend() *before* sendmsg() and implementing an undo function is needed. + void UndoSend() { + --last_send_; + if (ReleaseSendRecord(last_send_)->Unref()) { + // We should still be holding the ref taken by tcp_write(). + GPR_DEBUG_ASSERT(0); + } + } + + // Simply associate this send record (and the underlying sent data buffers) + // with the implicit sequence number for this zerocopy sendmsg(). + void AssociateSeqWithSendRecord(uint32_t seq, TcpZerocopySendRecord* record) { + MutexLock guard(&lock_); + ctx_lookup_.emplace(seq, record); + } + + // Get a send record for a send that we wish to do with zerocopy. + TcpZerocopySendRecord* GetSendRecord() { + MutexLock guard(&lock_); + return TryGetSendRecordLocked(); + } + + // A given send record corresponds to a single tcp_write() with zerocopy + // enabled. This can result in several sendmsg() calls to flush all of the + // data to wire. Each sendmsg() takes a reference on the + // TcpZerocopySendRecord, and corresponds to a single sequence number. + // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a + // single sequence number. This is called either when we receive the relevant + // error queue notification (saying that we can discard the underlying + // buffers for this sendmsg()) is received from the kernel - or, in case + // sendmsg() was unsuccessful to begin with. + TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) { + MutexLock guard(&lock_); + return ReleaseSendRecordLocked(seq); + } + + // After all the references to a TcpZerocopySendRecord are released, we can + // add it back to the pool (of size max_sends_). Note that we can only have + // max_sends_ tcp_write() instances with zerocopy enabled in flight at the + // same time. + void PutSendRecord(TcpZerocopySendRecord* record) { + GPR_DEBUG_ASSERT(record >= send_records_ && + record < send_records_ + max_sends_); + MutexLock guard(&lock_); + PutSendRecordLocked(record); + } + + // Indicate that we are disposing of this zerocopy context. This indicator + // will prevent new zerocopy writes from being issued. + void Shutdown() { shutdown_.Store(true, MemoryOrder::RELEASE); } + + // Indicates that there are no inflight tcp_write() instances with zerocopy + // enabled. + bool AllSendRecordsEmpty() { + MutexLock guard(&lock_); + return free_send_records_size_ == max_sends_; + } + + bool enabled() const { return enabled_; } + + void set_enabled(bool enabled) { + GPR_DEBUG_ASSERT(!enabled || !memory_limited()); + enabled_ = enabled; + } + + // Only use zerocopy if we are sending at least this many bytes. The + // additional overhead of reading the error queue for notifications means that + // zerocopy is not useful for small transfers. + size_t threshold_bytes() const { return threshold_bytes_; } + + private: + TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) { + auto iter = ctx_lookup_.find(seq); + GPR_DEBUG_ASSERT(iter != ctx_lookup_.end()); + TcpZerocopySendRecord* record = iter->second; + ctx_lookup_.erase(iter); + return record; + } + + TcpZerocopySendRecord* TryGetSendRecordLocked() { + if (shutdown_.Load(MemoryOrder::ACQUIRE)) { + return nullptr; + } + if (free_send_records_size_ == 0) { + return nullptr; + } + free_send_records_size_--; + return free_send_records_[free_send_records_size_]; + } + + void PutSendRecordLocked(TcpZerocopySendRecord* record) { + GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_); + free_send_records_[free_send_records_size_] = record; + free_send_records_size_++; + } + + TcpZerocopySendRecord* send_records_; + TcpZerocopySendRecord** free_send_records_; + int max_sends_; + int free_send_records_size_; + Mutex lock_; + uint32_t last_send_ = 0; + Atomic shutdown_; + bool enabled_ = false; + size_t threshold_bytes_ = kDefaultSendBytesThreshold; + std::unordered_map ctx_lookup_; + bool memory_limited_ = false; +}; + +} // namespace grpc_core + +using grpc_core::TcpZerocopySendCtx; +using grpc_core::TcpZerocopySendRecord; + namespace { struct grpc_tcp { grpc_endpoint base; @@ -142,6 +412,8 @@ struct grpc_tcp { bool ts_capable; /* Cache whether we can set timestamping options */ gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified on errors anymore */ + TcpZerocopySendCtx tcp_zerocopy_send_ctx; + TcpZerocopySendRecord* current_zerocopy_send = nullptr; }; struct backup_poller { @@ -151,6 +423,8 @@ struct backup_poller { } // namespace +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp); + #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) static gpr_atm g_uncovered_notifications_pending; @@ -339,6 +613,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { grpc_tcp* tcp = reinterpret_cast(ep); + ZerocopyDisableAndWaitForRemaining(tcp); grpc_fd_shutdown(tcp->em_fd, why); grpc_resource_user_shutdown(tcp->resource_user); } @@ -357,6 +632,7 @@ static void tcp_free(grpc_tcp* tcp) { gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; gpr_mu_destroy(&tcp->tb_mu); + tcp->tcp_zerocopy_send_ctx.~TcpZerocopySendCtx(); gpr_free(tcp); } @@ -390,6 +666,7 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_tcp* tcp = reinterpret_cast(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + ZerocopyDisableAndWaitForRemaining(tcp); gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -652,13 +929,13 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, /* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number * of bytes sent. */ -ssize_t tcp_send(int fd, const struct msghdr* msg) { +ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { GPR_TIMER_SCOPE("sendmsg", 1); ssize_t sent_length; do { /* TODO(klempner): Cork if this is a partial write */ GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); + sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); } while (sent_length < 0 && errno == EINTR); return sent_length; } @@ -671,16 +948,52 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) { */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length); + ssize_t* sent_length, + int additional_flags = 0); /** The callback function to be invoked when we get an error on the socket. */ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf); + #ifdef GRPC_LINUX_ERRQUEUE +static bool process_errors(grpc_tcp* tcp); + +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf) { + TcpZerocopySendRecord* zerocopy_send_record = nullptr; + const bool use_zerocopy = + tcp->tcp_zerocopy_send_ctx.enabled() && + tcp->tcp_zerocopy_send_ctx.threshold_bytes() < buf->length; + if (use_zerocopy) { + zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord(); + if (zerocopy_send_record == nullptr) { + process_errors(tcp); + zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord(); + } + if (zerocopy_send_record != nullptr) { + zerocopy_send_record->PrepareForSends(buf); + GPR_DEBUG_ASSERT(buf->count == 0); + GPR_DEBUG_ASSERT(buf->length == 0); + tcp->outgoing_byte_idx = 0; + tcp->outgoing_buffer = nullptr; + } + } + return zerocopy_send_record; +} + +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) { + tcp->tcp_zerocopy_send_ctx.Shutdown(); + while (!tcp->tcp_zerocopy_send_ctx.AllSendRecordsEmpty()) { + process_errors(tcp); + } +} static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length) { + ssize_t* sent_length, + int additional_flags) { if (!tcp->socket_ts_enabled) { uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, @@ -708,7 +1021,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ - ssize_t length = tcp_send(tcp->fd, msg); + ssize_t length = tcp_send(tcp->fd, msg, additional_flags); *sent_length = length; /* Only save timestamps if all the bytes were taken by sendmsg. */ if (sending_length == static_cast(length)) { @@ -722,6 +1035,43 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, return true; } +static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp, + TcpZerocopySendRecord* record, + uint32_t seq, const char* tag); +// Reads \a cmsg to process zerocopy control messages. +static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) { + GPR_DEBUG_ASSERT(cmsg); + auto serr = reinterpret_cast(CMSG_DATA(cmsg)); + GPR_DEBUG_ASSERT(serr->ee_errno == 0); + GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); + const uint32_t lo = serr->ee_info; + const uint32_t hi = serr->ee_data; + for (uint32_t seq = lo; seq <= hi; ++seq) { + // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence + // numbers that are generated by a single call to grpc_endpoint_write; ie. + // we can batch the unref operation. So, check if record is the same for + // both; if so, batch the unref/put. + TcpZerocopySendRecord* record = + tcp->tcp_zerocopy_send_ctx.ReleaseSendRecord(seq); + GPR_DEBUG_ASSERT(record); + UnrefMaybePutZerocopySendRecord(tcp, record, seq, "CALLBACK RCVD"); + } +} + +// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels. +static bool CmsgIsIpLevel(const cmsghdr& cmsg) { + return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) || + (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR); +} + +static bool CmsgIsZeroCopy(const cmsghdr& cmsg) { + if (!CmsgIsIpLevel(cmsg)) { + return false; + } + auto serr = reinterpret_cast CMSG_DATA(&cmsg); + return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY; +} + /** Reads \a cmsg to derive timestamps from the control messages. If a valid * timestamp is found, the traced buffer list is updated with this timestamp. * The caller of this function should be looping on the control messages found @@ -783,73 +1133,76 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, /** For linux platforms, reads the socket's error queue and processes error * messages from the queue. */ -static void process_errors(grpc_tcp* tcp) { +static bool process_errors(grpc_tcp* tcp) { + bool processed_err = false; + struct iovec iov; + iov.iov_base = nullptr; + iov.iov_len = 0; + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + /* Allocate enough space so we don't need to keep increasing this as size + * of OPT_STATS increase */ + constexpr size_t cmsg_alloc_space = + CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + + CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); + /* Allocate aligned space for cmsgs received along with timestamps */ + union { + char rbuf[cmsg_alloc_space]; + struct cmsghdr align; + } aligned_buf; + msg.msg_control = aligned_buf.rbuf; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + int r, saved_errno; while (true) { - struct iovec iov; - iov.iov_base = nullptr; - iov.iov_len = 0; - struct msghdr msg; - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = &iov; - msg.msg_iovlen = 0; - msg.msg_flags = 0; - - /* Allocate enough space so we don't need to keep increasing this as size - * of OPT_STATS increase */ - constexpr size_t cmsg_alloc_space = - CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + - CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + - CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); - /* Allocate aligned space for cmsgs received along with timestamps */ - union { - char rbuf[cmsg_alloc_space]; - struct cmsghdr align; - } aligned_buf; - memset(&aligned_buf, 0, sizeof(aligned_buf)); - - msg.msg_control = aligned_buf.rbuf; - msg.msg_controllen = sizeof(aligned_buf.rbuf); - - int r, saved_errno; do { r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); saved_errno = errno; } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - return; /* No more errors to process */ + return processed_err; /* No more errors to process */ } if (r == -1) { - return; + return processed_err; } - if ((msg.msg_flags & MSG_CTRUNC) != 0) { + if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) { gpr_log(GPR_ERROR, "Error message was truncated."); } if (msg.msg_controllen == 0) { /* There was no control message found. It was probably spurious. */ - return; + return processed_err; } bool seen = false; for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level != SOL_SOCKET || - cmsg->cmsg_type != SCM_TIMESTAMPING) { - /* Got a control message that is not a timestamp. Don't know how to - * handle this. */ + if (CmsgIsZeroCopy(*cmsg)) { + process_zerocopy(tcp, cmsg); + seen = true; + processed_err = true; + } else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_TIMESTAMPING) { + cmsg = process_timestamp(tcp, &msg, cmsg); + seen = true; + processed_err = true; + } else { + /* Got a control message that is not a timestamp or zerocopy. Don't know + * how to handle this. */ if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "unknown control message cmsg_level:%d cmsg_type:%d", cmsg->cmsg_level, cmsg->cmsg_type); } - return; + return processed_err; } - cmsg = process_timestamp(tcp, &msg, cmsg); - seen = true; } if (!seen) { - return; + return processed_err; } } } @@ -870,18 +1223,28 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { /* We are still interested in collecting timestamps, so let's try reading * them. */ - process_errors(tcp); + bool processed = process_errors(tcp); /* This might not a timestamps error. Set the read and write closures to be * ready. */ - grpc_fd_set_readable(tcp->em_fd); - grpc_fd_set_writable(tcp->em_fd); + if (!processed) { + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); + } grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); } #else /* GRPC_LINUX_ERRQUEUE */ +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf) { + return nullptr; +} + +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {} + static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, size_t /*sending_length*/, - ssize_t* /*sent_length*/) { + ssize_t* /*sent_length*/, + int /*additional_flags*/) { gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); GPR_ASSERT(0); return false; @@ -907,12 +1270,138 @@ void tcp_shutdown_buffer_list(grpc_tcp* tcp) { } } -/* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX #else #define MAX_WRITE_IOVEC 1000 #endif +msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx, + size_t* unwind_byte_idx, + size_t* sending_length, + iovec* iov) { + msg_iovlen_type iov_size; + *unwind_slice_idx = out_offset_.slice_idx; + *unwind_byte_idx = out_offset_.byte_idx; + for (iov_size = 0; + out_offset_.slice_idx != buf_.count && iov_size != MAX_WRITE_IOVEC; + iov_size++) { + iov[iov_size].iov_base = + GRPC_SLICE_START_PTR(buf_.slices[out_offset_.slice_idx]) + + out_offset_.byte_idx; + iov[iov_size].iov_len = + GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]) - + out_offset_.byte_idx; + *sending_length += iov[iov_size].iov_len; + ++(out_offset_.slice_idx); + out_offset_.byte_idx = 0; + } + GPR_DEBUG_ASSERT(iov_size > 0); + return iov_size; +} + +void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length, + size_t actually_sent) { + size_t trailing = sending_length - actually_sent; + while (trailing > 0) { + size_t slice_length; + out_offset_.slice_idx--; + slice_length = GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]); + if (slice_length > trailing) { + out_offset_.byte_idx = slice_length - trailing; + break; + } else { + trailing -= slice_length; + } + } +} + +// returns true if done, false if pending; if returning true, *error is set +static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, + grpc_error** error) { + struct msghdr msg; + struct iovec iov[MAX_WRITE_IOVEC]; + msg_iovlen_type iov_size; + ssize_t sent_length = 0; + size_t sending_length; + size_t unwind_slice_idx; + size_t unwind_byte_idx; + while (true) { + sending_length = 0; + iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx, + &sending_length, iov); + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_flags = 0; + bool tried_sending_message = false; + // Before calling sendmsg (with or without timestamps): we + // take a single ref on the zerocopy send record. + tcp->tcp_zerocopy_send_ctx.NoteSend(record); + if (tcp->outgoing_buffer_arg != nullptr) { + if (!tcp->ts_capable || + !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + MSG_ZEROCOPY)) { + /* We could not set socket options to collect Fathom timestamps. + * Fallback on writing without timestamps. */ + tcp->ts_capable = false; + tcp_shutdown_buffer_list(tcp); + } else { + tried_sending_message = true; + } + } + if (!tried_sending_message) { + msg.msg_control = nullptr; + msg.msg_controllen = 0; + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY); + } + if (sent_length < 0) { + // If this particular send failed, drop ref taken earlier in this method. + tcp->tcp_zerocopy_send_ctx.UndoSend(); + if (errno == EAGAIN) { + record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx); + return false; + } else if (errno == EPIPE) { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + tcp_shutdown_buffer_list(tcp); + return true; + } else { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + tcp_shutdown_buffer_list(tcp); + return true; + } + } + tcp->bytes_counter += sent_length; + record->UpdateOffsetForBytesSent(sending_length, + static_cast(sent_length)); + if (record->AllSlicesSent()) { + *error = GRPC_ERROR_NONE; + return true; + } + } +} + +static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp, + TcpZerocopySendRecord* record, + uint32_t seq, const char* tag) { + if (record->Unref()) { + tcp->tcp_zerocopy_send_ctx.PutSendRecord(record); + } +} + +static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, + grpc_error** error) { + bool done = do_tcp_flush_zerocopy(tcp, record, error); + if (done) { + // Either we encountered an error, or we successfully sent all the bytes. + // In either case, we're done with this record. + UnrefMaybePutZerocopySendRecord(tcp, record, 0, "flush_done"); + } + return done; +} + static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -927,7 +1416,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { // buffer as we write size_t outgoing_slice_idx = 0; - for (;;) { + while (true) { sending_length = 0; unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; @@ -1027,12 +1516,21 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; + if (tcp->current_zerocopy_send != nullptr) { + UnrefMaybePutZerocopySendRecord(tcp, tcp->current_zerocopy_send, 0, + "handle_write_err"); + tcp->current_zerocopy_send = nullptr; + } grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "write"); return; } - if (!tcp_flush(tcp, &error)) { + bool flush_result = + tcp->current_zerocopy_send != nullptr + ? tcp_flush_zerocopy(tcp, tcp->current_zerocopy_send, &error) + : tcp_flush(tcp, &error); + if (!flush_result) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "write: delayed"); } @@ -1042,6 +1540,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { } else { cb = tcp->write_cb; tcp->write_cb = nullptr; + tcp->current_zerocopy_send = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); @@ -1057,6 +1556,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast(ep); grpc_error* error = GRPC_ERROR_NONE; + TcpZerocopySendRecord* zerocopy_send_record = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { size_t i; @@ -1073,8 +1573,8 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } GPR_ASSERT(tcp->write_cb == nullptr); + GPR_DEBUG_ASSERT(tcp->current_zerocopy_send == nullptr); - tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { grpc_core::Closure::Run( DEBUG_LOCATION, cb, @@ -1085,15 +1585,26 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, tcp_shutdown_buffer_list(tcp); return; } - tcp->outgoing_buffer = buf; - tcp->outgoing_byte_idx = 0; + + zerocopy_send_record = tcp_get_send_zerocopy_record(tcp, buf); + if (zerocopy_send_record == nullptr) { + // Either not enough bytes, or couldn't allocate a zerocopy context. + tcp->outgoing_buffer = buf; + tcp->outgoing_byte_idx = 0; + } + tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } - if (!tcp_flush(tcp, &error)) { + bool flush_result = + zerocopy_send_record != nullptr + ? tcp_flush_zerocopy(tcp, zerocopy_send_record, &error) + : tcp_flush(tcp, &error); + if (!flush_result) { TCP_REF(tcp, "write"); tcp->write_cb = cb; + tcp->current_zerocopy_send = zerocopy_send_record; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "write: delayed"); } @@ -1121,6 +1632,7 @@ static void tcp_add_to_pollset_set(grpc_endpoint* ep, static void tcp_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { grpc_tcp* tcp = reinterpret_cast(ep); + ZerocopyDisableAndWaitForRemaining(tcp); grpc_pollset_set_del_fd(pollset_set, tcp->em_fd); } @@ -1172,9 +1684,15 @@ static const grpc_endpoint_vtable vtable = {tcp_read, grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, const grpc_channel_args* channel_args, const char* peer_string) { + static constexpr bool kZerocpTxEnabledDefault = false; int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; int tcp_max_read_chunk_size = 4 * 1024 * 1024; int tcp_min_read_chunk_size = 256; + bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault; + int tcp_tx_zerocopy_send_bytes_thresh = + grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold; + int tcp_tx_zerocopy_max_simult_sends = + grpc_core::TcpZerocopySendCtx::kDefaultMaxSends; grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr); if (channel_args != nullptr) { for (size_t i = 0; i < channel_args->num_args; i++) { @@ -1199,6 +1717,23 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, resource_quota = grpc_resource_quota_ref_internal(static_cast( channel_args->args[i].value.pointer.p)); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) { + tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool( + &channel_args->args[i], kZerocpTxEnabledDefault); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)) { + grpc_integer_options options = { + grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0, + INT_MAX}; + tcp_tx_zerocopy_send_bytes_thresh = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)) { + grpc_integer_options options = { + grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX}; + tcp_tx_zerocopy_max_simult_sends = + grpc_channel_arg_get_integer(&channel_args->args[i], options); } } } @@ -1215,6 +1750,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->fd = grpc_fd_wrapped_fd(em_fd); tcp->read_cb = nullptr; tcp->write_cb = nullptr; + tcp->current_zerocopy_send = nullptr; tcp->release_fd_cb = nullptr; tcp->release_fd = nullptr; tcp->incoming_buffer = nullptr; @@ -1228,6 +1764,20 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; + new (&tcp->tcp_zerocopy_send_ctx) TcpZerocopySendCtx( + tcp_tx_zerocopy_max_simult_sends, tcp_tx_zerocopy_send_bytes_thresh); + if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) { +#ifdef GRPC_LINUX_ERRQUEUE + const int enable = 1; + auto err = + setsockopt(tcp->fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); + if (err == 0) { + tcp->tcp_zerocopy_send_ctx.set_enabled(true); + } else { + gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket."); + } +#endif + } /* paired with unref in grpc_tcp_destroy */ new (&tcp->refcount) grpc_core::RefCount(1, &grpc_tcp_trace); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1294,6 +1844,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + ZerocopyDisableAndWaitForRemaining(tcp); gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index ee1cd5c1027..da18cc39c51 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -157,6 +157,14 @@ grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server* s, int fd, if (err != GRPC_ERROR_NONE) goto error; } +#ifdef GRPC_LINUX_ERRQUEUE + err = grpc_set_socket_zerocopy(fd); + if (err != GRPC_ERROR_NONE) { + /* it's not fatal, so just log it. */ + gpr_log(GPR_DEBUG, "Node does not support SO_ZEROCOPY, continuing."); + GRPC_ERROR_UNREF(err); + } +#endif err = grpc_set_socket_nonblocking(fd, 1); if (err != GRPC_ERROR_NONE) goto error; err = grpc_set_socket_cloexec(fd, 1);