[event_engine] More robust ee::Promise<> implementation (#30977)

* fix promise

* iocp

* iwyu

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30983/head
Craig Tiller 2 years ago committed by GitHub
parent fb3c83906c
commit 14c3839749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/core/lib/event_engine/promise.h
  2. 8
      test/core/event_engine/common_closures_test.cc
  3. 2
      test/core/event_engine/posix/event_poller_posix_test.cc
  4. 8
      test/core/event_engine/windows/iocp_test.cc

@ -15,8 +15,6 @@
#define GRPC_CORE_LIB_EVENT_ENGINE_PROMISE_H
#include <grpc/support/port_platform.h>
#include "absl/time/time.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
@ -27,8 +25,8 @@ namespace experimental {
/// A minimal promise implementation.
///
/// This is light-duty, syntactical sugar around cv wait & signal, which is
/// useful in some cases. A more robust implementation is being worked on
/// separately.
/// useful in some cases.
/// TODO(ctiller): Find a new name for this type.
template <typename T>
class Promise {
public:
@ -38,13 +36,10 @@ class Promise {
explicit Promise(T&& val) : val_(val) {}
// The getter will wait until the setter has been called, and will return the
// value passed during Set.
T& Get() { return WaitWithTimeout(absl::Hours(1)); }
// The getter will wait with timeout until the setter has been called, and
// will return the value passed during Set.
T& WaitWithTimeout(absl::Duration d) {
T& Get() {
grpc_core::MutexLock lock(&mu_);
if (!set_) {
cv_.WaitWithTimeout(&mu_, d);
while (!set_) {
cv_.Wait(&mu_);
}
return val_;
}

@ -31,7 +31,7 @@ TEST_F(AnyInvocableClosureTest, CallsItsFunction) {
Promise<bool> promise;
AnyInvocableClosure closure([&promise] { promise.Set(true); });
closure.Run();
ASSERT_TRUE(promise.WaitWithTimeout(absl::Seconds(3)));
ASSERT_TRUE(promise.Get());
}
class SelfDeletingClosureTest : public testing::Test {};
@ -41,7 +41,7 @@ TEST_F(SelfDeletingClosureTest, CallsItsFunction) {
auto* closure =
SelfDeletingClosure::Create([&promise] { promise.Set(true); });
closure->Run();
ASSERT_TRUE(promise.WaitWithTimeout(absl::Seconds(3)));
ASSERT_TRUE(promise.Get());
// ASAN should catch if this closure is not deleted
}
@ -52,8 +52,8 @@ TEST_F(SelfDeletingClosureTest, CallsItsFunctionAndIsDestroyed) {
SelfDeletingClosure::Create([&fn_called] { fn_called.Set(true); },
[&destroyed] { destroyed.Set(true); });
closure->Run();
ASSERT_TRUE(fn_called.WaitWithTimeout(absl::Seconds(3)));
ASSERT_TRUE(destroyed.WaitWithTimeout(absl::Seconds(3)));
ASSERT_TRUE(fn_called.Get());
ASSERT_TRUE(destroyed.Get());
}
int main(int argc, char** argv) {

@ -680,7 +680,7 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
}
void Wait() {
EXPECT_TRUE(promise.WaitWithTimeout(absl::Seconds(60)));
EXPECT_TRUE(promise.Get());
WeakUnref();
}

@ -126,8 +126,8 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
ASSERT_EQ(closures.size(), 1);
executor.Run(closures[0]);
// wait for the callbacks to run
ASSERT_TRUE(read_called.WaitWithTimeout(absl::Seconds(10)));
ASSERT_TRUE(write_called.WaitWithTimeout(absl::Seconds(10)));
ASSERT_TRUE(read_called.Get());
ASSERT_TRUE(write_called.Get());
delete on_read;
delete on_write;
@ -193,7 +193,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
// register the closure, which should trigger it immediately.
wrapped_client_socket->NotifyOnRead(on_read);
// wait for the callbacks to run
ASSERT_TRUE(read_called.WaitWithTimeout(absl::Seconds(10)));
ASSERT_TRUE(read_called.Get());
delete on_read;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
@ -215,7 +215,7 @@ TEST_F(IOCPTest, KickWorks) {
iocp.Kick();
});
// wait for the callbacks to run
ASSERT_TRUE(kicked.WaitWithTimeout(absl::Seconds(10)));
ASSERT_TRUE(kicked.Get());
}
TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {

Loading…
Cancel
Save