Revert "Revert "Temporarily require ExecCtx to be on the thread's stack for EventEngine (#29755)" (#29770)" (#29777)

This reverts commit c895272157.
pull/29780/head
AJ Heller 3 years ago committed by GitHub
parent 34920fcf8e
commit 3b3f952ef6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/core/lib/event_engine/iomgr_engine.cc
  2. 2
      src/core/lib/event_engine/iomgr_engine.h
  3. 1
      test/core/event_engine/test_suite/BUILD
  4. 3
      test/core/event_engine/test_suite/client_test.cc
  5. 3
      test/core/event_engine/test_suite/dns_test.cc
  6. 3
      test/core/event_engine/test_suite/server_test.cc
  7. 9
      test/core/event_engine/test_suite/timer_test.cc

@ -70,6 +70,7 @@ std::string HandleToString(EventEngine::TaskHandle handle) {
IomgrEventEngine::IomgrEventEngine() {}
IomgrEventEngine::~IomgrEventEngine() {
grpc_core::ExecCtx::Get()->Flush();
grpc_core::MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
@ -83,7 +84,6 @@ IomgrEventEngine::~IomgrEventEngine() {
}
bool IomgrEventEngine::Cancel(EventEngine::TaskHandle handle) {
grpc_core::ExecCtx ctx;
grpc_core::MutexLock lock(&mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]);
@ -114,7 +114,6 @@ EventEngine::TaskHandle IomgrEventEngine::RunAtInternal(
absl::Time when,
absl::variant<std::function<void()>, EventEngine::Closure*> cb) {
when = Clamp(when);
grpc_core::ExecCtx ctx;
auto* cd = new ClosureData;
cd->cb = std::move(cb);
cd->engine = this;

@ -39,6 +39,8 @@
namespace grpc_event_engine {
namespace experimental {
// An iomgr-based EventEngine implementation.
// All methods require an ExecCtx to already exist on the thread's stack.
class IomgrEventEngine final : public EventEngine {
public:
class IomgrEndpoint : public EventEngine::Endpoint {

@ -87,6 +87,7 @@ grpc_cc_library(
hdrs = COMMON_HEADERS,
external_deps = ["gtest"],
deps = [
"//:exec_ctx",
"//:grpc",
"//test/core/util:grpc_test_util",
],

@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
class EventEngineClientTest : public EventEngineTest {};
// TODO(hork): establish meaningful tests
TEST_F(EventEngineClientTest, TODO) {}
TEST_F(EventEngineClientTest, TODO) { grpc_core::ExecCtx exec_ctx; }

@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
class EventEngineDNSTest : public EventEngineTest {};
// TODO(hork): establish meaningful tests
TEST_F(EventEngineDNSTest, TODO) {}
TEST_F(EventEngineDNSTest, TODO) { grpc_core::ExecCtx exec_ctx; }

@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
class EventEngineServerTest : public EventEngineTest {};
// TODO(hork): establish meaningful tests
TEST_F(EventEngineServerTest, TODO) {}
TEST_F(EventEngineServerTest, TODO) { grpc_core::ExecCtx exec_ctx; }

@ -25,6 +25,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
using ::testing::ElementsAre;
@ -41,6 +42,7 @@ class EventEngineTimerTest : public EventEngineTest {
};
TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
grpc_core::MutexLock lock(&mu_);
engine->RunAt(absl::Now(), [this]() {
@ -53,12 +55,14 @@ TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
}
TEST_F(EventEngineTimerTest, SupportsCancellation) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
auto handle = engine->RunAt(absl::InfiniteFuture(), []() {});
ASSERT_TRUE(engine->Cancel(handle));
}
TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
grpc_core::ExecCtx exec_ctx;
{
auto engine = this->NewEventEngine();
auto handle = engine->RunAt(absl::InfiniteFuture(), [this]() {
@ -73,6 +77,7 @@ TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
}
TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
grpc_core::ExecCtx exec_ctx;
// Note: this is a brittle test if the first call to `RunAt` takes longer than
// the second callback's wait time.
std::vector<uint8_t> ordered;
@ -102,6 +107,7 @@ TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
}
TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
grpc_core::MutexLock lock(&mu_);
auto handle = engine->RunAt(absl::Now(), [this]() {
@ -123,6 +129,7 @@ void EventEngineTimerTest::ScheduleCheckCB(absl::Time when,
// millis, absl::Time reports in nanos. This generic test will be hard-coded
// to the lowest common denominator until EventEngines can compare relative
// times with supported resolution.
grpc_core::ExecCtx exec_ctx;
int64_t now_millis = absl::ToUnixMillis(absl::Now());
int64_t when_millis = absl::ToUnixMillis(when);
EXPECT_LE(when_millis, now_millis);
@ -135,6 +142,7 @@ void EventEngineTimerTest::ScheduleCheckCB(absl::Time when,
}
TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
grpc_core::ExecCtx exec_ctx;
auto engine = this->NewEventEngine();
constexpr int thread_count = 100;
constexpr int call_count_per_thread = 100;
@ -146,6 +154,7 @@ TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
threads.reserve(thread_count);
for (int thread_n = 0; thread_n < thread_count; ++thread_n) {
threads.emplace_back([&]() {
grpc_core::ExecCtx exec_ctx;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis(timeout_min_seconds,

Loading…
Cancel
Save