Tcp endpoint implementation for posix event engine. (#30559)

* convert tcp code to use EndpointConfig and successfully compile

* regenerate projects

* copying config options used in tcp code to create map backed endpoint configs

* fix minor build issues

* fix includes in tests

* fix some build and test issues

* modifying endpoint config definition from an abstract interface into a concrete implementation

* minor fix

* add unreachable code check

* fix some windows build issues

* fix more build issues

* adding a grpc_tcp_generic_options struct to hold data extracted from EndpointConfig

* code formatting

* simplify

* fix sanity checks

* fix windows issues

* some posix fixes

* disabling copy constructor and copy assignment operator for endpoint config

* fix sanity checks

* fix syntax error

* fix weird formatting

* fix build issue

* fix review comments

* reverting un-necessary change

* remove channel args completely from windows impl since it is unused

* remove unused headers and fix usage in endpoint_pair_windows.cc

* cleanup

* cleanup

* fix some initialization issues

* re-generate projects

* removing some files

* fix ifdef for windows

* fixing windows build issue due to improper header file include

* Automated change: Fix sanity tests

* start

* regenerate-projects

* delete some files

* Automated change: Fix sanity tests

* review comments

* add comment

* Migrating posix event pollers to use new event poller interface

* add inline attributes

* Automated change: Fix sanity tests

* remove ref/unref from iomgr engine closure ad add custom closure types internal to the pollers

* updating time util usage

* use unique_ptrs

* update comments

* Automated change: Fix sanity tests

* review comments

* rename GetPointer to GetVoidPointer

* cleanup

* Automated change: Fix sanity tests

* removing EndpointConfig::Get method

* Automated change: Fix sanity tests

* cleanup

* fix

* fix review comments

* review comments

* cleanup

* update comments

* fix

* cleanup

* update comments

* Automated change: Fix sanity tests

* fix misleading comments

* bug fixes

* fix

* fix

* start

* revert some changes for bug fixes and allow spurious wakeups for poll based poller

* sanity

* fix

* minor fix

* review comments

* fix

* comment

* remove re-defined function

* fix review comments

* fix windows iocp build issue due to removed function

* change Milliseconds return type

* fix

* remove header

* regenerate projects

* fix sanity

* code

* fix sanity

* Automated change: Fix sanity tests

* Forking iomgr internal_errqueue defines for posix event engine

* fix BUILD file

* start

* update

* sanity

* regenerate_projects

* Automated change: Fix sanity tests

* rename

* review comments

* update

* review comments

* Automated change: Fix sanity tests

* fix BUILD

* add no_windows tag

* update

* update

* remove debug statements

* delete unused file

* update

* fix BUILD

* regenerate projects

* update

* fix BUILD

* update

* fix sanity

* minor additions to configure global tcp user timeout values

* cleanup

* fix build syntax

* change poller interface for posix event engine

* update event_poller_posix_test

* fix build error

* Add more unit tests

* windows ee changes

* update comment

* remove unused deps

* fix msan issue

* add comment

* adding rcv lowat helper to socket wrapper

* fix sanity

* review comments

* fix build issue

* fix sanity

* sanity

* iwyu sanity

* fix build

* merge conflict

* Automated change: Fix sanity tests

* update

* update

* fix macos issue

* fix msan and ubsan issues

* fix macos build issue

* comment

* review comments

* remove unused dep

* sanity

* merge conflict

* mac build issue

* regenerate projects

* build issue

* fix typo and address review comments

* review comments

* Automated change: Fix sanity tests

* update

* merge issue

* review comments

* sanity

* fix build issue

* review comments

* add thread annotations

* MacOS fix unused variable

* add some more #endif comments

* missing semi-colon

* Automated change: Fix sanity tests

* sanity

* sanity again

* sanity try again

* sanity again

* try again with no_include pragma

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/31134/head
Vignesh Babu 2 years ago committed by GitHub
parent ba8af0157b
commit a81391d931
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      BUILD
  2. 58
      CMakeLists.txt
  3. 52
      build_autogenerated.yaml
  4. 17
      include/grpc/event_engine/slice_buffer.h
  5. 2
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  6. 1
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
  7. 2
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  8. 1
      src/core/lib/event_engine/posix_engine/ev_poll_posix.h
  9. 5
      src/core/lib/event_engine/posix_engine/event_poller.h
  10. 1144
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  11. 675
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  12. 2
      src/core/lib/event_engine/posix_engine/traced_buffer_list.h
  13. 45
      test/core/event_engine/posix/BUILD
  14. 16
      test/core/event_engine/posix/event_poller_posix_test.cc
  15. 341
      test/core/event_engine/posix/posix_endpoint_test.cc
  16. 59
      test/core/event_engine/posix/posix_engine_test_utils.cc
  17. 49
      test/core/event_engine/posix/posix_engine_test_utils.h
  18. 2
      test/core/event_engine/test_suite/BUILD
  19. 27
      test/core/event_engine/test_suite/client_test.cc
  20. 62
      test/core/event_engine/test_suite/event_engine_test_utils.cc
  21. 17
      test/core/event_engine/test_suite/event_engine_test_utils.h
  22. 22
      tools/run_tests/generated/tests.json

38
BUILD

@ -2779,6 +2779,44 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "posix_event_engine_endpoint",
srcs = [
"src/core/lib/event_engine/posix_engine/posix_endpoint.cc",
],
hdrs = [
"src/core/lib/event_engine/posix_engine/posix_endpoint.h",
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/functional:any_invocable",
"absl/hash",
"absl/memory",
"absl/meta:type_traits",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
],
deps = [
"event_engine_base_hdrs",
"experiments",
"gpr",
"iomgr_port",
"memory_quota",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
"posix_event_engine_internal_errqueue",
"posix_event_engine_tcp_socket_utils",
"posix_event_engine_traced_buffer_list",
"ref_counted",
"ref_counted_ptr",
"resource_quota",
"useful",
],
)
grpc_cc_library(
name = "event_engine_utils",
srcs = ["src/core/lib/event_engine/utils.cc"],

58
CMakeLists.txt generated

@ -1081,6 +1081,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx pipe_test)
add_dependencies(buildtests_cxx poll_test)
add_dependencies(buildtests_cxx port_sharing_end2end_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_endpoint_test)
endif()
add_dependencies(buildtests_cxx posix_event_engine_test)
add_dependencies(buildtests_cxx promise_factory_test)
add_dependencies(buildtests_cxx promise_map_test)
@ -9432,6 +9435,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
test/core/event_engine/posix/event_poller_posix_test.cc
test/core/event_engine/posix/posix_engine_test_utils.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@ -14505,6 +14509,60 @@ target_link_libraries(port_sharing_end2end_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_endpoint_test
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
src/core/lib/event_engine/posix_engine/internal_errqueue.cc
src/core/lib/event_engine/posix_engine/lockfree_event.cc
src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/iomgr/socket_mutator.cc
test/core/event_engine/posix/posix_endpoint_test.cc
test/core/event_engine/posix/posix_engine_test_utils.cc
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/oracle_event_engine_posix.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(posix_endpoint_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(posix_endpoint_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::cleanup
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -5738,6 +5738,7 @@ targets:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- test/core/event_engine/posix/posix_engine_test_utils.h
src:
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
- src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
@ -5747,6 +5748,7 @@ targets:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- test/core/event_engine/posix/event_poller_posix_test.cc
- test/core/event_engine/posix/posix_engine_test_utils.cc
deps:
- grpc_test_util
platforms:
@ -8268,6 +8270,56 @@ targets:
- test/cpp/end2end/test_service_impl.cc
deps:
- grpc++_test_util
- name: posix_endpoint_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
- src/core/lib/event_engine/posix_engine/internal_errqueue.h
- src/core/lib/event_engine/posix_engine/lockfree_event.h
- src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/traced_buffer_list.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/iomgr/socket_mutator.h
- test/core/event_engine/posix/posix_engine_test_utils.h
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
- test/core/event_engine/test_suite/oracle_event_engine_posix.h
src:
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
- src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
- src/core/lib/event_engine/posix_engine/internal_errqueue.cc
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
- src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/iomgr/socket_mutator.cc
- test/core/event_engine/posix/posix_endpoint_test.cc
- test/core/event_engine/posix/posix_engine_test_utils.cc
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/oracle_event_engine_posix.cc
deps:
- absl/cleanup:cleanup
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: posix_event_engine_test
gtest: true
build: test

@ -26,6 +26,7 @@
#include "absl/utility/utility.h"
#include <grpc/event_engine/slice.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
@ -67,6 +68,11 @@ class SliceBuffer {
return *this;
}
/// Swap the contents of this SliceBuffer with the contents of another.
void Swap(SliceBuffer& other) {
grpc_slice_buffer_swap(&slice_buffer_, &other.slice_buffer_);
}
/// Appends a new slice into the SliceBuffer and makes an attempt to merge
/// this slice with the last slice in the SliceBuffer.
void Append(Slice slice);
@ -88,6 +94,17 @@ class SliceBuffer {
grpc_slice_buffer_move_first_into_buffer(&slice_buffer_, n, dst);
}
/// Removes/deletes the last n bytes in the SliceBuffer and add it to the
/// other SliceBuffer
void MoveLastNBytesIntoSliceBuffer(size_t n, SliceBuffer& other) {
grpc_slice_buffer_trim_end(&slice_buffer_, n, &other.slice_buffer_);
}
/// Move the first n bytes of the SliceBuffer into the other SliceBuffer
void MoveFirstNBytesIntoSliceBuffer(size_t n, SliceBuffer& other) {
grpc_slice_buffer_move_first(&slice_buffer_, n, &other.slice_buffer_);
}
/// Removes and unrefs all slices in the SliceBuffer.
void Clear() { grpc_slice_buffer_reset_and_unref(&slice_buffer_); }

@ -91,7 +91,7 @@ class Epoll1EventHandle : public EventHandle {
pending_write_.store(false, std::memory_order_relaxed);
pending_error_.store(false, std::memory_order_relaxed);
}
Epoll1Poller* Poller() { return poller_; }
Epoll1Poller* Poller() override { return poller_; }
bool SetPendingActions(bool pending_read, bool pending_write,
bool pending_error) {
// Another thread may be executing ExecutePendingActions() at this point

@ -57,6 +57,7 @@ class Epoll1Poller : public PosixEventPoller {
void Kick() override;
Scheduler* GetScheduler() { return scheduler_; }
void Shutdown() override;
bool CanTrackErrors() const override { return true; }
~Epoll1Poller() override;
private:

@ -103,7 +103,7 @@ class PollEventHandle : public EventHandle {
absl::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListAddHandle(this);
}
PollPoller* Poller() { return poller_; }
PollPoller* Poller() override { return poller_; }
bool SetPendingActions(bool pending_read, bool pending_write) {
pending_actions_ |= pending_read;
if (pending_write) {

@ -51,6 +51,7 @@ class PollPoller : public PosixEventPoller {
void Kick() override;
Scheduler* GetScheduler() { return scheduler_; }
void Shutdown() override;
bool CanTrackErrors() const override { return false; }
~PollPoller() override;
private:

@ -37,6 +37,8 @@ class Scheduler {
virtual ~Scheduler() = default;
};
class PosixEventPoller;
class EventHandle {
public:
virtual int WrappedFd() = 0;
@ -79,6 +81,8 @@ class EventHandle {
virtual void SetHasError() = 0;
// Returns true if the handle has been shutdown.
virtual bool IsHandleShutdown() = 0;
// Returns the poller which was used to create this handle.
virtual PosixEventPoller* Poller() = 0;
virtual ~EventHandle() = default;
};
@ -87,6 +91,7 @@ class PosixEventPoller : public grpc_event_engine::experimental::Poller {
// Return an opaque handle to perform actions on the provided file descriptor.
virtual EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) = 0;
virtual bool CanTrackErrors() const = 0;
virtual std::string Name() = 0;
// Shuts down and deletes the poller. It is legal to call this function
// only when no other poller method is in progress. For instance, it is

File diff suppressed because it is too large Load Diff

@ -0,0 +1,675 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
#include <grpc/support/port_platform.h>
// IWYU pragma: no_include <bits/types/struct_iovec.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <new>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <sys/socket.h> // IWYU pragma: keep
#include <sys/types.h> // IWYU pragma: keep
#ifdef GRPC_MSG_IOVLEN_TYPE
typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
#else
typedef size_t msg_iovlen_type;
#endif
#endif // GRPC_POSIX_SOCKET_TCP
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_POSIX_SOCKET_TCP
class TcpZerocopySendRecord {
public:
TcpZerocopySendRecord() { buf_.Clear(); };
~TcpZerocopySendRecord() { DebugAssertEmpty(); }
// TcpZerocopySendRecord contains a slice buffer holding the slices to be
// sent. Given the slices that we wish to send, and the current offset into
// the slice buffer (indicating which have already been sent), populate an
// iovec array that will be used for a zerocopy enabled sendmsg().
// unwind_slice_idx - input/output parameter. It indicates the index of last
// slice whose contents were partially sent in the previous sendmsg. After
// this function returns, it gets updated to to a new offset
// depending on the number of bytes which are decided to be sent in the
// current sendmsg.
// unwind_byte_idx - input/output parameter. It indicates the byte offset
// within the last slice whose contents were partially sent in the previous
// sendmsg. After this function returns, it gets updated to a new offset
// depending on the number of bytes which are decided to be sent in the
// current sendmsg.
// sending_length - total number of bytes to be sent in the current sendmsg.
// iov - An iovec array containing the bytes to be sent in the current
// sendmsg.
// Returns: the number of entries in the iovec array.
//
msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx,
size_t* unwind_byte_idx, size_t* sending_length,
iovec* iov);
// A sendmsg() may not be able to send the bytes that we requested at this
// time, returning EAGAIN (possibly due to backpressure). In this case,
// unwind the offset into the slice buffer so we retry sending these bytes.
void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) {
out_offset_.byte_idx = unwind_byte_idx;
out_offset_.slice_idx = unwind_slice_idx;
}
// Update the offset into the slice buffer based on how much we wanted to sent
// vs. what sendmsg() actually sent (which may be lower, possibly due to
// backpressure).
void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent);
// Indicates whether all underlying data has been sent or not.
bool AllSlicesSent() { return out_offset_.slice_idx == buf_.Count(); }
// Reset this structure for a new tcp_write() with zerocopy.
void PrepareForSends(
grpc_event_engine::experimental::SliceBuffer& slices_to_send) {
DebugAssertEmpty();
out_offset_.slice_idx = 0;
out_offset_.byte_idx = 0;
buf_.Swap(slices_to_send);
Ref();
}
// References: 1 reference per sendmsg(), and 1 for the tcp_write().
void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
// Unref: called when we get an error queue notification for a sendmsg(), if a
// sendmsg() failed or when tcp_write() is done.
bool Unref() {
const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel);
GPR_DEBUG_ASSERT(prior > 0);
if (prior == 1) {
AllSendsComplete();
return true;
}
return false;
}
private:
struct OutgoingOffset {
size_t slice_idx = 0;
size_t byte_idx = 0;
};
void DebugAssertEmpty() {
GPR_DEBUG_ASSERT(buf_.Count() == 0);
GPR_DEBUG_ASSERT(buf_.Length() == 0);
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
}
// When all sendmsg() calls associated with this tcp_write() have been
// completed (ie. we have received the notifications for each sequence number
// for each sendmsg()) and all reference counts have been dropped, drop our
// reference to the underlying data since we no longer need it.
void AllSendsComplete() {
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
buf_.Clear();
}
grpc_event_engine::experimental::SliceBuffer buf_;
std::atomic<intptr_t> ref_{0};
OutgoingOffset out_offset_;
};
class TcpZerocopySendCtx {
public:
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB
explicit TcpZerocopySendCtx(
bool zerocopy_enabled, int max_sends = kDefaultMaxSends,
size_t send_bytes_threshold = kDefaultSendBytesThreshold)
: max_sends_(max_sends),
free_send_records_size_(max_sends),
threshold_bytes_(send_bytes_threshold) {
send_records_ = static_cast<TcpZerocopySendRecord*>(
gpr_malloc(max_sends * sizeof(*send_records_)));
free_send_records_ = static_cast<TcpZerocopySendRecord**>(
gpr_malloc(max_sends * sizeof(*free_send_records_)));
if (send_records_ == nullptr || free_send_records_ == nullptr) {
gpr_free(send_records_);
gpr_free(free_send_records_);
gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n");
memory_limited_ = true;
enabled_ = false;
} else {
for (int idx = 0; idx < max_sends_; ++idx) {
new (send_records_ + idx) TcpZerocopySendRecord();
free_send_records_[idx] = send_records_ + idx;
}
enabled_ = zerocopy_enabled;
}
}
~TcpZerocopySendCtx() {
if (send_records_ != nullptr) {
for (int idx = 0; idx < max_sends_; ++idx) {
send_records_[idx].~TcpZerocopySendRecord();
}
}
gpr_free(send_records_);
gpr_free(free_send_records_);
}
// True if we were unable to allocate the various bookkeeping structures at
// transport initialization time. If memory limited, we do not zerocopy.
bool MemoryLimited() const { return memory_limited_; }
// TCP send zerocopy maintains an implicit sequence number for every
// successful sendmsg() with zerocopy enabled; the kernel later gives us an
// error queue notification with this sequence number indicating that the
// underlying data buffers that we sent can now be released. Once that
// notification is received, we can release the buffers associated with this
// zerocopy send record. Here, we associate the sequence number with the data
// buffers that were sent with the corresponding call to sendmsg().
void NoteSend(TcpZerocopySendRecord* record) {
record->Ref();
{
grpc_core::MutexLock lock(&mu_);
is_in_write_ = true;
AssociateSeqWithSendRecordLocked(last_send_, record);
}
++last_send_;
}
// If sendmsg() actually failed, though, we need to revert the sequence number
// that we speculatively bumped before calling sendmsg(). Note that we bump
// this sequence number and perform relevant bookkeeping (see: NoteSend())
// *before* calling sendmsg() since, if we called it *after* sendmsg(), then
// there is a possible race with the release notification which could occur on
// another thread before we do the necessary bookkeeping. Hence, calling
// NoteSend() *before* sendmsg() and implementing an undo function is needed.
void UndoSend() {
--last_send_;
if (ReleaseSendRecord(last_send_)->Unref()) {
// We should still be holding the ref taken by tcp_write().
GPR_DEBUG_ASSERT(0);
}
}
// Simply associate this send record (and the underlying sent data buffers)
// with the implicit sequence number for this zerocopy sendmsg().
void AssociateSeqWithSendRecordLocked(uint32_t seq,
TcpZerocopySendRecord* record)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ctx_lookup_.emplace(seq, record);
}
// Get a send record for a send that we wish to do with zerocopy.
TcpZerocopySendRecord* GetSendRecord() {
grpc_core::MutexLock lock(&mu_);
return TryGetSendRecordLocked();
}
// A given send record corresponds to a single tcp_write() with zerocopy
// enabled. This can result in several sendmsg() calls to flush all of the
// data to wire. Each sendmsg() takes a reference on the
// TcpZerocopySendRecord, and corresponds to a single sequence number.
// ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
// single sequence number. This is called either when we receive the relevant
// error queue notification (saying that we can discard the underlying
// buffers for this sendmsg()) is received from the kernel - or, in case
// sendmsg() was unsuccessful to begin with.
TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) {
grpc_core::MutexLock lock(&mu_);
return ReleaseSendRecordLocked(seq);
}
// After all the references to a TcpZerocopySendRecord are released, we can
// add it back to the pool (of size max_sends_). Note that we can only have
// max_sends_ tcp_write() instances with zerocopy enabled in flight at the
// same time.
void PutSendRecord(TcpZerocopySendRecord* record) {
grpc_core::MutexLock lock(&mu_);
GPR_DEBUG_ASSERT(record >= send_records_ &&
record < send_records_ + max_sends_);
PutSendRecordLocked(record);
}
// Indicate that we are disposing of this zerocopy context. This indicator
// will prevent new zerocopy writes from being issued.
void Shutdown() { shutdown_.store(true, std::memory_order_release); }
// Indicates that there are no inflight tcp_write() instances with zerocopy
// enabled.
bool AllSendRecordsEmpty() {
grpc_core::MutexLock lock(&mu_);
return free_send_records_size_ == max_sends_;
}
bool Enabled() const { return enabled_; }
// Only use zerocopy if we are sending at least this many bytes. The
// additional overhead of reading the error queue for notifications means that
// zerocopy is not useful for small transfers.
size_t ThresholdBytes() const { return threshold_bytes_; }
// Expected to be called by handler reading messages from the err queue.
// It is used to indicate that some optmem memory is now available. It returns
// true to tell the caller to mark the file descriptor as immediately
// writable.
//
// OptMem (controlled by the kernel option optmem_max) refers to the memory
// allocated to the cmsg list maintained by the kernel that contains "extra"
// packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
// the kernel to allocate more memory as needed for more control messages that
// need to be sent for each socket connected.
//
// If a write is currently in progress on the socket (ie. we have issued a
// sendmsg() and are about to check its return value) then we set omem state
// to CHECK to make the sending thread know that some tcp_omem was
// concurrently freed even if sendmsg() returns ENOBUFS. In this case, since
// there is already an active send thread, we do not need to mark the
// socket writeable, so we return false.
//
// If there was no write in progress on the socket, and the socket was not
// marked as FULL, then we need not mark the socket writeable now that some
// tcp_omem memory is freed since it was not considered as blocked on
// tcp_omem to begin with. So in this case, return false.
//
// But, if a write was not in progress and the omem state was FULL, then we
// need to mark the socket writeable since it is no longer blocked by
// tcp_omem. In this case, return true.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOptMemStateAfterFree() {
grpc_core::MutexLock lock(&mu_);
if (is_in_write_) {
zcopy_enobuf_state_ = OptMemState::kCheck;
return false;
}
GPR_DEBUG_ASSERT(zcopy_enobuf_state_ != OptMemState::kCheck);
if (zcopy_enobuf_state_ == OptMemState::kFull) {
// A previous sendmsg attempt was blocked by ENOBUFS. Return true to
// mark the fd as writable so the next write attempt could be made.
zcopy_enobuf_state_ = OptMemState::kOpen;
return true;
} else if (zcopy_enobuf_state_ == OptMemState::kOpen) {
// No need to mark the fd as writable because the previous write
// attempt did not encounter ENOBUFS.
return false;
} else {
// This state should never be reached because it implies that the previous
// state was CHECK and is_in_write is false. This means that after the
// previous sendmsg returned and set is_in_write to false, it did
// not update the z-copy change from CHECK to OPEN.
GPR_ASSERT(false && "OMem state error!");
}
}
// Expected to be called by the thread calling sendmsg after the syscall
// invocation. is complete. If an ENOBUF is seen, it checks if the error
// handler (Tx0cp completions) has already run and free'ed up some OMem. It
// returns true indicating that the write can be attempted again immediately.
// If ENOBUFS was seen but no Tx0cp completions have been received between the
// sendmsg() and us taking this lock, then tcp_omem is still full from our
// point of view. Therefore, we do not signal that the socket is writeable
// with respect to the availability of tcp_omem. Therefore the function
// returns false. This indicates that another write should not be attempted
// immediately and the calling thread should wait until the socket is writable
// again. If ENOBUFS was not seen, then again return false because the next
// write should be attempted only when the socket is writable again.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf) {
grpc_core::MutexLock lock(&mu_);
is_in_write_ = false;
if (seen_enobuf) {
if (zcopy_enobuf_state_ == OptMemState::kCheck) {
zcopy_enobuf_state_ = OptMemState::kOpen;
return true;
} else {
zcopy_enobuf_state_ = OptMemState::kFull;
}
} else if (zcopy_enobuf_state_ != OptMemState::kOpen) {
zcopy_enobuf_state_ = OptMemState::kOpen;
}
return false;
}
private:
// STATE TRANSITION DIAGRAM
//
// sendmsg succeeds Tx-zero copy succeeds and there is no active sendmsg
// ----<<--+ +------<<-------------------------------------+
// | | | |
// | | v sendmsg returns ENOBUFS |
// +-----> OPEN ------------->>-------------------------> FULL
// ^ |
// | |
// | sendmsg completes |
// +----<<---------- CHECK <-------<<-------------+
// Tx-zero copy succeeds and there is
// an active sendmsg
//
// OptMem (controlled by the kernel option optmem_max) refers to the memory
// allocated to the cmsg list maintained by the kernel that contains "extra"
// packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
// the kernel to allocate more memory as needed for more control messages that
// need to be sent for each socket connected. Each tx zero copy sendmsg has
// a corresponding entry added into the Optmem queue. The entry is popped
// from the Optmem queue when the zero copy send is complete.
enum class OptMemState : int8_t {
kOpen, // Everything is clear and omem is not full.
kFull, // The last sendmsg() has returned with an errno of ENOBUFS.
kCheck, // Error queue is read while is_in_write_ was true, so we should
// check this state after the sendmsg.
};
TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto iter = ctx_lookup_.find(seq);
GPR_DEBUG_ASSERT(iter != ctx_lookup_.end());
TcpZerocopySendRecord* record = iter->second;
ctx_lookup_.erase(iter);
return record;
}
TcpZerocopySendRecord* TryGetSendRecordLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (shutdown_.load(std::memory_order_acquire)) {
return nullptr;
}
if (free_send_records_size_ == 0) {
return nullptr;
}
free_send_records_size_--;
return free_send_records_[free_send_records_size_];
}
void PutSendRecordLocked(TcpZerocopySendRecord* record)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_);
free_send_records_[free_send_records_size_] = record;
free_send_records_size_++;
}
TcpZerocopySendRecord* send_records_ ABSL_GUARDED_BY(mu_);
TcpZerocopySendRecord** free_send_records_ ABSL_GUARDED_BY(mu_);
int max_sends_;
int free_send_records_size_ ABSL_GUARDED_BY(mu_);
grpc_core::Mutex mu_;
uint32_t last_send_ = 0;
std::atomic<bool> shutdown_{false};
bool enabled_ = false;
size_t threshold_bytes_ = kDefaultSendBytesThreshold;
absl::flat_hash_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_
ABSL_GUARDED_BY(mu_);
bool memory_limited_ = false;
bool is_in_write_ ABSL_GUARDED_BY(mu_) = false;
OptMemState zcopy_enobuf_state_ ABSL_GUARDED_BY(mu_) = OptMemState::kOpen;
};
class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
public:
PosixEndpointImpl(
EventHandle* handle, PosixEngineClosure* on_done,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
const PosixTcpOptions& options);
~PosixEndpointImpl() override;
void Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args);
void Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args);
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const {
return peer_address_;
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const {
return local_address_;
}
void MaybeShutdown(absl::Status why);
private:
void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void HandleWrite(absl::Status status);
void HandleError(absl::Status status);
void HandleRead(absl::Status status);
void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void FinishEstimate();
void AddToEstimate(size_t bytes);
void MaybePostReclaimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void PerformReclamation() ABSL_LOCKS_EXCLUDED(read_mu_);
// Zero copy related helper methods.
TcpZerocopySendRecord* TcpGetSendZerocopyRecord(
grpc_event_engine::experimental::SliceBuffer& buf);
bool DoFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
bool TcpFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
bool TcpFlush(absl::Status& status);
void TcpShutdownTracedBufferList();
void UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord* record);
void ZerocopyDisableAndWaitForRemaining();
bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags);
#ifdef GRPC_LINUX_ERRQUEUE
bool ProcessErrors();
// Reads a cmsg to process zerocopy control messages.
void ProcessZerocopy(struct cmsghdr* cmsg);
// Reads a cmsg to derive timestamps from the control messages.
struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg);
#endif // GRPC_LINUX_ERRQUEUE
grpc_core::Mutex read_mu_;
grpc_core::Mutex traced_buffer_mu_;
PosixSocketWrapper sock_;
int fd_;
bool is_first_read_ = true;
bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false;
double target_length_;
int min_read_chunk_size_;
int max_read_chunk_size_;
int set_rcvlowat_ = 0;
double bytes_read_this_round_ = 0;
std::atomic<int> ref_count_{1};
// garbage after the last read.
grpc_event_engine::experimental::SliceBuffer last_read_buffer_;
grpc_event_engine::experimental::SliceBuffer* incoming_buffer_
ABSL_GUARDED_BY(read_mu_) = nullptr;
// bytes pending on the socket from the last read.
int inq_ = 1;
// cache whether kernel supports inq.
bool inq_capable_ = false;
grpc_event_engine::experimental::SliceBuffer* outgoing_buffer_ = nullptr;
// byte within outgoing_buffer's slices[0] to write next.
size_t outgoing_byte_idx_ = 0;
PosixEngineClosure* on_read_ = nullptr;
PosixEngineClosure* on_write_ = nullptr;
PosixEngineClosure* on_error_ = nullptr;
PosixEngineClosure* on_done_ = nullptr;
absl::AnyInvocable<void(absl::Status)> read_cb_ ABSL_GUARDED_BY(read_mu_);
absl::AnyInvocable<void(absl::Status)> write_cb_;
grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_;
grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_;
grpc_core::MemoryOwner memory_owner_;
grpc_core::MemoryAllocator::Reservation self_reservation_;
void* outgoing_buffer_arg_ = nullptr;
// A counter which starts at 0. It is initialized the first time the socket
// options for collecting timestamps are set, and is incremented with each
// byte sent.
int bytes_counter_ = -1;
// True if timestamping options are set on the socket.
#ifdef GRPC_LINUX_ERRQUEUE
bool socket_ts_enabled_ = false;
#endif // GRPC_LINUX_ERRQUEUE
// Cache whether we can set timestamping options
bool ts_capable_ = true;
// Set to 1 if we do not want to be notified on errors anymore.
std::atomic<bool> stop_error_notification_{false};
std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_;
TcpZerocopySendRecord* current_zerocopy_send_ = nullptr;
// If true, the size of buffers alloted for tcp reads will be based on the
// specified min_progress_size values conveyed by the upper layers.
bool frame_size_tuning_enabled_ = false;
// A hint from upper layers specifying the minimum number of bytes that need
// to be read to make meaningful progress.
int min_progress_size_ = 1;
TracedBufferList traced_buffers_ ABSL_GUARDED_BY(traced_buffer_mu_);
// The handle is owned by the PosixEndpointImpl object.
EventHandle* handle_;
PosixEventPoller* poller_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
class PosixEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
PosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
const grpc_event_engine::experimental::EndpointConfig& config)
: impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine),
TcpOptionsFromEndpointConfig(config))) {}
void Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args) override {
impl_->Read(std::move(on_read), buffer, args);
}
void Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args) override {
impl_->Write(std::move(on_writable), data, args);
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const override {
return impl_->GetPeerAddress();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const override {
return impl_->GetLocalAddress();
}
~PosixEndpoint() override {
impl_->MaybeShutdown(absl::InternalError("Endpoint closing"));
}
private:
PosixEndpointImpl* impl_;
};
#else // GRPC_POSIX_SOCKET_TCP
class PosixEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
PosixEndpoint() = default;
void Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
grpc_event_engine::experimental::SliceBuffer* /*buffer*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
ReadArgs* /*args*/) override {
GPR_ASSERT(false && "PosixEndpoint::Read not supported on this platform");
}
void Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
grpc_event_engine::experimental::SliceBuffer* /*data*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
WriteArgs* /*args*/) override {
GPR_ASSERT(false && "PosixEndpoint::Write not supported on this platform");
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const override {
GPR_ASSERT(false &&
"PosixEndpoint::GetPeerAddress not supported on this platform");
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const override {
GPR_ASSERT(false &&
"PosixEndpoint::GetLocalAddress not supported on this platform");
}
~PosixEndpoint() override = default;
};
#endif // GRPC_POSIX_SOCKET_TCP
// Create a PosixEndpoint.
// A shared_ptr of the EventEngine is passed to the endpoint to ensure that
// the event engine is alive for the lifetime of the endpoint. The ownership
// of the EventHandle is transferred to the endpoint.
std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
const grpc_event_engine::experimental::EndpointConfig& config);
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H

@ -30,8 +30,6 @@
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/iomgr/port.h"
// #undef GRPC_LINUX_ERRQUEUE
namespace grpc_event_engine {
namespace posix_engine {

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])
@ -21,6 +21,22 @@ grpc_package(
visibility = "public",
) # Useful for third party devs to test their io manager implementation.
grpc_cc_library(
name = "posix_engine_test_utils",
testonly = True,
srcs = ["posix_engine_test_utils.cc"],
hdrs = ["posix_engine_test_utils.h"],
tags = [
"no_windows",
],
visibility = ["//test:__subpackages__"],
deps = [
"//:event_engine_common",
"//:posix_event_engine_event_poller",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "timer_heap_test",
srcs = ["timer_heap_test.cc"],
@ -78,6 +94,7 @@ grpc_cc_test(
"//:posix_event_engine_closure",
"//:posix_event_engine_event_poller",
"//:posix_event_engine_poller_posix_default",
"//test/core/event_engine/posix:posix_engine_test_utils",
"//test/core/util:grpc_test_util",
],
)
@ -148,3 +165,29 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "posix_endpoint_test",
srcs = ["posix_endpoint_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = True,
uses_polling = True,
deps = [
"//:channel_args",
"//:common_event_engine_closures",
"//:event_engine_poller",
"//:posix_event_engine",
"//:posix_event_engine_closure",
"//:posix_event_engine_endpoint",
"//:posix_event_engine_event_poller",
"//:posix_event_engine_poller_posix_default",
"//test/core/event_engine/posix:posix_engine_test_utils",
"//test/core/event_engine/test_suite:conformance_test_base_lib",
"//test/core/event_engine/test_suite:oracle_event_engine_posix",
"//test/core/util:grpc_test_util",
],
)

@ -56,6 +56,7 @@
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/notification.h"
#include "test/core/event_engine/posix/posix_engine_test_utils.h"
#include "test/core/util/port.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
@ -85,21 +86,6 @@ using namespace std::chrono_literals;
namespace {
class TestScheduler : public Scheduler {
public:
explicit TestScheduler(experimental::EventEngine* engine) : engine_(engine) {}
void Run(experimental::EventEngine::Closure* closure) override {
engine_->Run(closure);
}
void Run(absl::AnyInvocable<void()> cb) override {
engine_->Run(std::move(cb));
}
private:
experimental::EventEngine* engine_;
};
absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
sizeof(buffer_size_bytes))

@ -0,0 +1,341 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include <fcntl.h>
#include <poll.h>
#include <chrono>
#include <memory>
#include <random>
#include <string>
#include <thread>
#include <gtest/gtest.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/event_engine/posix/posix_engine_test_utils.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h"
#include "test/core/util/port.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
namespace grpc_event_engine {
namespace posix_engine {
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetNextSendMessage;
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::experimental::PosixEventEngine;
using ::grpc_event_engine::experimental::PosixOracleEventEngine;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using ::grpc_event_engine::experimental::WaitForSingleOwner;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
using namespace std::chrono_literals;
constexpr int kMinMessageSize = 1024;
constexpr int kNumConnections = 10;
constexpr int kNumExchangedMessages = 100;
std::atomic<int> g_num_active_connections{0};
struct Connection {
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
};
std::list<Connection> CreateConnectedEndpoints(
PosixEventPoller& poller, bool is_zero_copy_enabled, int num_connections,
std::shared_ptr<EventEngine> posix_ee,
std::shared_ptr<EventEngine> oracle_ee) {
std::list<Connection> connections;
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
EventEngine::ResolvedAddress resolved_addr =
URIToResolvedAddress(target_addr);
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification* server_signal = new grpc_core::Notification();
Listener::AcceptCallback accept_cb =
[&server_endpoint, &server_signal](
std::unique_ptr<Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint = std::move(ep);
server_signal->Notify();
};
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
if (is_zero_copy_enabled) {
args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED, 1);
args = args.Set(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD,
kMinMessageSize);
}
ChannelArgsEndpointConfig config(args);
auto listener = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { ASSERT_TRUE(status.ok()); }, config,
absl::make_unique<grpc_core::MemoryQuota>("foo"));
GPR_ASSERT(listener.ok());
EXPECT_TRUE((*listener)->Bind(resolved_addr).ok());
EXPECT_TRUE((*listener)->Start().ok());
// Create client socket and connect to the target address.
for (int i = 0; i < num_connections; ++i) {
int client_fd = ConnectToServerOrDie(resolved_addr);
EventHandle* handle =
poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
EXPECT_NE(handle, nullptr);
server_signal->WaitForNotification();
EXPECT_NE(server_endpoint, nullptr);
++g_num_active_connections;
connections.push_back(Connection{
CreatePosixEndpoint(handle,
PosixEngineClosure::TestOnlyToClosure(
[&poller](absl::Status /*status*/) {
if (--g_num_active_connections == 0) {
poller.Kick();
}
}),
posix_ee, config),
std::move(server_endpoint)});
delete server_signal;
server_signal = new grpc_core::Notification();
}
delete server_signal;
return connections;
}
} // namespace
std::string TestScenarioName(const ::testing::TestParamInfo<bool>& info) {
return absl::StrCat("is_zero_copy_enabled_", info.param);
}
// A helper class to drive the polling of Fds. It repeatedly calls the Work(..)
// method on the poller to get pet pending events, then schedules another
// parallel Work(..) instantiation and processes these pending events. This
// continues until all Fds have orphaned themselves.
class Worker : public grpc_core::DualRefCounted<Worker> {
public:
Worker(std::shared_ptr<EventEngine> engine, PosixEventPoller* poller)
: engine_(std::move(engine)), poller_(poller) {
WeakRef().release();
}
void Orphan() override { signal.Notify(); }
void Start() {
// Start executing Work(..).
engine_->Run([this]() { Work(); });
}
void Wait() {
signal.WaitForNotification();
WeakUnref();
}
private:
void Work() {
auto result = poller_->Work(24h, [this]() {
// Schedule next work instantiation immediately and take a Ref for
// the next instantiation.
Ref().release();
engine_->Run([this]() { Work(); });
});
ASSERT_TRUE(result == Poller::WorkResult::kOk ||
result == Poller::WorkResult::kKicked);
// Corresponds to the Ref taken for the current instantiation. If the
// result was Poller::WorkResult::kKicked, then the next work instantiation
// would not have been scheduled and the poll_again callback would have
// been deleted.
Unref();
}
std::shared_ptr<EventEngine> engine_;
// The poller is not owned by the Worker. Rather it is owned by the test
// which creates the worker instance.
PosixEventPoller* poller_;
grpc_core::Notification signal;
};
class PosixEndpointTest : public ::testing::TestWithParam<bool> {
void SetUp() override {
oracle_ee_ = std::make_shared<PosixOracleEventEngine>();
posix_ee_ = std::make_shared<PosixEventEngine>();
scheduler_ =
absl::make_unique<grpc_event_engine::posix_engine::TestScheduler>(
posix_ee_.get());
EXPECT_NE(scheduler_, nullptr);
poller_ = GetDefaultPoller(scheduler_.get());
if (poller_ != nullptr) {
gpr_log(GPR_INFO, "Using poller: %s", poller_->Name().c_str());
}
}
void TearDown() override {
if (poller_ != nullptr) {
poller_->Shutdown();
}
WaitForSingleOwner(std::move(posix_ee_));
WaitForSingleOwner(std::move(oracle_ee_));
}
public:
TestScheduler* Scheduler() { return scheduler_.get(); }
std::shared_ptr<EventEngine> GetPosixEE() { return posix_ee_; }
std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
PosixEventPoller* PosixPoller() { return poller_; }
private:
PosixEventPoller* poller_;
std::unique_ptr<TestScheduler> scheduler_;
std::shared_ptr<EventEngine> posix_ee_;
std::shared_ptr<EventEngine> oracle_ee_;
};
TEST_P(PosixEndpointTest, ConnectExchangeBidiDataTransferTest) {
if (PosixPoller() == nullptr) {
return;
}
Worker* worker = new Worker(GetPosixEE(), PosixPoller());
worker->Start();
{
auto connections = CreateConnectedEndpoints(*PosixPoller(), GetParam(), 1,
GetPosixEE(), GetOracleEE());
auto it = connections.begin();
auto client_endpoint = std::move((*it).client_endpoint);
auto server_endpoint = std::move((*it).server_endpoint);
EXPECT_NE(client_endpoint, nullptr);
EXPECT_NE(server_endpoint, nullptr);
connections.erase(it);
// Alternate message exchanges between client -- server and server --
// client.
for (int i = 0; i < kNumExchangedMessages; i++) {
// Send from client to server and verify data read at the server.
ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
client_endpoint.get(),
server_endpoint.get())
.ok());
// Send from server to client and verify data read at the client.
ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(),
server_endpoint.get(),
client_endpoint.get())
.ok());
}
}
worker->Wait();
}
// Create N connections and exchange and verify random number of messages over
// each connection in parallel.
TEST_P(PosixEndpointTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
if (PosixPoller() == nullptr) {
return;
}
Worker* worker = new Worker(GetPosixEE(), PosixPoller());
worker->Start();
auto connections = CreateConnectedEndpoints(
*PosixPoller(), GetParam(), kNumConnections, GetPosixEE(), GetOracleEE());
std::vector<std::thread> threads;
// Create one thread for each connection. For each connection, create
// 2 more worker threads: to exchange and verify bi-directional data transfer.
threads.reserve(kNumConnections);
for (int i = 0; i < kNumConnections; i++) {
// For each connection, simulate a parallel bi-directional data transfer.
// All bi-directional transfers are run in parallel across all connections.
auto it = connections.begin();
auto client_endpoint = std::move((*it).client_endpoint);
auto server_endpoint = std::move((*it).server_endpoint);
EXPECT_NE(client_endpoint, nullptr);
EXPECT_NE(server_endpoint, nullptr);
connections.erase(it);
threads.emplace_back([client_endpoint = std::move(client_endpoint),
server_endpoint = std::move(server_endpoint)]() {
std::vector<std::thread> workers;
workers.reserve(2);
auto worker = [client_endpoint = client_endpoint.get(),
server_endpoint =
server_endpoint.get()](bool client_to_server) {
for (int i = 0; i < kNumExchangedMessages; i++) {
// If client_to_server is true, send from client to server and
// verify data read at the server. Otherwise send data from server
// to client and verify data read at client.
if (client_to_server) {
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
client_endpoint, server_endpoint)
.ok());
} else {
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
server_endpoint, client_endpoint)
.ok());
}
}
};
// worker[0] simulates a flow from client to server endpoint
workers.emplace_back([&worker]() { worker(true); });
// worker[1] simulates a flow from server to client endpoint
workers.emplace_back([&worker]() { worker(false); });
workers[0].join();
workers[1].join();
});
}
for (auto& t : threads) {
t.join();
}
worker->Wait();
}
// Test with zero copy enabled and disabled.
INSTANTIATE_TEST_SUITE_P(PosixEndpoint, PosixEndpointTest,
::testing::ValuesIn({false, true}), &TestScenarioName);
} // namespace posix_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_core::UniquePtr<char> poll_strategy =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
auto strings = absl::StrSplit(poll_strategy.get(), ',');
if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
// Skip the test entirely if poll strategy is none.
return 0;
}
return RUN_ALL_TESTS();
}

@ -0,0 +1,59 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/event_engine/posix/posix_engine_test_utils.h"
#include <fcntl.h>
#include <poll.h>
namespace grpc_event_engine {
namespace posix_engine {
using ResolvedAddress =
grpc_event_engine::experimental::EventEngine::ResolvedAddress;
// Creates a client socket and blocks until it connects to the specified
// server address. The function abort fails upon encountering errors.
int ConnectToServerOrDie(const ResolvedAddress& server_address) {
int client_fd;
int one = 1;
int flags;
client_fd = socket(AF_INET6, SOCK_STREAM, 0);
setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
// Make fd non-blocking.
flags = fcntl(client_fd, F_GETFL, 0);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
if (connect(client_fd, const_cast<struct sockaddr*>(server_address.address()),
server_address.size()) == -1) {
if (errno == EINPROGRESS) {
struct pollfd pfd;
pfd.fd = client_fd;
pfd.events = POLLOUT;
pfd.revents = 0;
if (poll(&pfd, 1, -1) == -1) {
gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
abort();
}
} else {
gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
abort();
}
}
return client_fd;
}
} // namespace posix_engine
} // namespace grpc_event_engine

@ -0,0 +1,49 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <fcntl.h>
#include <poll.h>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
namespace grpc_event_engine {
namespace posix_engine {
class TestScheduler : public Scheduler {
public:
explicit TestScheduler(grpc_event_engine::experimental::EventEngine* engine)
: engine_(engine) {}
void Run(
grpc_event_engine::experimental::EventEngine::Closure* closure) override {
engine_->Run(closure);
}
void Run(absl::AnyInvocable<void()> cb) override {
engine_->Run(std::move(cb));
}
private:
grpc_event_engine::experimental::EventEngine* engine_;
};
// Creates a client socket and blocks until it connects to the specified
// server address. The function abort fails upon encountering errors.
int ConnectToServerOrDie(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
server_address);
} // namespace posix_engine
} // namespace grpc_event_engine

@ -124,6 +124,7 @@ grpc_cc_library(
"cpu:10",
"no_windows",
],
visibility = ["//test:__subpackages__"],
deps = [
":conformance_test_base_lib",
"//:grpc",
@ -163,6 +164,7 @@ grpc_cc_library(
],
hdrs = COMMON_HEADERS,
external_deps = ["gtest"],
visibility = ["@grpc:public"],
deps = [
"//:grpc",
"//test/core/util:grpc_test_util",

@ -45,35 +45,10 @@ using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
using ::grpc_event_engine::experimental::GetNextSendMessage;
constexpr int kMinMessageSize = 1024;
constexpr int kMaxMessageSize = 4096;
constexpr int kNumExchangedMessages = 100;
// Returns a random message with bounded length.
std::string GetNextSendMessage() {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
static std::random_device rd;
static std::seed_seq seed{rd()};
static std::mt19937 gen(seed);
static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
static grpc_core::Mutex g_mu;
std::string tmp_s;
int len;
{
grpc_core::MutexLock lock(&g_mu);
len = dis(gen);
}
tmp_s.reserve(len);
for (int i = 0; i < len; ++i) {
tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
}
return tmp_s;
}
} // namespace
// Create a connection using the test EventEngine to a non-existent listener

@ -16,6 +16,7 @@
#include <cstring>
#include <memory>
#include <random>
#include <string>
#include <utility>
@ -44,6 +45,43 @@ using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
namespace grpc_event_engine {
namespace experimental {
namespace {
constexpr int kMinMessageSize = 1024;
constexpr int kMaxMessageSize = 4096;
} // namespace
// Returns a random message with bounded length.
std::string GetNextSendMessage() {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
static std::random_device rd;
static std::seed_seq seed{rd()};
static std::mt19937 gen(seed);
static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
static grpc_core::Mutex g_mu;
std::string tmp_s;
int len;
{
grpc_core::MutexLock lock(&g_mu);
len = dis(gen);
}
tmp_s.reserve(len);
for (int i = 0; i < len; ++i) {
tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
}
return tmp_s;
}
void WaitForSingleOwner(std::shared_ptr<EventEngine>&& engine) {
while (engine.use_count() > 1) {
absl::SleepFor(absl::Milliseconds(100));
}
}
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) {
grpc_resolved_address addr;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
@ -79,23 +117,34 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint,
grpc_core::Notification read_signal;
grpc_core::Notification write_signal;
SliceBuffer read_slice_buf;
SliceBuffer read_store_buf;
SliceBuffer write_slice_buf;
read_slice_buf.Clear();
write_slice_buf.Clear();
read_store_buf.Clear();
// std::cout << "SendValidatePayload ... " << std::endl;
// fflush(stdout);
AppendStringToSliceBuffer(&write_slice_buf, data);
EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
std::function<void(absl::Status)> read_cb;
read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_signal,
&args](absl::Status status) {
read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb,
&read_signal, &args](absl::Status status) {
GPR_ASSERT(status.ok());
if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
read_store_buf);
read_signal.Notify();
return;
}
args.read_hint_bytes -= read_slice_buf.Length();
receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
read_store_buf);
receive_endpoint->Read(read_cb, &read_slice_buf, &args);
};
// Start asynchronous reading at the receive_endpoint.
receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
receive_endpoint->Read(read_cb, &read_slice_buf, &args);
// Start asynchronous writing at the send_endpoint.
send_endpoint->Write(
[&write_signal](absl::Status status) {
@ -106,7 +155,10 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint,
write_signal.WaitForNotification();
read_signal.WaitForNotification();
// Check if data written == data read
if (data != ExtractSliceBufferIntoString(&read_slice_buf)) {
std::string data_read = ExtractSliceBufferIntoString(&read_store_buf);
if (data != data_read) {
gpr_log(GPR_INFO, "Data written = %s", data.c_str());
gpr_log(GPR_INFO, "Data read = %s", data_read.c_str());
return absl::CancelledError("Data read != Data written");
}
return absl::OkStatus();

@ -42,11 +42,18 @@ std::string ExtractSliceBufferIntoString(SliceBuffer* buf);
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str);
// A helper method to exchange data between two endpoints. It is assumed that
// both endpoints are connected. The data (specified as a string) is written by
// the sender_endpoint and read by the receiver_endpoint. It returns OK
// status only if data written == data read. It also blocks the calling thread
// until said Write and Read operations are complete.
// Returns a random message with bounded length.
std::string GetNextSendMessage();
// Waits until the use_count of the event engine shared_ptr has reached 1
// and returns.
void WaitForSingleOwner(std::shared_ptr<EventEngine>&& engine);
// A helper method to exchange data between two endpoints. It is assumed
// that both endpoints are connected. The data (specified as a string) is
// written by the sender_endpoint and read by the receiver_endpoint. It
// returns OK status only if data written == data read. It also blocks the
// calling thread until said Write and Read operations are complete.
absl::Status SendValidatePayload(std::string data,
EventEngine::Endpoint* send_endpoint,
EventEngine::Endpoint* receive_endpoint);

@ -5191,6 +5191,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "posix_endpoint_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save