From 5e1d10ac9b7166e9cdeaf3b0950b0974f5189189 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Mon, 6 Jun 2022 22:53:17 +0000 Subject: [PATCH] Save ENOBUFS errno correctly in tcp_posix for subsequent handling --- src/core/lib/iomgr/tcp_posix.cc | 44 ++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 0526fc5370f..ed74a339012 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -945,14 +945,15 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, /* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number * of bytes sent. */ -ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { +ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno, + int additional_flags = 0) { GPR_TIMER_SCOPE("sendmsg", 1); ssize_t sent_length; do { /* TODO(klempner): Cork if this is a partial write */ GRPC_STATS_INC_SYSCALL_WRITE(); sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); - } while (sent_length < 0 && errno == EINTR); + } while (sent_length < 0 && (*saved_errno = errno) == EINTR); return sent_length; } @@ -964,7 +965,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, + ssize_t* sent_length, int* saved_errno, int additional_flags = 0); /** The callback function to be invoked when we get an error on the socket. */ @@ -1008,7 +1009,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) { static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, + ssize_t* sent_length, int* saved_errno, int additional_flags) { if (!tcp->socket_ts_enabled) { uint32_t opt = grpc_core::kTimestampingSocketOptions; @@ -1037,7 +1038,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ - ssize_t length = tcp_send(tcp->fd, msg, additional_flags); + ssize_t length = tcp_send(tcp->fd, msg, saved_errno, additional_flags); *sent_length = length; /* Only save timestamps if all the bytes were taken by sendmsg. */ if (sending_length == static_cast(length)) { @@ -1262,6 +1263,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {} static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, size_t /*sending_length*/, ssize_t* /*sent_length*/, + int* /* saved_errno */, int /*additional_flags*/) { gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); GPR_ASSERT(0); @@ -1342,6 +1344,7 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, size_t unwind_slice_idx; size_t unwind_byte_idx; bool tried_sending_message; + int saved_errno; msghdr msg; // iov consumes a large space. Keep it as the last item on the stack to // improve locality. After all, we expect only the first elements of it being @@ -1360,10 +1363,11 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, // Before calling sendmsg (with or without timestamps): we // take a single ref on the zerocopy send record. tcp->tcp_zerocopy_send_ctx.NoteSend(record); + saved_errno = 0; if (tcp->outgoing_buffer_arg != nullptr) { if (!tcp->ts_capable || !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - MSG_ZEROCOPY)) { + &saved_errno, MSG_ZEROCOPY)) { /* We could not set socket options to collect Fathom timestamps. * Fallback on writing without timestamps. */ tcp->ts_capable = false; @@ -1377,20 +1381,23 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, msg.msg_controllen = 0; GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY); + sent_length = tcp_send(tcp->fd, &msg, &saved_errno, MSG_ZEROCOPY); } if (sent_length < 0) { // If this particular send failed, drop ref taken earlier in this method. tcp->tcp_zerocopy_send_ctx.UndoSend(); - if (errno == EAGAIN) { + if (saved_errno == EAGAIN) { record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx); return false; - } else if (errno == EPIPE) { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + } else if (saved_errno == EPIPE) { + *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); tcp_shutdown_buffer_list(tcp); return true; } else { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + if (saved_errno == ENOBUFS) { + gpr_log(GPR_ERROR, "gRPC TCP Tx0cp: ENOBUFS encountered."); + } + *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); tcp_shutdown_buffer_list(tcp); return true; } @@ -1434,6 +1441,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { size_t trailing; size_t unwind_slice_idx; size_t unwind_byte_idx; + int saved_errno; // We always start at zero, because we eagerly unref and trim the slice // buffer as we write @@ -1465,9 +1473,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { msg.msg_iovlen = iov_size; msg.msg_flags = 0; bool tried_sending_message = false; + saved_errno = 0; if (tcp->outgoing_buffer_arg != nullptr) { if (!tcp->ts_capable || - !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { + !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + &saved_errno)) { /* We could not set socket options to collect Fathom timestamps. * Fallback on writing without timestamps. */ tcp->ts_capable = false; @@ -1483,11 +1493,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - sent_length = tcp_send(tcp->fd, &msg); + sent_length = tcp_send(tcp->fd, &msg, &saved_errno); } if (sent_length < 0) { - if (errno == EAGAIN) { + if (saved_errno == EAGAIN) { tcp->outgoing_byte_idx = unwind_byte_idx; // unref all and forget about all slices that have been written to this // point @@ -1495,13 +1505,13 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { grpc_slice_buffer_remove_first(tcp->outgoing_buffer); } return false; - } else if (errno == EPIPE) { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + } else if (saved_errno == EPIPE) { + *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); tcp_shutdown_buffer_list(tcp); return true; } else { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + *error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); tcp_shutdown_buffer_list(tcp); return true;