From 9beb72836e528aeba00ad9c55698ad887a032737 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 9 Jan 2023 14:02:26 -0800 Subject: [PATCH] [event_engine+promises] EventEngine based wakeup scheduler for activities (#31926) * eliminate activity* from wakers * event engine activity wakeup scheduler * Automated change: Fix sanity tests * add exec ctx * fix * iwyu * fix Co-authored-by: ctiller --- CMakeLists.txt | 38 ++++++++++ build_autogenerated.yaml | 11 +++ src/core/BUILD | 13 ++++ src/core/lib/promise/activity.h | 32 +++++---- .../promise/event_engine_wakeup_scheduler.h | 63 +++++++++++++++++ .../lib/promise/exec_ctx_wakeup_scheduler.h | 30 ++++---- test/core/promise/BUILD | 20 ++++++ .../event_engine_wakeup_scheduler_test.cc | 70 +++++++++++++++++++ test/core/promise/promise_fuzzer.cc | 22 ++++-- test/core/promise/test_wakeup_schedulers.h | 33 ++++++--- tools/run_tests/generated/tests.json | 24 +++++++ 11 files changed, 317 insertions(+), 39 deletions(-) create mode 100644 src/core/lib/promise/event_engine_wakeup_scheduler.h create mode 100644 test/core/promise/event_engine_wakeup_scheduler_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 5304df29e86..1622f33b189 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -936,6 +936,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx error_test) add_dependencies(buildtests_cxx error_utils_test) add_dependencies(buildtests_cxx evaluate_args_test) + add_dependencies(buildtests_cxx event_engine_wakeup_scheduler_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx event_poller_posix_test) endif() @@ -10277,6 +10278,43 @@ target_link_libraries(evaluate_args_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(event_engine_wakeup_scheduler_test + test/core/promise/event_engine_wakeup_scheduler_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(event_engine_wakeup_scheduler_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(event_engine_wakeup_scheduler_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index cf2034cfb71..3d22a961460 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6599,6 +6599,17 @@ targets: - test/core/util/tracer_util.cc deps: - grpc_test_util +- name: event_engine_wakeup_scheduler_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/promise/event_engine_wakeup_scheduler.h + src: + - test/core/promise/event_engine_wakeup_scheduler_test.cc + deps: + - grpc + uses_polling: false - name: event_poller_posix_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 6412a5108b4..ef9f183b285 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -731,6 +731,19 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "event_engine_wakeup_scheduler", + hdrs = [ + "lib/promise/event_engine_wakeup_scheduler.h", + ], + language = "c++", + deps = [ + "//:event_engine_base_hdrs", + "//:exec_ctx", + "//:gpr_platform", + ], +) + grpc_cc_library( name = "wait_set", external_deps = [ diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index d0825c59b55..9cffaf347da 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -387,23 +387,32 @@ class FreestandingActivity : public Activity, private Wakeable { }; // Implementation details for an Activity of an arbitrary type of promise. -// There should exist a static function: +// There should exist an inner template class `BoundScheduler` that provides +// the following interface: // struct WakeupScheduler { // template -// void ScheduleWakeup(ActivityType* activity); +// class BoundScheduler { +// public: +// BoundScheduler(WakeupScheduler); +// void ScheduleWakeup(); +// }; // }; -// This function should arrange that activity->RunScheduledWakeup() be invoked -// at the earliest opportunity. +// The ScheduleWakeup function should arrange that +// static_cast(this)->RunScheduledWakeup() be invoked at the +// earliest opportunity. // It can assume that activity will remain live until RunScheduledWakeup() is // invoked, and that a given activity will not be concurrently scheduled again // until its RunScheduledWakeup() has been invoked. -// We use private inheritance here as a way of getting private members for -// each of the contexts. +// We use private inheritance here as a way of getting private members for each +// of the contexts. // TODO(ctiller): We can probably reconsider the private inheritance here // when we move away from C++11 and have more powerful template features. template -class PromiseActivity final : public FreestandingActivity, - private ActivityContexts { +class PromiseActivity final + : public FreestandingActivity, + public WakeupScheduler::template BoundScheduler< + PromiseActivity>, + private ActivityContexts { public: using Factory = OncePromiseFactory; using ResultType = typename Factory::Promise::Result; @@ -411,8 +420,9 @@ class PromiseActivity final : public FreestandingActivity, PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) : FreestandingActivity(), + WakeupScheduler::template BoundScheduler( + std::move(wakeup_scheduler)), ActivityContexts(std::forward(contexts)...), - wakeup_scheduler_(std::move(wakeup_scheduler)), on_done_(std::move(on_done)) { // Lock, construct an initial promise from the factory, and step it. // This may hit a waiter, which could expose our this pointer to other @@ -482,7 +492,7 @@ class PromiseActivity final : public FreestandingActivity, } if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) { // Can't safely run, so ask to run later. - wakeup_scheduler_.ScheduleWakeup(this); + this->ScheduleWakeup(); } else { // Already a wakeup scheduled for later, drop ref. WakeupComplete(); @@ -564,8 +574,6 @@ class PromiseActivity final : public FreestandingActivity, } using Promise = typename Factory::Promise; - // Scheduler for wakeups - GPR_NO_UNIQUE_ADDRESS WakeupScheduler wakeup_scheduler_; // Callback on completion of the promise. GPR_NO_UNIQUE_ADDRESS OnDone on_done_; // Has execution completed? diff --git a/src/core/lib/promise/event_engine_wakeup_scheduler.h b/src/core/lib/promise/event_engine_wakeup_scheduler.h new file mode 100644 index 00000000000..61cf86d7387 --- /dev/null +++ b/src/core/lib/promise/event_engine_wakeup_scheduler.h @@ -0,0 +1,63 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H +#define GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H + +#include + +#include +#include + +#include + +#include "src/core/lib/iomgr/exec_ctx.h" + +namespace grpc_core { + +// A callback scheduler for activities that works by scheduling callbacks on the +// exec ctx. +class EventEngineWakeupScheduler { + public: + explicit EventEngineWakeupScheduler( + std::shared_ptr + event_engine) + : event_engine_(std::move(event_engine)) {} + + template + class BoundScheduler + : public grpc_event_engine::experimental::EventEngine::Closure { + protected: + explicit BoundScheduler(EventEngineWakeupScheduler scheduler) + : event_engine_(std::move(scheduler.event_engine_)) {} + BoundScheduler(const BoundScheduler&) = delete; + BoundScheduler& operator=(const BoundScheduler&) = delete; + void ScheduleWakeup() { event_engine_->Run(this); } + void Run() final { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + static_cast(this)->RunScheduledWakeup(); + } + + private: + std::shared_ptr event_engine_; + }; + + private: + std::shared_ptr event_engine_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_PROMISE_EVENT_ENGINE_WAKEUP_SCHEDULER_H diff --git a/src/core/lib/promise/exec_ctx_wakeup_scheduler.h b/src/core/lib/promise/exec_ctx_wakeup_scheduler.h index 2a3f7837234..85706243ec7 100644 --- a/src/core/lib/promise/exec_ctx_wakeup_scheduler.h +++ b/src/core/lib/promise/exec_ctx_wakeup_scheduler.h @@ -31,18 +31,24 @@ namespace grpc_core { class ExecCtxWakeupScheduler { public: template - void ScheduleWakeup(ActivityType* activity) { - GRPC_CLOSURE_INIT( - &closure_, - [](void* arg, grpc_error_handle) { - static_cast(arg)->RunScheduledWakeup(); - }, - activity, grpc_schedule_on_exec_ctx); - ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus()); - } - - private: - grpc_closure closure_; + class BoundScheduler { + protected: + explicit BoundScheduler(ExecCtxWakeupScheduler) {} + BoundScheduler(const BoundScheduler&) = delete; + BoundScheduler& operator=(const BoundScheduler&) = delete; + void ScheduleWakeup() { + GRPC_CLOSURE_INIT( + &closure_, + [](void* arg, grpc_error_handle) { + static_cast(arg)->RunScheduledWakeup(); + }, + static_cast(this), nullptr); + ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus()); + } + + private: + grpc_closure closure_; + }; }; } // namespace grpc_core diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index 6bf965d0115..772ea391321 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -435,6 +435,26 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "event_engine_wakeup_scheduler_test", + srcs = ["event_engine_wakeup_scheduler_test.cc"], + external_deps = [ + "absl/status", + "gtest", + ], + language = "c++", + tags = ["promise_test"], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:grpc", + "//src/core:activity", + "//src/core:event_engine_wakeup_scheduler", + "//src/core:notification", + "//src/core:poll", + ], +) + grpc_cc_test( name = "sleep_test", srcs = ["sleep_test.cc"], diff --git a/test/core/promise/event_engine_wakeup_scheduler_test.cc b/test/core/promise/event_engine_wakeup_scheduler_test.cc new file mode 100644 index 00000000000..306105d26ce --- /dev/null +++ b/test/core/promise/event_engine_wakeup_scheduler_test.cc @@ -0,0 +1,70 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" + +#include + +#include + +#include "absl/status/status.h" +#include "gtest/gtest.h" + +#include +#include + +#include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" + +namespace grpc_core { + +TEST(EventEngineWakeupSchedulerTest, Works) { + int state = 0; + Notification done; + auto activity = MakeActivity( + [&state]() mutable -> Poll { + ++state; + switch (state) { + case 1: + return Pending(); + case 2: + return absl::OkStatus(); + default: + abort(); + } + }, + EventEngineWakeupScheduler( + grpc_event_engine::experimental::CreateEventEngine()), + [&done](absl::Status status) { + EXPECT_EQ(status, absl::OkStatus()); + done.Notify(); + }); + + EXPECT_EQ(state, 1); + EXPECT_FALSE(done.HasBeenNotified()); + activity->ForceWakeup(); + done.WaitForNotification(); + EXPECT_EQ(state, 2); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + auto r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; +} diff --git a/test/core/promise/promise_fuzzer.cc b/test/core/promise/promise_fuzzer.cc index d44e34990df..c777e948065 100644 --- a/test/core/promise/promise_fuzzer.cc +++ b/test/core/promise/promise_fuzzer.cc @@ -121,13 +121,23 @@ class Fuzzer { // Schedule wakeups against the fuzzer struct Scheduler { Fuzzer* fuzzer; - // Schedule a wakeup template - void ScheduleWakeup(ActivityType* activity) { - GPR_ASSERT(activity == fuzzer->activity_.get()); - GPR_ASSERT(fuzzer->wakeup_ == nullptr); - fuzzer->wakeup_ = [activity]() { activity->RunScheduledWakeup(); }; - } + class BoundScheduler { + public: + explicit BoundScheduler(Scheduler scheduler) + : fuzzer_(scheduler.fuzzer) {} + void ScheduleWakeup() { + GPR_ASSERT(static_cast(this) == + fuzzer_->activity_.get()); + GPR_ASSERT(fuzzer_->wakeup_ == nullptr); + fuzzer_->wakeup_ = [this]() { + static_cast(this)->RunScheduledWakeup(); + }; + } + + private: + Fuzzer* fuzzer_; + }; }; // We know that if not already finished, the status when finished will be diff --git a/test/core/promise/test_wakeup_schedulers.h b/test/core/promise/test_wakeup_schedulers.h index 55bc35719be..adba934c079 100644 --- a/test/core/promise/test_wakeup_schedulers.h +++ b/test/core/promise/test_wakeup_schedulers.h @@ -27,9 +27,11 @@ namespace grpc_core { // Useful for very limited tests. struct NoWakeupScheduler { template - void ScheduleWakeup(ActivityType*) { - abort(); - } + class BoundScheduler { + public: + explicit BoundScheduler(NoWakeupScheduler) {} + void ScheduleWakeup() { abort(); } + }; }; // A wakeup scheduler that simply runs the callback immediately. @@ -37,9 +39,13 @@ struct NoWakeupScheduler { // ordering problems. struct InlineWakeupScheduler { template - void ScheduleWakeup(ActivityType* activity) { - activity->RunScheduledWakeup(); - } + class BoundScheduler { + public: + explicit BoundScheduler(InlineWakeupScheduler) {} + void ScheduleWakeup() { + static_cast(this)->RunScheduledWakeup(); + } + }; }; // Mock for something that can schedule callbacks. @@ -58,9 +64,18 @@ class MockCallbackScheduler { struct UseMockCallbackScheduler { MockCallbackScheduler* scheduler; template - void ScheduleWakeup(ActivityType* activity) { - scheduler->Schedule([activity] { activity->RunScheduledWakeup(); }); - } + class BoundScheduler { + public: + explicit BoundScheduler(UseMockCallbackScheduler use_scheduler) + : scheduler(use_scheduler.scheduler) {} + void ScheduleWakeup() { + scheduler->Schedule( + [this] { static_cast(this)->RunScheduledWakeup(); }); + } + + private: + MockCallbackScheduler* scheduler; + }; }; } // namespace grpc_core diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 08372b3eec9..6b1126714fd 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2755,6 +2755,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "event_engine_wakeup_scheduler_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,