diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 836441f8948..89fd15faf1a 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -323,6 +323,20 @@ typedef struct { "grpc.experimental.tcp_min_read_chunk_size" #define GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE \ "grpc.experimental.tcp_max_read_chunk_size" +/* TCP TX Zerocopy enable state: zero is disabled, non-zero is enabled. By + default, it is disabled. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED \ + "grpc.experimental.tcp_tx_zerocopy_enabled" +/* TCP TX Zerocopy send threshold: only zerocopy if >= this many bytes sent. By + default, this is set to 16KB. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD \ + "grpc.experimental.tcp_tx_zerocopy_send_bytes_threshold" +/* TCP TX Zerocopy max simultaneous sends: limit for maximum number of pending + calls to tcp_write() using zerocopy. A tcp_write() is considered pending + until the kernel performs the zerocopy-done callback for all sendmsg() calls + issued by the tcp_write(). By default, this is set to 4. */ +#define GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS \ + "grpc.experimental.tcp_tx_zerocopy_max_simultaneous_sends" /* Timeout in milliseconds to use for calls to the grpclb load balancer. If 0 or unset, the balancer calls will have no deadline. */ #define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms" diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index f46cbd51c88..3974ae7dec2 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -50,6 +50,20 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +/* set a socket to use zerocopy */ +grpc_error* grpc_set_socket_zerocopy(int fd) { +#ifdef GRPC_LINUX_ERRQUEUE + const int enable = 1; + auto err = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); + if (err != 0) { + return GRPC_OS_ERROR(errno, "setsockopt(SO_ZEROCOPY)"); + } + return GRPC_ERROR_NONE; +#else + return GRPC_OS_ERROR(ENOSYS, "setsockopt(SO_ZEROCOPY)"); +#endif +} + /* set a socket to non blocking mode */ grpc_error* grpc_set_socket_nonblocking(int fd, int non_blocking) { int oldflags = fcntl(fd, F_GETFL, 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index a708a7a0ed5..734d340a953 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -31,10 +31,22 @@ #include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" +#ifdef GRPC_LINUX_ERRQUEUE +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif +#endif /* ifdef GRPC_LINUX_ERRQUEUE */ + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec); +/* set a socket to use zerocopy */ +grpc_error* grpc_set_socket_zerocopy(int fd); + /* set a socket to non blocking mode */ grpc_error* grpc_set_socket_nonblocking(int fd, int non_blocking); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 668a0c805e8..c96031183b3 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -49,9 +50,11 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -71,6 +74,15 @@ #define SENDMSG_FLAGS 0 #endif +// TCP zero copy sendmsg flag. +// NB: We define this here as a fallback in case we're using an older set of +// library headers that has not defined MSG_ZEROCOPY. Since this constant is +// part of the kernel, we are guaranteed it will never change/disagree so +// defining it here is safe. +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + #ifdef GRPC_MSG_IOVLEN_TYPE typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; #else @@ -79,6 +91,264 @@ typedef size_t msg_iovlen_type; extern grpc_core::TraceFlag grpc_tcp_trace; +namespace grpc_core { + +class TcpZerocopySendRecord { + public: + TcpZerocopySendRecord() { grpc_slice_buffer_init(&buf_); } + + ~TcpZerocopySendRecord() { + AssertEmpty(); + grpc_slice_buffer_destroy_internal(&buf_); + } + + // Given the slices that we wish to send, and the current offset into the + // slice buffer (indicating which have already been sent), populate an iovec + // array that will be used for a zerocopy enabled sendmsg(). + msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx, + size_t* unwind_byte_idx, size_t* sending_length, + iovec* iov); + + // A sendmsg() may not be able to send the bytes that we requested at this + // time, returning EAGAIN (possibly due to backpressure). In this case, + // unwind the offset into the slice buffer so we retry sending these bytes. + void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) { + out_offset_.byte_idx = unwind_byte_idx; + out_offset_.slice_idx = unwind_slice_idx; + } + + // Update the offset into the slice buffer based on how much we wanted to sent + // vs. what sendmsg() actually sent (which may be lower, possibly due to + // backpressure). + void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent); + + // Indicates whether all underlying data has been sent or not. + bool AllSlicesSent() { return out_offset_.slice_idx == buf_.count; } + + // Reset this structure for a new tcp_write() with zerocopy. + void PrepareForSends(grpc_slice_buffer* slices_to_send) { + AssertEmpty(); + out_offset_.slice_idx = 0; + out_offset_.byte_idx = 0; + grpc_slice_buffer_swap(slices_to_send, &buf_); + Ref(); + } + + // References: 1 reference per sendmsg(), and 1 for the tcp_write(). + void Ref() { ref_.FetchAdd(1, MemoryOrder::RELAXED); } + + // Unref: called when we get an error queue notification for a sendmsg(), if a + // sendmsg() failed or when tcp_write() is done. + bool Unref() { + const intptr_t prior = ref_.FetchSub(1, MemoryOrder::ACQ_REL); + GPR_DEBUG_ASSERT(prior > 0); + if (prior == 1) { + AllSendsComplete(); + return true; + } + return false; + } + + private: + struct OutgoingOffset { + size_t slice_idx = 0; + size_t byte_idx = 0; + }; + + void AssertEmpty() { + GPR_DEBUG_ASSERT(buf_.count == 0); + GPR_DEBUG_ASSERT(buf_.length == 0); + GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0); + } + + // When all sendmsg() calls associated with this tcp_write() have been + // completed (ie. we have received the notifications for each sequence number + // for each sendmsg()) and all reference counts have been dropped, drop our + // reference to the underlying data since we no longer need it. + void AllSendsComplete() { + GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0); + grpc_slice_buffer_reset_and_unref_internal(&buf_); + } + + grpc_slice_buffer buf_; + Atomic ref_; + OutgoingOffset out_offset_; +}; + +class TcpZerocopySendCtx { + public: + static constexpr int kDefaultMaxSends = 4; + static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB + + TcpZerocopySendCtx(int max_sends = kDefaultMaxSends, + size_t send_bytes_threshold = kDefaultSendBytesThreshold) + : max_sends_(max_sends), + free_send_records_size_(max_sends), + threshold_bytes_(send_bytes_threshold) { + send_records_ = static_cast( + gpr_malloc(max_sends * sizeof(*send_records_))); + free_send_records_ = static_cast( + gpr_malloc(max_sends * sizeof(*free_send_records_))); + if (send_records_ == nullptr || free_send_records_ == nullptr) { + gpr_free(send_records_); + gpr_free(free_send_records_); + gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n"); + memory_limited_ = true; + } else { + for (int idx = 0; idx < max_sends_; ++idx) { + new (send_records_ + idx) TcpZerocopySendRecord(); + free_send_records_[idx] = send_records_ + idx; + } + } + } + + ~TcpZerocopySendCtx() { + if (send_records_ != nullptr) { + for (int idx = 0; idx < max_sends_; ++idx) { + send_records_[idx].~TcpZerocopySendRecord(); + } + } + gpr_free(send_records_); + gpr_free(free_send_records_); + } + + // True if we were unable to allocate the various bookkeeping structures at + // transport initialization time. If memory limited, we do not zerocopy. + bool memory_limited() const { return memory_limited_; } + + // TCP send zerocopy maintains an implicit sequence number for every + // successful sendmsg() with zerocopy enabled; the kernel later gives us an + // error queue notification with this sequence number indicating that the + // underlying data buffers that we sent can now be released. Once that + // notification is received, we can release the buffers associated with this + // zerocopy send record. Here, we associate the sequence number with the data + // buffers that were sent with the corresponding call to sendmsg(). + void NoteSend(TcpZerocopySendRecord* record) { + record->Ref(); + AssociateSeqWithSendRecord(last_send_, record); + ++last_send_; + } + + // If sendmsg() actually failed, though, we need to revert the sequence number + // that we speculatively bumped before calling sendmsg(). Note that we bump + // this sequence number and perform relevant bookkeeping (see: NoteSend()) + // *before* calling sendmsg() since, if we called it *after* sendmsg(), then + // there is a possible race with the release notification which could occur on + // another thread before we do the necessary bookkeeping. Hence, calling + // NoteSend() *before* sendmsg() and implementing an undo function is needed. + void UndoSend() { + --last_send_; + if (ReleaseSendRecord(last_send_)->Unref()) { + // We should still be holding the ref taken by tcp_write(). + GPR_DEBUG_ASSERT(0); + } + } + + // Simply associate this send record (and the underlying sent data buffers) + // with the implicit sequence number for this zerocopy sendmsg(). + void AssociateSeqWithSendRecord(uint32_t seq, TcpZerocopySendRecord* record) { + MutexLock guard(&lock_); + ctx_lookup_.emplace(seq, record); + } + + // Get a send record for a send that we wish to do with zerocopy. + TcpZerocopySendRecord* GetSendRecord() { + MutexLock guard(&lock_); + return TryGetSendRecordLocked(); + } + + // A given send record corresponds to a single tcp_write() with zerocopy + // enabled. This can result in several sendmsg() calls to flush all of the + // data to wire. Each sendmsg() takes a reference on the + // TcpZerocopySendRecord, and corresponds to a single sequence number. + // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a + // single sequence number. This is called either when we receive the relevant + // error queue notification (saying that we can discard the underlying + // buffers for this sendmsg()) is received from the kernel - or, in case + // sendmsg() was unsuccessful to begin with. + TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) { + MutexLock guard(&lock_); + return ReleaseSendRecordLocked(seq); + } + + // After all the references to a TcpZerocopySendRecord are released, we can + // add it back to the pool (of size max_sends_). Note that we can only have + // max_sends_ tcp_write() instances with zerocopy enabled in flight at the + // same time. + void PutSendRecord(TcpZerocopySendRecord* record) { + GPR_DEBUG_ASSERT(record >= send_records_ && + record < send_records_ + max_sends_); + MutexLock guard(&lock_); + PutSendRecordLocked(record); + } + + // Indicate that we are disposing of this zerocopy context. This indicator + // will prevent new zerocopy writes from being issued. + void Shutdown() { shutdown_.Store(true, MemoryOrder::RELEASE); } + + // Indicates that there are no inflight tcp_write() instances with zerocopy + // enabled. + bool AllSendRecordsEmpty() { + MutexLock guard(&lock_); + return free_send_records_size_ == max_sends_; + } + + bool enabled() const { return enabled_; } + + void set_enabled(bool enabled) { + GPR_DEBUG_ASSERT(!enabled || !memory_limited()); + enabled_ = enabled; + } + + // Only use zerocopy if we are sending at least this many bytes. The + // additional overhead of reading the error queue for notifications means that + // zerocopy is not useful for small transfers. + size_t threshold_bytes() const { return threshold_bytes_; } + + private: + TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) { + auto iter = ctx_lookup_.find(seq); + GPR_DEBUG_ASSERT(iter != ctx_lookup_.end()); + TcpZerocopySendRecord* record = iter->second; + ctx_lookup_.erase(iter); + return record; + } + + TcpZerocopySendRecord* TryGetSendRecordLocked() { + if (shutdown_.Load(MemoryOrder::ACQUIRE)) { + return nullptr; + } + if (free_send_records_size_ == 0) { + return nullptr; + } + free_send_records_size_--; + return free_send_records_[free_send_records_size_]; + } + + void PutSendRecordLocked(TcpZerocopySendRecord* record) { + GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_); + free_send_records_[free_send_records_size_] = record; + free_send_records_size_++; + } + + TcpZerocopySendRecord* send_records_; + TcpZerocopySendRecord** free_send_records_; + int max_sends_; + int free_send_records_size_; + Mutex lock_; + uint32_t last_send_ = 0; + Atomic shutdown_; + bool enabled_ = false; + size_t threshold_bytes_ = kDefaultSendBytesThreshold; + std::unordered_map ctx_lookup_; + bool memory_limited_ = false; +}; + +} // namespace grpc_core + +using grpc_core::TcpZerocopySendCtx; +using grpc_core::TcpZerocopySendRecord; + namespace { struct grpc_tcp { grpc_endpoint base; @@ -142,6 +412,8 @@ struct grpc_tcp { bool ts_capable; /* Cache whether we can set timestamping options */ gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified on errors anymore */ + TcpZerocopySendCtx tcp_zerocopy_send_ctx; + TcpZerocopySendRecord* current_zerocopy_send = nullptr; }; struct backup_poller { @@ -151,6 +423,8 @@ struct backup_poller { } // namespace +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp); + #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) static gpr_atm g_uncovered_notifications_pending; @@ -339,6 +613,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { grpc_tcp* tcp = reinterpret_cast(ep); + ZerocopyDisableAndWaitForRemaining(tcp); grpc_fd_shutdown(tcp->em_fd, why); grpc_resource_user_shutdown(tcp->resource_user); } @@ -357,6 +632,7 @@ static void tcp_free(grpc_tcp* tcp) { gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; gpr_mu_destroy(&tcp->tb_mu); + tcp->tcp_zerocopy_send_ctx.~TcpZerocopySendCtx(); gpr_free(tcp); } @@ -390,6 +666,7 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_tcp* tcp = reinterpret_cast(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + ZerocopyDisableAndWaitForRemaining(tcp); gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -652,13 +929,13 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, /* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number * of bytes sent. */ -ssize_t tcp_send(int fd, const struct msghdr* msg) { +ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { GPR_TIMER_SCOPE("sendmsg", 1); ssize_t sent_length; do { /* TODO(klempner): Cork if this is a partial write */ GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); + sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); } while (sent_length < 0 && errno == EINTR); return sent_length; } @@ -671,16 +948,52 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) { */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length); + ssize_t* sent_length, + int additional_flags = 0); /** The callback function to be invoked when we get an error on the socket. */ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf); + #ifdef GRPC_LINUX_ERRQUEUE +static bool process_errors(grpc_tcp* tcp); + +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf) { + TcpZerocopySendRecord* zerocopy_send_record = nullptr; + const bool use_zerocopy = + tcp->tcp_zerocopy_send_ctx.enabled() && + tcp->tcp_zerocopy_send_ctx.threshold_bytes() < buf->length; + if (use_zerocopy) { + zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord(); + if (zerocopy_send_record == nullptr) { + process_errors(tcp); + zerocopy_send_record = tcp->tcp_zerocopy_send_ctx.GetSendRecord(); + } + if (zerocopy_send_record != nullptr) { + zerocopy_send_record->PrepareForSends(buf); + GPR_DEBUG_ASSERT(buf->count == 0); + GPR_DEBUG_ASSERT(buf->length == 0); + tcp->outgoing_byte_idx = 0; + tcp->outgoing_buffer = nullptr; + } + } + return zerocopy_send_record; +} + +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) { + tcp->tcp_zerocopy_send_ctx.Shutdown(); + while (!tcp->tcp_zerocopy_send_ctx.AllSendRecordsEmpty()) { + process_errors(tcp); + } +} static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length) { + ssize_t* sent_length, + int additional_flags) { if (!tcp->socket_ts_enabled) { uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, @@ -708,7 +1021,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ - ssize_t length = tcp_send(tcp->fd, msg); + ssize_t length = tcp_send(tcp->fd, msg, additional_flags); *sent_length = length; /* Only save timestamps if all the bytes were taken by sendmsg. */ if (sending_length == static_cast(length)) { @@ -722,6 +1035,43 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, return true; } +static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp, + TcpZerocopySendRecord* record, + uint32_t seq, const char* tag); +// Reads \a cmsg to process zerocopy control messages. +static void process_zerocopy(grpc_tcp* tcp, struct cmsghdr* cmsg) { + GPR_DEBUG_ASSERT(cmsg); + auto serr = reinterpret_cast(CMSG_DATA(cmsg)); + GPR_DEBUG_ASSERT(serr->ee_errno == 0); + GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); + const uint32_t lo = serr->ee_info; + const uint32_t hi = serr->ee_data; + for (uint32_t seq = lo; seq <= hi; ++seq) { + // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence + // numbers that are generated by a single call to grpc_endpoint_write; ie. + // we can batch the unref operation. So, check if record is the same for + // both; if so, batch the unref/put. + TcpZerocopySendRecord* record = + tcp->tcp_zerocopy_send_ctx.ReleaseSendRecord(seq); + GPR_DEBUG_ASSERT(record); + UnrefMaybePutZerocopySendRecord(tcp, record, seq, "CALLBACK RCVD"); + } +} + +// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels. +static bool CmsgIsIpLevel(const cmsghdr& cmsg) { + return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) || + (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR); +} + +static bool CmsgIsZeroCopy(const cmsghdr& cmsg) { + if (!CmsgIsIpLevel(cmsg)) { + return false; + } + auto serr = reinterpret_cast CMSG_DATA(&cmsg); + return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY; +} + /** Reads \a cmsg to derive timestamps from the control messages. If a valid * timestamp is found, the traced buffer list is updated with this timestamp. * The caller of this function should be looping on the control messages found @@ -783,73 +1133,76 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, /** For linux platforms, reads the socket's error queue and processes error * messages from the queue. */ -static void process_errors(grpc_tcp* tcp) { +static bool process_errors(grpc_tcp* tcp) { + bool processed_err = false; + struct iovec iov; + iov.iov_base = nullptr; + iov.iov_len = 0; + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + /* Allocate enough space so we don't need to keep increasing this as size + * of OPT_STATS increase */ + constexpr size_t cmsg_alloc_space = + CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + + CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); + /* Allocate aligned space for cmsgs received along with timestamps */ + union { + char rbuf[cmsg_alloc_space]; + struct cmsghdr align; + } aligned_buf; + msg.msg_control = aligned_buf.rbuf; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + int r, saved_errno; while (true) { - struct iovec iov; - iov.iov_base = nullptr; - iov.iov_len = 0; - struct msghdr msg; - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = &iov; - msg.msg_iovlen = 0; - msg.msg_flags = 0; - - /* Allocate enough space so we don't need to keep increasing this as size - * of OPT_STATS increase */ - constexpr size_t cmsg_alloc_space = - CMSG_SPACE(sizeof(grpc_core::scm_timestamping)) + - CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + - CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); - /* Allocate aligned space for cmsgs received along with timestamps */ - union { - char rbuf[cmsg_alloc_space]; - struct cmsghdr align; - } aligned_buf; - memset(&aligned_buf, 0, sizeof(aligned_buf)); - - msg.msg_control = aligned_buf.rbuf; - msg.msg_controllen = sizeof(aligned_buf.rbuf); - - int r, saved_errno; do { r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); saved_errno = errno; } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - return; /* No more errors to process */ + return processed_err; /* No more errors to process */ } if (r == -1) { - return; + return processed_err; } - if ((msg.msg_flags & MSG_CTRUNC) != 0) { + if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) { gpr_log(GPR_ERROR, "Error message was truncated."); } if (msg.msg_controllen == 0) { /* There was no control message found. It was probably spurious. */ - return; + return processed_err; } bool seen = false; for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level != SOL_SOCKET || - cmsg->cmsg_type != SCM_TIMESTAMPING) { - /* Got a control message that is not a timestamp. Don't know how to - * handle this. */ + if (CmsgIsZeroCopy(*cmsg)) { + process_zerocopy(tcp, cmsg); + seen = true; + processed_err = true; + } else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_TIMESTAMPING) { + cmsg = process_timestamp(tcp, &msg, cmsg); + seen = true; + processed_err = true; + } else { + /* Got a control message that is not a timestamp or zerocopy. Don't know + * how to handle this. */ if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "unknown control message cmsg_level:%d cmsg_type:%d", cmsg->cmsg_level, cmsg->cmsg_type); } - return; + return processed_err; } - cmsg = process_timestamp(tcp, &msg, cmsg); - seen = true; } if (!seen) { - return; + return processed_err; } } } @@ -870,18 +1223,28 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { /* We are still interested in collecting timestamps, so let's try reading * them. */ - process_errors(tcp); + bool processed = process_errors(tcp); /* This might not a timestamps error. Set the read and write closures to be * ready. */ - grpc_fd_set_readable(tcp->em_fd); - grpc_fd_set_writable(tcp->em_fd); + if (!processed) { + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); + } grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); } #else /* GRPC_LINUX_ERRQUEUE */ +static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( + grpc_tcp* tcp, grpc_slice_buffer* buf) { + return nullptr; +} + +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {} + static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, size_t /*sending_length*/, - ssize_t* /*sent_length*/) { + ssize_t* /*sent_length*/, + int /*additional_flags*/) { gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); GPR_ASSERT(0); return false; @@ -907,12 +1270,138 @@ void tcp_shutdown_buffer_list(grpc_tcp* tcp) { } } -/* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX #else #define MAX_WRITE_IOVEC 1000 #endif +msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx, + size_t* unwind_byte_idx, + size_t* sending_length, + iovec* iov) { + msg_iovlen_type iov_size; + *unwind_slice_idx = out_offset_.slice_idx; + *unwind_byte_idx = out_offset_.byte_idx; + for (iov_size = 0; + out_offset_.slice_idx != buf_.count && iov_size != MAX_WRITE_IOVEC; + iov_size++) { + iov[iov_size].iov_base = + GRPC_SLICE_START_PTR(buf_.slices[out_offset_.slice_idx]) + + out_offset_.byte_idx; + iov[iov_size].iov_len = + GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]) - + out_offset_.byte_idx; + *sending_length += iov[iov_size].iov_len; + ++(out_offset_.slice_idx); + out_offset_.byte_idx = 0; + } + GPR_DEBUG_ASSERT(iov_size > 0); + return iov_size; +} + +void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length, + size_t actually_sent) { + size_t trailing = sending_length - actually_sent; + while (trailing > 0) { + size_t slice_length; + out_offset_.slice_idx--; + slice_length = GRPC_SLICE_LENGTH(buf_.slices[out_offset_.slice_idx]); + if (slice_length > trailing) { + out_offset_.byte_idx = slice_length - trailing; + break; + } else { + trailing -= slice_length; + } + } +} + +// returns true if done, false if pending; if returning true, *error is set +static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, + grpc_error** error) { + struct msghdr msg; + struct iovec iov[MAX_WRITE_IOVEC]; + msg_iovlen_type iov_size; + ssize_t sent_length = 0; + size_t sending_length; + size_t unwind_slice_idx; + size_t unwind_byte_idx; + while (true) { + sending_length = 0; + iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx, + &sending_length, iov); + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_flags = 0; + bool tried_sending_message = false; + // Before calling sendmsg (with or without timestamps): we + // take a single ref on the zerocopy send record. + tcp->tcp_zerocopy_send_ctx.NoteSend(record); + if (tcp->outgoing_buffer_arg != nullptr) { + if (!tcp->ts_capable || + !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + MSG_ZEROCOPY)) { + /* We could not set socket options to collect Fathom timestamps. + * Fallback on writing without timestamps. */ + tcp->ts_capable = false; + tcp_shutdown_buffer_list(tcp); + } else { + tried_sending_message = true; + } + } + if (!tried_sending_message) { + msg.msg_control = nullptr; + msg.msg_controllen = 0; + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY); + } + if (sent_length < 0) { + // If this particular send failed, drop ref taken earlier in this method. + tcp->tcp_zerocopy_send_ctx.UndoSend(); + if (errno == EAGAIN) { + record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx); + return false; + } else if (errno == EPIPE) { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + tcp_shutdown_buffer_list(tcp); + return true; + } else { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + tcp_shutdown_buffer_list(tcp); + return true; + } + } + tcp->bytes_counter += sent_length; + record->UpdateOffsetForBytesSent(sending_length, + static_cast(sent_length)); + if (record->AllSlicesSent()) { + *error = GRPC_ERROR_NONE; + return true; + } + } +} + +static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp, + TcpZerocopySendRecord* record, + uint32_t seq, const char* tag) { + if (record->Unref()) { + tcp->tcp_zerocopy_send_ctx.PutSendRecord(record); + } +} + +static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, + grpc_error** error) { + bool done = do_tcp_flush_zerocopy(tcp, record, error); + if (done) { + // Either we encountered an error, or we successfully sent all the bytes. + // In either case, we're done with this record. + UnrefMaybePutZerocopySendRecord(tcp, record, 0, "flush_done"); + } + return done; +} + static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -927,7 +1416,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { // buffer as we write size_t outgoing_slice_idx = 0; - for (;;) { + while (true) { sending_length = 0; unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; @@ -1027,12 +1516,21 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; + if (tcp->current_zerocopy_send != nullptr) { + UnrefMaybePutZerocopySendRecord(tcp, tcp->current_zerocopy_send, 0, + "handle_write_err"); + tcp->current_zerocopy_send = nullptr; + } grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "write"); return; } - if (!tcp_flush(tcp, &error)) { + bool flush_result = + tcp->current_zerocopy_send != nullptr + ? tcp_flush_zerocopy(tcp, tcp->current_zerocopy_send, &error) + : tcp_flush(tcp, &error); + if (!flush_result) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "write: delayed"); } @@ -1042,6 +1540,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { } else { cb = tcp->write_cb; tcp->write_cb = nullptr; + tcp->current_zerocopy_send = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); @@ -1057,6 +1556,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast(ep); grpc_error* error = GRPC_ERROR_NONE; + TcpZerocopySendRecord* zerocopy_send_record = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { size_t i; @@ -1073,8 +1573,8 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } GPR_ASSERT(tcp->write_cb == nullptr); + GPR_DEBUG_ASSERT(tcp->current_zerocopy_send == nullptr); - tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { grpc_core::Closure::Run( DEBUG_LOCATION, cb, @@ -1085,15 +1585,26 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, tcp_shutdown_buffer_list(tcp); return; } - tcp->outgoing_buffer = buf; - tcp->outgoing_byte_idx = 0; + + zerocopy_send_record = tcp_get_send_zerocopy_record(tcp, buf); + if (zerocopy_send_record == nullptr) { + // Either not enough bytes, or couldn't allocate a zerocopy context. + tcp->outgoing_buffer = buf; + tcp->outgoing_byte_idx = 0; + } + tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } - if (!tcp_flush(tcp, &error)) { + bool flush_result = + zerocopy_send_record != nullptr + ? tcp_flush_zerocopy(tcp, zerocopy_send_record, &error) + : tcp_flush(tcp, &error); + if (!flush_result) { TCP_REF(tcp, "write"); tcp->write_cb = cb; + tcp->current_zerocopy_send = zerocopy_send_record; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "write: delayed"); } @@ -1121,6 +1632,7 @@ static void tcp_add_to_pollset_set(grpc_endpoint* ep, static void tcp_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { grpc_tcp* tcp = reinterpret_cast(ep); + ZerocopyDisableAndWaitForRemaining(tcp); grpc_pollset_set_del_fd(pollset_set, tcp->em_fd); } @@ -1172,9 +1684,15 @@ static const grpc_endpoint_vtable vtable = {tcp_read, grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, const grpc_channel_args* channel_args, const char* peer_string) { + static constexpr bool kZerocpTxEnabledDefault = false; int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; int tcp_max_read_chunk_size = 4 * 1024 * 1024; int tcp_min_read_chunk_size = 256; + bool tcp_tx_zerocopy_enabled = kZerocpTxEnabledDefault; + int tcp_tx_zerocopy_send_bytes_thresh = + grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold; + int tcp_tx_zerocopy_max_simult_sends = + grpc_core::TcpZerocopySendCtx::kDefaultMaxSends; grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr); if (channel_args != nullptr) { for (size_t i = 0; i < channel_args->num_args; i++) { @@ -1199,6 +1717,23 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, resource_quota = grpc_resource_quota_ref_internal(static_cast( channel_args->args[i].value.pointer.p)); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) { + tcp_tx_zerocopy_enabled = grpc_channel_arg_get_bool( + &channel_args->args[i], kZerocpTxEnabledDefault); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)) { + grpc_integer_options options = { + grpc_core::TcpZerocopySendCtx::kDefaultSendBytesThreshold, 0, + INT_MAX}; + tcp_tx_zerocopy_send_bytes_thresh = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)) { + grpc_integer_options options = { + grpc_core::TcpZerocopySendCtx::kDefaultMaxSends, 0, INT_MAX}; + tcp_tx_zerocopy_max_simult_sends = + grpc_channel_arg_get_integer(&channel_args->args[i], options); } } } @@ -1215,6 +1750,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->fd = grpc_fd_wrapped_fd(em_fd); tcp->read_cb = nullptr; tcp->write_cb = nullptr; + tcp->current_zerocopy_send = nullptr; tcp->release_fd_cb = nullptr; tcp->release_fd = nullptr; tcp->incoming_buffer = nullptr; @@ -1228,6 +1764,20 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->socket_ts_enabled = false; tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; + new (&tcp->tcp_zerocopy_send_ctx) TcpZerocopySendCtx( + tcp_tx_zerocopy_max_simult_sends, tcp_tx_zerocopy_send_bytes_thresh); + if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) { +#ifdef GRPC_LINUX_ERRQUEUE + const int enable = 1; + auto err = + setsockopt(tcp->fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); + if (err == 0) { + tcp->tcp_zerocopy_send_ctx.set_enabled(true); + } else { + gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket."); + } +#endif + } /* paired with unref in grpc_tcp_destroy */ new (&tcp->refcount) grpc_core::RefCount(1, &grpc_tcp_trace); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1294,6 +1844,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + ZerocopyDisableAndWaitForRemaining(tcp); gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index ee1cd5c1027..da18cc39c51 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -157,6 +157,14 @@ grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server* s, int fd, if (err != GRPC_ERROR_NONE) goto error; } +#ifdef GRPC_LINUX_ERRQUEUE + err = grpc_set_socket_zerocopy(fd); + if (err != GRPC_ERROR_NONE) { + /* it's not fatal, so just log it. */ + gpr_log(GPR_DEBUG, "Node does not support SO_ZEROCOPY, continuing."); + GRPC_ERROR_UNREF(err); + } +#endif err = grpc_set_socket_nonblocking(fd, 1); if (err != GRPC_ERROR_NONE) goto error; err = grpc_set_socket_cloexec(fd, 1);