|
|
|
@ -945,14 +945,15 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, |
|
|
|
|
|
|
|
|
|
/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
|
|
|
|
|
* of bytes sent. */ |
|
|
|
|
ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { |
|
|
|
|
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno, |
|
|
|
|
int additional_flags = 0) { |
|
|
|
|
GPR_TIMER_SCOPE("sendmsg", 1); |
|
|
|
|
ssize_t sent_length; |
|
|
|
|
do { |
|
|
|
|
/* TODO(klempner): Cork if this is a partial write */ |
|
|
|
|
GRPC_STATS_INC_SYSCALL_WRITE(); |
|
|
|
|
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); |
|
|
|
|
} while (sent_length < 0 && errno == EINTR); |
|
|
|
|
} while (sent_length < 0 && (*saved_errno = errno) == EINTR); |
|
|
|
|
return sent_length; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -964,7 +965,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) { |
|
|
|
|
*/ |
|
|
|
|
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, |
|
|
|
|
size_t sending_length, |
|
|
|
|
ssize_t* sent_length, |
|
|
|
|
ssize_t* sent_length, int* saved_errno, |
|
|
|
|
int additional_flags = 0); |
|
|
|
|
|
|
|
|
|
/** The callback function to be invoked when we get an error on the socket. */ |
|
|
|
@ -1008,7 +1009,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) { |
|
|
|
|
|
|
|
|
|
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, |
|
|
|
|
size_t sending_length, |
|
|
|
|
ssize_t* sent_length, |
|
|
|
|
ssize_t* sent_length, int* saved_errno, |
|
|
|
|
int additional_flags) { |
|
|
|
|
if (!tcp->socket_ts_enabled) { |
|
|
|
|
uint32_t opt = grpc_core::kTimestampingSocketOptions; |
|
|
|
@ -1037,7 +1038,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, |
|
|
|
|
msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); |
|
|
|
|
|
|
|
|
|
/* If there was an error on sendmsg the logic in tcp_flush will handle it. */ |
|
|
|
|
ssize_t length = tcp_send(tcp->fd, msg, additional_flags); |
|
|
|
|
ssize_t length = tcp_send(tcp->fd, msg, saved_errno, additional_flags); |
|
|
|
|
*sent_length = length; |
|
|
|
|
/* Only save timestamps if all the bytes were taken by sendmsg. */ |
|
|
|
|
if (sending_length == static_cast<size_t>(length)) { |
|
|
|
@ -1262,6 +1263,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {} |
|
|
|
|
static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, |
|
|
|
|
size_t /*sending_length*/, |
|
|
|
|
ssize_t* /*sent_length*/, |
|
|
|
|
int* /* saved_errno */, |
|
|
|
|
int /*additional_flags*/) { |
|
|
|
|
gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); |
|
|
|
|
GPR_ASSERT(0); |
|
|
|
@ -1342,6 +1344,7 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, |
|
|
|
|
size_t unwind_slice_idx; |
|
|
|
|
size_t unwind_byte_idx; |
|
|
|
|
bool tried_sending_message; |
|
|
|
|
int saved_errno; |
|
|
|
|
msghdr msg; |
|
|
|
|
// iov consumes a large space. Keep it as the last item on the stack to
|
|
|
|
|
// improve locality. After all, we expect only the first elements of it being
|
|
|
|
@ -1360,10 +1363,11 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, |
|
|
|
|
// Before calling sendmsg (with or without timestamps): we
|
|
|
|
|
// take a single ref on the zerocopy send record.
|
|
|
|
|
tcp->tcp_zerocopy_send_ctx.NoteSend(record); |
|
|
|
|
saved_errno = 0; |
|
|
|
|
if (tcp->outgoing_buffer_arg != nullptr) { |
|
|
|
|
if (!tcp->ts_capable || |
|
|
|
|
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, |
|
|
|
|
MSG_ZEROCOPY)) { |
|
|
|
|
&saved_errno, MSG_ZEROCOPY)) { |
|
|
|
|
/* We could not set socket options to collect Fathom timestamps.
|
|
|
|
|
* Fallback on writing without timestamps. */ |
|
|
|
|
tcp->ts_capable = false; |
|
|
|
@ -1377,20 +1381,23 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, |
|
|
|
|
msg.msg_controllen = 0; |
|
|
|
|
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); |
|
|
|
|
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); |
|
|
|
|
sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY); |
|
|
|
|
sent_length = tcp_send(tcp->fd, &msg, &saved_errno, MSG_ZEROCOPY); |
|
|
|
|
} |
|
|
|
|
if (sent_length < 0) { |
|
|
|
|
// If this particular send failed, drop ref taken earlier in this method.
|
|
|
|
|
tcp->tcp_zerocopy_send_ctx.UndoSend(); |
|
|
|
|
if (errno == EAGAIN) { |
|
|
|
|
if (saved_errno == EAGAIN) { |
|
|
|
|
record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx); |
|
|
|
|
return false; |
|
|
|
|
} else if (errno == EPIPE) { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); |
|
|
|
|
} else if (saved_errno == EPIPE) { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); |
|
|
|
|
tcp_shutdown_buffer_list(tcp); |
|
|
|
|
return true; |
|
|
|
|
} else { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); |
|
|
|
|
if (saved_errno == ENOBUFS) { |
|
|
|
|
gpr_log(GPR_ERROR, "gRPC TCP Tx0cp: ENOBUFS encountered."); |
|
|
|
|
} |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); |
|
|
|
|
tcp_shutdown_buffer_list(tcp); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
@ -1434,6 +1441,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
size_t trailing; |
|
|
|
|
size_t unwind_slice_idx; |
|
|
|
|
size_t unwind_byte_idx; |
|
|
|
|
int saved_errno; |
|
|
|
|
|
|
|
|
|
// We always start at zero, because we eagerly unref and trim the slice
|
|
|
|
|
// buffer as we write
|
|
|
|
@ -1465,9 +1473,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
msg.msg_iovlen = iov_size; |
|
|
|
|
msg.msg_flags = 0; |
|
|
|
|
bool tried_sending_message = false; |
|
|
|
|
saved_errno = 0; |
|
|
|
|
if (tcp->outgoing_buffer_arg != nullptr) { |
|
|
|
|
if (!tcp->ts_capable || |
|
|
|
|
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { |
|
|
|
|
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, |
|
|
|
|
&saved_errno)) { |
|
|
|
|
/* We could not set socket options to collect Fathom timestamps.
|
|
|
|
|
* Fallback on writing without timestamps. */ |
|
|
|
|
tcp->ts_capable = false; |
|
|
|
@ -1483,11 +1493,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); |
|
|
|
|
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); |
|
|
|
|
|
|
|
|
|
sent_length = tcp_send(tcp->fd, &msg); |
|
|
|
|
sent_length = tcp_send(tcp->fd, &msg, &saved_errno); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (sent_length < 0) { |
|
|
|
|
if (errno == EAGAIN) { |
|
|
|
|
if (saved_errno == EAGAIN) { |
|
|
|
|
tcp->outgoing_byte_idx = unwind_byte_idx; |
|
|
|
|
// unref all and forget about all slices that have been written to this
|
|
|
|
|
// point
|
|
|
|
@ -1495,13 +1505,13 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { |
|
|
|
|
grpc_slice_buffer_remove_first(tcp->outgoing_buffer); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} else if (errno == EPIPE) { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); |
|
|
|
|
} else if (saved_errno == EPIPE) { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); |
|
|
|
|
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); |
|
|
|
|
tcp_shutdown_buffer_list(tcp); |
|
|
|
|
return true; |
|
|
|
|
} else { |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); |
|
|
|
|
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp); |
|
|
|
|
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); |
|
|
|
|
tcp_shutdown_buffer_list(tcp); |
|
|
|
|
return true; |
|
|
|
|