[event_engine+promises] EventEngine based wakeup scheduler for activities (#31926)

* eliminate activity* from wakers

* event engine activity wakeup scheduler

* Automated change: Fix sanity tests

* add exec ctx

* fix

* iwyu

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32060/head
Craig Tiller 2 years ago committed by GitHub
parent bf9d7cb2fe
commit 9beb72836e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      CMakeLists.txt
  2. 11
      build_autogenerated.yaml
  3. 13
      src/core/BUILD
  4. 32
      src/core/lib/promise/activity.h
  5. 63
      src/core/lib/promise/event_engine_wakeup_scheduler.h
  6. 30
      src/core/lib/promise/exec_ctx_wakeup_scheduler.h
  7. 20
      test/core/promise/BUILD
  8. 70
      test/core/promise/event_engine_wakeup_scheduler_test.cc
  9. 22
      test/core/promise/promise_fuzzer.cc
  10. 33
      test/core/promise/test_wakeup_schedulers.h
  11. 24
      tools/run_tests/generated/tests.json

38
CMakeLists.txt generated

@ -936,6 +936,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx error_test)
add_dependencies(buildtests_cxx error_utils_test)
add_dependencies(buildtests_cxx evaluate_args_test)
add_dependencies(buildtests_cxx event_engine_wakeup_scheduler_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx event_poller_posix_test)
endif()
@ -10277,6 +10278,43 @@ target_link_libraries(evaluate_args_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(event_engine_wakeup_scheduler_test
test/core/promise/event_engine_wakeup_scheduler_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(event_engine_wakeup_scheduler_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(event_engine_wakeup_scheduler_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -6599,6 +6599,17 @@ targets:
- test/core/util/tracer_util.cc
deps:
- grpc_test_util
- name: event_engine_wakeup_scheduler_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/promise/event_engine_wakeup_scheduler.h
src:
- test/core/promise/event_engine_wakeup_scheduler_test.cc
deps:
- grpc
uses_polling: false
- name: event_poller_posix_test
gtest: true
build: test

@ -731,6 +731,19 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_wakeup_scheduler",
hdrs = [
"lib/promise/event_engine_wakeup_scheduler.h",
],
language = "c++",
deps = [
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "wait_set",
external_deps = [

@ -387,23 +387,32 @@ class FreestandingActivity : public Activity, private Wakeable {
};
// Implementation details for an Activity of an arbitrary type of promise.
// There should exist a static function:
// There should exist an inner template class `BoundScheduler` that provides
// the following interface:
// struct WakeupScheduler {
// template <typename ActivityType>
// void ScheduleWakeup(ActivityType* activity);
// class BoundScheduler {
// public:
// BoundScheduler(WakeupScheduler);
// void ScheduleWakeup();
// };
// };
// This function should arrange that activity->RunScheduledWakeup() be invoked
// at the earliest opportunity.
// The ScheduleWakeup function should arrange that
// static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
// earliest opportunity.
// It can assume that activity will remain live until RunScheduledWakeup() is
// invoked, and that a given activity will not be concurrently scheduled again
// until its RunScheduledWakeup() has been invoked.
// We use private inheritance here as a way of getting private members for
// each of the contexts.
// We use private inheritance here as a way of getting private members for each
// of the contexts.
// TODO(ctiller): We can probably reconsider the private inheritance here
// when we move away from C++11 and have more powerful template features.
template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
class PromiseActivity final : public FreestandingActivity,
private ActivityContexts<Contexts...> {
class PromiseActivity final
: public FreestandingActivity,
public WakeupScheduler::template BoundScheduler<
PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
private ActivityContexts<Contexts...> {
public:
using Factory = OncePromiseFactory<void, F>;
using ResultType = typename Factory::Promise::Result;
@ -411,8 +420,9 @@ class PromiseActivity final : public FreestandingActivity,
PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
OnDone on_done, Contexts&&... contexts)
: FreestandingActivity(),
WakeupScheduler::template BoundScheduler<PromiseActivity>(
std::move(wakeup_scheduler)),
ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
wakeup_scheduler_(std::move(wakeup_scheduler)),
on_done_(std::move(on_done)) {
// Lock, construct an initial promise from the factory, and step it.
// This may hit a waiter, which could expose our this pointer to other
@ -482,7 +492,7 @@ class PromiseActivity final : public FreestandingActivity,
}
if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
// Can't safely run, so ask to run later.
wakeup_scheduler_.ScheduleWakeup(this);
this->ScheduleWakeup();
} else {
// Already a wakeup scheduled for later, drop ref.
WakeupComplete();
@ -564,8 +574,6 @@ class PromiseActivity final : public FreestandingActivity,
}
using Promise = typename Factory::Promise;
// Scheduler for wakeups
GPR_NO_UNIQUE_ADDRESS WakeupScheduler wakeup_scheduler_;
// Callback on completion of the promise.
GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
// Has execution completed?

@ -0,0 +1,63 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H
#define GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <utility>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
// A callback scheduler for activities that works by scheduling callbacks on the
// exec ctx.
class EventEngineWakeupScheduler {
public:
explicit EventEngineWakeupScheduler(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: event_engine_(std::move(event_engine)) {}
template <typename ActivityType>
class BoundScheduler
: public grpc_event_engine::experimental::EventEngine::Closure {
protected:
explicit BoundScheduler(EventEngineWakeupScheduler scheduler)
: event_engine_(std::move(scheduler.event_engine_)) {}
BoundScheduler(const BoundScheduler&) = delete;
BoundScheduler& operator=(const BoundScheduler&) = delete;
void ScheduleWakeup() { event_engine_->Run(this); }
void Run() final {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
static_cast<ActivityType*>(this)->RunScheduledWakeup();
}
private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H

@ -31,18 +31,24 @@ namespace grpc_core {
class ExecCtxWakeupScheduler {
public:
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
GRPC_CLOSURE_INIT(
&closure_,
[](void* arg, grpc_error_handle) {
static_cast<ActivityType*>(arg)->RunScheduledWakeup();
},
activity, grpc_schedule_on_exec_ctx);
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
private:
grpc_closure closure_;
class BoundScheduler {
protected:
explicit BoundScheduler(ExecCtxWakeupScheduler) {}
BoundScheduler(const BoundScheduler&) = delete;
BoundScheduler& operator=(const BoundScheduler&) = delete;
void ScheduleWakeup() {
GRPC_CLOSURE_INIT(
&closure_,
[](void* arg, grpc_error_handle) {
static_cast<ActivityType*>(arg)->RunScheduledWakeup();
},
static_cast<ActivityType*>(this), nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
private:
grpc_closure closure_;
};
};
} // namespace grpc_core

@ -435,6 +435,26 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "event_engine_wakeup_scheduler_test",
srcs = ["event_engine_wakeup_scheduler_test.cc"],
external_deps = [
"absl/status",
"gtest",
],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:grpc",
"//src/core:activity",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:notification",
"//src/core:poll",
],
)
grpc_cc_test(
name = "sleep_test",
srcs = ["sleep_test.cc"],

@ -0,0 +1,70 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include <stdlib.h>
#include <memory>
#include "absl/status/status.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
TEST(EventEngineWakeupSchedulerTest, Works) {
int state = 0;
Notification done;
auto activity = MakeActivity(
[&state]() mutable -> Poll<absl::Status> {
++state;
switch (state) {
case 1:
return Pending();
case 2:
return absl::OkStatus();
default:
abort();
}
},
EventEngineWakeupScheduler(
grpc_event_engine::experimental::CreateEventEngine()),
[&done](absl::Status status) {
EXPECT_EQ(status, absl::OkStatus());
done.Notify();
});
EXPECT_EQ(state, 1);
EXPECT_FALSE(done.HasBeenNotified());
activity->ForceWakeup();
done.WaitForNotification();
EXPECT_EQ(state, 2);
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_init();
auto r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -121,13 +121,23 @@ class Fuzzer {
// Schedule wakeups against the fuzzer
struct Scheduler {
Fuzzer* fuzzer;
// Schedule a wakeup
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
GPR_ASSERT(activity == fuzzer->activity_.get());
GPR_ASSERT(fuzzer->wakeup_ == nullptr);
fuzzer->wakeup_ = [activity]() { activity->RunScheduledWakeup(); };
}
class BoundScheduler {
public:
explicit BoundScheduler(Scheduler scheduler)
: fuzzer_(scheduler.fuzzer) {}
void ScheduleWakeup() {
GPR_ASSERT(static_cast<ActivityType*>(this) ==
fuzzer_->activity_.get());
GPR_ASSERT(fuzzer_->wakeup_ == nullptr);
fuzzer_->wakeup_ = [this]() {
static_cast<ActivityType*>(this)->RunScheduledWakeup();
};
}
private:
Fuzzer* fuzzer_;
};
};
// We know that if not already finished, the status when finished will be

@ -27,9 +27,11 @@ namespace grpc_core {
// Useful for very limited tests.
struct NoWakeupScheduler {
template <typename ActivityType>
void ScheduleWakeup(ActivityType*) {
abort();
}
class BoundScheduler {
public:
explicit BoundScheduler(NoWakeupScheduler) {}
void ScheduleWakeup() { abort(); }
};
};
// A wakeup scheduler that simply runs the callback immediately.
@ -37,9 +39,13 @@ struct NoWakeupScheduler {
// ordering problems.
struct InlineWakeupScheduler {
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
activity->RunScheduledWakeup();
}
class BoundScheduler {
public:
explicit BoundScheduler(InlineWakeupScheduler) {}
void ScheduleWakeup() {
static_cast<ActivityType*>(this)->RunScheduledWakeup();
}
};
};
// Mock for something that can schedule callbacks.
@ -58,9 +64,18 @@ class MockCallbackScheduler {
struct UseMockCallbackScheduler {
MockCallbackScheduler* scheduler;
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
scheduler->Schedule([activity] { activity->RunScheduledWakeup(); });
}
class BoundScheduler {
public:
explicit BoundScheduler(UseMockCallbackScheduler use_scheduler)
: scheduler(use_scheduler.scheduler) {}
void ScheduleWakeup() {
scheduler->Schedule(
[this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); });
}
private:
MockCallbackScheduler* scheduler;
};
};
} // namespace grpc_core

@ -2755,6 +2755,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "event_engine_wakeup_scheduler_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save