diff --git a/BUILD b/BUILD index 5efc6af32f5..ee73d392136 100644 --- a/BUILD +++ b/BUILD @@ -2779,44 +2779,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "posix_event_engine_endpoint", - srcs = [ - "src/core/lib/event_engine/posix_engine/posix_endpoint.cc", - ], - hdrs = [ - "src/core/lib/event_engine/posix_engine/posix_endpoint.h", - ], - external_deps = [ - "absl/base:core_headers", - "absl/container:flat_hash_map", - "absl/functional:any_invocable", - "absl/hash", - "absl/memory", - "absl/meta:type_traits", - "absl/status", - "absl/status:statusor", - "absl/strings", - "absl/types:optional", - ], - deps = [ - "event_engine_base_hdrs", - "experiments", - "gpr", - "iomgr_port", - "memory_quota", - "posix_event_engine_closure", - "posix_event_engine_event_poller", - "posix_event_engine_internal_errqueue", - "posix_event_engine_tcp_socket_utils", - "posix_event_engine_traced_buffer_list", - "ref_counted", - "ref_counted_ptr", - "resource_quota", - "useful", - ], -) - grpc_cc_library( name = "event_engine_utils", srcs = ["src/core/lib/event_engine/utils.cc"], diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c73090b6fd..30b4d211fdd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1081,9 +1081,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx pipe_test) add_dependencies(buildtests_cxx poll_test) add_dependencies(buildtests_cxx port_sharing_end2end_test) - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - add_dependencies(buildtests_cxx posix_endpoint_test) - endif() add_dependencies(buildtests_cxx posix_event_engine_test) add_dependencies(buildtests_cxx promise_factory_test) add_dependencies(buildtests_cxx promise_map_test) @@ -9435,7 +9432,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc test/core/event_engine/posix/event_poller_posix_test.cc - test/core/event_engine/posix/posix_engine_test_utils.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) @@ -14509,60 +14505,6 @@ target_link_libraries(port_sharing_end2end_test ) -endif() -if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - - add_executable(posix_endpoint_test - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc - src/core/lib/event_engine/posix_engine/ev_poll_posix.cc - src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc - src/core/lib/event_engine/posix_engine/internal_errqueue.cc - src/core/lib/event_engine/posix_engine/lockfree_event.cc - src/core/lib/event_engine/posix_engine/posix_endpoint.cc - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - src/core/lib/iomgr/socket_mutator.cc - test/core/event_engine/posix/posix_endpoint_test.cc - test/core/event_engine/posix/posix_engine_test_utils.cc - test/core/event_engine/test_suite/event_engine_test.cc - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/oracle_event_engine_posix.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc - ) - - target_include_directories(posix_endpoint_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} - ) - - target_link_libraries(posix_endpoint_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - absl::cleanup - grpc_test_util - ) - - -endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 9c4e382d9a5..869c9eba8e1 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5738,7 +5738,6 @@ targets: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - - test/core/event_engine/posix/posix_engine_test_utils.h src: - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc - src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -5748,7 +5747,6 @@ targets: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - test/core/event_engine/posix/event_poller_posix_test.cc - - test/core/event_engine/posix/posix_engine_test_utils.cc deps: - grpc_test_util platforms: @@ -8270,56 +8268,6 @@ targets: - test/cpp/end2end/test_service_impl.cc deps: - grpc++_test_util -- name: posix_endpoint_test - gtest: true - build: test - language: c++ - headers: - - src/core/lib/event_engine/common_closures.h - - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - - src/core/lib/event_engine/posix_engine/event_poller.h - - src/core/lib/event_engine/posix_engine/event_poller_posix_default.h - - src/core/lib/event_engine/posix_engine/internal_errqueue.h - - src/core/lib/event_engine/posix_engine/lockfree_event.h - - src/core/lib/event_engine/posix_engine/posix_endpoint.h - - src/core/lib/event_engine/posix_engine/posix_engine_closure.h - - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h - - src/core/lib/event_engine/posix_engine/traced_buffer_list.h - - src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h - - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h - - src/core/lib/iomgr/socket_mutator.h - - test/core/event_engine/posix/posix_engine_test_utils.h - - test/core/event_engine/test_suite/event_engine_test.h - - test/core/event_engine/test_suite/event_engine_test_utils.h - - test/core/event_engine/test_suite/oracle_event_engine_posix.h - src: - - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc - - src/core/lib/event_engine/posix_engine/ev_poll_posix.cc - - src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc - - src/core/lib/event_engine/posix_engine/internal_errqueue.cc - - src/core/lib/event_engine/posix_engine/lockfree_event.cc - - src/core/lib/event_engine/posix_engine/posix_endpoint.cc - - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc - - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc - - src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc - - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - - src/core/lib/iomgr/socket_mutator.cc - - test/core/event_engine/posix/posix_endpoint_test.cc - - test/core/event_engine/posix/posix_engine_test_utils.cc - - test/core/event_engine/test_suite/event_engine_test.cc - - test/core/event_engine/test_suite/event_engine_test_utils.cc - - test/core/event_engine/test_suite/oracle_event_engine_posix.cc - deps: - - absl/cleanup:cleanup - - grpc_test_util - platforms: - - linux - - posix - - mac - name: posix_event_engine_test gtest: true build: test diff --git a/include/grpc/event_engine/slice_buffer.h b/include/grpc/event_engine/slice_buffer.h index e3a75898b9a..3861717e7b6 100644 --- a/include/grpc/event_engine/slice_buffer.h +++ b/include/grpc/event_engine/slice_buffer.h @@ -26,7 +26,6 @@ #include "absl/utility/utility.h" #include -#include #include #include #include @@ -68,11 +67,6 @@ class SliceBuffer { return *this; } - /// Swap the contents of this SliceBuffer with the contents of another. - void Swap(SliceBuffer& other) { - grpc_slice_buffer_swap(&slice_buffer_, &other.slice_buffer_); - } - /// Appends a new slice into the SliceBuffer and makes an attempt to merge /// this slice with the last slice in the SliceBuffer. void Append(Slice slice); @@ -94,17 +88,6 @@ class SliceBuffer { grpc_slice_buffer_move_first_into_buffer(&slice_buffer_, n, dst); } - /// Removes/deletes the last n bytes in the SliceBuffer and add it to the - /// other SliceBuffer - void MoveLastNBytesIntoSliceBuffer(size_t n, SliceBuffer& other) { - grpc_slice_buffer_trim_end(&slice_buffer_, n, &other.slice_buffer_); - } - - /// Move the first n bytes of the SliceBuffer into the other SliceBuffer - void MoveFirstNBytesIntoSliceBuffer(size_t n, SliceBuffer& other) { - grpc_slice_buffer_move_first(&slice_buffer_, n, &other.slice_buffer_); - } - /// Removes and unrefs all slices in the SliceBuffer. void Clear() { grpc_slice_buffer_reset_and_unref(&slice_buffer_); } diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index ac4072492d9..b9b15200f11 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -91,7 +91,7 @@ class Epoll1EventHandle : public EventHandle { pending_write_.store(false, std::memory_order_relaxed); pending_error_.store(false, std::memory_order_relaxed); } - Epoll1Poller* Poller() override { return poller_; } + Epoll1Poller* Poller() { return poller_; } bool SetPendingActions(bool pending_read, bool pending_write, bool pending_error) { // Another thread may be executing ExecutePendingActions() at this point diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h index f3a9268a8ec..d1becd525c1 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h @@ -57,7 +57,6 @@ class Epoll1Poller : public PosixEventPoller { void Kick() override; Scheduler* GetScheduler() { return scheduler_; } void Shutdown() override; - bool CanTrackErrors() const override { return true; } ~Epoll1Poller() override; private: diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index add5c20c2c3..db6256ea46a 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -103,7 +103,7 @@ class PollEventHandle : public EventHandle { absl::MutexLock lock(&poller_->mu_); poller_->PollerHandlesListAddHandle(this); } - PollPoller* Poller() override { return poller_; } + PollPoller* Poller() { return poller_; } bool SetPendingActions(bool pending_read, bool pending_write) { pending_actions_ |= pending_read; if (pending_write) { diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h index b1797d38b6f..53557d9d96f 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h @@ -51,7 +51,6 @@ class PollPoller : public PosixEventPoller { void Kick() override; Scheduler* GetScheduler() { return scheduler_; } void Shutdown() override; - bool CanTrackErrors() const override { return false; } ~PollPoller() override; private: diff --git a/src/core/lib/event_engine/posix_engine/event_poller.h b/src/core/lib/event_engine/posix_engine/event_poller.h index a68c23e29d4..3a742a60b38 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller.h +++ b/src/core/lib/event_engine/posix_engine/event_poller.h @@ -37,8 +37,6 @@ class Scheduler { virtual ~Scheduler() = default; }; -class PosixEventPoller; - class EventHandle { public: virtual int WrappedFd() = 0; @@ -81,8 +79,6 @@ class EventHandle { virtual void SetHasError() = 0; // Returns true if the handle has been shutdown. virtual bool IsHandleShutdown() = 0; - // Returns the poller which was used to create this handle. - virtual PosixEventPoller* Poller() = 0; virtual ~EventHandle() = default; }; @@ -91,7 +87,6 @@ class PosixEventPoller : public grpc_event_engine::experimental::Poller { // Return an opaque handle to perform actions on the provided file descriptor. virtual EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) = 0; - virtual bool CanTrackErrors() const = 0; virtual std::string Name() = 0; // Shuts down and deletes the poller. It is legal to call this function // only when no other poller method is in progress. For instance, it is diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc deleted file mode 100644 index da590c681a0..00000000000 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ /dev/null @@ -1,1144 +0,0 @@ -// Copyright 2022 gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h" - -#include -#include - -#include -#include -#include -#include - -#include "absl/functional/any_invocable.h" -#include "absl/memory/memory.h" -#include "absl/status/status.h" -#include "absl/status/statusor.h" -#include "absl/strings/str_cat.h" -#include "absl/types/optional.h" - -#include -#include -#include -#include -#include - -#include "src/core/lib/event_engine/posix_engine/event_poller.h" -#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" -#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" -#include "src/core/lib/experiments/experiments.h" -#include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/resource_quota/resource_quota.h" - -#ifdef GRPC_POSIX_SOCKET_TCP -#ifdef GRPC_LINUX_ERRQUEUE -#include // IWYU pragma: keep -#include // IWYU pragma: keep -#endif -#include // IWYU pragma: keep - -#ifndef SOL_TCP -#define SOL_TCP IPPROTO_TCP -#endif - -#ifndef TCP_INQ -#define TCP_INQ 36 -#define TCP_CM_INQ TCP_INQ -#endif - -#ifdef GRPC_HAVE_MSG_NOSIGNAL -#define SENDMSG_FLAGS MSG_NOSIGNAL -#else -#define SENDMSG_FLAGS 0 -#endif - -// TCP zero copy sendmsg flag. -// NB: We define this here as a fallback in case we're using an older set of -// library headers that has not defined MSG_ZEROCOPY. Since this constant is -// part of the kernel, we are guaranteed it will never change/disagree so -// defining it here is safe. -#ifndef MSG_ZEROCOPY -#define MSG_ZEROCOPY 0x4000000 -#endif - -#define MAX_READ_IOVEC 64 - -namespace grpc_event_engine { -namespace posix_engine { - -namespace { - -using ::grpc_event_engine::experimental::EndpointConfig; -using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::Slice; -using ::grpc_event_engine::experimental::SliceBuffer; - -// A wrapper around sendmsg. It sends \a msg over \a fd and returns the number -// of bytes sent. -ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno, - int additional_flags = 0) { - ssize_t sent_length; - do { - sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags); - } while (sent_length < 0 && (*saved_errno = errno) == EINTR); - return sent_length; -} - -#ifdef GRPC_LINUX_ERRQUEUE -// Whether the cmsg received from error queue is of the IPv4 or IPv6 levels. -bool CmsgIsIpLevel(const cmsghdr& cmsg) { - return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) || - (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR); -} - -bool CmsgIsZeroCopy(const cmsghdr& cmsg) { - if (!CmsgIsIpLevel(cmsg)) { - return false; - } - auto serr = reinterpret_cast CMSG_DATA(&cmsg); - return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY; -} -#endif // GRPC_LINUX_ERRQUEUE - -} // namespace - -#if defined(IOV_MAX) && IOV_MAX < 260 -#define MAX_WRITE_IOVEC IOV_MAX -#else -#define MAX_WRITE_IOVEC 260 -#endif -msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx, - size_t* unwind_byte_idx, - size_t* sending_length, - iovec* iov) { - msg_iovlen_type iov_size; - *unwind_slice_idx = out_offset_.slice_idx; - *unwind_byte_idx = out_offset_.byte_idx; - for (iov_size = 0; - out_offset_.slice_idx != buf_.Count() && iov_size != MAX_WRITE_IOVEC; - iov_size++) { - auto slice = buf_.RefSlice(out_offset_.slice_idx); - iov[iov_size].iov_base = - const_cast(slice.begin()) + out_offset_.byte_idx; - iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx; - *sending_length += iov[iov_size].iov_len; - ++(out_offset_.slice_idx); - out_offset_.byte_idx = 0; - } - GPR_DEBUG_ASSERT(iov_size > 0); - return iov_size; -} - -void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length, - size_t actually_sent) { - size_t trailing = sending_length - actually_sent; - while (trailing > 0) { - size_t slice_length; - out_offset_.slice_idx--; - slice_length = buf_.RefSlice(out_offset_.slice_idx).length(); - if (slice_length > trailing) { - out_offset_.byte_idx = slice_length - trailing; - break; - } else { - trailing -= slice_length; - } - } -} - -void PosixEndpointImpl::AddToEstimate(size_t bytes) { - bytes_read_this_round_ += static_cast(bytes); -} - -void PosixEndpointImpl::FinishEstimate() { - // If we read >80% of the target buffer in one read loop, increase the size of - // the target buffer to either the amount read, or twice its previous value. - if (bytes_read_this_round_ > target_length_ * 0.8) { - target_length_ = std::max(2 * target_length_, bytes_read_this_round_); - } else { - target_length_ = 0.99 * target_length_ + 0.01 * bytes_read_this_round_; - } - bytes_read_this_round_ = 0; -} - -// Returns true if data available to read or error other than EAGAIN. -bool PosixEndpointImpl::TcpDoRead(absl::Status& status) { - struct msghdr msg; - struct iovec iov[MAX_READ_IOVEC]; - ssize_t read_bytes; - size_t total_read_bytes = 0; - size_t iov_len = std::min(MAX_READ_IOVEC, incoming_buffer_->Count()); -#ifdef GRPC_LINUX_ERRQUEUE - constexpr size_t cmsg_alloc_space = - CMSG_SPACE(sizeof(scm_timestamping)) + CMSG_SPACE(sizeof(int)); -#else - constexpr size_t cmsg_alloc_space = 24; // CMSG_SPACE(sizeof(int)) -#endif // GRPC_LINUX_ERRQUEUE - char cmsgbuf[cmsg_alloc_space]; - for (size_t i = 0; i < iov_len; i++) { - Slice slice = incoming_buffer_->RefSlice(i); - iov[i].iov_base = const_cast(slice.begin()); - iov[i].iov_len = slice.length(); - } - - GPR_ASSERT(incoming_buffer_->Length() != 0); - GPR_DEBUG_ASSERT(min_progress_size_ > 0); - - do { - // Assume there is something on the queue. If we receive TCP_INQ from - // kernel, we will update this value, otherwise, we have to assume there is - // always something to read until we get EAGAIN. - inq_ = 1; - - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = static_cast(iov_len); - if (inq_capable_) { - msg.msg_control = cmsgbuf; - msg.msg_controllen = sizeof(cmsgbuf); - } else { - msg.msg_control = nullptr; - msg.msg_controllen = 0; - } - msg.msg_flags = 0; - - do { - read_bytes = recvmsg(fd_, &msg, 0); - } while (read_bytes < 0 && errno == EINTR); - - // We have read something in previous reads. We need to deliver those bytes - // to the upper layer. - if (read_bytes <= 0 && - total_read_bytes >= static_cast(min_progress_size_)) { - inq_ = 1; - break; - } - - if (read_bytes < 0) { - // NB: After calling call_read_cb a parallel call of the read handler may - // be running. - if (errno == EAGAIN) { - if (total_read_bytes > 0) { - break; - } - FinishEstimate(); - inq_ = 0; - return false; - } else { - incoming_buffer_->Clear(); - status = - absl::InternalError(absl::StrCat("recvmsg:", std::strerror(errno))); - return true; - } - } - if (read_bytes == 0) { - // 0 read size ==> end of stream - // - // We may have read something, i.e., total_read_bytes > 0, but since the - // connection is closed we will drop the data here, because we can't call - // the callback multiple times. - incoming_buffer_->Clear(); - status = absl::InternalError("Socket closed"); - return true; - } - - AddToEstimate(static_cast(read_bytes)); - GPR_DEBUG_ASSERT((size_t)read_bytes <= - incoming_buffer_->Length() - total_read_bytes); - -#ifdef GRPC_HAVE_TCP_INQ - if (inq_capable_) { - GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC)); - struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); - for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ && - cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { - inq_ = *reinterpret_cast(CMSG_DATA(cmsg)); - break; - } - } - } -#endif // GRPC_HAVE_TCP_INQ - - total_read_bytes += read_bytes; - if (inq_ == 0 || total_read_bytes == incoming_buffer_->Length()) { - break; - } - - // We had a partial read, and still have space to read more data. So, adjust - // IOVs and try to read more. - size_t remaining = read_bytes; - size_t j = 0; - for (size_t i = 0; i < iov_len; i++) { - if (remaining >= iov[i].iov_len) { - remaining -= iov[i].iov_len; - continue; - } - if (remaining > 0) { - iov[j].iov_base = static_cast(iov[i].iov_base) + remaining; - iov[j].iov_len = iov[i].iov_len - remaining; - remaining = 0; - } else { - iov[j].iov_base = iov[i].iov_base; - iov[j].iov_len = iov[i].iov_len; - } - ++j; - } - iov_len = j; - } while (true); - - if (inq_ == 0) { - FinishEstimate(); - } - - GPR_DEBUG_ASSERT(total_read_bytes > 0); - status = absl::OkStatus(); - if (frame_size_tuning_enabled_) { - // Update min progress size based on the total number of bytes read in - // this round. - min_progress_size_ -= total_read_bytes; - if (min_progress_size_ > 0) { - // There is still some bytes left to be read before we can signal - // the read as complete. Append the bytes read so far into - // last_read_buffer which serves as a staging buffer. Return false - // to indicate tcp_handle_read needs to be scheduled again. - incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes, - last_read_buffer_); - return false; - } else { - // The required number of bytes have been read. Append the bytes - // read in this round into last_read_buffer. Then swap last_read_buffer - // and incoming_buffer. Now incoming buffer contains all the bytes - // read since the start of the last tcp_read operation. last_read_buffer - // would contain any spare space left in the incoming buffer. This - // space will be used in the next tcp_read operation. - min_progress_size_ = 1; - incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes, - last_read_buffer_); - incoming_buffer_->Swap(last_read_buffer_); - return true; - } - } - if (total_read_bytes < incoming_buffer_->Length()) { - incoming_buffer_->MoveLastNBytesIntoSliceBuffer( - incoming_buffer_->Length() - total_read_bytes, last_read_buffer_); - // last_read_buffer_.Clear(); - } - return true; -} - -void PosixEndpointImpl::PerformReclamation() { - read_mu_.Lock(); - if (incoming_buffer_ != nullptr) { - incoming_buffer_->Clear(); - } - has_posted_reclaimer_ = false; - read_mu_.Unlock(); -} - -void PosixEndpointImpl::MaybePostReclaimer() { - if (!has_posted_reclaimer_) { - has_posted_reclaimer_ = true; - memory_owner_.PostReclaimer( - grpc_core::ReclamationPass::kBenign, - [this](absl::optional sweep) { - if (!sweep.has_value()) return; - PerformReclamation(); - }); - } -} - -void PosixEndpointImpl::UpdateRcvLowat() { - if (!grpc_core::IsTcpRcvLowatEnabled()) return; - - // TODO(ctiller): Check if supported by OS. - // TODO(ctiller): Allow some adjustments instead of hardcoding things. - - static constexpr int kRcvLowatMax = 16 * 1024 * 1024; - static constexpr int kRcvLowatThreshold = 16 * 1024; - - int remaining = std::min({static_cast(incoming_buffer_->Length()), - kRcvLowatMax, min_progress_size_}); - - // Setting SO_RCVLOWAT for small quantities does not save on CPU. - if (remaining < kRcvLowatThreshold) { - remaining = 0; - } - - // If zerocopy is off, wake shortly before the full RPC is here. More can - // show up partway through recvmsg() since it takes a while to copy data. - // So an early wakeup aids latency. - if (!tcp_zerocopy_send_ctx_->Enabled() && remaining > 0) { - remaining -= kRcvLowatThreshold; - } - - // We still do not know the RPC size. Do not set SO_RCVLOWAT. - if (set_rcvlowat_ <= 1 && remaining <= 1) return; - - // Previous value is still valid. No change needed in SO_RCVLOWAT. - if (set_rcvlowat_ == remaining) { - return; - } - auto result = sock_.SetSocketRcvLowat(remaining); - if (result.ok()) { - set_rcvlowat_ = *result; - } else { - gpr_log(GPR_ERROR, "%s", - absl::StrCat("ERROR in SO_RCVLOWAT: ", result.status().message()) - .c_str()); - } -} - -void PosixEndpointImpl::MaybeMakeReadSlices() { - if (grpc_core::IsTcpReadChunksEnabled()) { - static const int kBigAlloc = 64 * 1024; - static const int kSmallAlloc = 8 * 1024; - if (incoming_buffer_->Length() < static_cast(min_progress_size_)) { - size_t allocate_length = min_progress_size_; - const size_t target_length = static_cast(target_length_); - // If memory pressure is low and we think there will be more than - // min_progress_size bytes to read, allocate a bit more. - const bool low_memory_pressure = - memory_owner_.GetPressureInfo().pressure_control_value < 0.8; - if (low_memory_pressure && target_length > allocate_length) { - allocate_length = target_length; - } - int extra_wanted = - allocate_length - static_cast(incoming_buffer_->Length()); - if (extra_wanted >= - (low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) { - while (extra_wanted > 0) { - extra_wanted -= kBigAlloc; - incoming_buffer_->AppendIndexed( - Slice(memory_owner_.MakeSlice(kBigAlloc))); - } - } else { - while (extra_wanted > 0) { - extra_wanted -= kSmallAlloc; - incoming_buffer_->AppendIndexed( - Slice(memory_owner_.MakeSlice(kSmallAlloc))); - } - } - MaybePostReclaimer(); - } - } else { - if (incoming_buffer_->Length() < static_cast(min_progress_size_) && - incoming_buffer_->Count() < MAX_READ_IOVEC) { - int target_length = - std::max(static_cast(target_length_), min_progress_size_); - int extra_wanted = - target_length - static_cast(incoming_buffer_->Length()); - int min_read_chunk_size = - std::max(min_read_chunk_size_, min_progress_size_); - int max_read_chunk_size = - std::max(max_read_chunk_size_, min_progress_size_); - incoming_buffer_->AppendIndexed( - Slice(memory_owner_.MakeSlice(grpc_core::MemoryRequest( - min_read_chunk_size, - grpc_core::Clamp(extra_wanted, min_read_chunk_size, - max_read_chunk_size))))); - MaybePostReclaimer(); - } - } -} - -void PosixEndpointImpl::HandleRead(absl::Status status) { - read_mu_.Lock(); - if (status.ok()) { - MaybeMakeReadSlices(); - if (!TcpDoRead(status)) { - // We've consumed the edge, request a new one. - read_mu_.Unlock(); - handle_->NotifyOnRead(on_read_); - return; - } - } else { - incoming_buffer_->Clear(); - last_read_buffer_.Clear(); - } - absl::AnyInvocable cb = std::move(read_cb_); - read_cb_ = nullptr; - incoming_buffer_ = nullptr; - read_mu_.Unlock(); - cb(status); - Unref(); -} - -void PosixEndpointImpl::Read(absl::AnyInvocable on_read, - SliceBuffer* buffer, - const EventEngine::Endpoint::ReadArgs* args) { - read_mu_.Lock(); - GPR_ASSERT(read_cb_ == nullptr); - read_cb_ = std::move(on_read); - incoming_buffer_ = buffer; - incoming_buffer_->Clear(); - incoming_buffer_->Swap(last_read_buffer_); - read_mu_.Unlock(); - if (args != nullptr && frame_size_tuning_enabled_) { - min_progress_size_ = args->read_hint_bytes; - } else { - min_progress_size_ = 1; - } - Ref().release(); - if (is_first_read_) { - // Endpoint read called for the very first time. Register read callback - // with the polling engine. - is_first_read_ = false; - handle_->NotifyOnRead(on_read_); - } else if (inq_ == 0) { - // Upper layer asked to read more but we know there is no pending data to - // read from previous reads. So, wait for POLLIN. - handle_->NotifyOnRead(on_read_); - } else { - on_read_->SetStatus(absl::OkStatus()); - engine_->Run(on_read_); - } -} - -#ifdef GRPC_LINUX_ERRQUEUE -TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord( - SliceBuffer& buf) { - TcpZerocopySendRecord* zerocopy_send_record = nullptr; - const bool use_zerocopy = - tcp_zerocopy_send_ctx_->Enabled() && - tcp_zerocopy_send_ctx_->ThresholdBytes() < buf.Length(); - if (use_zerocopy) { - zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord(); - if (zerocopy_send_record == nullptr) { - ProcessErrors(); - zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord(); - } - if (zerocopy_send_record != nullptr) { - zerocopy_send_record->PrepareForSends(buf); - GPR_DEBUG_ASSERT(buf.Count() == 0); - GPR_DEBUG_ASSERT(buf.Length() == 0); - outgoing_byte_idx_ = 0; - outgoing_buffer_ = nullptr; - } - } - return zerocopy_send_record; -} - -// For linux platforms, reads the socket's error queue and processes error -// messages from the queue. -bool PosixEndpointImpl::ProcessErrors() { - bool processed_err = false; - struct iovec iov; - iov.iov_base = nullptr; - iov.iov_len = 0; - struct msghdr msg; - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = &iov; - msg.msg_iovlen = 0; - msg.msg_flags = 0; - // Allocate enough space so we don't need to keep increasing this as size of - // OPT_STATS increase. - constexpr size_t cmsg_alloc_space = - CMSG_SPACE(sizeof(scm_timestamping)) + - CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + - CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); - // Allocate aligned space for cmsgs received along with timestamps. - union { - char rbuf[cmsg_alloc_space]; - struct cmsghdr align; - } aligned_buf; - msg.msg_control = aligned_buf.rbuf; - int r, saved_errno; - while (true) { - msg.msg_controllen = sizeof(aligned_buf.rbuf); - do { - r = recvmsg(fd_, &msg, MSG_ERRQUEUE); - saved_errno = errno; - } while (r < 0 && saved_errno == EINTR); - - if (r < 0 && saved_errno == EAGAIN) { - return processed_err; // No more errors to process - } else if (r < 0) { - return processed_err; - } - if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) { - gpr_log(GPR_ERROR, "Error message was truncated."); - } - - if (msg.msg_controllen == 0) { - // There was no control message found. It was probably spurious. - return processed_err; - } - bool seen = false; - for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; - cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (CmsgIsZeroCopy(*cmsg)) { - ProcessZerocopy(cmsg); - seen = true; - processed_err = true; - } else if (cmsg->cmsg_level == SOL_SOCKET && - cmsg->cmsg_type == SCM_TIMESTAMPING) { - cmsg = ProcessTimestamp(&msg, cmsg); - seen = true; - processed_err = true; - } else { - // Got a control message that is not a timestamp or zerocopy. Don't know - // how to handle this. - return processed_err; - } - } - if (!seen) { - return processed_err; - } - } -} - -void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() { - tcp_zerocopy_send_ctx_->Shutdown(); - while (!tcp_zerocopy_send_ctx_->AllSendRecordsEmpty()) { - ProcessErrors(); - } -} - -// Reads \a cmsg to process zerocopy control messages. -void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) { - GPR_DEBUG_ASSERT(cmsg); - auto serr = reinterpret_cast(CMSG_DATA(cmsg)); - GPR_DEBUG_ASSERT(serr->ee_errno == 0); - GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); - const uint32_t lo = serr->ee_info; - const uint32_t hi = serr->ee_data; - for (uint32_t seq = lo; seq <= hi; ++seq) { - // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence - // numbers that are generated by a single call to grpc_endpoint_write; ie. - // we can batch the unref operation. So, check if record is the same for - // both; if so, batch the unref/put. - TcpZerocopySendRecord* record = - tcp_zerocopy_send_ctx_->ReleaseSendRecord(seq); - GPR_DEBUG_ASSERT(record); - UnrefMaybePutZerocopySendRecord(record); - } - if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterFree()) { - handle_->SetWritable(); - } -} - -// Reads \a cmsg to derive timestamps from the control messages. If a valid -// timestamp is found, the traced buffer list is updated with this timestamp. -// The caller of this function should be looping on the control messages found -// in \a msg. \a cmsg should point to the control message that the caller wants -// processed. On return, a pointer to a control message is returned. On the next -// iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. -struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg, - struct cmsghdr* cmsg) { - auto next_cmsg = CMSG_NXTHDR(msg, cmsg); - cmsghdr* opt_stats = nullptr; - if (next_cmsg == nullptr) { - return cmsg; - } - - // Check if next_cmsg is an OPT_STATS msg. - if (next_cmsg->cmsg_level == SOL_SOCKET && - next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) { - opt_stats = next_cmsg; - next_cmsg = CMSG_NXTHDR(msg, opt_stats); - if (next_cmsg == nullptr) { - return opt_stats; - } - } - - if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || - !(next_cmsg->cmsg_type == IP_RECVERR || - next_cmsg->cmsg_type == IPV6_RECVERR)) { - return cmsg; - } - - auto tss = reinterpret_cast(CMSG_DATA(cmsg)); - auto serr = reinterpret_cast(CMSG_DATA(next_cmsg)); - if (serr->ee_errno != ENOMSG || - serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { - gpr_log(GPR_ERROR, "Unexpected control message"); - return cmsg; - } - // The error handling can potentially be done on another thread so we need to - // protect the traced buffer list. A lock free list might be better. Using a - // simple mutex for now. - { - grpc_core::MutexLock lock(&traced_buffer_mu_); - traced_buffers_.ProcessTimestamp(serr, opt_stats, tss); - } - return next_cmsg; -} - -void PosixEndpointImpl::HandleError(absl::Status status) { - if (!status.ok() || - stop_error_notification_.load(std::memory_order_relaxed)) { - // We aren't going to register to hear on error anymore, so it is safe to - // unref. - Unref(); - return; - } - // We are still interested in collecting timestamps, so let's try reading - // them. - if (!ProcessErrors()) { - // This might not a timestamps error. Set the read and write closures to be - // ready. - handle_->SetReadable(); - handle_->SetWritable(); - } - handle_->NotifyOnError(on_error_); -} - -bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg, - size_t sending_length, - ssize_t* sent_length, - int* saved_errno, - int additional_flags) { - if (!socket_ts_enabled_) { - uint32_t opt = kTimestampingSocketOptions; - if (setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, static_cast(&opt), - sizeof(opt)) != 0) { - return false; - } - bytes_counter_ = -1; - socket_ts_enabled_ = true; - } - // Set control message to indicate that you want timestamps. - union { - char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))]; - struct cmsghdr align; - } u; - cmsghdr* cmsg = reinterpret_cast(u.cmsg_buf); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SO_TIMESTAMPING; - cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t)); - *reinterpret_cast(CMSG_DATA(cmsg)) = kTimestampingRecordingOptions; - msg->msg_control = u.cmsg_buf; - msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); - - // If there was an error on sendmsg the logic in tcp_flush will handle it. - ssize_t length = TcpSend(fd_, msg, saved_errno, additional_flags); - *sent_length = length; - // Only save timestamps if all the bytes were taken by sendmsg. - if (sending_length == static_cast(length)) { - traced_buffer_mu_.Lock(); - traced_buffers_.AddNewEntry(static_cast(bytes_counter_ + length), - fd_, outgoing_buffer_arg_); - traced_buffer_mu_.Unlock(); - outgoing_buffer_arg_ = nullptr; - } - return true; -} - -#else // GRPC_LINUX_ERRQUEUE -TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord( - SliceBuffer& /*buf*/) { - return nullptr; -} - -void PosixEndpointImpl::HandleError(absl::Status /*status*/) { - GPR_ASSERT(false && "Error handling not supported on this platform"); -} - -void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {} - -bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* /*msg*/, - size_t /*sending_length*/, - ssize_t* /*sent_length*/, - int* /*saved_errno*/, - int /*additional_flags*/) { - GPR_ASSERT(false && "Write with timestamps not supported for this platform"); -} -#endif // GRPC_LINUX_ERRQUEUE - -void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord( - TcpZerocopySendRecord* record) { - if (record->Unref()) { - tcp_zerocopy_send_ctx_->PutSendRecord(record); - } -} - -// If outgoing_buffer_arg is filled, shuts down the list early, so that any -// release operations needed can be performed on the arg. -void PosixEndpointImpl::TcpShutdownTracedBufferList() { - if (outgoing_buffer_arg_ != nullptr) { - traced_buffer_mu_.Lock(); - traced_buffers_.Shutdown(outgoing_buffer_arg_, - absl::InternalError("TracedBuffer list shutdown")); - traced_buffer_mu_.Unlock(); - outgoing_buffer_arg_ = nullptr; - } -} - -// returns true if done, false if pending; if returning true, *error is set -bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record, - absl::Status& status) { - msg_iovlen_type iov_size; - ssize_t sent_length = 0; - size_t sending_length; - size_t unwind_slice_idx; - size_t unwind_byte_idx; - bool tried_sending_message; - int saved_errno; - msghdr msg; - status = absl::OkStatus(); - // iov consumes a large space. Keep it as the last item on the stack to - // improve locality. After all, we expect only the first elements of it - // being populated in most cases. - iovec iov[MAX_WRITE_IOVEC]; - while (true) { - sending_length = 0; - iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx, - &sending_length, iov); - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iov_size; - msg.msg_flags = 0; - tried_sending_message = false; - // Before calling sendmsg (with or without timestamps): we - // take a single ref on the zerocopy send record. - tcp_zerocopy_send_ctx_->NoteSend(record); - saved_errno = 0; - if (outgoing_buffer_arg_ != nullptr) { - if (!ts_capable_ || - !WriteWithTimestamps(&msg, sending_length, &sent_length, &saved_errno, - MSG_ZEROCOPY)) { - // We could not set socket options to collect Fathom timestamps. - // Fallback on writing without timestamps. - ts_capable_ = false; - TcpShutdownTracedBufferList(); - } else { - tried_sending_message = true; - } - } - if (!tried_sending_message) { - msg.msg_control = nullptr; - msg.msg_controllen = 0; - sent_length = TcpSend(fd_, &msg, &saved_errno, MSG_ZEROCOPY); - } - if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterSend( - saved_errno == ENOBUFS)) { - handle_->SetWritable(); - } - if (sent_length < 0) { - // If this particular send failed, drop ref taken earlier in this method. - tcp_zerocopy_send_ctx_->UndoSend(); - if (saved_errno == EAGAIN || saved_errno == ENOBUFS) { - record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx); - return false; - } else { - status = absl::InternalError( - absl::StrCat("sendmsg", std::strerror(saved_errno))); - TcpShutdownTracedBufferList(); - return true; - } - } - bytes_counter_ += sent_length; - record->UpdateOffsetForBytesSent(sending_length, - static_cast(sent_length)); - if (record->AllSlicesSent()) { - return true; - } - } -} - -bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record, - absl::Status& status) { - bool done = DoFlushZerocopy(record, status); - if (done) { - // Either we encountered an error, or we successfully sent all the bytes. - // In either case, we're done with this record. - UnrefMaybePutZerocopySendRecord(record); - } - return done; -} - -bool PosixEndpointImpl::TcpFlush(absl::Status& status) { - struct msghdr msg; - struct iovec iov[MAX_WRITE_IOVEC]; - msg_iovlen_type iov_size; - ssize_t sent_length = 0; - size_t sending_length; - size_t trailing; - size_t unwind_slice_idx; - size_t unwind_byte_idx; - int saved_errno; - status = absl::OkStatus(); - - // We always start at zero, because we eagerly unref and trim the slice - // buffer as we write - size_t outgoing_slice_idx = 0; - - while (true) { - sending_length = 0; - unwind_slice_idx = outgoing_slice_idx; - unwind_byte_idx = outgoing_byte_idx_; - for (iov_size = 0; outgoing_slice_idx != outgoing_buffer_->Count() && - iov_size != MAX_WRITE_IOVEC; - iov_size++) { - auto slice = outgoing_buffer_->RefSlice(outgoing_slice_idx); - iov[iov_size].iov_base = - const_cast(slice.begin()) + outgoing_byte_idx_; - iov[iov_size].iov_len = slice.length() - outgoing_byte_idx_; - sending_length += iov[iov_size].iov_len; - outgoing_slice_idx++; - outgoing_byte_idx_ = 0; - } - GPR_ASSERT(iov_size > 0); - - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iov_size; - msg.msg_flags = 0; - bool tried_sending_message = false; - saved_errno = 0; - if (outgoing_buffer_arg_ != nullptr) { - if (!ts_capable_ || !WriteWithTimestamps(&msg, sending_length, - &sent_length, &saved_errno, 0)) { - // We could not set socket options to collect Fathom timestamps. - // Fallback on writing without timestamps. - ts_capable_ = false; - TcpShutdownTracedBufferList(); - } else { - tried_sending_message = true; - } - } - if (!tried_sending_message) { - msg.msg_control = nullptr; - msg.msg_controllen = 0; - sent_length = TcpSend(fd_, &msg, &saved_errno); - } - - if (sent_length < 0) { - if (saved_errno == EAGAIN || saved_errno == ENOBUFS) { - outgoing_byte_idx_ = unwind_byte_idx; - // unref all and forget about all slices that have been written to this - // point - for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { - outgoing_buffer_->TakeFirst(); - } - return false; - } else { - status = absl::InternalError( - absl::StrCat("sendmsg", std::strerror(saved_errno))); - outgoing_buffer_->Clear(); - TcpShutdownTracedBufferList(); - return true; - } - } - - GPR_ASSERT(outgoing_byte_idx_ == 0); - bytes_counter_ += sent_length; - trailing = sending_length - static_cast(sent_length); - while (trailing > 0) { - size_t slice_length; - outgoing_slice_idx--; - slice_length = outgoing_buffer_->RefSlice(outgoing_slice_idx).length(); - if (slice_length > trailing) { - outgoing_byte_idx_ = slice_length - trailing; - break; - } else { - trailing -= slice_length; - } - } - if (outgoing_slice_idx == outgoing_buffer_->Count()) { - outgoing_buffer_->Clear(); - return true; - } - } -} - -void PosixEndpointImpl::HandleWrite(absl::Status status) { - if (!status.ok()) { - absl::AnyInvocable cb_ = std::move(write_cb_); - write_cb_ = nullptr; - if (current_zerocopy_send_ != nullptr) { - UnrefMaybePutZerocopySendRecord(current_zerocopy_send_); - current_zerocopy_send_ = nullptr; - } - cb_(status); - Unref(); - return; - } - bool flush_result = current_zerocopy_send_ != nullptr - ? TcpFlushZerocopy(current_zerocopy_send_, status) - : TcpFlush(status); - if (!flush_result) { - GPR_DEBUG_ASSERT(status.ok()); - handle_->NotifyOnWrite(on_write_); - } else { - absl::AnyInvocable cb_ = std::move(write_cb_); - write_cb_ = nullptr; - current_zerocopy_send_ = nullptr; - cb_(status); - Unref(); - } -} - -void PosixEndpointImpl::Write( - absl::AnyInvocable on_writable, SliceBuffer* data, - const EventEngine::Endpoint::WriteArgs* args) { - absl::Status status = absl::OkStatus(); - TcpZerocopySendRecord* zerocopy_send_record = nullptr; - - GPR_ASSERT(write_cb_ == nullptr); - GPR_DEBUG_ASSERT(current_zerocopy_send_ == nullptr); - GPR_DEBUG_ASSERT(data != nullptr); - - if (data->Length() == 0) { - on_writable(handle_->IsHandleShutdown() ? absl::InternalError("EOF") - : status); - TcpShutdownTracedBufferList(); - return; - } - - zerocopy_send_record = TcpGetSendZerocopyRecord(*data); - if (zerocopy_send_record == nullptr) { - // Either not enough bytes, or couldn't allocate a zerocopy context. - outgoing_buffer_ = data; - outgoing_byte_idx_ = 0; - } - if (args != nullptr) { - outgoing_buffer_arg_ = args->google_specific; - } - if (outgoing_buffer_arg_) { - GPR_ASSERT(poller_->CanTrackErrors()); - } - - bool flush_result = zerocopy_send_record != nullptr - ? TcpFlushZerocopy(zerocopy_send_record, status) - : TcpFlush(status); - if (!flush_result) { - Ref().release(); - write_cb_ = std::move(on_writable); - current_zerocopy_send_ = zerocopy_send_record; - handle_->NotifyOnWrite(on_write_); - } else { - on_writable(status); - } -} - -void PosixEndpointImpl::MaybeShutdown(absl::Status why) { - if (poller_->CanTrackErrors()) { - ZerocopyDisableAndWaitForRemaining(); - stop_error_notification_.store(true, std::memory_order_release); - handle_->SetHasError(); - } - handle_->ShutdownHandle(why); - Unref(); -} - -PosixEndpointImpl ::~PosixEndpointImpl() { - handle_->OrphanHandle(on_done_, nullptr, ""); - delete on_read_; - delete on_write_; - delete on_error_; -} - -PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, - PosixEngineClosure* on_done, - std::shared_ptr engine, - const PosixTcpOptions& options) - : sock_(PosixSocketWrapper(handle->WrappedFd())), - on_done_(on_done), - traced_buffers_(), - handle_(handle), - poller_(handle->Poller()), - engine_(engine) { - PosixSocketWrapper sock(handle->WrappedFd()); - fd_ = handle_->WrappedFd(); - GPR_ASSERT(options.resource_quota != nullptr); - memory_owner_ = options.resource_quota->memory_quota()->CreateMemoryOwner( - *sock.PeerAddressString()); - self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl)); - local_address_ = *sock.LocalAddress(); - peer_address_ = *sock.PeerAddress(); - target_length_ = static_cast(options.tcp_read_chunk_size); - bytes_read_this_round_ = 0; - min_read_chunk_size_ = options.tcp_min_read_chunk_size; - max_read_chunk_size_ = options.tcp_max_read_chunk_size; - bool zerocopy_enabled = - options.tcp_tx_zero_copy_enabled && poller_->CanTrackErrors(); -#ifdef GRPC_LINUX_ERRQUEUE - if (zerocopy_enabled) { - const int enable = 1; - auto err = - setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); - if (err != 0) { - zerocopy_enabled = false; - gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket."); - } - } -#endif // GRPC_LINUX_ERRQUEUE - tcp_zerocopy_send_ctx_ = absl::make_unique( - zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends, - options.tcp_tx_zerocopy_send_bytes_threshold); - frame_size_tuning_enabled_ = grpc_core::IsTcpFrameSizeTuningEnabled(); -#ifdef GRPC_HAVE_TCP_INQ - int one = 1; - if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) { - inq_capable_ = true; - } else { - gpr_log(GPR_DEBUG, "cannot set inq fd=%d errno=%d", fd_, errno); - inq_capable_ = false; - } -#else - inq_capable_ = false; -#endif // GRPC_HAVE_TCP_INQ - - on_read_ = PosixEngineClosure::ToPermanentClosure( - [this](absl::Status status) { HandleRead(std::move(status)); }); - on_write_ = PosixEngineClosure::ToPermanentClosure( - [this](absl::Status status) { HandleWrite(std::move(status)); }); - on_error_ = PosixEngineClosure::ToPermanentClosure( - [this](absl::Status status) { HandleError(std::move(status)); }); - - // Start being notified on errors if poller can track errors. - if (poller_->CanTrackErrors()) { - Ref().release(); - handle_->NotifyOnError(on_error_); - } -} - -std::unique_ptr CreatePosixEndpoint( - EventHandle* handle, PosixEngineClosure* on_shutdown, - std::shared_ptr engine, const EndpointConfig& config) { - GPR_DEBUG_ASSERT(handle != nullptr); - return absl::make_unique(handle, on_shutdown, - std::move(engine), config); -} - -} // namespace posix_engine -} // namespace grpc_event_engine - -#else // GRPC_POSIX_SOCKET_TCP - -namespace grpc_event_engine { -namespace posix_engine { - -using ::grpc_event_engine::experimental::EndpointConfig; -using ::grpc_event_engine::experimental::EventEngine; - -std::unique_ptr CreatePosixEndpoint( - EventHandle* /*handle*/, PosixEngineClosure* /*on_shutdown*/, - std::shared_ptr /*engine*/, const EndpointConfig& /*config*/) { - GPR_ASSERT(false && "Cannot create PosixEndpoint on this platform"); -} - -} // namespace posix_engine -} // namespace grpc_event_engine - -#endif // GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h deleted file mode 100644 index e1eab2eaf34..00000000000 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ /dev/null @@ -1,675 +0,0 @@ -// Copyright 2022 gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H -#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H - -#include - -// IWYU pragma: no_include - -#include -#include -#include -#include -#include - -#include "absl/base/thread_annotations.h" -#include "absl/container/flat_hash_map.h" -#include "absl/functional/any_invocable.h" -#include "absl/hash/hash.h" -#include "absl/meta/type_traits.h" -#include "absl/status/status.h" - -#include -#include -#include -#include -#include -#include - -#include "src/core/lib/event_engine/posix_engine/event_poller.h" -#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" -#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" -#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h" -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/port.h" -#include "src/core/lib/resource_quota/memory_quota.h" - -#ifdef GRPC_POSIX_SOCKET_TCP - -#include // IWYU pragma: keep -#include // IWYU pragma: keep - -#ifdef GRPC_MSG_IOVLEN_TYPE -typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; -#else -typedef size_t msg_iovlen_type; -#endif - -#endif // GRPC_POSIX_SOCKET_TCP - -namespace grpc_event_engine { -namespace posix_engine { - -#ifdef GRPC_POSIX_SOCKET_TCP - -class TcpZerocopySendRecord { - public: - TcpZerocopySendRecord() { buf_.Clear(); }; - - ~TcpZerocopySendRecord() { DebugAssertEmpty(); } - - // TcpZerocopySendRecord contains a slice buffer holding the slices to be - // sent. Given the slices that we wish to send, and the current offset into - // the slice buffer (indicating which have already been sent), populate an - // iovec array that will be used for a zerocopy enabled sendmsg(). - // unwind_slice_idx - input/output parameter. It indicates the index of last - // slice whose contents were partially sent in the previous sendmsg. After - // this function returns, it gets updated to to a new offset - // depending on the number of bytes which are decided to be sent in the - // current sendmsg. - // unwind_byte_idx - input/output parameter. It indicates the byte offset - // within the last slice whose contents were partially sent in the previous - // sendmsg. After this function returns, it gets updated to a new offset - // depending on the number of bytes which are decided to be sent in the - // current sendmsg. - // sending_length - total number of bytes to be sent in the current sendmsg. - // iov - An iovec array containing the bytes to be sent in the current - // sendmsg. - // Returns: the number of entries in the iovec array. - // - msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx, - size_t* unwind_byte_idx, size_t* sending_length, - iovec* iov); - - // A sendmsg() may not be able to send the bytes that we requested at this - // time, returning EAGAIN (possibly due to backpressure). In this case, - // unwind the offset into the slice buffer so we retry sending these bytes. - void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) { - out_offset_.byte_idx = unwind_byte_idx; - out_offset_.slice_idx = unwind_slice_idx; - } - - // Update the offset into the slice buffer based on how much we wanted to sent - // vs. what sendmsg() actually sent (which may be lower, possibly due to - // backpressure). - void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent); - - // Indicates whether all underlying data has been sent or not. - bool AllSlicesSent() { return out_offset_.slice_idx == buf_.Count(); } - - // Reset this structure for a new tcp_write() with zerocopy. - void PrepareForSends( - grpc_event_engine::experimental::SliceBuffer& slices_to_send) { - DebugAssertEmpty(); - out_offset_.slice_idx = 0; - out_offset_.byte_idx = 0; - buf_.Swap(slices_to_send); - Ref(); - } - - // References: 1 reference per sendmsg(), and 1 for the tcp_write(). - void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); } - - // Unref: called when we get an error queue notification for a sendmsg(), if a - // sendmsg() failed or when tcp_write() is done. - bool Unref() { - const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel); - GPR_DEBUG_ASSERT(prior > 0); - if (prior == 1) { - AllSendsComplete(); - return true; - } - return false; - } - - private: - struct OutgoingOffset { - size_t slice_idx = 0; - size_t byte_idx = 0; - }; - - void DebugAssertEmpty() { - GPR_DEBUG_ASSERT(buf_.Count() == 0); - GPR_DEBUG_ASSERT(buf_.Length() == 0); - GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0); - } - - // When all sendmsg() calls associated with this tcp_write() have been - // completed (ie. we have received the notifications for each sequence number - // for each sendmsg()) and all reference counts have been dropped, drop our - // reference to the underlying data since we no longer need it. - void AllSendsComplete() { - GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0); - buf_.Clear(); - } - - grpc_event_engine::experimental::SliceBuffer buf_; - std::atomic ref_{0}; - OutgoingOffset out_offset_; -}; - -class TcpZerocopySendCtx { - public: - static constexpr int kDefaultMaxSends = 4; - static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB - - explicit TcpZerocopySendCtx( - bool zerocopy_enabled, int max_sends = kDefaultMaxSends, - size_t send_bytes_threshold = kDefaultSendBytesThreshold) - : max_sends_(max_sends), - free_send_records_size_(max_sends), - threshold_bytes_(send_bytes_threshold) { - send_records_ = static_cast( - gpr_malloc(max_sends * sizeof(*send_records_))); - free_send_records_ = static_cast( - gpr_malloc(max_sends * sizeof(*free_send_records_))); - if (send_records_ == nullptr || free_send_records_ == nullptr) { - gpr_free(send_records_); - gpr_free(free_send_records_); - gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n"); - memory_limited_ = true; - enabled_ = false; - } else { - for (int idx = 0; idx < max_sends_; ++idx) { - new (send_records_ + idx) TcpZerocopySendRecord(); - free_send_records_[idx] = send_records_ + idx; - } - enabled_ = zerocopy_enabled; - } - } - - ~TcpZerocopySendCtx() { - if (send_records_ != nullptr) { - for (int idx = 0; idx < max_sends_; ++idx) { - send_records_[idx].~TcpZerocopySendRecord(); - } - } - gpr_free(send_records_); - gpr_free(free_send_records_); - } - - // True if we were unable to allocate the various bookkeeping structures at - // transport initialization time. If memory limited, we do not zerocopy. - bool MemoryLimited() const { return memory_limited_; } - - // TCP send zerocopy maintains an implicit sequence number for every - // successful sendmsg() with zerocopy enabled; the kernel later gives us an - // error queue notification with this sequence number indicating that the - // underlying data buffers that we sent can now be released. Once that - // notification is received, we can release the buffers associated with this - // zerocopy send record. Here, we associate the sequence number with the data - // buffers that were sent with the corresponding call to sendmsg(). - void NoteSend(TcpZerocopySendRecord* record) { - record->Ref(); - { - grpc_core::MutexLock lock(&mu_); - is_in_write_ = true; - AssociateSeqWithSendRecordLocked(last_send_, record); - } - ++last_send_; - } - - // If sendmsg() actually failed, though, we need to revert the sequence number - // that we speculatively bumped before calling sendmsg(). Note that we bump - // this sequence number and perform relevant bookkeeping (see: NoteSend()) - // *before* calling sendmsg() since, if we called it *after* sendmsg(), then - // there is a possible race with the release notification which could occur on - // another thread before we do the necessary bookkeeping. Hence, calling - // NoteSend() *before* sendmsg() and implementing an undo function is needed. - void UndoSend() { - --last_send_; - if (ReleaseSendRecord(last_send_)->Unref()) { - // We should still be holding the ref taken by tcp_write(). - GPR_DEBUG_ASSERT(0); - } - } - - // Simply associate this send record (and the underlying sent data buffers) - // with the implicit sequence number for this zerocopy sendmsg(). - void AssociateSeqWithSendRecordLocked(uint32_t seq, - TcpZerocopySendRecord* record) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - ctx_lookup_.emplace(seq, record); - } - - // Get a send record for a send that we wish to do with zerocopy. - TcpZerocopySendRecord* GetSendRecord() { - grpc_core::MutexLock lock(&mu_); - return TryGetSendRecordLocked(); - } - - // A given send record corresponds to a single tcp_write() with zerocopy - // enabled. This can result in several sendmsg() calls to flush all of the - // data to wire. Each sendmsg() takes a reference on the - // TcpZerocopySendRecord, and corresponds to a single sequence number. - // ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a - // single sequence number. This is called either when we receive the relevant - // error queue notification (saying that we can discard the underlying - // buffers for this sendmsg()) is received from the kernel - or, in case - // sendmsg() was unsuccessful to begin with. - TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) { - grpc_core::MutexLock lock(&mu_); - return ReleaseSendRecordLocked(seq); - } - - // After all the references to a TcpZerocopySendRecord are released, we can - // add it back to the pool (of size max_sends_). Note that we can only have - // max_sends_ tcp_write() instances with zerocopy enabled in flight at the - // same time. - void PutSendRecord(TcpZerocopySendRecord* record) { - grpc_core::MutexLock lock(&mu_); - GPR_DEBUG_ASSERT(record >= send_records_ && - record < send_records_ + max_sends_); - PutSendRecordLocked(record); - } - - // Indicate that we are disposing of this zerocopy context. This indicator - // will prevent new zerocopy writes from being issued. - void Shutdown() { shutdown_.store(true, std::memory_order_release); } - - // Indicates that there are no inflight tcp_write() instances with zerocopy - // enabled. - bool AllSendRecordsEmpty() { - grpc_core::MutexLock lock(&mu_); - return free_send_records_size_ == max_sends_; - } - - bool Enabled() const { return enabled_; } - - // Only use zerocopy if we are sending at least this many bytes. The - // additional overhead of reading the error queue for notifications means that - // zerocopy is not useful for small transfers. - size_t ThresholdBytes() const { return threshold_bytes_; } - - // Expected to be called by handler reading messages from the err queue. - // It is used to indicate that some optmem memory is now available. It returns - // true to tell the caller to mark the file descriptor as immediately - // writable. - // - // OptMem (controlled by the kernel option optmem_max) refers to the memory - // allocated to the cmsg list maintained by the kernel that contains "extra" - // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows - // the kernel to allocate more memory as needed for more control messages that - // need to be sent for each socket connected. - // - // If a write is currently in progress on the socket (ie. we have issued a - // sendmsg() and are about to check its return value) then we set omem state - // to CHECK to make the sending thread know that some tcp_omem was - // concurrently freed even if sendmsg() returns ENOBUFS. In this case, since - // there is already an active send thread, we do not need to mark the - // socket writeable, so we return false. - // - // If there was no write in progress on the socket, and the socket was not - // marked as FULL, then we need not mark the socket writeable now that some - // tcp_omem memory is freed since it was not considered as blocked on - // tcp_omem to begin with. So in this case, return false. - // - // But, if a write was not in progress and the omem state was FULL, then we - // need to mark the socket writeable since it is no longer blocked by - // tcp_omem. In this case, return true. - // - // Please refer to the STATE TRANSITION DIAGRAM below for more details. - // - bool UpdateZeroCopyOptMemStateAfterFree() { - grpc_core::MutexLock lock(&mu_); - if (is_in_write_) { - zcopy_enobuf_state_ = OptMemState::kCheck; - return false; - } - GPR_DEBUG_ASSERT(zcopy_enobuf_state_ != OptMemState::kCheck); - if (zcopy_enobuf_state_ == OptMemState::kFull) { - // A previous sendmsg attempt was blocked by ENOBUFS. Return true to - // mark the fd as writable so the next write attempt could be made. - zcopy_enobuf_state_ = OptMemState::kOpen; - return true; - } else if (zcopy_enobuf_state_ == OptMemState::kOpen) { - // No need to mark the fd as writable because the previous write - // attempt did not encounter ENOBUFS. - return false; - } else { - // This state should never be reached because it implies that the previous - // state was CHECK and is_in_write is false. This means that after the - // previous sendmsg returned and set is_in_write to false, it did - // not update the z-copy change from CHECK to OPEN. - GPR_ASSERT(false && "OMem state error!"); - } - } - - // Expected to be called by the thread calling sendmsg after the syscall - // invocation. is complete. If an ENOBUF is seen, it checks if the error - // handler (Tx0cp completions) has already run and free'ed up some OMem. It - // returns true indicating that the write can be attempted again immediately. - // If ENOBUFS was seen but no Tx0cp completions have been received between the - // sendmsg() and us taking this lock, then tcp_omem is still full from our - // point of view. Therefore, we do not signal that the socket is writeable - // with respect to the availability of tcp_omem. Therefore the function - // returns false. This indicates that another write should not be attempted - // immediately and the calling thread should wait until the socket is writable - // again. If ENOBUFS was not seen, then again return false because the next - // write should be attempted only when the socket is writable again. - // - // Please refer to the STATE TRANSITION DIAGRAM below for more details. - // - bool UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf) { - grpc_core::MutexLock lock(&mu_); - is_in_write_ = false; - if (seen_enobuf) { - if (zcopy_enobuf_state_ == OptMemState::kCheck) { - zcopy_enobuf_state_ = OptMemState::kOpen; - return true; - } else { - zcopy_enobuf_state_ = OptMemState::kFull; - } - } else if (zcopy_enobuf_state_ != OptMemState::kOpen) { - zcopy_enobuf_state_ = OptMemState::kOpen; - } - return false; - } - - private: - // STATE TRANSITION DIAGRAM - // - // sendmsg succeeds Tx-zero copy succeeds and there is no active sendmsg - // ----<<--+ +------<<-------------------------------------+ - // | | | | - // | | v sendmsg returns ENOBUFS | - // +-----> OPEN ------------->>-------------------------> FULL - // ^ | - // | | - // | sendmsg completes | - // +----<<---------- CHECK <-------<<-------------+ - // Tx-zero copy succeeds and there is - // an active sendmsg - // - // OptMem (controlled by the kernel option optmem_max) refers to the memory - // allocated to the cmsg list maintained by the kernel that contains "extra" - // packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows - // the kernel to allocate more memory as needed for more control messages that - // need to be sent for each socket connected. Each tx zero copy sendmsg has - // a corresponding entry added into the Optmem queue. The entry is popped - // from the Optmem queue when the zero copy send is complete. - enum class OptMemState : int8_t { - kOpen, // Everything is clear and omem is not full. - kFull, // The last sendmsg() has returned with an errno of ENOBUFS. - kCheck, // Error queue is read while is_in_write_ was true, so we should - // check this state after the sendmsg. - }; - - TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - auto iter = ctx_lookup_.find(seq); - GPR_DEBUG_ASSERT(iter != ctx_lookup_.end()); - TcpZerocopySendRecord* record = iter->second; - ctx_lookup_.erase(iter); - return record; - } - - TcpZerocopySendRecord* TryGetSendRecordLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - if (shutdown_.load(std::memory_order_acquire)) { - return nullptr; - } - if (free_send_records_size_ == 0) { - return nullptr; - } - free_send_records_size_--; - return free_send_records_[free_send_records_size_]; - } - - void PutSendRecordLocked(TcpZerocopySendRecord* record) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_); - free_send_records_[free_send_records_size_] = record; - free_send_records_size_++; - } - - TcpZerocopySendRecord* send_records_ ABSL_GUARDED_BY(mu_); - TcpZerocopySendRecord** free_send_records_ ABSL_GUARDED_BY(mu_); - int max_sends_; - int free_send_records_size_ ABSL_GUARDED_BY(mu_); - grpc_core::Mutex mu_; - uint32_t last_send_ = 0; - std::atomic shutdown_{false}; - bool enabled_ = false; - size_t threshold_bytes_ = kDefaultSendBytesThreshold; - absl::flat_hash_map ctx_lookup_ - ABSL_GUARDED_BY(mu_); - bool memory_limited_ = false; - bool is_in_write_ ABSL_GUARDED_BY(mu_) = false; - OptMemState zcopy_enobuf_state_ ABSL_GUARDED_BY(mu_) = OptMemState::kOpen; -}; - -class PosixEndpointImpl : public grpc_core::RefCounted { - public: - PosixEndpointImpl( - EventHandle* handle, PosixEngineClosure* on_done, - std::shared_ptr engine, - const PosixTcpOptions& options); - ~PosixEndpointImpl() override; - void Read( - absl::AnyInvocable on_read, - grpc_event_engine::experimental::SliceBuffer* buffer, - const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* - args); - void Write( - absl::AnyInvocable on_writable, - grpc_event_engine::experimental::SliceBuffer* data, - const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* - args); - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetPeerAddress() const { - return peer_address_; - } - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetLocalAddress() const { - return local_address_; - } - - void MaybeShutdown(absl::Status why); - - private: - void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); - void HandleWrite(absl::Status status); - void HandleError(absl::Status status); - void HandleRead(absl::Status status); - void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); - bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); - void FinishEstimate(); - void AddToEstimate(size_t bytes); - void MaybePostReclaimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); - void PerformReclamation() ABSL_LOCKS_EXCLUDED(read_mu_); - // Zero copy related helper methods. - TcpZerocopySendRecord* TcpGetSendZerocopyRecord( - grpc_event_engine::experimental::SliceBuffer& buf); - bool DoFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status); - bool TcpFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status); - bool TcpFlush(absl::Status& status); - void TcpShutdownTracedBufferList(); - void UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord* record); - void ZerocopyDisableAndWaitForRemaining(); - bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, int* saved_errno, - int additional_flags); -#ifdef GRPC_LINUX_ERRQUEUE - bool ProcessErrors(); - // Reads a cmsg to process zerocopy control messages. - void ProcessZerocopy(struct cmsghdr* cmsg); - // Reads a cmsg to derive timestamps from the control messages. - struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg); -#endif // GRPC_LINUX_ERRQUEUE - grpc_core::Mutex read_mu_; - grpc_core::Mutex traced_buffer_mu_; - PosixSocketWrapper sock_; - int fd_; - bool is_first_read_ = true; - bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false; - double target_length_; - int min_read_chunk_size_; - int max_read_chunk_size_; - int set_rcvlowat_ = 0; - double bytes_read_this_round_ = 0; - std::atomic ref_count_{1}; - - // garbage after the last read. - grpc_event_engine::experimental::SliceBuffer last_read_buffer_; - - grpc_event_engine::experimental::SliceBuffer* incoming_buffer_ - ABSL_GUARDED_BY(read_mu_) = nullptr; - // bytes pending on the socket from the last read. - int inq_ = 1; - // cache whether kernel supports inq. - bool inq_capable_ = false; - - grpc_event_engine::experimental::SliceBuffer* outgoing_buffer_ = nullptr; - // byte within outgoing_buffer's slices[0] to write next. - size_t outgoing_byte_idx_ = 0; - - PosixEngineClosure* on_read_ = nullptr; - PosixEngineClosure* on_write_ = nullptr; - PosixEngineClosure* on_error_ = nullptr; - PosixEngineClosure* on_done_ = nullptr; - absl::AnyInvocable read_cb_ ABSL_GUARDED_BY(read_mu_); - absl::AnyInvocable write_cb_; - - grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_; - grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_; - - grpc_core::MemoryOwner memory_owner_; - grpc_core::MemoryAllocator::Reservation self_reservation_; - - void* outgoing_buffer_arg_ = nullptr; - - // A counter which starts at 0. It is initialized the first time the socket - // options for collecting timestamps are set, and is incremented with each - // byte sent. - int bytes_counter_ = -1; - // True if timestamping options are set on the socket. -#ifdef GRPC_LINUX_ERRQUEUE - bool socket_ts_enabled_ = false; -#endif // GRPC_LINUX_ERRQUEUE - // Cache whether we can set timestamping options - bool ts_capable_ = true; - // Set to 1 if we do not want to be notified on errors anymore. - std::atomic stop_error_notification_{false}; - std::unique_ptr tcp_zerocopy_send_ctx_; - TcpZerocopySendRecord* current_zerocopy_send_ = nullptr; - // If true, the size of buffers alloted for tcp reads will be based on the - // specified min_progress_size values conveyed by the upper layers. - bool frame_size_tuning_enabled_ = false; - // A hint from upper layers specifying the minimum number of bytes that need - // to be read to make meaningful progress. - int min_progress_size_ = 1; - TracedBufferList traced_buffers_ ABSL_GUARDED_BY(traced_buffer_mu_); - // The handle is owned by the PosixEndpointImpl object. - EventHandle* handle_; - PosixEventPoller* poller_; - std::shared_ptr engine_; -}; - -class PosixEndpoint - : public grpc_event_engine::experimental::EventEngine::Endpoint { - public: - PosixEndpoint( - EventHandle* handle, PosixEngineClosure* on_shutdown, - std::shared_ptr engine, - const grpc_event_engine::experimental::EndpointConfig& config) - : impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine), - TcpOptionsFromEndpointConfig(config))) {} - - void Read( - absl::AnyInvocable on_read, - grpc_event_engine::experimental::SliceBuffer* buffer, - const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* - args) override { - impl_->Read(std::move(on_read), buffer, args); - } - - void Write( - absl::AnyInvocable on_writable, - grpc_event_engine::experimental::SliceBuffer* data, - const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* - args) override { - impl_->Write(std::move(on_writable), data, args); - } - - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetPeerAddress() const override { - return impl_->GetPeerAddress(); - } - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetLocalAddress() const override { - return impl_->GetLocalAddress(); - } - - ~PosixEndpoint() override { - impl_->MaybeShutdown(absl::InternalError("Endpoint closing")); - } - - private: - PosixEndpointImpl* impl_; -}; - -#else // GRPC_POSIX_SOCKET_TCP - -class PosixEndpoint - : public grpc_event_engine::experimental::EventEngine::Endpoint { - public: - PosixEndpoint() = default; - - void Read(absl::AnyInvocable /*on_read*/, - grpc_event_engine::experimental::SliceBuffer* /*buffer*/, - const grpc_event_engine::experimental::EventEngine::Endpoint:: - ReadArgs* /*args*/) override { - GPR_ASSERT(false && "PosixEndpoint::Read not supported on this platform"); - } - - void Write(absl::AnyInvocable /*on_writable*/, - grpc_event_engine::experimental::SliceBuffer* /*data*/, - const grpc_event_engine::experimental::EventEngine::Endpoint:: - WriteArgs* /*args*/) override { - GPR_ASSERT(false && "PosixEndpoint::Write not supported on this platform"); - } - - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetPeerAddress() const override { - GPR_ASSERT(false && - "PosixEndpoint::GetPeerAddress not supported on this platform"); - } - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - GetLocalAddress() const override { - GPR_ASSERT(false && - "PosixEndpoint::GetLocalAddress not supported on this platform"); - } - - ~PosixEndpoint() override = default; -}; - -#endif // GRPC_POSIX_SOCKET_TCP - -// Create a PosixEndpoint. -// A shared_ptr of the EventEngine is passed to the endpoint to ensure that -// the event engine is alive for the lifetime of the endpoint. The ownership -// of the EventHandle is transferred to the endpoint. -std::unique_ptr CreatePosixEndpoint( - EventHandle* handle, PosixEngineClosure* on_shutdown, - std::shared_ptr engine, - const grpc_event_engine::experimental::EndpointConfig& config); - -} // namespace posix_engine -} // namespace grpc_event_engine - -#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H diff --git a/src/core/lib/event_engine/posix_engine/traced_buffer_list.h b/src/core/lib/event_engine/posix_engine/traced_buffer_list.h index 6c3c0a83dba..02fd484219c 100644 --- a/src/core/lib/event_engine/posix_engine/traced_buffer_list.h +++ b/src/core/lib/event_engine/posix_engine/traced_buffer_list.h @@ -30,6 +30,8 @@ #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" #include "src/core/lib/iomgr/port.h" +// #undef GRPC_LINUX_ERRQUEUE + namespace grpc_event_engine { namespace posix_engine { diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 0fcb0139080..24b8f487c27 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") licenses(["notice"]) @@ -21,22 +21,6 @@ grpc_package( visibility = "public", ) # Useful for third party devs to test their io manager implementation. -grpc_cc_library( - name = "posix_engine_test_utils", - testonly = True, - srcs = ["posix_engine_test_utils.cc"], - hdrs = ["posix_engine_test_utils.h"], - tags = [ - "no_windows", - ], - visibility = ["//test:__subpackages__"], - deps = [ - "//:event_engine_common", - "//:posix_event_engine_event_poller", - "//test/core/util:grpc_test_util", - ], -) - grpc_cc_test( name = "timer_heap_test", srcs = ["timer_heap_test.cc"], @@ -94,7 +78,6 @@ grpc_cc_test( "//:posix_event_engine_closure", "//:posix_event_engine_event_poller", "//:posix_event_engine_poller_posix_default", - "//test/core/event_engine/posix:posix_engine_test_utils", "//test/core/util:grpc_test_util", ], ) @@ -165,29 +148,3 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) - -grpc_cc_test( - name = "posix_endpoint_test", - srcs = ["posix_endpoint_test.cc"], - external_deps = ["gtest"], - language = "C++", - tags = [ - "no_windows", - ], - uses_event_engine = True, - uses_polling = True, - deps = [ - "//:channel_args", - "//:common_event_engine_closures", - "//:event_engine_poller", - "//:posix_event_engine", - "//:posix_event_engine_closure", - "//:posix_event_engine_endpoint", - "//:posix_event_engine_event_poller", - "//:posix_event_engine_poller_posix_default", - "//test/core/event_engine/posix:posix_engine_test_utils", - "//test/core/event_engine/test_suite:conformance_test_base_lib", - "//test/core/event_engine/test_suite:oracle_event_engine_posix", - "//test/core/util:grpc_test_util", - ], -) diff --git a/test/core/event_engine/posix/event_poller_posix_test.cc b/test/core/event_engine/posix/event_poller_posix_test.cc index 4191105d8b7..9d8b30a9143 100644 --- a/test/core/event_engine/posix/event_poller_posix_test.cc +++ b/test/core/event_engine/posix/event_poller_posix_test.cc @@ -56,7 +56,6 @@ #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/global_config.h" #include "src/core/lib/gprpp/notification.h" -#include "test/core/event_engine/posix/posix_engine_test_utils.h" #include "test/core/util/port.h" GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); @@ -86,6 +85,21 @@ using namespace std::chrono_literals; namespace { +class TestScheduler : public Scheduler { + public: + explicit TestScheduler(experimental::EventEngine* engine) : engine_(engine) {} + void Run(experimental::EventEngine::Closure* closure) override { + engine_->Run(closure); + } + + void Run(absl::AnyInvocable cb) override { + engine_->Run(std::move(cb)); + } + + private: + experimental::EventEngine* engine_; +}; + absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) { return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes, sizeof(buffer_size_bytes)) diff --git a/test/core/event_engine/posix/posix_endpoint_test.cc b/test/core/event_engine/posix/posix_endpoint_test.cc deleted file mode 100644 index e512b69cae8..00000000000 --- a/test/core/event_engine/posix/posix_endpoint_test.cc +++ /dev/null @@ -1,341 +0,0 @@ -// Copyright 2022 gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h" - -#include -#include - -#include -#include -#include -#include -#include - -#include - -#include "absl/strings/str_cat.h" -#include "absl/strings/str_split.h" - -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/event_engine/channel_args_endpoint_config.h" -#include "src/core/lib/event_engine/posix_engine/event_poller.h" -#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h" -#include "src/core/lib/event_engine/posix_engine/posix_engine.h" -#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" -#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" -#include "src/core/lib/gprpp/dual_ref_counted.h" -#include "src/core/lib/gprpp/global_config.h" -#include "src/core/lib/gprpp/notification.h" -#include "src/core/lib/iomgr/port.h" -#include "test/core/event_engine/posix/posix_engine_test_utils.h" -#include "test/core/event_engine/test_suite/event_engine_test_utils.h" -#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h" -#include "test/core/util/port.h" - -GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy); - -namespace grpc_event_engine { -namespace posix_engine { - -namespace { - -using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; -using ::grpc_event_engine::experimental::EventEngine; -using ::grpc_event_engine::experimental::GetNextSendMessage; -using ::grpc_event_engine::experimental::Poller; -using ::grpc_event_engine::experimental::PosixEventEngine; -using ::grpc_event_engine::experimental::PosixOracleEventEngine; -using ::grpc_event_engine::experimental::URIToResolvedAddress; -using ::grpc_event_engine::experimental::WaitForSingleOwner; -using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; -using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; -using namespace std::chrono_literals; - -constexpr int kMinMessageSize = 1024; -constexpr int kNumConnections = 10; -constexpr int kNumExchangedMessages = 100; -std::atomic g_num_active_connections{0}; - -struct Connection { - std::unique_ptr client_endpoint; - std::unique_ptr server_endpoint; -}; - -std::list CreateConnectedEndpoints( - PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections, - std::shared_ptr posix_ee, - std::shared_ptr oracle_ee) { - std::list connections; - auto memory_quota = absl::make_unique("bar"); - std::string target_addr = absl::StrCat( - "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EventEngine::ResolvedAddress resolved_addr = - URIToResolvedAddress(target_addr); - std::unique_ptr server_endpoint; - grpc_core::Notification* server_signal = new grpc_core::Notification(); - - Listener::AcceptCallback accept_cb = - [&server_endpoint, &server_signal]( - std::unique_ptr ep, - grpc_core::MemoryAllocator /*memory_allocator*/) { - server_endpoint = std::move(ep); - server_signal->Notify(); - }; - grpc_core::ChannelArgs args; - auto quota = grpc_core::ResourceQuota::Default(); - args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); - if (is_zero_copy_enabled) { - args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1); - args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD, - kMinMessageSize); - } - ChannelArgsEndpointConfig config(args); - auto listener = oracle_ee->CreateListener( - std::move(accept_cb), - [](absl::Status status) { ASSERT_TRUE(status.ok()); }, config, - absl::make_unique("foo")); - GPR_ASSERT(listener.ok()); - - EXPECT_TRUE((*listener)->Bind(resolved_addr).ok()); - EXPECT_TRUE((*listener)->Start().ok()); - - // Create client socket and connect to the target address. - for (int i = 0; i < num_connections; ++i) { - int client_fd = ConnectToServerOrDie(resolved_addr); - EventHandle* handle = - poller.CreateHandle(client_fd, "test", poller.CanTrackErrors()); - EXPECT_NE(handle, nullptr); - server_signal->WaitForNotification(); - EXPECT_NE(server_endpoint, nullptr); - ++g_num_active_connections; - connections.push_back(Connection{ - CreatePosixEndpoint(handle, - PosixEngineClosure::TestOnlyToClosure( - [&poller](absl::Status /*status*/) { - if (--g_num_active_connections == 0) { - poller.Kick(); - } - }), - posix_ee, config), - std::move(server_endpoint)}); - delete server_signal; - server_signal = new grpc_core::Notification(); - } - delete server_signal; - return connections; -} - -} // namespace - -std::string TestScenarioName(const ::testing::TestParamInfo& info) { - return absl::StrCat("is_zero_copy_enabled_", info.param); -} - -// A helper class to drive the polling of Fds. It repeatedly calls the Work(..) -// method on the poller to get pet pending events, then schedules another -// parallel Work(..) instantiation and processes these pending events. This -// continues until all Fds have orphaned themselves. -class Worker : public grpc_core::DualRefCounted { - public: - Worker(std::shared_ptr engine, PosixEventPoller* poller) - : engine_(std::move(engine)), poller_(poller) { - WeakRef().release(); - } - void Orphan() override { signal.Notify(); } - void Start() { - // Start executing Work(..). - engine_->Run([this]() { Work(); }); - } - - void Wait() { - signal.WaitForNotification(); - WeakUnref(); - } - - private: - void Work() { - auto result = poller_->Work(24h, [this]() { - // Schedule next work instantiation immediately and take a Ref for - // the next instantiation. - Ref().release(); - engine_->Run([this]() { Work(); }); - }); - ASSERT_TRUE(result == Poller::WorkResult::kOk || - result == Poller::WorkResult::kKicked); - // Corresponds to the Ref taken for the current instantiation. If the - // result was Poller::WorkResult::kKicked, then the next work instantiation - // would not have been scheduled and the poll_again callback would have - // been deleted. - Unref(); - } - std::shared_ptr engine_; - // The poller is not owned by the Worker. Rather it is owned by the test - // which creates the worker instance. - PosixEventPoller* poller_; - grpc_core::Notification signal; -}; - -class PosixEndpointTest : public ::testing::TestWithParam { - void SetUp() override { - oracle_ee_ = std::make_shared(); - posix_ee_ = std::make_shared(); - scheduler_ = - absl::make_unique( - posix_ee_.get()); - EXPECT_NE(scheduler_, nullptr); - poller_ = GetDefaultPoller(scheduler_.get()); - if (poller_ != nullptr) { - gpr_log(GPR_INFO, "Using poller: %s", poller_->Name().c_str()); - } - } - - void TearDown() override { - if (poller_ != nullptr) { - poller_->Shutdown(); - } - WaitForSingleOwner(std::move(posix_ee_)); - WaitForSingleOwner(std::move(oracle_ee_)); - } - - public: - TestScheduler* Scheduler() { return scheduler_.get(); } - - std::shared_ptr GetPosixEE() { return posix_ee_; } - - std::shared_ptr GetOracleEE() { return oracle_ee_; } - - PosixEventPoller* PosixPoller() { return poller_; } - - private: - PosixEventPoller* poller_; - std::unique_ptr scheduler_; - std::shared_ptr posix_ee_; - std::shared_ptr oracle_ee_; -}; - -TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) { - if (PosixPoller() == nullptr) { - return; - } - Worker* worker = new Worker(GetPosixEE(), PosixPoller()); - worker->Start(); - { - auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1, - GetPosixEE(), GetOracleEE()); - auto it = connections.begin(); - auto client_endpoint = std::move((*it).client_endpoint); - auto server_endpoint = std::move((*it).server_endpoint); - EXPECT_NE(client_endpoint, nullptr); - EXPECT_NE(server_endpoint, nullptr); - connections.erase(it); - - // Alternate message exchanges between client -- server and server -- - // client. - for (int i = 0; i < kNumExchangedMessages; i++) { - // Send from client to server and verify data read at the server. - ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), - client_endpoint.get(), - server_endpoint.get()) - .ok()); - // Send from server to client and verify data read at the client. - ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), - server_endpoint.get(), - client_endpoint.get()) - .ok()); - } - } - worker->Wait(); -} - -// Create N connections and exchange and verify random number of messages over -// each connection in parallel. -TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { - if (PosixPoller() == nullptr) { - return; - } - Worker* worker = new Worker(GetPosixEE(), PosixPoller()); - worker->Start(); - auto connections = CreateConnectedEndpoints( - *PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE()); - std::vector threads; - // Create one thread for each connection. For each connection, create - // 2 more worker threads: to exchange and verify bi-directional data transfer. - threads.reserve(kNumConnections); - for (int i = 0; i < kNumConnections; i++) { - // For each connection, simulate a parallel bi-directional data transfer. - // All bi-directional transfers are run in parallel across all connections. - auto it = connections.begin(); - auto client_endpoint = std::move((*it).client_endpoint); - auto server_endpoint = std::move((*it).server_endpoint); - EXPECT_NE(client_endpoint, nullptr); - EXPECT_NE(server_endpoint, nullptr); - connections.erase(it); - threads.emplace_back([client_endpoint = std::move(client_endpoint), - server_endpoint = std::move(server_endpoint)]() { - std::vector workers; - workers.reserve(2); - auto worker = [client_endpoint = client_endpoint.get(), - server_endpoint = - server_endpoint.get()](bool client_to_server) { - for (int i = 0; i < kNumExchangedMessages; i++) { - // If client_to_server is true, send from client to server and - // verify data read at the server. Otherwise send data from server - // to client and verify data read at client. - if (client_to_server) { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), - client_endpoint, server_endpoint) - .ok()); - } else { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), - server_endpoint, client_endpoint) - .ok()); - } - } - }; - // worker[0] simulates a flow from client to server endpoint - workers.emplace_back([&worker]() { worker(true); }); - // worker[1] simulates a flow from server to client endpoint - workers.emplace_back([&worker]() { worker(false); }); - workers[0].join(); - workers[1].join(); - }); - } - for (auto& t : threads) { - t.join(); - } - worker->Wait(); -} - -// Test with zero copy enabled and disabled. -INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest, - ::testing::ValuesIn({false, true}), &TestScenarioName); - -} // namespace posix_engine -} // namespace grpc_event_engine - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc_core::UniquePtr poll_strategy = - GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); - GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); - auto strings = absl::StrSplit(poll_strategy.get(), ','); - if (std::find(strings.begin(), strings.end(), "none") != strings.end()) { - // Skip the test entirely if poll strategy is none. - return 0; - } - return RUN_ALL_TESTS(); -} diff --git a/test/core/event_engine/posix/posix_engine_test_utils.cc b/test/core/event_engine/posix/posix_engine_test_utils.cc deleted file mode 100644 index 861f9c212d3..00000000000 --- a/test/core/event_engine/posix/posix_engine_test_utils.cc +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2022 gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "test/core/event_engine/posix/posix_engine_test_utils.h" - -#include -#include - -namespace grpc_event_engine { -namespace posix_engine { - -using ResolvedAddress = - grpc_event_engine::experimental::EventEngine::ResolvedAddress; - -// Creates a client socket and blocks until it connects to the specified -// server address. The function abort fails upon encountering errors. -int ConnectToServerOrDie(const ResolvedAddress& server_address) { - int client_fd; - int one = 1; - int flags; - - client_fd = socket(AF_INET6, SOCK_STREAM, 0); - setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); - // Make fd non-blocking. - flags = fcntl(client_fd, F_GETFL, 0); - fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); - - if (connect(client_fd, const_cast(server_address.address()), - server_address.size()) == -1) { - if (errno == EINPROGRESS) { - struct pollfd pfd; - pfd.fd = client_fd; - pfd.events = POLLOUT; - pfd.revents = 0; - if (poll(&pfd, 1, -1) == -1) { - gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno); - abort(); - } - } else { - gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno); - abort(); - } - } - return client_fd; -} - -} // namespace posix_engine -} // namespace grpc_event_engine diff --git a/test/core/event_engine/posix/posix_engine_test_utils.h b/test/core/event_engine/posix/posix_engine_test_utils.h deleted file mode 100644 index 6d014da843d..00000000000 --- a/test/core/event_engine/posix/posix_engine_test_utils.h +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2022 gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -#include "src/core/lib/event_engine/posix_engine/event_poller.h" - -namespace grpc_event_engine { -namespace posix_engine { - -class TestScheduler : public Scheduler { - public: - explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine) - : engine_(engine) {} - void Run( - grpc_event_engine::experimental::EventEngine::Closure* closure) override { - engine_->Run(closure); - } - - void Run(absl::AnyInvocable cb) override { - engine_->Run(std::move(cb)); - } - - private: - grpc_event_engine::experimental::EventEngine* engine_; -}; - -// Creates a client socket and blocks until it connects to the specified -// server address. The function abort fails upon encountering errors. -int ConnectToServerOrDie( - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& - server_address); - -} // namespace posix_engine -} // namespace grpc_event_engine \ No newline at end of file diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 408d82d87ae..504802c5837 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -124,7 +124,6 @@ grpc_cc_library( "cpu:10", "no_windows", ], - visibility = ["//test:__subpackages__"], deps = [ ":conformance_test_base_lib", "//:grpc", @@ -164,7 +163,6 @@ grpc_cc_library( ], hdrs = COMMON_HEADERS, external_deps = ["gtest"], - visibility = ["@grpc:public"], deps = [ "//:grpc", "//test/core/util:grpc_test_util", diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index dcb17ab6683..031766db1ec 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -45,10 +45,35 @@ using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::URIToResolvedAddress; using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; -using ::grpc_event_engine::experimental::GetNextSendMessage; +constexpr int kMinMessageSize = 1024; +constexpr int kMaxMessageSize = 4096; constexpr int kNumExchangedMessages = 100; +// Returns a random message with bounded length. +std::string GetNextSendMessage() { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + static std::random_device rd; + static std::seed_seq seed{rd()}; + static std::mt19937 gen(seed); + static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize); + static grpc_core::Mutex g_mu; + std::string tmp_s; + int len; + { + grpc_core::MutexLock lock(&g_mu); + len = dis(gen); + } + tmp_s.reserve(len); + for (int i = 0; i < len; ++i) { + tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)]; + } + return tmp_s; +} + } // namespace // Create a connection using the test EventEngine to a non-existent listener diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.cc b/test/core/event_engine/test_suite/event_engine_test_utils.cc index 433a7256aef..a754339e26b 100644 --- a/test/core/event_engine/test_suite/event_engine_test_utils.cc +++ b/test/core/event_engine/test_suite/event_engine_test_utils.cc @@ -16,7 +16,6 @@ #include #include -#include #include #include @@ -45,43 +44,6 @@ using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; namespace grpc_event_engine { namespace experimental { -namespace { - -constexpr int kMinMessageSize = 1024; -constexpr int kMaxMessageSize = 4096; - -} // namespace - -// Returns a random message with bounded length. -std::string GetNextSendMessage() { - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - static std::random_device rd; - static std::seed_seq seed{rd()}; - static std::mt19937 gen(seed); - static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize); - static grpc_core::Mutex g_mu; - std::string tmp_s; - int len; - { - grpc_core::MutexLock lock(&g_mu); - len = dis(gen); - } - tmp_s.reserve(len); - for (int i = 0; i < len; ++i) { - tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)]; - } - return tmp_s; -} - -void WaitForSingleOwner(std::shared_ptr&& engine) { - while (engine.use_count() > 1) { - absl::SleepFor(absl::Milliseconds(100)); - } -} - EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) { grpc_resolved_address addr; absl::StatusOr uri = grpc_core::URI::Parse(address_str); @@ -117,34 +79,23 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint, grpc_core::Notification read_signal; grpc_core::Notification write_signal; SliceBuffer read_slice_buf; - SliceBuffer read_store_buf; SliceBuffer write_slice_buf; - read_slice_buf.Clear(); - write_slice_buf.Clear(); - read_store_buf.Clear(); - // std::cout << "SendValidatePayload ... " << std::endl; - // fflush(stdout); - AppendStringToSliceBuffer(&write_slice_buf, data); EventEngine::Endpoint::ReadArgs args = {num_bytes_written}; std::function read_cb; - read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb, - &read_signal, &args](absl::Status status) { + read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_signal, + &args](absl::Status status) { GPR_ASSERT(status.ok()); if (read_slice_buf.Length() == static_cast(args.read_hint_bytes)) { - read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(), - read_store_buf); read_signal.Notify(); return; } args.read_hint_bytes -= read_slice_buf.Length(); - read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(), - read_store_buf); - receive_endpoint->Read(read_cb, &read_slice_buf, &args); + receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args); }; // Start asynchronous reading at the receive_endpoint. - receive_endpoint->Read(read_cb, &read_slice_buf, &args); + receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args); // Start asynchronous writing at the send_endpoint. send_endpoint->Write( [&write_signal](absl::Status status) { @@ -155,10 +106,7 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint, write_signal.WaitForNotification(); read_signal.WaitForNotification(); // Check if data written == data read - std::string data_read = ExtractSliceBufferIntoString(&read_store_buf); - if (data != data_read) { - gpr_log(GPR_INFO, "Data written = %s", data.c_str()); - gpr_log(GPR_INFO, "Data read = %s", data_read.c_str()); + if (data != ExtractSliceBufferIntoString(&read_slice_buf)) { return absl::CancelledError("Data read != Data written"); } return absl::OkStatus(); diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.h b/test/core/event_engine/test_suite/event_engine_test_utils.h index be34004f190..ab68aae3f88 100644 --- a/test/core/event_engine/test_suite/event_engine_test_utils.h +++ b/test/core/event_engine/test_suite/event_engine_test_utils.h @@ -42,18 +42,11 @@ std::string ExtractSliceBufferIntoString(SliceBuffer* buf); EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str); -// Returns a random message with bounded length. -std::string GetNextSendMessage(); - -// Waits until the use_count of the event engine shared_ptr has reached 1 -// and returns. -void WaitForSingleOwner(std::shared_ptr&& engine); - -// A helper method to exchange data between two endpoints. It is assumed -// that both endpoints are connected. The data (specified as a string) is -// written by the sender_endpoint and read by the receiver_endpoint. It -// returns OK status only if data written == data read. It also blocks the -// calling thread until said Write and Read operations are complete. +// A helper method to exchange data between two endpoints. It is assumed that +// both endpoints are connected. The data (specified as a string) is written by +// the sender_endpoint and read by the receiver_endpoint. It returns OK +// status only if data written == data read. It also blocks the calling thread +// until said Write and Read operations are complete. absl::Status SendValidatePayload(std::string data, EventEngine::Endpoint* send_endpoint, EventEngine::Endpoint* receive_endpoint); diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 98e73f9aa44..84bad94acd2 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5191,28 +5191,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "posix_endpoint_test", - "platforms": [ - "linux", - "mac", - "posix" - ], - "uses_polling": true - }, { "args": [], "benchmark": false,