Fixing some bugs in posix event engine (#31785)

* update

* regenerate projects

* fix sanity

* review comments

* fix

* add lock free event benchmark

* releasable mutex lock

* fix build isue

* update

* fix sanity

* regenerate projects

* iwyu

* Fix resolved address length related bugs in tcp_socket_utils and listener_utils

* iwyu

* cleanup src/core/lib/event_engine/tcp_socket_utils.cc

* update endpoint to use SliceCast into MutableSlice

* fix sanity

* review comments

* review comments

* regenerate projects

* fix

* rm unused variable

* fix
pull/32007/head^2
Vignesh Babu 2 years ago committed by GitHub
parent 353afc2dd9
commit 8bd2b9af52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      CMakeLists.txt
  2. 4
      build_autogenerated.yaml
  3. 4
      include/grpc/event_engine/slice_buffer.h
  4. 7
      src/core/BUILD
  5. 4
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  6. 8
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  7. 8
      src/core/lib/event_engine/posix_engine/lockfree_event.cc
  8. 96
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  9. 1
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  10. 10
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  11. 4
      src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
  12. 29
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  13. 115
      src/core/lib/event_engine/tcp_socket_utils.cc
  14. 6
      test/core/event_engine/posix/BUILD
  15. 46
      test/core/event_engine/posix/lock_free_event_test.cc
  16. 2
      test/core/event_engine/posix/posix_event_engine_connect_test.cc
  17. 4
      tools/run_tests/generated/tests.json

5
CMakeLists.txt generated

@ -1032,7 +1032,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx latch_test)
add_dependencies(buildtests_cxx lb_get_cpu_stats_test)
add_dependencies(buildtests_cxx lb_load_data_store_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx lock_free_event_test)
endif()
add_dependencies(buildtests_cxx log_test)
@ -14329,7 +14329,7 @@ target_link_libraries(lb_load_data_store_test
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(lock_free_event_test
test/core/event_engine/posix/lock_free_event_test.cc
@ -14361,6 +14361,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
${_gRPC_BENCHMARK_LIBRARIES}
grpc_test_util
)

@ -8881,11 +8881,13 @@ targets:
src:
- test/core/event_engine/posix/lock_free_event_test.cc
deps:
- benchmark
- grpc_test_util
benchmark: true
defaults: benchmark
platforms:
- linux
- posix
- mac
uses_polling: false
- name: log_test
gtest: true

@ -52,7 +52,7 @@ namespace experimental {
/// an experimental API.
class SliceBuffer {
public:
explicit SliceBuffer() { grpc_slice_buffer_init(&slice_buffer_); }
SliceBuffer() { grpc_slice_buffer_init(&slice_buffer_); }
SliceBuffer(const SliceBuffer& other) = delete;
SliceBuffer(SliceBuffer&& other) noexcept
: slice_buffer_(other.slice_buffer_) {
@ -131,7 +131,7 @@ class SliceBuffer {
}
/// The total number of bytes held by the SliceBuffer
size_t Length() { return slice_buffer_.length; }
size_t Length() const { return slice_buffer_.length; }
/// Return a pointer to the back raw grpc_slice_buffer
grpc_slice_buffer* c_slice_buffer() { return &slice_buffer_; }

@ -1572,9 +1572,11 @@ grpc_cc_library(
"posix_event_engine_lockfree_event",
"posix_event_engine_wakeup_fd_posix",
"posix_event_engine_wakeup_fd_posix_default",
"status_helper",
"strerror",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_public_hdrs",
],
)
@ -1604,10 +1606,12 @@ grpc_cc_library(
"posix_event_engine_event_poller",
"posix_event_engine_wakeup_fd_posix",
"posix_event_engine_wakeup_fd_posix_default",
"status_helper",
"strerror",
"time",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_public_hdrs",
],
)
@ -1685,6 +1689,7 @@ grpc_cc_library(
],
deps = [
"event_engine_common",
"event_engine_tcp_socket_utils",
"experiments",
"iomgr_port",
"load_file",
@ -1697,11 +1702,13 @@ grpc_cc_library(
"ref_counted",
"resource_quota",
"slice",
"status_helper",
"strerror",
"time",
"useful",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_public_hdrs",
"//:ref_counted_ptr",
],
)

@ -24,6 +24,7 @@
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@ -46,6 +47,7 @@
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
@ -321,6 +323,8 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
// shutdown() syscall on that fd)
void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
bool releasing_fd) {
grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
if (read_closure_->SetShutdown(why)) {
if (!releasing_fd) {
shutdown(fd_, SHUT_RDWR);

@ -30,6 +30,7 @@
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@ -56,6 +57,7 @@
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
@ -372,6 +374,9 @@ void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
is_shutdown_ = true;
shutdown_error_ =
absl::Status(absl::StatusCode::kInternal, "FD Orphaned");
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
// signal read/write closed to OS so that future operations fail.
if (!released_) {
shutdown(fd_, SHUT_RDWR);
@ -447,6 +452,9 @@ void PollEventHandle::ShutdownHandle(absl::Status why) {
if (!is_shutdown_) {
is_shutdown_ = true;
shutdown_error_ = why;
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
// signal read/write closed to OS so that future operations fail.
shutdown(fd_, SHUT_RDWR);
SetReadyLocked(&read_closure_);

@ -86,7 +86,7 @@ void LockfreeEvent::DestroyEvent() {
// with post-deletion (see the note in the constructor) we want the bit
// pattern to prevent error retention in a deleted object
} while (!state_.compare_exchange_strong(curr, kShutdownBit,
std::memory_order_relaxed,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
@ -111,7 +111,7 @@ void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) {
// barrier.
if (state_.compare_exchange_strong(
curr, reinterpret_cast<intptr_t>(closure),
std::memory_order_release, std::memory_order_relaxed)) {
std::memory_order_acq_rel, std::memory_order_relaxed)) {
return; // Successful. Return
}
@ -128,7 +128,7 @@ void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) {
// closure when transitioning out of CLOSURE_NO_READY state (i.e there
// is no other code that needs to 'happen-after' this)
if (state_.compare_exchange_strong(curr, kClosureNotReady,
std::memory_order_relaxed,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
scheduler_->Run(closure);
return; // Successful. Return.
@ -230,7 +230,7 @@ void LockfreeEvent::SetReady() {
// No barrier required as we're transitioning to a state that does not
// involve a closure
if (state_.compare_exchange_strong(curr, kClosureReady,
std::memory_order_relaxed,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return; // early out
}

@ -23,9 +23,9 @@
#include <cctype>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <string>
#include <type_traits>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
@ -33,18 +33,22 @@
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/event_engine/memory_request.h>
#include <grpc/event_engine/slice.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/resource_quota/resource_quota.h"
@ -205,6 +209,16 @@ bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
}
#endif // GRPC_LINUX_ERRQUEUE
absl::Status PosixOSError(int error_no, const char* call_name) {
absl::Status s = absl::UnknownError(grpc_core::StrError(error_no));
grpc_core::StatusSetInt(&s, grpc_core::StatusIntProperty::kErrorNo, error_no);
grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kOsError,
grpc_core::StrError(error_no));
grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kSyscall,
call_name);
return s;
}
} // namespace
#if defined(IOV_MAX) && IOV_MAX < 260
@ -222,9 +236,9 @@ msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
for (iov_size = 0;
out_offset_.slice_idx != buf_.Count() && iov_size != MAX_WRITE_IOVEC;
iov_size++) {
auto slice = buf_.RefSlice(out_offset_.slice_idx);
iov[iov_size].iov_base =
const_cast<uint8_t*>(slice.begin()) + out_offset_.byte_idx;
MutableSlice& slice = internal::SliceCast<MutableSlice>(
buf_.MutableSliceAt(out_offset_.slice_idx));
iov[iov_size].iov_base = slice.begin();
iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx;
*sending_length += iov[iov_size].iov_len;
++(out_offset_.slice_idx);
@ -265,6 +279,19 @@ void PosixEndpointImpl::FinishEstimate() {
bytes_read_this_round_ = 0;
}
absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) {
auto peer_string = ResolvedAddressToNormalizedString(peer_address_);
grpc_core::StatusSetStr(&src_error,
grpc_core::StatusStrProperty::kTargetAddress,
peer_string.ok() ? *peer_string : "");
grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kFd,
handle_->WrappedFd());
grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
return src_error;
}
// Returns true if data available to read or error other than EAGAIN.
bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
struct msghdr msg;
@ -280,8 +307,9 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
#endif // GRPC_LINUX_ERRQUEUE
char cmsgbuf[cmsg_alloc_space];
for (size_t i = 0; i < iov_len; i++) {
Slice slice = incoming_buffer_->RefSlice(i);
iov[i].iov_base = const_cast<uint8_t*>(slice.begin());
MutableSlice& slice =
internal::SliceCast<MutableSlice>(incoming_buffer_->MutableSliceAt(i));
iov[i].iov_base = slice.begin();
iov[i].iov_len = slice.length();
}
@ -333,10 +361,10 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
// 0 read size ==> end of stream
incoming_buffer_->Clear();
if (read_bytes == 0) {
status = absl::InternalError("Socket closed");
status = TcpAnnotateError(absl::InternalError("Socket closed"));
} else {
status = absl::InternalError(
absl::StrCat("recvmsg:", grpc_core::StrError(errno)));
status = TcpAnnotateError(absl::InternalError(
absl::StrCat("recvmsg:", grpc_core::StrError(errno))));
}
return true;
}
@ -421,7 +449,6 @@ bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
if (total_read_bytes < incoming_buffer_->Length()) {
incoming_buffer_->MoveLastNBytesIntoSliceBuffer(
incoming_buffer_->Length() - total_read_bytes, last_read_buffer_);
// last_read_buffer_.Clear();
}
return true;
}
@ -546,6 +573,7 @@ void PosixEndpointImpl::HandleRead(absl::Status status) {
if (status.ok()) {
MaybeMakeReadSlices();
if (!TcpDoRead(status)) {
UpdateRcvLowat();
// We've consumed the edge, request a new one.
read_mu_.Unlock();
handle_->NotifyOnRead(on_read_);
@ -566,13 +594,12 @@ void PosixEndpointImpl::HandleRead(absl::Status status) {
void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer,
const EventEngine::Endpoint::ReadArgs* args) {
read_mu_.Lock();
grpc_core::ReleasableMutexLock lock(&read_mu_);
GPR_ASSERT(read_cb_ == nullptr);
read_cb_ = std::move(on_read);
incoming_buffer_ = buffer;
incoming_buffer_->Clear();
incoming_buffer_->Swap(last_read_buffer_);
read_mu_.Unlock();
if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
min_progress_size_ = std::max(static_cast<int>(args->read_hint_bytes), 1);
} else {
@ -580,15 +607,20 @@ void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
}
Ref().release();
if (is_first_read_) {
UpdateRcvLowat();
// Endpoint read called for the very first time. Register read callback
// with the polling engine.
is_first_read_ = false;
lock.Release();
handle_->NotifyOnRead(on_read_);
} else if (inq_ == 0) {
UpdateRcvLowat();
lock.Release();
// Upper layer asked to read more but we know there is no pending data to
// read from previous reads. So, wait for POLLIN.
handle_->NotifyOnRead(on_read_);
} else {
lock.Release();
on_read_->SetStatus(absl::OkStatus());
engine_->Run(on_read_);
}
@ -939,8 +971,7 @@ bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
return false;
} else {
status = absl::InternalError(
absl::StrCat("sendmsg", std::strerror(saved_errno)));
status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
TcpShutdownTracedBufferList();
return true;
}
@ -988,10 +1019,11 @@ bool PosixEndpointImpl::TcpFlush(absl::Status& status) {
for (iov_size = 0; outgoing_slice_idx != outgoing_buffer_->Count() &&
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
auto slice = outgoing_buffer_->RefSlice(outgoing_slice_idx);
iov[iov_size].iov_base =
const_cast<uint8_t*>(slice.begin()) + outgoing_byte_idx_;
MutableSlice& slice = internal::SliceCast<MutableSlice>(
outgoing_buffer_->MutableSliceAt(outgoing_slice_idx));
iov[iov_size].iov_base = slice.begin() + outgoing_byte_idx_;
iov[iov_size].iov_len = slice.length() - outgoing_byte_idx_;
sending_length += iov[iov_size].iov_len;
outgoing_slice_idx++;
outgoing_byte_idx_ = 0;
@ -1032,8 +1064,7 @@ bool PosixEndpointImpl::TcpFlush(absl::Status& status) {
}
return false;
} else {
status = absl::InternalError(
absl::StrCat("sendmsg", std::strerror(saved_errno)));
status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
outgoing_buffer_->Clear();
TcpShutdownTracedBufferList();
return true;
@ -1099,8 +1130,9 @@ void PosixEndpointImpl::Write(
GPR_DEBUG_ASSERT(data != nullptr);
if (data->Length() == 0) {
on_writable(handle_->IsHandleShutdown() ? absl::InternalError("EOF")
: status);
on_writable(handle_->IsHandleShutdown()
? TcpAnnotateError(absl::InternalError("EOF"))
: status);
TcpShutdownTracedBufferList();
return;
}
@ -1127,7 +1159,12 @@ void PosixEndpointImpl::Write(
current_zerocopy_send_ = zerocopy_send_record;
handle_->NotifyOnWrite(on_write_);
} else {
on_writable(status);
// TODO(vigneshbabu): Consider eventually running this callback inline to
// avoid a thread hop. At the time of submission, it causes deadlocks which
// should be reolved after ExecCtx removal.
engine_->Run([on_writable = std::move(on_writable), status]() mutable {
on_writable(status);
});
}
}
@ -1137,6 +1174,8 @@ void PosixEndpointImpl::MaybeShutdown(absl::Status why) {
stop_error_notification_.store(true, std::memory_order_release);
handle_->SetHasError();
}
grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
handle_->ShutdownHandle(why);
Unref();
}
@ -1162,11 +1201,18 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
PosixSocketWrapper sock(handle->WrappedFd());
fd_ = handle_->WrappedFd();
GPR_ASSERT(options.resource_quota != nullptr);
auto peer_addr_string = sock.PeerAddressString();
memory_owner_ = options.resource_quota->memory_quota()->CreateMemoryOwner(
*sock.PeerAddressString());
peer_addr_string.ok() ? *peer_addr_string : "");
self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl));
local_address_ = *sock.LocalAddress();
peer_address_ = *sock.PeerAddress();
auto local_address = sock.LocalAddress();
if (local_address.ok()) {
local_address_ = *local_address;
}
auto peer_address = sock.PeerAddress();
if (peer_address.ok()) {
peer_address_ = *peer_address;
}
target_length_ = static_cast<double>(options.tcp_read_chunk_size);
bytes_read_this_round_ = 0;
min_read_chunk_size_ = options.tcp_min_read_chunk_size;

@ -513,6 +513,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags);
absl::Status TcpAnnotateError(absl::Status src_error);
#ifdef GRPC_LINUX_ERRQUEUE
bool ProcessErrors();
// Reads a cmsg to process zerocopy control messages.

@ -132,9 +132,8 @@ void AsyncConnect::OnWritable(absl::Status status)
fd = nullptr;
}
if (!status.ok()) {
ep = absl::CancelledError(
absl::StrCat("Failed to connect to remote host: ", resolved_addr_str_,
" with error: ", status.ToString()));
ep = absl::UnknownError(
absl::StrCat("Failed to connect to remote host: ", status.message()));
}
// Run the OnConnect callback asynchronously.
if (!connect_cancelled) {
@ -197,8 +196,7 @@ void AsyncConnect::OnWritable(absl::Status status)
return;
case ECONNREFUSED:
// This error shouldn't happen for anything other than connect().
status = absl::FailedPreconditionError(
absl::StrCat("connect: ", std::strerror(so_error)));
status = absl::FailedPreconditionError(std::strerror(so_error));
break;
default:
// We don't really know which syscall triggered the problem here, so
@ -220,7 +218,7 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
} while (err < 0 && errno == EINTR);
saved_errno = errno;
auto addr_uri = ResolvedAddressToNormalizedString(addr);
auto addr_uri = ResolvedAddressToURI(addr);
if (!addr_uri.ok()) {
Run([on_connect = std::move(on_connect),
ep = absl::FailedPreconditionError(absl::StrCat(

@ -26,6 +26,7 @@
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
@ -214,7 +215,6 @@ absl::StatusOr<ListenerSocket> CreateAndPrepareListenerSocket(
} else {
socket.addr = addr;
}
GRPC_RETURN_IF_ERROR(PrepareSocket(options, socket));
GPR_ASSERT(socket.port > 0);
return socket;
@ -252,7 +252,7 @@ absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
} else {
continue;
}
memcpy(const_cast<sockaddr*>(addr.address()), ifa_it->ifa_addr, len);
addr = EventEngine::ResolvedAddress(ifa_it->ifa_addr, len);
ResolvedAddressSetPort(addr, requested_port);
std::string addr_str = *ResolvedAddressToString(addr);
gpr_log(GPR_DEBUG,

@ -159,10 +159,13 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
options.expand_wildcard_addrs =
(AdjustValue(0, 1, INT_MAX,
config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
options.allow_reuse_port =
(AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
0);
options.allow_reuse_port = PosixSocketWrapper::IsSocketReusePortSupported();
auto allow_reuse_port_value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
if (allow_reuse_port_value.has_value()) {
options.allow_reuse_port =
(AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
0);
}
if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;
}
@ -189,8 +192,9 @@ int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec) {
int fd, flags;
EventEngine::ResolvedAddress peer_addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
fd = accept(sockfd, const_cast<sockaddr*>(addr.address()), &len);
fd = accept(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len);
if (fd >= 0) {
if (nonblock) {
flags = fcntl(fd, F_GETFL, 0);
@ -203,6 +207,7 @@ int Accept4(int sockfd,
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) != 0) goto close_and_error;
}
}
addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
return fd;
close_and_error:
@ -218,8 +223,12 @@ int Accept4(int sockfd,
int flags = 0;
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
EventEngine::ResolvedAddress peer_addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
return accept4(sockfd, const_cast<sockaddr*>(addr.address()), &len, flags);
int ret =
accept4(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len, flags);
addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
return ret;
}
#endif // GRPC_LINUX_SOCKETUTILS
@ -453,7 +462,9 @@ bool PosixSocketWrapper::IsSocketReusePortSupported() {
}
if (s >= 0) {
PosixSocketWrapper sock(s);
return sock.SetSocketReusePort(1).ok();
bool result = sock.SetSocketReusePort(1).ok();
close(sock.Fd());
return result;
} else {
return false;
}
@ -631,7 +642,7 @@ PosixSocketWrapper::LocalAddress() {
return absl::InternalError(
absl::StrCat("getsockname:", grpc_core::StrError(errno)));
}
return addr;
return EventEngine::ResolvedAddress(addr.address(), len);
}
absl::StatusOr<EventEngine::ResolvedAddress> PosixSocketWrapper::PeerAddress() {
@ -641,7 +652,7 @@ absl::StatusOr<EventEngine::ResolvedAddress> PosixSocketWrapper::PeerAddress() {
return absl::InternalError(
absl::StrCat("getpeername:", grpc_core::StrError(errno)));
}
return addr;
return EventEngine::ResolvedAddress(addr.address(), len);
}
absl::StatusOr<std::string> PosixSocketWrapper::LocalAddressString() {

@ -15,6 +15,8 @@
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
@ -66,10 +68,77 @@ absl::StatusOr<std::string> GetScheme(
case AF_UNIX:
return "unix";
default:
return absl::InvalidArgumentError(absl::StrFormat(
"Unknown scheme: %d", resolved_address.address()->sa_family));
return absl::InvalidArgumentError(
absl::StrFormat("Unknown sockaddr family: %d",
resolved_address.address()->sa_family));
}
}
#ifdef GRPC_HAVE_UNIX_SOCKET
absl::StatusOr<std::string> ResolvedAddrToUnixPathIfPossible(
const EventEngine::ResolvedAddress* resolved_addr) {
const sockaddr* addr = resolved_addr->address();
if (addr->sa_family != AF_UNIX) {
return absl::InvalidArgumentError(
absl::StrCat("Socket family is not AF_UNIX: ", addr->sa_family));
}
const sockaddr_un* unix_addr = reinterpret_cast<const sockaddr_un*>(addr);
#ifdef GPR_APPLE
int len = resolved_addr->size() - sizeof(unix_addr->sun_family) -
sizeof(unix_addr->sun_len) - 1;
#else
int len = resolved_addr->size() - sizeof(unix_addr->sun_family) - 1;
#endif
bool abstract = (len < 0 || unix_addr->sun_path[0] == '\0');
std::string path;
if (abstract) {
if (len >= 0) {
path = std::string(unix_addr->sun_path + 1, len);
}
path = absl::StrCat(std::string(1, '\0'), path);
} else {
size_t maxlen = sizeof(unix_addr->sun_path);
if (strnlen(unix_addr->sun_path, maxlen) == maxlen) {
return absl::InvalidArgumentError("UDS path is not null-terminated");
}
path = unix_addr->sun_path;
}
return path;
}
absl::StatusOr<std::string> ResolvedAddrToUriUnixIfPossible(
const EventEngine::ResolvedAddress* resolved_addr) {
auto path = ResolvedAddrToUnixPathIfPossible(resolved_addr);
GRPC_RETURN_IF_ERROR(path.status());
std::string scheme;
std::string path_string;
if (path->at(0) == '\0') {
scheme = "unix-abstract";
path_string = path->length() > 1 ? path->substr(1, std::string::npos) : "";
} else {
scheme = "unix";
path_string = std::move(*path);
}
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Create(
std::move(scheme), /*authority=*/"", std::move(path_string),
/*query_parameter_pairs=*/{}, /*fragment=*/"");
if (!uri.ok()) return uri.status();
return uri->ToString();
}
#else
absl::StatusOr<std::string> ResolvedAddrToUnixPathIfPossible(
const EventEngine::ResolvedAddress* /*resolved_addr*/) {
return absl::InvalidArgumentError("Unix socket is not supported.");
}
absl::StatusOr<std::string> ResolvedAddrToUriUnixIfPossible(
const EventEngine::ResolvedAddress* /*resolved_addr*/) {
return absl::InvalidArgumentError("Unix socket is not supported.");
}
#endif
} // namespace
bool ResolvedAddressIsV4Mapped(
@ -88,7 +157,7 @@ bool ResolvedAddressIsV4Mapped(
sizeof(kV4MappedPrefix)) == 0) {
if (resolved_addr4_out != nullptr) {
// Normalize ::ffff:0.0.0.0/96 to IPv4.
memset(addr4_out, 0, sizeof(sockaddr_in));
memset(addr4_out, 0, EventEngine::ResolvedAddress::MAX_SIZE_BYTES);
addr4_out->sin_family = AF_INET;
// s6_addr32 would be nice, but it's non-standard.
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
@ -223,10 +292,10 @@ absl::optional<int> ResolvedAddressIsWildcard(
absl::StatusOr<std::string> ResolvedAddressToNormalizedString(
const EventEngine::ResolvedAddress& resolved_addr) {
EventEngine::ResolvedAddress addr_normalized;
if (ResolvedAddressIsV4Mapped(resolved_addr, &addr_normalized)) {
return ResolvedAddressToString(addr_normalized);
if (!ResolvedAddressIsV4Mapped(resolved_addr, &addr_normalized)) {
addr_normalized = resolved_addr;
}
return ResolvedAddressToString(resolved_addr);
return ResolvedAddressToString(addr_normalized);
}
absl::StatusOr<std::string> ResolvedAddressToString(
@ -236,27 +305,7 @@ absl::StatusOr<std::string> ResolvedAddressToString(
std::string out;
#ifdef GRPC_HAVE_UNIX_SOCKET
if (addr->sa_family == AF_UNIX) {
const sockaddr_un* addr_un = reinterpret_cast<const sockaddr_un*>(addr);
bool abstract = addr_un->sun_path[0] == '\0';
if (abstract) {
#ifdef GPR_APPLE
int len = resolved_addr.size() - sizeof(addr_un->sun_family) -
sizeof(addr_un->sun_len);
#else
int len = resolved_addr.size() - sizeof(addr_un->sun_family);
#endif
if (len <= 0) {
return absl::InvalidArgumentError("Empty UDS abstract path");
}
out = std::string(addr_un->sun_path, len);
} else {
size_t maxlen = sizeof(addr_un->sun_path);
if (strnlen(addr_un->sun_path, maxlen) == maxlen) {
return absl::InvalidArgumentError("UDS path is not null-terminated");
}
out = std::string(addr_un->sun_path);
}
return out;
return ResolvedAddrToUnixPathIfPossible(&resolved_addr);
}
#endif // GRPC_HAVE_UNIX_SOCKET
@ -300,9 +349,17 @@ absl::StatusOr<std::string> ResolvedAddressToURI(
if (resolved_address.size() == 0) {
return absl::InvalidArgumentError("Empty address");
}
auto scheme = GetScheme(resolved_address);
EventEngine::ResolvedAddress addr = resolved_address;
EventEngine::ResolvedAddress addr_normalized;
if (ResolvedAddressIsV4Mapped(addr, &addr_normalized)) {
addr = addr_normalized;
}
auto scheme = GetScheme(addr);
GRPC_RETURN_IF_ERROR(scheme.status());
auto path = ResolvedAddressToString(resolved_address);
if (*scheme == "unix") {
return ResolvedAddrToUriUnixIfPossible(&addr);
}
auto path = ResolvedAddressToString(addr);
GRPC_RETURN_IF_ERROR(path.status());
absl::StatusOr<grpc_core::URI> uri =
grpc_core::URI::Create(*scheme, /*authority=*/"", std::move(path.value()),

@ -102,9 +102,13 @@ grpc_cc_test(
grpc_cc_test(
name = "lock_free_event_test",
srcs = ["lock_free_event_test.cc"],
external_deps = ["gtest"],
external_deps = [
"benchmark",
"gtest",
],
language = "C++",
tags = [
"no_mac",
"no_windows",
],
uses_event_engine = True,

@ -18,6 +18,8 @@
#include <utility>
#include <vector>
#include <benchmark/benchmark.h>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/time/time.h"
@ -151,17 +153,61 @@ TEST(LockFreeEventTest, MultiThreadedTest) {
event.DestroyEvent();
}
namespace {
// A trivial callback sceduler which inherits from the Scheduler interface but
// immediatey runs the callback/closure.
class BechmarkCallbackScheduler : public Scheduler {
public:
BechmarkCallbackScheduler() = default;
void Run(
grpc_event_engine::experimental::EventEngine::Closure* closure) override {
closure->Run();
}
void Run(absl::AnyInvocable<void()> cb) override { cb(); }
};
// A benchmark which repeatedly registers a NotifyOn callback and invokes the
// callback with SetReady. This benchmark is intended to measure the cost of
// NotifyOn and SetReady implementations of the lock free event.
void BM_LockFreeEvent(benchmark::State& state) {
BechmarkCallbackScheduler cb_scheduler;
LockfreeEvent event(&cb_scheduler);
event.InitEvent();
PosixEngineClosure* notify_on_closure =
PosixEngineClosure::ToPermanentClosure([](absl::Status /*status*/) {});
for (auto s : state) {
event.NotifyOn(notify_on_closure);
event.SetReady();
}
event.SetShutdown(absl::CancelledError("Shutting down"));
delete notify_on_closure;
event.DestroyEvent();
}
BENCHMARK(BM_LockFreeEvent)->ThreadRange(1, 64);
} // namespace
} // namespace experimental
} // namespace grpc_event_engine
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes.
namespace benchmark {
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
benchmark::Initialize(&argc, argv);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
g_scheduler = new TestScheduler(
grpc_event_engine::experimental::GetDefaultEventEngine());
int r = RUN_ALL_TESTS();
benchmark::RunTheBenchmarksNamespaced();
grpc_shutdown();
return r;
}

@ -161,7 +161,7 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
posix_ee->Connect(
[&signal](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
EXPECT_EQ(status.status().code(), absl::StatusCode::kCancelled);
EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
signal.Notify();
},
URIToResolvedAddress(target_addr), config,

@ -4435,10 +4435,9 @@
},
{
"args": [],
"benchmark": false,
"benchmark": true,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
@ -4450,7 +4449,6 @@
"name": "lock_free_event_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": false

Loading…
Cancel
Save