Replace EventEngine::Promise with grpc_core::Notification (#31027)

* Replace EventEngine::Promise with grpc_core::Notification

* remove promise.h

* Automated change: Fix sanity tests

* fix windows

* fix iocp

* fix client_test

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31083/head
AJ Heller 2 years ago committed by GitHub
parent 1ad4bbe78e
commit c0e5e35c7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 5
      CMakeLists.txt
  3. 8
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 73
      src/core/lib/event_engine/promise.h
  9. 3
      test/core/event_engine/BUILD
  10. 29
      test/core/event_engine/common_closures_test.cc
  11. 12
      test/core/event_engine/posix/event_poller_posix_test.cc
  12. 71
      test/core/event_engine/test_suite/client_test.cc
  13. 20
      test/core/event_engine/test_suite/event_engine_test_utils.cc
  14. 22
      test/core/event_engine/test_suite/event_engine_test_utils.h
  15. 34
      test/core/event_engine/test_suite/oracle_event_engine_posix.cc
  16. 15
      test/core/event_engine/test_suite/oracle_event_engine_posix.h
  17. 6
      test/core/event_engine/test_suite/timer_test.cc
  18. 27
      test/core/event_engine/windows/iocp_test.cc
  19. 1
      tools/doxygen/Doxyfile.c++.internal
  20. 1
      tools/doxygen/Doxyfile.core.internal

@ -3222,7 +3222,6 @@ grpc_cc_library(
"src/core/lib/debug/stats.h",
"src/core/lib/debug/stats_data.h",
"src/core/lib/event_engine/channel_args_endpoint_config.h",
"src/core/lib/event_engine/promise.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",

5
CMakeLists.txt generated

@ -8193,8 +8193,9 @@ target_include_directories(common_closures_test
target_link_libraries(common_closures_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc
grpc_test_util_unsecure
absl::any_invocable
absl::statusor
gpr
)

@ -741,7 +741,6 @@ libs:
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
- src/core/lib/event_engine/posix_engine/timer_manager.h
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
@ -1941,7 +1940,6 @@ libs:
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
- src/core/lib/event_engine/posix_engine/timer_manager.h
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
@ -5142,11 +5140,13 @@ targets:
language: c++
headers:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/gprpp/notification.h
src:
- test/core/event_engine/common_closures_test.cc
deps:
- grpc
- grpc_test_util_unsecure
- absl/functional:any_invocable
- absl/status:statusor
- gpr
- name: completion_queue_threading_test
gtest: true
build: test

2
gRPC-C++.podspec generated

@ -692,7 +692,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
@ -1556,7 +1555,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',

2
gRPC-Core.podspec generated

@ -1066,7 +1066,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.cc',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
@ -2183,7 +2182,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',

1
grpc.gemspec generated

@ -978,7 +978,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/posix_engine/timer_heap.h )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_manager.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_manager.h )
s.files += %w( src/core/lib/event_engine/promise.h )
s.files += %w( src/core/lib/event_engine/resolved_address.cc )
s.files += %w( src/core/lib/event_engine/slice.cc )
s.files += %w( src/core/lib/event_engine/slice_buffer.cc )

1
package.xml generated

@ -960,7 +960,6 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/timer_heap.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/timer_manager.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/timer_manager.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_buffer.cc" role="src" />

@ -1,73 +0,0 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
/// A minimal promise implementation.
///
/// This is light-duty, syntactical sugar around cv wait & signal, which is
/// useful in some cases.
/// TODO(ctiller): Find a new name for this type.
template <typename T>
class Promise {
public:
Promise() = default;
// Initialize a default value that will be returned if WaitWithTimeout times
// out
explicit Promise(T&& val) : val_(val) {}
// The getter will wait until the setter has been called, and will return the
// value passed during Set.
T& Get() {
grpc_core::MutexLock lock(&mu_);
while (!set_) {
cv_.Wait(&mu_);
}
return val_;
}
// This setter can only be called exactly once without a Reset.
// Will automatically unblock getters.
void Set(T&& val) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(!set_);
val_ = std::move(val);
set_ = true;
cv_.SignalAll();
}
// Can only be called after a set operation.
void Reset() {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(set_);
set_ = false;
}
private:
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
T val_;
bool set_ = false;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H

@ -28,8 +28,7 @@ grpc_cc_test(
deps = [
"//:common_event_engine_closures",
"//:gpr_platform",
"//:grpc",
"//test/core/util:grpc_test_util_unsecure",
"//:notification",
],
)

@ -18,42 +18,39 @@
#include <gtest/gtest.h>
#include "src/core/lib/event_engine/promise.h"
#include "test/core/util/test_config.h"
#include "src/core/lib/gprpp/notification.h"
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::SelfDeletingClosure;
class AnyInvocableClosureTest : public testing::Test {};
TEST_F(AnyInvocableClosureTest, CallsItsFunction) {
Promise<bool> promise;
AnyInvocableClosure closure([&promise] { promise.Set(true); });
grpc_core::Notification signal;
AnyInvocableClosure closure([&signal] { signal.Notify(); });
closure.Run();
ASSERT_TRUE(promise.Get());
signal.WaitForNotification();
}
class SelfDeletingClosureTest : public testing::Test {};
TEST_F(SelfDeletingClosureTest, CallsItsFunction) {
Promise<bool> promise;
auto* closure =
SelfDeletingClosure::Create([&promise] { promise.Set(true); });
grpc_core::Notification signal;
auto* closure = SelfDeletingClosure::Create([&signal] { signal.Notify(); });
closure->Run();
ASSERT_TRUE(promise.Get());
signal.WaitForNotification();
// ASAN should catch if this closure is not deleted
}
TEST_F(SelfDeletingClosureTest, CallsItsFunctionAndIsDestroyed) {
Promise<bool> fn_called;
Promise<bool> destroyed;
grpc_core::Notification fn_called;
grpc_core::Notification destroyed;
auto* closure =
SelfDeletingClosure::Create([&fn_called] { fn_called.Set(true); },
[&destroyed] { destroyed.Set(true); });
SelfDeletingClosure::Create([&fn_called] { fn_called.Notify(); },
[&destroyed] { destroyed.Notify(); });
closure->Run();
ASSERT_TRUE(fn_called.Get());
ASSERT_TRUE(destroyed.Get());
fn_called.WaitForNotification();
destroyed.WaitForNotification();
}
int main(int argc, char** argv) {

@ -16,8 +16,6 @@
#include <ostream>
#include "absl/functional/any_invocable.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
@ -55,10 +53,9 @@
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/notification.h"
#include "test/core/util/port.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
@ -82,7 +79,6 @@ namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::SelfDeletingClosure;
using ::grpc_event_engine::posix_engine::PosixEventPoller;
using namespace std::chrono_literals;
@ -664,14 +660,14 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
}
WeakRef().release();
}
void Orphan() override { promise.Set(true); }
void Orphan() override { signal.Notify(); }
void Start() {
// Start executing Work(..).
scheduler_->Run([this]() { Work(); });
}
void Wait() {
EXPECT_TRUE(promise.Get());
signal.WaitForNotification();
WeakUnref();
}
@ -693,7 +689,7 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
}
Scheduler* scheduler_;
PosixEventPoller* poller_;
Promise<bool> promise;
grpc_core::Notification signal;
std::vector<WakeupFdHandle*> handles_;
};

@ -20,17 +20,16 @@
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
#include "test/core/util/port.h"
@ -43,7 +42,6 @@ namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
@ -83,7 +81,7 @@ std::string GetNextSendMessage() {
TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
grpc_core::ExecCtx ctx;
auto test_ee = this->NewEventEngine();
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
grpc_core::Notification signal;
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
@ -91,17 +89,14 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
// listener.
ChannelArgsEndpointConfig config;
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
[&signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
// Connect should fail.
EXPECT_FALSE(status.ok());
client_endpoint_promise.Set(nullptr);
signal.Notify();
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
EXPECT_EQ(client_endpoint, nullptr);
signal.WaitForNotification();
}
// Create a connection using the test EventEngine to a listener created
@ -115,14 +110,17 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification client_signal;
grpc_core::Notification server_signal;
Listener::AcceptCallback accept_cb =
[&server_endpoint_promise](
[&server_endpoint, &server_signal](
std::unique_ptr<Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint_promise.Set(std::move(ep));
server_endpoint = std::move(ep);
server_signal.Notify();
};
ChannelArgsEndpointConfig config;
@ -137,21 +135,22 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
EXPECT_TRUE(listener->Start().ok());
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
[&client_endpoint,
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "Connect failed: %s",
status.status().ToString().c_str());
client_endpoint_promise.Set(nullptr);
client_endpoint = nullptr;
} else {
client_endpoint_promise.Set(std::move(*status));
client_endpoint = std::move(*status);
}
client_signal.Notify();
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
auto server_endpoint = std::move(server_endpoint_promise.Get());
client_signal.WaitForNotification();
server_signal.WaitForNotification();
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
@ -178,17 +177,19 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
auto oracle_ee = this->NewOracleEventEngine();
auto test_ee = this->NewEventEngine();
auto memory_quota = absl::make_unique<grpc_core::MemoryQuota>("bar");
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
// Notifications can only be fired once, so they are newed every loop
grpc_core::Notification* server_signal = new grpc_core::Notification();
std::vector<std::string> target_addrs;
std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
connections;
Listener::AcceptCallback accept_cb =
[&server_endpoint_promise](
[&server_endpoint, &server_signal](
std::unique_ptr<Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint_promise.Set(std::move(ep));
server_endpoint = std::move(ep);
server_signal->Notify();
};
ChannelArgsEndpointConfig config;
auto status = oracle_ee->CreateListener(
@ -208,35 +209,39 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
EXPECT_TRUE(listener->Start().ok());
absl::SleepFor(absl::Milliseconds(500));
for (int i = 0; i < kNumConnections; i++) {
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
grpc_core::Notification client_signal;
// Create a test EventEngine client endpoint and connect to a one of the
// addresses bound to the oracle listener. Verify that the connection
// succeeds.
ChannelArgsEndpointConfig config;
test_ee->Connect(
[&client_endpoint_promise](
absl::StatusOr<std::unique_ptr<Endpoint>> status) {
[&client_endpoint,
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "Connect failed: %s",
status.status().ToString().c_str());
client_endpoint_promise.Set(nullptr);
client_endpoint = nullptr;
} else {
client_endpoint_promise.Set(std::move(*status));
client_endpoint = std::move(*status);
}
client_signal.Notify();
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), config,
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),
24h);
auto client_endpoint = std::move(client_endpoint_promise.Get());
auto server_endpoint = std::move(server_endpoint_promise.Get());
client_signal.WaitForNotification();
server_signal->WaitForNotification();
EXPECT_TRUE(client_endpoint != nullptr);
EXPECT_TRUE(server_endpoint != nullptr);
connections.push_back(std::make_tuple(std::move(client_endpoint),
std::move(server_endpoint)));
client_endpoint_promise.Reset();
server_endpoint_promise.Reset();
delete server_signal;
server_signal = new grpc_core::Notification();
}
delete server_signal;
std::vector<std::thread> threads;
// Create one thread for each connection. For each connection, create

@ -23,7 +23,6 @@
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
@ -35,6 +34,7 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/uri/uri_parser.h"
@ -76,19 +76,19 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint,
Endpoint* receive_endpoint) {
GPR_ASSERT(receive_endpoint != nullptr && send_endpoint != nullptr);
int num_bytes_written = data.size();
Promise<bool> read_promise;
Promise<bool> write_promise;
grpc_core::Notification read_signal;
grpc_core::Notification write_signal;
SliceBuffer read_slice_buf;
SliceBuffer write_slice_buf;
AppendStringToSliceBuffer(&write_slice_buf, data);
EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
std::function<void(absl::Status)> read_cb;
read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_promise,
read_cb = [receive_endpoint, &read_slice_buf, &read_cb, &read_signal,
&args](absl::Status status) {
GPR_ASSERT(status.ok());
if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
read_promise.Set(true);
read_signal.Notify();
return;
}
args.read_hint_bytes -= read_slice_buf.Length();
@ -98,15 +98,13 @@ absl::Status SendValidatePayload(std::string data, Endpoint* send_endpoint,
receive_endpoint->Read(std::move(read_cb), &read_slice_buf, &args);
// Start asynchronous writing at the send_endpoint.
send_endpoint->Write(
[&write_promise](absl::Status status) {
[&write_signal](absl::Status status) {
GPR_ASSERT(status.ok());
write_promise.Set(true);
write_signal.Notify();
},
&write_slice_buf, nullptr);
// Wait for async write to complete.
GPR_ASSERT(write_promise.Get() == true);
// Wait for async read to complete.
GPR_ASSERT(read_promise.Get() == true);
write_signal.WaitForNotification();
read_signal.WaitForNotification();
// Check if data written == data read
if (data != ExtractSliceBufferIntoString(&read_slice_buf)) {
return absl::CancelledError("Data read != Data written");

@ -27,7 +27,7 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/memory_quota.h"
using EventEngineFactory = std::function<
@ -89,26 +89,30 @@ class ConnectionManager {
void SetClientEndpoint(
std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) {
client_endpoint_promise_.Set(std::move(client_endpoint));
client_endpoint_ = std::move(client_endpoint);
client_signal_.Notify();
}
void SetServerEndpoint(
std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) {
server_endpoint_promise_.Set(std::move(server_endpoint));
server_endpoint_ = std::move(server_endpoint);
server_signal_.Notify();
}
std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() {
auto client_endpoint = std::move(client_endpoint_promise_.Get());
client_endpoint_promise_.Reset();
auto client_endpoint = std::move(client_endpoint_);
client_endpoint_.reset();
return client_endpoint;
}
std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() {
auto server_endpoint = std::move(server_endpoint_promise_.Get());
server_endpoint_promise_.Reset();
auto server_endpoint = std::move(server_endpoint_);
server_endpoint_.reset();
return server_endpoint;
}
private:
Promise<std::unique_ptr<EventEngine::Endpoint>> client_endpoint_promise_;
Promise<std::unique_ptr<EventEngine::Endpoint>> server_endpoint_promise_;
std::unique_ptr<EventEngine::Endpoint> client_endpoint_;
std::unique_ptr<EventEngine::Endpoint> server_endpoint_;
grpc_core::Notification client_signal_;
grpc_core::Notification server_signal_;
};
grpc_core::Mutex mu_;

@ -206,12 +206,14 @@ PosixOracleEndpoint::PosixOracleEndpoint(int socket_fd)
}
void PosixOracleEndpoint::Shutdown() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
if (std::exchange(is_shutdown_, true)) {
return;
}
read_ops_channel_.Set(ReadOperation());
write_ops_channel_.Set(WriteOperation());
read_ops_channel_ = ReadOperation();
read_op_signal_->Notify();
write_ops_channel_ = WriteOperation();
write_op_signal_->Notify();
read_ops_.Join();
write_ops_.Join();
}
@ -228,26 +230,31 @@ PosixOracleEndpoint::~PosixOracleEndpoint() {
void PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(buffer != nullptr);
int read_hint_bytes =
args != nullptr ? std::max(1, static_cast<int>(args->read_hint_bytes))
: 0;
read_ops_channel_.Set(
ReadOperation(read_hint_bytes, buffer, std::move(on_read)));
read_ops_channel_ =
ReadOperation(read_hint_bytes, buffer, std::move(on_read));
read_op_signal_->Notify();
}
void PosixOracleEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* /*args*/) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(data != nullptr);
write_ops_channel_.Set(WriteOperation(data, std::move(on_writable)));
write_ops_channel_ = WriteOperation(data, std::move(on_writable));
write_op_signal_->Notify();
}
void PosixOracleEndpoint::ProcessReadOperations() {
gpr_log(GPR_INFO, "Starting thread to process read ops ...");
while (true) {
ReadOperation read_op = std::move(read_ops_channel_.Get());
read_ops_channel_.Reset();
read_op_signal_->WaitForNotification();
read_op_signal_ = absl::make_unique<grpc_core::Notification>();
auto read_op = std::exchange(read_ops_channel_, ReadOperation());
if (!read_op.IsValid()) {
read_op(std::string(), absl::CancelledError("Closed"));
break;
@ -266,8 +273,9 @@ void PosixOracleEndpoint::ProcessReadOperations() {
void PosixOracleEndpoint::ProcessWriteOperations() {
gpr_log(GPR_INFO, "Starting thread to process write ops ...");
while (true) {
WriteOperation write_op = std::move(write_ops_channel_.Get());
write_ops_channel_.Reset();
write_op_signal_->WaitForNotification();
write_op_signal_ = absl::make_unique<grpc_core::Notification>();
auto write_op = std::exchange(write_ops_channel_, WriteOperation());
if (!write_op.IsValid()) {
write_op(absl::CancelledError("Closed"));
break;
@ -296,7 +304,7 @@ PosixOracleListener::PosixOracleListener(
}
absl::Status PosixOracleListener::Start() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(!listener_fds_.empty());
if (std::exchange(is_started_, true)) {
return absl::InternalError("Cannot start listener more than once ...");
@ -312,7 +320,7 @@ absl::Status PosixOracleListener::Start() {
}
PosixOracleListener::~PosixOracleListener() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
if (!is_started_) {
serve_.Join();
return;
@ -373,7 +381,7 @@ void PosixOracleListener::HandleIncomingConnections() {
absl::StatusOr<int> PosixOracleListener::Bind(
const EventEngine::ResolvedAddress& addr) {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
int new_socket;
int opt = -1;
grpc_resolved_address address = CreateGRPCResolvedAddress(addr);

@ -23,15 +23,12 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
namespace grpc_event_engine {
@ -106,11 +103,15 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
void ProcessReadOperations();
void ProcessWriteOperations();
mutable absl::Mutex mu_;
mutable grpc_core::Mutex mu_;
bool is_shutdown_ = false;
int socket_fd_;
Promise<ReadOperation> read_ops_channel_;
Promise<WriteOperation> write_ops_channel_;
ReadOperation read_ops_channel_;
WriteOperation write_ops_channel_;
std::unique_ptr<grpc_core::Notification> read_op_signal_{
new grpc_core::Notification()};
std::unique_ptr<grpc_core::Notification> write_op_signal_{
new grpc_core::Notification()};
grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_);
grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_);
};
@ -128,7 +129,7 @@ class PosixOracleListener : public EventEngine::Listener {
private:
void HandleIncomingConnections();
mutable absl::Mutex mu_;
mutable grpc_core::Mutex mu_;
EventEngine::Listener::AcceptCallback on_accept_;
absl::AnyInvocable<void(absl::Status)> on_shutdown_;
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;

@ -175,7 +175,9 @@ TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
while (!signaled_) {
cv_.Wait(&mu_);
}
gpr_log(GPR_DEBUG, "failed timer count: %d of %d", failed_call_count.load(),
thread_count * call_count);
if (failed_call_count.load() != 0) {
gpr_log(GPR_DEBUG, "failed timer count: %d of %d", failed_call_count.load(),
thread_count * call_count);
}
ASSERT_EQ(0, failed_call_count.load());
}

@ -29,9 +29,9 @@
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/executor/threaded_executor.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/error.h"
#include "test/core/event_engine/windows/create_sockpair.h"
@ -41,7 +41,6 @@ using ::grpc_event_engine::experimental::CreateSockpair;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::IOCP;
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::SelfDeletingClosure;
using ::grpc_event_engine::experimental::ThreadedExecutor;
using ::grpc_event_engine::experimental::WinSocket;
@ -58,8 +57,8 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
WinSocket* wrapped_server_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[1]));
Promise<bool> read_called{false};
Promise<bool> write_called{false};
grpc_core::Notification read_called;
grpc_core::Notification write_called;
DWORD flags = 0;
AnyInvocableClosure* on_read;
AnyInvocableClosure* on_write;
@ -84,7 +83,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Set(true);
read_called.Notify();
});
wrapped_client_socket->NotifyOnRead(on_read);
}
@ -109,7 +108,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
}
on_write = new AnyInvocableClosure([&write_called] {
gpr_log(GPR_DEBUG, "Notified on write");
write_called.Set(true);
write_called.Notify();
});
wrapped_server_socket->NotifyOnWrite(on_write);
}
@ -126,8 +125,8 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
ASSERT_TRUE(cb_invoked);
// wait for the callbacks to run
ASSERT_TRUE(read_called.Get());
ASSERT_TRUE(write_called.Get());
read_called.WaitForNotification();
write_called.WaitForNotification();
delete on_read;
delete on_write;
@ -144,7 +143,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
WinSocket* wrapped_client_socket =
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
Promise<bool> read_called{false};
grpc_core::Notification read_called;
DWORD flags = 0;
AnyInvocableClosure* on_read;
{
@ -169,7 +168,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
gpr_log(GPR_DEBUG, "Notified on read");
EXPECT_GE(wrapped_client_socket->read_info()->bytes_transferred(), 10);
EXPECT_STREQ(read_wsabuf.buf, "hello!");
read_called.Set(true);
read_called.Notify();
});
}
{
@ -194,7 +193,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
// register the closure, which should trigger it immediately.
wrapped_client_socket->NotifyOnRead(on_read);
// wait for the callbacks to run
ASSERT_TRUE(read_called.Get());
read_called.WaitForNotification();
delete on_read;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
@ -204,14 +203,14 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
TEST_F(IOCPTest, KickWorks) {
ThreadedExecutor executor{2};
IOCP iocp(&executor);
Promise<bool> kicked{false};
grpc_core::Notification kicked;
executor.Run([&iocp, &kicked] {
bool cb_invoked = false;
Poller::WorkResult result = iocp.Work(
std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(result == Poller::WorkResult::kKicked);
ASSERT_FALSE(cb_invoked);
kicked.Set(true);
kicked.Notify();
});
executor.Run([&iocp] {
// give the worker thread a chance to start
@ -219,7 +218,7 @@ TEST_F(IOCPTest, KickWorks) {
iocp.Kick();
});
// wait for the callbacks to run
ASSERT_TRUE(kicked.Get());
kicked.WaitForNotification();
}
TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {

@ -1963,7 +1963,6 @@ src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_heap.h \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
src/core/lib/event_engine/posix_engine/timer_manager.h \
src/core/lib/event_engine/promise.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \

@ -1751,7 +1751,6 @@ src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_heap.h \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
src/core/lib/event_engine/posix_engine/timer_manager.h \
src/core/lib/event_engine/promise.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \

Loading…
Cancel
Save