diff --git a/BUILD b/BUILD index c78876178c0..3058c0f7335 100644 --- a/BUILD +++ b/BUILD @@ -1076,6 +1076,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "sleep", + srcs = [ + "src/core/lib/promise/sleep.cc", + ], + hdrs = [ + "src/core/lib/promise/sleep.h", + ], + deps = [ + "activity", + "gpr_platform", + "grpc_base", + "poll", + ], +) + grpc_cc_library( name = "promise", external_deps = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 74827b652cb..608e14536d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -981,6 +981,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx settings_timeout_test) add_dependencies(buildtests_cxx shutdown_test) add_dependencies(buildtests_cxx simple_request_bad_client_test) + add_dependencies(buildtests_cxx sleep_test) add_dependencies(buildtests_cxx smoke_test) add_dependencies(buildtests_cxx sockaddr_utils_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -15229,6 +15230,42 @@ target_link_libraries(simple_request_bad_client_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(sleep_test + src/core/lib/promise/sleep.cc + test/core/promise/sleep_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(sleep_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(sleep_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 53df1db6647..bb90834efac 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7745,6 +7745,19 @@ targets: - test/core/end2end/cq_verifier.cc deps: - grpc_test_util +- name: sleep_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/promise/sleep.h + - test/core/promise/test_wakeup_schedulers.h + src: + - src/core/lib/promise/sleep.cc + - test/core/promise/sleep_test.cc + deps: + - grpc + uses_polling: false - name: smoke_test gtest: true build: test diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc new file mode 100644 index 00000000000..df58d83f435 --- /dev/null +++ b/src/core/lib/promise/sleep.cc @@ -0,0 +1,74 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/promise/sleep.h" + +namespace grpc_core { + +Sleep::Sleep(grpc_millis deadline) : state_(new State(deadline)) { + GRPC_CLOSURE_INIT(&state_->on_timer, &OnTimer, state_, nullptr); +} + +Sleep::~Sleep() { + if (state_ == nullptr) return; + { + MutexLock lock(&state_->mu); + switch (state_->stage) { + case Stage::kInitial: + state_->Unref(); + break; + case Stage::kStarted: + grpc_timer_cancel(&state_->timer); + break; + case Stage::kDone: + break; + } + } + state_->Unref(); +} + +void Sleep::OnTimer(void* arg, grpc_error_handle) { + auto* state = static_cast(arg); + Waker waker; + { + MutexLock lock(&state->mu); + state->stage = Stage::kDone; + waker = std::move(state->waker); + } + waker.Wakeup(); + state->Unref(); +} + +Poll Sleep::operator()() { + MutexLock lock(&state_->mu); + switch (state_->stage) { + case Stage::kInitial: + if (state_->deadline <= ExecCtx::Get()->Now()) { + return absl::OkStatus(); + } + state_->stage = Stage::kStarted; + grpc_timer_init(&state_->timer, state_->deadline, &state_->on_timer); + break; + case Stage::kStarted: + break; + case Stage::kDone: + return absl::OkStatus(); + } + state_->waker = Activity::current()->MakeNonOwningWaker(); + return Pending{}; +} + +} // namespace grpc_core diff --git a/src/core/lib/promise/sleep.h b/src/core/lib/promise/sleep.h new file mode 100644 index 00000000000..8cf9b15fe94 --- /dev/null +++ b/src/core/lib/promise/sleep.h @@ -0,0 +1,66 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_PROMISE_SLEEP_H +#define GRPC_CORE_LIB_PROMISE_SLEEP_H + +#include + +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/poll.h" + +namespace grpc_core { + +// Promise that sleeps until a deadline and then finishes. +class Sleep { + public: + explicit Sleep(grpc_millis deadline); + ~Sleep(); + + Sleep(const Sleep&) = delete; + Sleep& operator=(const Sleep&) = delete; + Sleep(Sleep&& other) noexcept : state_(other.state_) { + other.state_ = nullptr; + } + Sleep& operator=(Sleep&& other) noexcept { + std::swap(state_, other.state_); + return *this; + } + + Poll operator()(); + + private: + static void OnTimer(void* arg, grpc_error_handle error); + + enum class Stage { kInitial, kStarted, kDone }; + struct State { + explicit State(grpc_millis deadline) : deadline(deadline) {} + RefCount refs{2}; + const grpc_millis deadline; + grpc_timer timer; + grpc_closure on_timer; + Mutex mu; + Stage stage ABSL_GUARDED_BY(mu) = Stage::kInitial; + Waker waker ABSL_GUARDED_BY(mu); + void Unref() { + if (refs.Unref()) delete this; + } + }; + State* state_; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_PROMISE_SLEEP_H diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD index dce78cbcb2d..6e6e98f903a 100644 --- a/test/core/promise/BUILD +++ b/test/core/promise/BUILD @@ -328,3 +328,21 @@ grpc_cc_test( "//test/core/util:grpc_suppressions", ], ) + +grpc_cc_test( + name = "sleep_test", + srcs = ["sleep_test.cc"], + external_deps = [ + "gtest", + "absl/synchronization", + ], + language = "c++", + uses_polling = False, + deps = [ + "test_wakeup_schedulers", + "//:activity", + "//:grpc", + "//:sleep", + "//test/core/util:grpc_suppressions", + ], +) diff --git a/test/core/promise/sleep_test.cc b/test/core/promise/sleep_test.cc new file mode 100644 index 00000000000..21e8e82d54a --- /dev/null +++ b/test/core/promise/sleep_test.cc @@ -0,0 +1,86 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/promise/sleep.h" + +#include + +#include +#include + +#include "absl/synchronization/notification.h" + +#include + +#include "src/core/lib/promise/race.h" +#include "src/core/lib/promise/seq.h" +#include "test/core/promise/test_wakeup_schedulers.h" + +namespace grpc_core { +namespace { + +TEST(Sleep, Zzzz) { + ExecCtx exec_ctx; + absl::Notification done; + grpc_millis done_time = ExecCtx::Get()->Now() + 1000; + // Sleep for one second then set done to true. + auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }); + done.WaitForNotification(); + exec_ctx.InvalidateNow(); + EXPECT_GE(ExecCtx::Get()->Now(), done_time); +} + +TEST(Sleep, AlreadyDone) { + ExecCtx exec_ctx; + absl::Notification done; + grpc_millis done_time = ExecCtx::Get()->Now() - 1000; + // Sleep for no time at all then set done to true. + auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(), + [&done](absl::Status r) { + EXPECT_EQ(r, absl::OkStatus()); + done.Notify(); + }); + done.WaitForNotification(); +} + +TEST(Sleep, Cancel) { + ExecCtx exec_ctx; + absl::Notification done; + grpc_millis done_time = ExecCtx::Get()->Now() + 1000; + // Sleep for one second but race it to complete immediately + auto activity = MakeActivity( + Race(Sleep(done_time), [] { return absl::CancelledError(); }), + InlineWakeupScheduler(), [&done](absl::Status r) { + EXPECT_EQ(r, absl::CancelledError()); + done.Notify(); + }); + done.WaitForNotification(); + exec_ctx.InvalidateNow(); + EXPECT_LT(ExecCtx::Get()->Now(), done_time); +} + +} // namespace +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 47c1fa162eb..9c609863098 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6523,6 +6523,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "sleep_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,