[༺ EventEngine ༻] Specify requirements for Run* immediate execution (#32028)

* [༺ EventEngine༻ ] Specify requirements for Run* immediate execution

Also adds test suite tests that may catch non-conforming implementations
(Run called ~3200 times in case it's non-deterministic).

* fix

* verbiage; longer timeout

* fix int type comparison warning

* rm illegal term

* Automated change: Fix sanity tests

* disable tests

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/32034/head
AJ Heller 2 years ago committed by GitHub
parent 4806da6041
commit 5db6de8634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      include/grpc/event_engine/event_engine.h
  2. 59
      test/core/event_engine/test_suite/timer_test.cc

@ -379,6 +379,11 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
///
/// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will
/// not be deleted after it has been run, ownership remains with the caller.
///
/// Implementations must not execute the closure in the calling thread before
/// \a Run returns. For example, if the caller must release a lock before the
/// closure can proceed, running the closure immediately would cause a
/// deadlock.
virtual void Run(Closure* closure) = 0;
/// Asynchronously executes a task as soon as possible.
///
@ -389,12 +394,18 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// This version of \a Run may be less performant than the \a Closure version
/// in some scenarios. This overload is useful in situations where performance
/// is not a critical concern.
///
/// Implementations must not execute the closure in the calling thread before
/// \a Run returns.
virtual void Run(absl::AnyInvocable<void()> closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run, nor will it be deleted. Ownership remains with the caller.
///
/// Implementations must not execute the closure in the calling thread before
/// \a RunAfter returns.
virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
@ -407,6 +418,9 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// This version of \a RunAfter may be less performant than the \a Closure
/// version in some scenarios. This overload is useful in situations where
/// performance is not a critical concern.
///
/// Implementations must not execute the closure in the calling thread before
/// \a RunAfter returns.
virtual TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) = 0;
/// Request cancellation of a task.

@ -20,10 +20,13 @@
#include <random>
#include <ratio>
#include <thread>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/functional/function_ref.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gmock/gmock.h"
@ -188,3 +191,59 @@ TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
}
ASSERT_EQ(0, failed_call_count.load());
}
// Common implementation for the Run and RunAfter test variants below
// Calls run_fn multiple times, and will get stuck if the implementation does a
// blocking inline execution of the closure. This test will timeout on failure.
void ImmediateRunTestInternal(
absl::FunctionRef<void(absl::AnyInvocable<void()>)> run_fn,
grpc_core::Mutex& mu, grpc_core::CondVar& cv) {
constexpr int num_concurrent_runs = 32;
constexpr int num_iterations = 100;
constexpr absl::Duration run_timeout = absl::Seconds(60);
std::atomic<int> waiters{0};
std::atomic<int> execution_count{0};
auto cb = [&mu, &cv, &run_timeout, &waiters, &execution_count]() {
waiters.fetch_add(1);
grpc_core::MutexLock lock(&mu);
EXPECT_FALSE(cv.WaitWithTimeout(&mu, run_timeout))
<< "callback timed out waiting.";
execution_count.fetch_add(1);
};
for (int i = 0; i < num_iterations; i++) {
waiters.store(0);
execution_count.store(0);
for (int run = 0; run < num_concurrent_runs; run++) {
run_fn(cb);
}
while (waiters.load() != num_concurrent_runs) {
absl::SleepFor(absl::Milliseconds(33));
}
cv.SignalAll();
while (execution_count.load() != num_concurrent_runs) {
absl::SleepFor(absl::Milliseconds(33));
}
}
}
// TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
// ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,
DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread) {
auto engine = this->NewEventEngine();
ImmediateRunTestInternal(
[&engine](absl::AnyInvocable<void()> cb) { engine->Run(std::move(cb)); },
mu_, cv_);
}
// TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
// ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,
DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread) {
auto engine = this->NewEventEngine();
ImmediateRunTestInternal(
[&engine](absl::AnyInvocable<void()> cb) {
engine->RunAfter(std::chrono::seconds(0), std::move(cb));
},
mu_, cv_);
}

Loading…
Cancel
Save