Revert "Revert "Creating a posix oracle event engine and a suite of event engine client tests"" (#30060)

* Revert "Revert "Creating a posix oracle event engine and a suite of event engine client tests (#29714)" (#30042)"

This reverts commit 1630efd8ab.

* fix typos
pull/30088/head
Vignesh Babu 2 years ago committed by GitHub
parent 870fe8624f
commit c03388853c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 46
      CMakeLists.txt
  3. 27
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 69
      src/core/lib/event_engine/promise.h
  9. 43
      test/core/event_engine/test_suite/BUILD
  10. 266
      test/core/event_engine/test_suite/client_test.cc
  11. 13
      test/core/event_engine/test_suite/event_engine_test.cc
  12. 41
      test/core/event_engine/test_suite/event_engine_test.h
  13. 205
      test/core/event_engine/test_suite/event_engine_test_utils.cc
  14. 126
      test/core/event_engine/test_suite/event_engine_test_utils.h
  15. 10
      test/core/event_engine/test_suite/fuzzing_event_engine_test.cc
  16. 13
      test/core/event_engine/test_suite/iomgr_event_engine_test.cc
  17. 473
      test/core/event_engine/test_suite/oracle_event_engine_posix.cc
  18. 194
      test/core/event_engine/test_suite/oracle_event_engine_posix.h
  19. 29
      test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc
  20. 1
      tools/doxygen/Doxyfile.c++.internal
  21. 1
      tools/doxygen/Doxyfile.core.internal
  22. 22
      tools/run_tests/generated/tests.json

@ -2623,6 +2623,7 @@ grpc_cc_library(
"src/core/lib/debug/stats.h",
"src/core/lib/debug/stats_data.h",
"src/core/lib/event_engine/channel_args_endpoint_config.h",
"src/core/lib/event_engine/promise.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",

46
CMakeLists.txt generated

@ -1091,6 +1091,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx mock_test)
add_dependencies(buildtests_cxx nonblocking_test)
add_dependencies(buildtests_cxx observable_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx oracle_event_engine_posix_test)
endif()
add_dependencies(buildtests_cxx orca_service_end2end_test)
add_dependencies(buildtests_cxx orphanable_test)
add_dependencies(buildtests_cxx out_of_bounds_bad_client_test)
@ -10861,6 +10864,7 @@ add_executable(fuzzing_event_engine_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/fuzzing_event_engine_test.cc
test/core/event_engine/test_suite/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
@ -12662,6 +12666,7 @@ if(gRPC_BUILD_TESTS)
add_executable(iomgr_event_engine_test
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/iomgr_event_engine_test.cc
test/core/event_engine/test_suite/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
@ -13592,6 +13597,47 @@ target_link_libraries(observable_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(oracle_event_engine_posix_test
test/core/event_engine/test_suite/client_test.cc
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/oracle_event_engine_posix.cc
test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(oracle_event_engine_posix_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(oracle_event_engine_posix_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -750,6 +750,7 @@ libs:
- src/core/lib/event_engine/iomgr_engine/timer.h
- src/core/lib/event_engine/iomgr_engine/timer_heap.h
- src/core/lib/event_engine/iomgr_engine/timer_manager.h
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
@ -1969,6 +1970,7 @@ libs:
- src/core/lib/event_engine/iomgr_engine/timer.h
- src/core/lib/event_engine/iomgr_engine/timer_heap.h
- src/core/lib/event_engine/iomgr_engine/timer_manager.h
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/trace.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
@ -6008,10 +6010,12 @@ targets:
headers:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/fuzzing_event_engine_test.cc
- test/core/event_engine/test_suite/timer_test.cc
deps:
@ -6568,8 +6572,10 @@ targets:
language: c++
headers:
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
src:
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/iomgr_event_engine_test.cc
- test/core/event_engine/test_suite/timer_test.cc
deps:
@ -7087,6 +7093,27 @@ targets:
- absl/types:variant
- absl/utility:utility
uses_polling: false
- name: oracle_event_engine_posix_test
gtest: true
build: test
language: c++
headers:
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
- test/core/event_engine/test_suite/oracle_event_engine_posix.h
src:
- test/core/event_engine/test_suite/client_test.cc
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/oracle_event_engine_posix.cc
- test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
uses_polling: false
- name: orca_service_end2end_test
gtest: true
build: test

2
gRPC-C++.podspec generated

@ -680,6 +680,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/iomgr_engine/timer.h',
'src/core/lib/event_engine/iomgr_engine/timer_heap.h',
'src/core/lib/event_engine/iomgr_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/env.h',
@ -1506,6 +1507,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/iomgr_engine/timer.h',
'src/core/lib/event_engine/iomgr_engine/timer_heap.h',
'src/core/lib/event_engine/iomgr_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/env.h',

2
gRPC-Core.podspec generated

@ -1052,6 +1052,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/iomgr_engine/timer_manager.cc',
'src/core/lib/event_engine/iomgr_engine/timer_manager.h',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
@ -2113,6 +2114,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/iomgr_engine/timer.h',
'src/core/lib/event_engine/iomgr_engine/timer_heap.h',
'src/core/lib/event_engine/iomgr_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/gpr/alloc.h',
'src/core/lib/gpr/env.h',

1
grpc.gemspec generated

@ -966,6 +966,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/iomgr_engine/timer_manager.cc )
s.files += %w( src/core/lib/event_engine/iomgr_engine/timer_manager.h )
s.files += %w( src/core/lib/event_engine/memory_allocator.cc )
s.files += %w( src/core/lib/event_engine/promise.h )
s.files += %w( src/core/lib/event_engine/resolved_address.cc )
s.files += %w( src/core/lib/event_engine/slice.cc )
s.files += %w( src/core/lib/event_engine/slice_buffer.cc )

1
package.xml generated

@ -948,6 +948,7 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/iomgr_engine/timer_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/iomgr_engine/timer_manager.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/memory_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_buffer.cc" role="src" />

@ -0,0 +1,69 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
/// A minimal promise implementation.
///
/// This is light-duty, syntactical sugar around cv wait & signal, which is
/// useful in some cases. A more robust implementation is being worked on
/// separately.
template <typename T>
class Promise {
public:
// The getter will wait until the setter has been called, and will return the
// value passed during Set.
T& Get() {
grpc_core::MutexLock lock(&mu_);
if (!set_) {
cv_.Wait(&mu_);
}
return val_;
}
// This setter can only be called exactly once without a Reset.
// Will automatically unblock getters.
void Set(T&& val) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(!set_);
val_ = std::move(val);
set_ = true;
cv_.SignalAll();
}
// Can only be called after a set operation.
void Reset() {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(set_);
set_ = false;
}
private:
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
T val_;
bool set_ = false;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H

@ -18,7 +18,10 @@ licenses(["notice"])
grpc_package(name = "test/core/event_engine/test_suite")
COMMON_HEADERS = ["event_engine_test.h"]
COMMON_HEADERS = [
"event_engine_test.h",
"event_engine_test_utils.h",
]
grpc_cc_library(
name = "timer",
@ -90,10 +93,46 @@ grpc_cc_test(
# -- Internal targets --
grpc_cc_library(
name = "oracle_event_engine_posix",
testonly = True,
srcs = ["oracle_event_engine_posix.cc"],
hdrs = ["oracle_event_engine_posix.h"],
tags = [
"no_windows",
],
deps = [
":conformance_test_base_lib",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "oracle_event_engine_posix_test",
srcs = ["oracle_event_engine_posix_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_test_ios",
"no_windows",
],
uses_polling = False,
deps = [
":client",
":oracle_event_engine_posix",
"//:grpc",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_library(
name = "conformance_test_base_lib",
testonly = True,
srcs = ["event_engine_test.cc"],
srcs = [
"event_engine_test.cc",
"event_engine_test_utils.cc",
],
hdrs = COMMON_HEADERS,
external_deps = ["gtest"],
deps = [

@ -12,10 +12,272 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <random>
#include <string>
#include <thread>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
class EventEngineClientTest : public EventEngineTest {};
// TODO(hork): establish meaningful tests
TEST_F(EventEngineClientTest, TODO) { grpc_core::ExecCtx exec_ctx; }
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
using namespace std::chrono_literals;
constexpr int kMinMessageSize = 1024;
constexpr int kMaxMessageSize = 4096;
constexpr int kNumExchangedMessages = 100;
// Returns a random message with bounded length.
std::string GetNextSendMessage() {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
static std::random_device rd;
static std::seed_seq seed{rd()};
static std::mt19937 gen(seed);
static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
static grpc_core::Mutex g_mu;
std::string tmp_s;
int len;
{
grpc_core::MutexLock lock(&g_mu);
len = dis(gen);
}
tmp_s.reserve(len);
for (int i = 0; i < len; ++i) {
tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
}
return tmp_s;
}
} // namespace
// Create a connection using the test EventEngine to a non-existent listener
// and verify that the connection fails.
TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
grpc_core::ExecCtx ctx;
auto test_ee = this->NewEventEngine();
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
// Create a test EventEngine client endpoint and connect to a non existent
// listener.
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
// Connect should fail.
EXPECT_FALSE(status.ok());
client_endpoint_promise.Set(nullptr);
},
URIToResolvedAddress("ipv6:[::1]:7000"),
ChannelArgsEndpointConfig(nullptr),
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
EXPECT_EQ(client_endpoint, nullptr);
}
// Create a connection using the test EventEngine to a listener created
// by the oracle EventEngine and exchange bi-di data over the connection.
// For each data transfer, verify that data written at one end of the stream
// equals data read at the other end of the stream.
TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
grpc_core::ExecCtx ctx;
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = "ipv6:[::1]:7000";
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
Listener::AcceptCallback accept_cb =
[&server_endpoint_promise](
std::unique_ptr<Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint_promise.Set(std::move(ep));
};
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
EXPECT_TRUE(status.ok());
std::unique_ptr<Listener> listener = std::move(*status);
EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
EXPECT_TRUE(listener->Start().ok());
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "Connect failed: %s",
status.status().ToString().c_str());
client_endpoint_promise.Set(nullptr);
} else {
client_endpoint_promise.Set(std::move(*status));
}
},
URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
auto server_endpoint = std::move(server_endpoint_promise.Get());
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
// Alternate message exchanges between client -- server and server -- client.
for (int i = 0; i < kNumExchangedMessages; i++) {
// Send from client to server and verify data read at the server.
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
server_endpoint.get())
.ok());
// Send from server to client and verify data read at the client.
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
client_endpoint.get())
.ok());
}
}
// Create 1 listener bound to N IPv6 addresses and M connections where M > N and
// exchange and verify random number of messages over each connection.
TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
grpc_core::ExecCtx ctx;
static constexpr int kStartPortNumber = 7000;
static constexpr int kNumListenerAddresses = 10; // N
static constexpr int kNumConnections = 100; // M
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
std::vector<std::string> target_addrs;
std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
connections;
Listener::AcceptCallback accept_cb =
[&server_endpoint_promise](
std::unique_ptr<Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint_promise.Set(std::move(ep));
};
auto status = oracle_ee->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
EXPECT_TRUE(status.ok());
std::unique_ptr<Listener> listener = std::move(*status);
target_addrs.reserve(kNumListenerAddresses);
for (int i = 0; i < kNumListenerAddresses; i++) {
std::string target_addr =
absl::StrCat("ipv6:[::1]:", std::to_string(kStartPortNumber + i));
EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
target_addrs.push_back(target_addr);
}
EXPECT_TRUE(listener->Start().ok());
absl::SleepFor(absl::Milliseconds(500));
for (int i = 0; i < kNumConnections; i++) {
// Create a test EventEngine client endpoint and connect to a one of the
// addresses bound to the oracle listener. Verify that the connection
// succeeds.
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "Connect failed: %s",
status.status().ToString().c_str());
client_endpoint_promise.Set(nullptr);
} else {
client_endpoint_promise.Set(std::move(*status));
}
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
ChannelArgsEndpointConfig(nullptr),
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),
24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
auto server_endpoint = std::move(server_endpoint_promise.Get());
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
connections.push_back(std::make_tuple(std::move(client_endpoint),
std::move(server_endpoint)));
client_endpoint_promise.Reset();
server_endpoint_promise.Reset();
}
std::vector<std::thread> threads;
// Create one thread for each connection. For each connection, create
// 2 more worker threads: to exchange and verify bi-directional data transfer.
threads.reserve(kNumConnections);
for (int i = 0; i < kNumConnections; i++) {
// For each connection, simulate a parallel bi-directional data transfer.
// All bi-directional transfers are run in parallel across all connections.
// Each bi-directional data transfer uses a random number of messages.
threads.emplace_back([client_endpoint =
std::move(std::get<0>(connections[i])),
server_endpoint =
std::move(std::get<1>(connections[i]))]() {
std::vector<std::thread> workers;
workers.reserve(2);
auto worker = [client_endpoint = client_endpoint.get(),
server_endpoint =
server_endpoint.get()](bool client_to_server) {
grpc_core::ExecCtx ctx;
for (int i = 0; i < kNumExchangedMessages; i++) {
// If client_to_server is true, send from client to server and
// verify data read at the server. Otherwise send data from server
// to client and verify data read at client.
if (client_to_server) {
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
client_endpoint, server_endpoint)
.ok());
} else {
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
server_endpoint, client_endpoint)
.ok());
}
}
};
// worker[0] simulates a flow from client to server endpoint
workers.emplace_back([&worker]() { worker(true); });
// worker[1] simulates a flow from server to client endpoint
workers.emplace_back([&worker]() { worker(false); });
workers[0].join();
workers[1].join();
});
}
for (auto& t : threads) {
t.join();
}
}
// TODO(vigneshbabu): Add more tests which create listeners bound to a mix
// Ipv6 and other type of addresses (UDS) in the same test.

@ -20,9 +20,16 @@
std::function<std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>*
g_ee_factory = nullptr;
void SetEventEngineFactory(
std::function<std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>*
g_oracle_ee_factory = nullptr;
void SetEventEngineFactories(
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory,
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory) {
testing::AddGlobalTestEnvironment(new EventEngineTestEnvironment(factory));
oracle_ee_factory) {
testing::AddGlobalTestEnvironment(
new EventEngineTestEnvironment(factory, oracle_ee_factory));
}

@ -24,22 +24,37 @@ extern std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>*
g_ee_factory;
extern std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>*
g_oracle_ee_factory;
// Manages the lifetime of the global EventEngine factory.
class EventEngineTestEnvironment : public testing::Environment {
public:
explicit EventEngineTestEnvironment(
EventEngineTestEnvironment(
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory)
: factory_(factory) {}
factory,
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
oracle_factory)
: factory_(factory), oracle_factory_(oracle_factory) {}
void SetUp() override { g_ee_factory = &factory_; }
void SetUp() override {
g_ee_factory = &factory_;
g_oracle_ee_factory = &oracle_factory_;
}
void TearDown() override { g_ee_factory = nullptr; }
void TearDown() override {
g_ee_factory = nullptr;
g_oracle_ee_factory = nullptr;
}
private:
std::function<std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory_;
std::function<std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
oracle_factory_;
};
class EventEngineTest : public testing::Test {
@ -49,12 +64,22 @@ class EventEngineTest : public testing::Test {
GPR_ASSERT(g_ee_factory != nullptr);
return (*g_ee_factory)();
}
std::unique_ptr<grpc_event_engine::experimental::EventEngine>
NewOracleEventEngine() {
GPR_ASSERT(g_oracle_ee_factory != nullptr);
return (*g_oracle_ee_factory)();
}
};
// Set a custom factory for the EventEngine test suite.
void SetEventEngineFactory(
// Set a custom factory for the EventEngine test suite. An optional oracle
// EventEngine can additionally be specified here.
void SetEventEngineFactories(
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
ee_factory,
std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>
factory);
oracle_ee_factory);
#endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_H

@ -0,0 +1,205 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/uri/uri_parser.h"
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
namespace grpc_event_engine {
namespace experimental {
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) {
grpc_resolved_address addr;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
if (!uri.ok()) {
gpr_log(GPR_ERROR, "Failed to parse. Error: %s",
uri.status().ToString().c_str());
GPR_ASSERT(uri.ok());
}
GPR_ASSERT(grpc_parse_uri(*uri, &addr));
return EventEngine::ResolvedAddress(
reinterpret_cast<const sockaddr*>(addr.addr), addr.len);
}
void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data) {
buf->Append(Slice::FromCopiedString(data));
}
std::string ExtractSliceBufferIntoString(SliceBuffer* buf) {
if (!buf->Length()) {
return std::string();
}
std::string tmp(buf->Length(), '\0');
char* bytes = const_cast<char*>(tmp.c_str());
grpc_slice_buffer_move_first_into_buffer(buf->RawSliceBuffer(), buf->Length(),
bytes);
return tmp;
}
absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint,
Endpoint* receive_endpoint) {
GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr);
int num_bytes_written = data.size();
Promise<bool> read_promise;
Promise<bool> write_promise;
SliceBuffer read_slice_buf;
SliceBuffer write_slice_buf;
AppendStringToSliceBuffer(&write_slice_buf, data);
EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
std::function<void(absl::Status)> read_cb;
read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_promise,
&args](absl::Status status) {
GPR_ASSERT(status.ok());
if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
read_promise.Set(true);
return;
}
args.read_hint_bytes -= read_slice_buf.Length();
receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
};
// Start asynchronous reading at the receive_endpoint.
receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
// Start asynchronous writing at the send_endpoint.
send_endpoint->Write(
[&write_promise](absl::Status status) {
GPR_ASSERT(status.ok());
write_promise.Set(true);
},
&write_slice_buf, nullptr);
// Wait for async write to complete.
GPR_ASSERT(write_promise.Get() == true);
// Wait for async read to complete.
GPR_ASSERT(read_promise.Get() == true);
// Check if data written == data read
if (data != ExtractSliceBufferIntoString(&read_slice_buf)) {
return absl::CancelledError("Data read != Data written");
}
return absl::OkStatus();
}
absl::Status ConnectionManager::BindAndStartListener(
std::vector<std::string> addrs, bool listener_type_oracle) {
grpc_core::MutexLock lock(&mu_);
if (addrs.empty()) {
return absl::InvalidArgumentError(
"Atleast one bind address must be specified");
}
for (auto& addr : addrs) {
if (listeners_.find(addr) != listeners_.end()) {
// There is already a listener at this address. Return error.
return absl::AlreadyExistsError(
absl::StrCat("Listener already existis for address: ", addr));
}
}
Listener::AcceptCallback accept_cb =
[this](std::unique_ptr<Endpoint> ep,
MemoryAllocator /*memory_allocator*/) {
last_in_progress_connection_.SetServerEndpoint(std::move(ep));
};
EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
: test_event_engine_.get();
auto status = event_engine->CreateListener(
std::move(accept_cb),
[](absl::Status status) { GPR_ASSERT(status.ok()); },
ChannelArgsEndpointConfig(nullptr),
std::make_unique<grpc_core::MemoryQuota>("foo"));
if (!status.ok()) {
return status.status();
}
std::shared_ptr<Listener> listener((*status).release());
for (auto& addr : addrs) {
auto bind_status = listener->Bind(URIToResolvedAddress(addr));
if (!bind_status.ok()) {
gpr_log(GPR_ERROR, "Binding listener failed: %s",
bind_status.status().ToString().c_str());
return bind_status.status();
}
}
GPR_ASSERT(listener->Start().ok());
// Insert same listener pointer for all bind addresses after the listener
// has started successfully.
for (auto& addr : addrs) {
listeners_.insert(std::make_pair(addr, listener));
}
return absl::OkStatus();
}
absl::StatusOr<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
ConnectionManager::CreateConnection(std::string target_addr,
EventEngine::Duration timeout,
bool client_type_oracle) {
// Only allow one CreateConnection call to proceed at a time.
grpc_core::MutexLock lock(&mu_);
std::string conn_name =
absl::StrCat("connection-", std::to_string(num_processed_connections_++));
EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
: test_event_engine_.get();
event_engine->Connect(
[this](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "Connect failed: %s",
status.status().ToString().c_str());
last_in_progress_connection_.SetClientEndpoint(nullptr);
} else {
last_in_progress_connection_.SetClientEndpoint(std::move(*status));
}
},
URIToResolvedAddress(target_addr), ChannelArgsEndpointConfig(nullptr),
memory_quota_->CreateMemoryAllocator(conn_name), timeout);
auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
if (client_endpoint != nullptr &&
listeners_.find(target_addr) != listeners_.end()) {
// There is a listener for the specified address. Wait until it
// creates a ServerEndpoint after accepting the connection.
auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
GPR_ASSERT(server_endpoint != nullptr);
// Set last_in_progress_connection_ to nullptr
return std::make_tuple(std::move(client_endpoint),
std::move(server_endpoint));
}
return absl::CancelledError("Failed to create connection.");
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,126 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_
#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/resource_quota/memory_quota.h"
using EventEngineFactory = std::function<
std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>;
namespace grpc_event_engine {
namespace experimental {
void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data);
std::string ExtractSliceBufferIntoString(SliceBuffer* buf);
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str);
// A helper method to exchange data between two endpoints. It is assumed that
// both endpoints are connected. The data (specified as a string) is written by
// the sender_endpoint and read by the receiver_endpoint. It returns OK
// status only if data written == data read. It also blocks the calling thread
// until said Write and Read operations are complete.
absl::Status SendValidatePayload(std::string data,
EventEngine::Endpoint* send_endpoint,
EventEngine::Endpoint* receive_endpoint);
// A helper class to create clients/listeners and connections between them.
// The clients and listeners can be created by the oracle event engine
// or the event engine under test. The class provides handles into the
// connections that are created. Inidividual tests can test expected behavior by
// exchanging arbitrary data over these connections.
class ConnectionManager {
public:
ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,
std::unique_ptr<EventEngine> oracle_event_engine)
: memory_quota_(std::make_unique<grpc_core::MemoryQuota>("foo")),
test_event_engine_(std::move(test_event_engine)),
oracle_event_engine_(std::move(oracle_event_engine)) {}
~ConnectionManager() = default;
// It creates and starts a listener bound to all the specified list of
// addresses. If successful, return OK status. The type of the listener is
// determined by the 2nd argument.
absl::Status BindAndStartListener(std::vector<std::string> addrs,
bool listener_type_oracle = true);
// If connection is successful, returns a tuple containing:
// 1. a pointer to the client side endpoint of the connection.
// 2. a pointer to the server side endpoint of the connection.
// If un-successful it returns a non-OK status containing the error
// encountered.
absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
std::unique_ptr<EventEngine::Endpoint>>>
CreateConnection(std::string target_addr, EventEngine::Duration timeout,
bool client_type_oracle);
private:
class Connection {
public:
Connection() = default;
~Connection() = default;
void SetClientEndpoint(
std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) {
client_endpoint_promise_.Set(std::move(client_endpoint));
}
void SetServerEndpoint(
std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) {
server_endpoint_promise_.Set(std::move(server_endpoint));
}
std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() {
auto client_endpoint = std::move(client_endpoint_promise_.Get());
client_endpoint_promise_.Reset();
return client_endpoint;
}
std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() {
auto server_endpoint = std::move(server_endpoint_promise_.Get());
server_endpoint_promise_.Reset();
return server_endpoint;
}
private:
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise_;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise_;
};
grpc_core::Mutex mu_;
std::unique_ptr<grpc_core::MemoryQuota> memory_quota_;
int num_processed_connections_ = 0;
Connection last_in_progress_connection_;
std::map<std::string, std::shared_ptr<EventEngine::Listener>> listeners_;
std::unique_ptr<EventEngine> test_event_engine_;
std::unique_ptr<EventEngine> oracle_event_engine_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_UTILS_H_

@ -61,9 +61,11 @@ class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
SetEventEngineFactory([]() {
return absl::make_unique<
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>();
});
SetEventEngineFactories(
[]() {
return absl::make_unique<
grpc_event_engine::experimental::ThreadedFuzzingEventEngine>();
},
nullptr);
return RUN_ALL_TESTS();
}

@ -20,10 +20,11 @@
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
SetEventEngineFactory([]() {
return absl::make_unique<
grpc_event_engine::experimental::IomgrEventEngine>();
});
auto result = RUN_ALL_TESTS();
return result;
SetEventEngineFactories(
[]() {
return absl::make_unique<
grpc_event_engine::experimental::IomgrEventEngine>();
},
nullptr);
return RUN_ALL_TESTS();
}

@ -0,0 +1,473 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h"
#include <poll.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
const char* kStopMessage = "STOP";
grpc_resolved_address CreateGRPCResolvedAddress(
const EventEngine::ResolvedAddress& ra) {
grpc_resolved_address grpc_addr;
memcpy(grpc_addr.addr, ra.address(), ra.size());
grpc_addr.len = ra.size();
return grpc_addr;
}
// Blocks until poll(2) indicates that one of the fds has pending I/O
// the deadline is reached whichever comes first. Returns an OK
// status a valid I/O event is available for at least one of the fds, a Status
// with canonical code DEADLINE_EXCEEDED if the deadline expired and a non-OK
// Status if any other error occurred.
absl::Status PollFds(struct pollfd* pfds, int nfds, absl::Duration timeout) {
int rv;
while (true) {
if (timeout != absl::InfiniteDuration()) {
rv = poll(pfds, nfds,
static_cast<int>(absl::ToInt64Milliseconds(timeout)));
} else {
rv = poll(pfds, nfds, /* timeout = */ -1);
}
const int saved_errno = errno;
errno = saved_errno;
if (rv >= 0 || errno != EINTR) {
break;
}
}
if (rv < 0) {
return absl::UnknownError(std::strerror(errno));
}
if (rv == 0) {
return absl::CancelledError("Deadline exceeded");
}
return absl::OkStatus();
}
absl::Status BlockUntilReadable(int fd) {
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLIN;
pfd.revents = 0;
return PollFds(&pfd, 1, absl::InfiniteDuration());
}
absl::Status BlockUntilWritableWithTimeout(int fd, absl::Duration timeout) {
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLOUT;
pfd.revents = 0;
return PollFds(&pfd, 1, timeout);
}
absl::Status BlockUntilWritable(int fd) {
return BlockUntilWritableWithTimeout(fd, absl::InfiniteDuration());
}
// Tries to read upto num_expected_bytes from the socket. It returns early if
// specified data is not yet available.
std::string TryReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
int ret = 0;
static constexpr int kDefaultNumExpectedBytes = 1024;
if (num_expected_bytes <= 0) {
num_expected_bytes = kDefaultNumExpectedBytes;
}
std::string read_data = std::string(num_expected_bytes, '\0');
char* buffer = const_cast<char*>(read_data.c_str());
int pending_bytes = num_expected_bytes;
do {
errno = 0;
ret = read(sockfd, buffer + num_expected_bytes - pending_bytes,
pending_bytes);
if (ret > 0) {
pending_bytes -= ret;
}
} while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
saved_errno = errno;
return read_data.substr(0, num_expected_bytes - pending_bytes);
}
// Blocks calling thread until the specified number of bytes have been
// read from the provided socket or it encounters an unrecoverable error. It
// puts the read bytes into a string and returns the string. If it encounters an
// error, it returns an empty string and updates saved_errno with the
// appropriate errno.
std::string ReadBytes(int sockfd, int& saved_errno, int num_expected_bytes) {
std::string read_data;
do {
saved_errno = 0;
read_data += TryReadBytes(sockfd, saved_errno,
num_expected_bytes - read_data.length());
if (saved_errno == EAGAIN &&
read_data.length() < static_cast<size_t>(num_expected_bytes)) {
GPR_ASSERT(BlockUntilReadable(sockfd).ok());
} else if (saved_errno != 0 && num_expected_bytes > 0) {
read_data.clear();
break;
}
} while (read_data.length() < static_cast<size_t>(num_expected_bytes));
return read_data;
}
// Tries to write the specified bytes over the socket. It returns the number of
// bytes actually written.
int TryWriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
int ret = 0;
int pending_bytes = write_bytes.length();
do {
errno = 0;
ret = write(sockfd,
write_bytes.c_str() + write_bytes.length() - pending_bytes,
pending_bytes);
if (ret > 0) {
pending_bytes -= ret;
}
} while (pending_bytes > 0 && ((ret > 0) || (ret < 0 && errno == EINTR)));
saved_errno = errno;
return write_bytes.length() - pending_bytes;
}
// Blocks calling thread until the specified number of bytes have been
// written over the provided socket or it encounters an unrecoverable error. The
// bytes to write are specified as a string. If it encounters an error, it
// returns an empty string and updates saved_errno with the appropriate errno
// and returns a value less than zero.
int WriteBytes(int sockfd, int& saved_errno, std::string write_bytes) {
int ret = 0;
int original_write_length = write_bytes.length();
do {
saved_errno = 0;
ret = TryWriteBytes(sockfd, saved_errno, write_bytes);
if (saved_errno == EAGAIN && ret < static_cast<int>(write_bytes.length())) {
GPR_ASSERT(ret >= 0);
GPR_ASSERT(BlockUntilWritable(sockfd).ok());
} else if (saved_errno != 0) {
GPR_ASSERT(ret < 0);
return ret;
}
write_bytes = write_bytes.substr(ret, std::string::npos);
} while (write_bytes.length() > 0);
return original_write_length;
}
} // namespace
PosixOracleEndpoint::PosixOracleEndpoint(int socket_fd)
: socket_fd_(socket_fd) {
read_ops_ = grpc_core::Thread(
"read_ops_thread",
[](void* arg) {
static_cast<PosixOracleEndpoint*>(arg)->ProcessReadOperations();
},
this);
write_ops_ = grpc_core::Thread(
"write_ops_thread",
[](void* arg) {
static_cast<PosixOracleEndpoint*>(arg)->ProcessWriteOperations();
},
this);
read_ops_.Start();
write_ops_.Start();
}
void PosixOracleEndpoint::Shutdown() {
absl::MutexLock lock(&mu_);
if (absl::exchange(is_shutdown_, true)) {
return;
}
read_ops_channel_.Set(ReadOperation());
write_ops_channel_.Set(WriteOperation());
read_ops_.Join();
write_ops_.Join();
}
std::unique_ptr<PosixOracleEndpoint> PosixOracleEndpoint::Create(
int socket_fd) {
return std::make_unique<PosixOracleEndpoint>(socket_fd);
}
PosixOracleEndpoint::~PosixOracleEndpoint() {
Shutdown();
close(socket_fd_);
}
void PosixOracleEndpoint::Read(std::function<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
GPR_ASSERT(buffer != nullptr);
int read_hint_bytes =
args != nullptr ? std::max(1, static_cast<int>(args->read_hint_bytes))
: 0;
read_ops_channel_.Set(
ReadOperation(read_hint_bytes, buffer, std::move(on_read)));
}
void PosixOracleEndpoint::Write(std::function<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /*args*/) {
GPR_ASSERT(data != nullptr);
write_ops_channel_.Set(WriteOperation(data, std::move(on_writable)));
}
void PosixOracleEndpoint::ProcessReadOperations() {
gpr_log(GPR_INFO, "Starting thread to process read ops ...");
while (true) {
ReadOperation read_op;
read_op = read_ops_channel_.Get();
read_ops_channel_.Reset();
if (!read_op.IsValid()) {
read_op(std::string(), absl::CancelledError("Closed"));
break;
}
int saved_errno;
std::string read_data =
ReadBytes(socket_fd_, saved_errno, read_op.GetNumBytesToRead());
read_op(read_data, read_data.empty() ? absl::CancelledError(absl::StrCat(
"Read failed with error = ",
std::strerror(saved_errno)))
: absl::OkStatus());
}
gpr_log(GPR_INFO, "Shutting down read ops thread ...");
}
void PosixOracleEndpoint::ProcessWriteOperations() {
gpr_log(GPR_INFO, "Starting thread to process write ops ...");
while (true) {
WriteOperation write_op;
write_op = write_ops_channel_.Get();
write_ops_channel_.Reset();
if (!write_op.IsValid()) {
write_op(absl::CancelledError("Closed"));
break;
}
int saved_errno;
int ret = WriteBytes(socket_fd_, saved_errno, write_op.GetBytesToWrite());
write_op(
ret < 0 ? absl::CancelledError(absl::StrCat(
"Write failed with error = ", std::strerror(saved_errno)))
: absl::OkStatus());
}
gpr_log(GPR_INFO, "Shutting down write ops thread ...");
}
PosixOracleListener::PosixOracleListener(
EventEngine::Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
: on_accept_(std::move(on_accept)),
on_shutdown_(std::move(on_shutdown)),
memory_allocator_factory_(std::move(memory_allocator_factory)) {
if (pipe(pipefd_) == -1) {
gpr_log(GPR_ERROR, "Error creating pipe: %s", std::strerror(errno));
abort();
}
}
absl::Status PosixOracleListener::Start() {
absl::MutexLock lock(&mu_);
GPR_ASSERT(!listener_fds_.empty());
if (absl::exchange(is_started_, true)) {
return absl::InternalError("Cannot start listener more than once ...");
}
serve_ = grpc_core::Thread(
"accept_thread",
[](void* arg) {
static_cast<PosixOracleListener*>(arg)->HandleIncomingConnections();
},
this);
serve_.Start();
return absl::OkStatus();
}
PosixOracleListener::~PosixOracleListener() {
absl::MutexLock lock(&mu_);
if (!is_started_) {
serve_.Join();
return;
}
for (int i = 0; i < static_cast<int>(listener_fds_.size()); i++) {
shutdown(listener_fds_[i], SHUT_RDWR);
}
// Send a STOP message over the pipe.
write(pipefd_[1], kStopMessage, strlen(kStopMessage));
serve_.Join();
on_shutdown_(absl::OkStatus());
}
void PosixOracleListener::HandleIncomingConnections() {
gpr_log(GPR_INFO, "Starting accept thread ...");
GPR_ASSERT(!listener_fds_.empty());
int nfds = listener_fds_.size();
// Add one extra file descriptor to poll the pipe fd.
++nfds;
struct pollfd* pfds =
static_cast<struct pollfd*>(gpr_malloc(sizeof(struct pollfd) * nfds));
memset(pfds, 0, sizeof(struct pollfd) * nfds);
while (true) {
for (int i = 0; i < nfds; i++) {
pfds[i].fd = i == nfds - 1 ? pipefd_[0] : listener_fds_[i];
pfds[i].events = POLLIN;
pfds[i].revents = 0;
}
if (!PollFds(pfds, nfds, absl::InfiniteDuration()).ok()) {
break;
}
int saved_errno = 0;
if ((pfds[nfds - 1].revents & POLLIN) &&
ReadBytes(pipefd_[0], saved_errno, strlen(kStopMessage)) ==
std::string(kStopMessage)) {
break;
}
for (int i = 0; i < nfds - 1; i++) {
if (!(pfds[i].revents & POLLIN)) {
continue;
}
// pfds[i].fd has a readable event.
int client_sock_fd = accept(pfds[i].fd, nullptr, nullptr);
if (client_sock_fd < 0) {
gpr_log(GPR_ERROR,
"Error accepting new connection: %s. Ignoring connection "
"attempt ...",
std::strerror(errno));
continue;
}
on_accept_(PosixOracleEndpoint::Create(client_sock_fd),
memory_allocator_factory_->CreateMemoryAllocator("test"));
}
}
gpr_log(GPR_INFO, "Shutting down accept thread ...");
gpr_free(pfds);
}
absl::StatusOr<int> PosixOracleListener::Bind(
const EventEngine::ResolvedAddress& addr) {
absl::MutexLock lock(&mu_);
int new_socket;
int opt = -1;
grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
return absl::UnimplementedError(
"Unsupported bind address type. Only IPV6 addresses are supported "
"currently by the PosixOracleListener ...");
}
// Creating a new socket file descriptor.
if ((new_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
return absl::UnknownError(
absl::StrCat("Error creating socket: ", std::strerror(errno)));
}
// MacOS biulds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
// setsockopt syscall. So they are set separately one after the other.
if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
return absl::UnknownError(
absl::StrCat("Error setsockopt(SO_REUSEADDR): ", std::strerror(errno)));
}
if (setsockopt(new_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
return absl::UnknownError(
absl::StrCat("Error setsockopt(SO_REUSEPORT): ", std::strerror(errno)));
}
// Forcefully bind the new socket.
if (bind(new_socket, reinterpret_cast<const struct sockaddr*>(addr.address()),
address.len) < 0) {
return absl::UnknownError(
absl::StrCat("Error bind: ", std::strerror(errno)));
}
// Set the new socket to listen for one active connection at a time.
if (listen(new_socket, 1) < 0) {
return absl::UnknownError(
absl::StrCat("Error listen: ", std::strerror(errno)));
}
listener_fds_.push_back(new_socket);
return 0;
}
// PosixOracleEventEngine implements blocking connect. It blocks the calling
// thread until either connect succeeds or fails with timeout.
EventEngine::ConnectionHandle PosixOracleEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
EventEngine::Duration timeout) {
int client_sock_fd;
absl::Time deadline = absl::Now() + absl::FromChrono(timeout);
grpc_resolved_address address = CreateGRPCResolvedAddress(addr);
const char* scheme = grpc_sockaddr_get_uri_scheme(&address);
if (scheme == nullptr || strcmp(scheme, "ipv6") != 0) {
on_connect(
absl::CancelledError("Unsupported bind address type. Only ipv6 "
"addresses are currently supported."));
return {};
}
if ((client_sock_fd = socket(AF_INET6, SOCK_STREAM, 0)) < 0) {
on_connect(absl::CancelledError(absl::StrCat(
"Connect failed: socket creation error: ", std::strerror(errno))));
return {};
}
int err;
int num_retries = 0;
static constexpr int kMaxRetries = 5;
do {
err = connect(client_sock_fd, const_cast<struct sockaddr*>(addr.address()),
address.len);
if (err < 0 && (errno == EINPROGRESS || errno == EWOULDBLOCK)) {
auto status = BlockUntilWritableWithTimeout(
client_sock_fd,
std::max(deadline - absl::Now(), absl::ZeroDuration()));
if (!status.ok()) {
on_connect(status);
return {};
}
} else if (err < 0) {
if (errno != ECONNREFUSED || ++num_retries > kMaxRetries) {
on_connect(absl::CancelledError("Connect failed."));
return {};
}
// If ECONNREFUSED && num_retries < kMaxRetries, wait a while and try
// again.
absl::SleepFor(absl::Milliseconds(100));
}
} while (err < 0 && absl::Now() < deadline);
if (err < 0 && absl::Now() >= deadline) {
on_connect(absl::CancelledError("Deadline exceeded"));
} else {
on_connect(PosixOracleEndpoint::Create(client_sock_fd));
}
return {};
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,194 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_
#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
namespace grpc_event_engine {
namespace experimental {
class PosixOracleEndpoint : public EventEngine::Endpoint {
public:
explicit PosixOracleEndpoint(int socket_fd);
static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
~PosixOracleEndpoint() override;
void Read(std::function<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(std::function<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* args) override;
void Shutdown();
EventEngine::ResolvedAddress& GetPeerAddress() const override {
GPR_ASSERT(false && "unimplemented");
}
EventEngine::ResolvedAddress& GetLocalAddress() const override {
GPR_ASSERT(false && "unimplemented");
}
private:
// An internal helper class definition of Read operations to be performed
// by the TCPServerEndpoint.
class ReadOperation {
public:
ReadOperation()
: num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {}
ReadOperation(int num_bytes_to_read, SliceBuffer* buffer,
std::function<void(absl::Status)>&& on_complete)
: num_bytes_to_read_(num_bytes_to_read),
buffer_(buffer),
on_complete_(std::move(on_complete)) {}
bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; }
int GetNumBytesToRead() const { return num_bytes_to_read_; }
void operator()(std::string read_data, absl::Status status) {
if (on_complete_ != nullptr) {
AppendStringToSliceBuffer(absl::exchange(buffer_, nullptr), read_data);
absl::exchange(on_complete_, nullptr)(status);
}
}
private:
int num_bytes_to_read_;
SliceBuffer* buffer_;
std::function<void(absl::Status)> on_complete_;
};
// An internal helper class definition of Write operations to be performed
// by the TCPServerEndpoint.
class WriteOperation {
public:
WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {}
WriteOperation(SliceBuffer* buffer,
std::function<void(absl::Status)>&& on_complete)
: bytes_to_write_(ExtractSliceBufferIntoString(buffer)),
on_complete_(std::move(on_complete)) {}
bool IsValid() { return bytes_to_write_.length() > 0; }
std::string GetBytesToWrite() const { return bytes_to_write_; }
void operator()(absl::Status status) {
if (on_complete_ != nullptr) {
absl::exchange(on_complete_, nullptr)(status);
}
}
private:
std::string bytes_to_write_;
std::function<void(absl::Status)> on_complete_;
};
void ProcessReadOperations();
void ProcessWriteOperations();
mutable absl::Mutex mu_;
bool is_shutdown_ = false;
int socket_fd_;
Promise<ReadOperation> read_ops_channel_;
Promise<WriteOperation> write_ops_channel_;
grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_);
grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_);
};
class PosixOracleListener : public EventEngine::Listener {
public:
PosixOracleListener(
EventEngine::Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory);
~PosixOracleListener() override;
absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override;
absl::Status Start() override;
private:
void HandleIncomingConnections();
mutable absl::Mutex mu_;
EventEngine::Listener::AcceptCallback on_accept_;
std::function<void(absl::Status)> on_shutdown_;
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
grpc_core::Thread serve_;
int pipefd_[2];
bool is_started_ = false;
std::vector<int> listener_fds_;
};
// A posix based oracle event engine.
class PosixOracleEventEngine final : public EventEngine {
public:
PosixOracleEventEngine() = default;
~PosixOracleEventEngine() override = default;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
const EndpointConfig& /*config*/,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override {
return std::make_unique<PosixOracleListener>(
std::move(on_accept), std::move(on_shutdown),
std::move(memory_allocator_factory));
}
ConnectionHandle Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
MemoryAllocator memory_allocator,
EventEngine::Duration timeout) override;
bool CancelConnect(ConnectionHandle /*handle*/) override {
GPR_ASSERT(false && "unimplemented");
}
bool IsWorkerThread() override { return false; };
std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& /*options*/) override {
GPR_ASSERT(false && "unimplemented");
}
void Run(Closure* /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
void Run(std::function<void()> /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
TaskHandle RunAfter(EventEngine::Duration /*duration*/,
Closure* /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
TaskHandle RunAfter(EventEngine::Duration /*duration*/,
std::function<void()> /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
bool Cancel(TaskHandle /*handle*/) override {
GPR_ASSERT(false && "unimplemented");
}
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_ORACLE_EVENT_ENGINE_POSIX_H_

@ -0,0 +1,29 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
auto ee_factory = []() {
return absl::make_unique<
grpc_event_engine::experimental::PosixOracleEventEngine>();
};
SetEventEngineFactories(/*ee_factory=*/ee_factory,
/*oracle_ee_factory=*/ee_factory);
return RUN_ALL_TESTS();
}

@ -1949,6 +1949,7 @@ src/core/lib/event_engine/iomgr_engine/timer_heap.h \
src/core/lib/event_engine/iomgr_engine/timer_manager.cc \
src/core/lib/event_engine/iomgr_engine/timer_manager.h \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/promise.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \

@ -1740,6 +1740,7 @@ src/core/lib/event_engine/iomgr_engine/timer_heap.h \
src/core/lib/event_engine/iomgr_engine/timer_manager.cc \
src/core/lib/event_engine/iomgr_engine/timer_manager.h \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/promise.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \

@ -5523,6 +5523,28 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "oracle_event_engine_posix_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save