diff --git a/BUILD b/BUILD index 5d7e5bee491..e25db3b0fe4 100644 --- a/BUILD +++ b/BUILD @@ -1890,6 +1890,23 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "periodic_update", + srcs = [ + "src/core/lib/resource_quota/periodic_update.cc", + ], + hdrs = [ + "src/core/lib/resource_quota/periodic_update.h", + ], + tags = ["grpc-autodeps"], + deps = [ + "exec_ctx", + "gpr_platform", + "time", + "useful", + ], +) + grpc_cc_library( name = "arena", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 359845e3e08..c2cf5e5be6b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1092,6 +1092,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx out_of_bounds_bad_client_test) add_dependencies(buildtests_cxx overload_test) add_dependencies(buildtests_cxx parsed_metadata_test) + add_dependencies(buildtests_cxx periodic_update_test) add_dependencies(buildtests_cxx pid_controller_test) add_dependencies(buildtests_cxx pipe_test) add_dependencies(buildtests_cxx poll_test) @@ -13751,6 +13752,57 @@ target_link_libraries(parsed_metadata_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(periodic_update_test + src/core/ext/upb-generated/google/protobuf/any.upb.c + src/core/ext/upb-generated/google/rpc/status.upb.c + src/core/lib/debug/trace.cc + src/core/lib/gprpp/status_helper.cc + src/core/lib/gprpp/time.cc + src/core/lib/iomgr/combiner.cc + src/core/lib/iomgr/error.cc + src/core/lib/iomgr/exec_ctx.cc + src/core/lib/iomgr/executor.cc + src/core/lib/iomgr/iomgr_internal.cc + src/core/lib/resource_quota/periodic_update.cc + src/core/lib/slice/percent_encoding.cc + src/core/lib/slice/slice.cc + src/core/lib/slice/slice_refcount.cc + src/core/lib/slice/slice_string_helpers.cc + test/core/resource_quota/periodic_update_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(periodic_update_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(periodic_update_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + gpr + upb +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 6395befe385..63d9629a150 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7171,6 +7171,52 @@ targets: - test/core/transport/parsed_metadata_test.cc deps: - grpc_test_util +- name: periodic_update_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/upb-generated/google/protobuf/any.upb.h + - src/core/ext/upb-generated/google/rpc/status.upb.h + - src/core/lib/debug/trace.h + - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/status_helper.h + - src/core/lib/gprpp/time.h + - src/core/lib/iomgr/closure.h + - src/core/lib/iomgr/combiner.h + - src/core/lib/iomgr/error.h + - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/exec_ctx.h + - src/core/lib/iomgr/executor.h + - src/core/lib/iomgr/iomgr_internal.h + - src/core/lib/resource_quota/periodic_update.h + - src/core/lib/slice/percent_encoding.h + - src/core/lib/slice/slice.h + - src/core/lib/slice/slice_internal.h + - src/core/lib/slice/slice_refcount.h + - src/core/lib/slice/slice_refcount_base.h + - src/core/lib/slice/slice_string_helpers.h + src: + - src/core/ext/upb-generated/google/protobuf/any.upb.c + - src/core/ext/upb-generated/google/rpc/status.upb.c + - src/core/lib/debug/trace.cc + - src/core/lib/gprpp/status_helper.cc + - src/core/lib/gprpp/time.cc + - src/core/lib/iomgr/combiner.cc + - src/core/lib/iomgr/error.cc + - src/core/lib/iomgr/exec_ctx.cc + - src/core/lib/iomgr/executor.cc + - src/core/lib/iomgr/iomgr_internal.cc + - src/core/lib/resource_quota/periodic_update.cc + - src/core/lib/slice/percent_encoding.cc + - src/core/lib/slice/slice.cc + - src/core/lib/slice/slice_refcount.cc + - src/core/lib/slice/slice_string_helpers.cc + - test/core/resource_quota/periodic_update_test.cc + deps: + - gpr + - upb + uses_polling: false - name: pid_controller_test gtest: true build: test diff --git a/src/core/lib/resource_quota/periodic_update.cc b/src/core/lib/resource_quota/periodic_update.cc new file mode 100644 index 00000000000..deee08842fa --- /dev/null +++ b/src/core/lib/resource_quota/periodic_update.cc @@ -0,0 +1,72 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/resource_quota/periodic_update.h" + +#include + +#include "src/core/lib/gpr/useful.h" + +namespace grpc_core { + +bool PeriodicUpdate::MaybeEndPeriod() { + // updates_remaining_ just reached 0 and the thread calling this function was + // the decrementer that got us there. + // We can now safely mutate any non-atomic mutable variables (we've got a + // guarantee that no other thread will), and by the time this function returns + // we must store a postive number into updates_remaining_. + auto now = ExecCtx::Get()->Now(); + Duration time_so_far = now - period_start_; + if (time_so_far < period_) { + // At most double the number of updates remaining until the next period. + // At least try to estimate when we'll reach it. + int64_t better_guess; + if (time_so_far.millis() == 0) { + better_guess = expected_updates_per_period_ * 2; + } else { + // Determine a scaling factor that would have gotten us to the next + // period, but clamp between 1.01 (at least 1% increase in guesses) + // and 2.0 (at most doubling) - to avoid running completely out of + // control. + const double scale = + Clamp(period_.seconds() / time_so_far.seconds(), 1.01, 2.0); + better_guess = expected_updates_per_period_ * scale; + if (better_guess <= expected_updates_per_period_) { + better_guess = expected_updates_per_period_ + 1; + } + } + // Store the remainder left. Note that updates_remaining_ may have been + // decremented by another thread whilst we performed the above calculations: + // we simply discard those decrements. + updates_remaining_.store(better_guess - expected_updates_per_period_, + std::memory_order_release); + // Not quite done, return false, try for longer. + return false; + } + // Finished period, start a new one and return true. + // We try to predict how many update periods we'd need to cover the full time + // span, and we increase that by 1% to attempt to tend to not enter the above + // stanza. + expected_updates_per_period_ = + period_.seconds() * expected_updates_per_period_ / time_so_far.seconds(); + if (expected_updates_per_period_ < 1) expected_updates_per_period_ = 1; + period_start_ = now; + updates_remaining_.store(expected_updates_per_period_, + std::memory_order_release); + return true; +} + +} // namespace grpc_core diff --git a/src/core/lib/resource_quota/periodic_update.h b/src/core/lib/resource_quota/periodic_update.h new file mode 100644 index 00000000000..447164158ec --- /dev/null +++ b/src/core/lib/resource_quota/periodic_update.h @@ -0,0 +1,70 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H +#define GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H + +#include + +#include + +#include + +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +namespace grpc_core { + +// Lightweight timer-like mechanism for periodic updates. +// Fast path only decrements an atomic int64. +// Slow path runs corrections and estimates how many ticks are required to hit +// the target period. +// This is super inaccurate of course, but for places where we can't run timers, +// or places where continuous registration/unregistration would cause problems +// it can be quite useful. +class PeriodicUpdate { + public: + explicit PeriodicUpdate(Duration period) : period_(period) {} + + // Tick the update, return true if we think the period expired. + GRPC_MUST_USE_RESULT bool Tick() { + // Atomically decrement the remaining ticks counter. + // If we hit 0 our estimate of period length has expired. + // See the comment next to the data members for a description of thread + // safety. + if (updates_remaining_.fetch_sub(1, std::memory_order_acquire) == 1) { + return MaybeEndPeriod(); + } + return false; + } + + private: + GRPC_MUST_USE_RESULT bool MaybeEndPeriod(); + + // Thread safety: + // When updates_remaining_ reaches 0 the thread that decremented becomes + // responsible for updating any mutable variables and then setting + // updates_remaining_ to a value greater than zero. + // Whilst in this state other threads *may* decrement updates_remaining_, but + // this is fine because they'll observe an ignorable negative value. + + const Duration period_; + Timestamp period_start_ = ExecCtx::Get()->Now(); + int64_t expected_updates_per_period_ = 1; + std::atomic updates_remaining_{1}; +}; + +} // namespace grpc_core + +#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H diff --git a/test/core/resource_quota/BUILD b/test/core/resource_quota/BUILD index 183f358c231..f0c5de22011 100644 --- a/test/core/resource_quota/BUILD +++ b/test/core/resource_quota/BUILD @@ -43,6 +43,21 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "periodic_update_test", + srcs = ["periodic_update_test.cc"], + external_deps = [ + "gtest", + ], + language = "c++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:periodic_update", + "//test/core/util:grpc_suppressions", + ], +) + grpc_cc_test( name = "thread_quota_test", srcs = ["thread_quota_test.cc"], diff --git a/test/core/resource_quota/periodic_update_test.cc b/test/core/resource_quota/periodic_update_test.cc new file mode 100644 index 00000000000..35bb89e921f --- /dev/null +++ b/test/core/resource_quota/periodic_update_test.cc @@ -0,0 +1,111 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/resource_quota/periodic_update.h" + +#include + +#include + +#include "absl/synchronization/notification.h" + +#include "src/core/lib/iomgr/exec_ctx.h" + +namespace grpc_core { +namespace testing { + +TEST(PeriodicUpdateTest, SimpleTest) { + std::unique_ptr upd; + Timestamp start; + // Create a periodic update that updates every second. + { + ExecCtx exec_ctx; + upd = absl::make_unique(Duration::Seconds(1)); + start = exec_ctx.Now(); + } + // Wait until the first period has elapsed. + while (true) { + ExecCtx exec_ctx; + if (upd->Tick()) break; + } + // Ensure that took at least 1 second. + { + ExecCtx exec_ctx; + EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1)); + start = exec_ctx.Now(); + } + // Do ten more update cycles + for (int i = 0; i < 10; i++) { + while (true) { + ExecCtx exec_ctx; + if (upd->Tick()) break; + } + // Ensure the time taken was between 1 and 1.5 seconds - we make a little + // allowance for the presumed inaccuracy of this type. + { + ExecCtx exec_ctx; + EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1)); + EXPECT_LE(exec_ctx.Now() - start, Duration::Milliseconds(1500)); + start = exec_ctx.Now(); + } + } +} + +TEST(PeriodicUpdate, ThreadTest) { + std::unique_ptr upd; + std::atomic count(0); + Timestamp start; + // Create a periodic update that updates every second. + { + ExecCtx exec_ctx; + upd = absl::make_unique(Duration::Seconds(1)); + start = exec_ctx.Now(); + } + // Run ten threads all updating the counter continuously, for a total of ten + // update cycles. + // This allows TSAN to catch threading issues. + std::vector threads; + for (size_t i = 0; i < 10; i++) { + threads.push_back(std::thread([&]() { + while (count.load() < 10) { + ExecCtx exec_ctx; + if (upd->Tick()) count.fetch_add(1); + } + })); + } + + // Finish all threads. + for (auto& th : threads) { + th.join(); + } + // Ensure our ten cycles took at least 10 seconds, and no more than 15. + { + ExecCtx exec_ctx; + EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(10)); + EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(15)); + } +} + +} // namespace testing +} // namespace grpc_core + +// Hook needed to run ExecCtx outside of iomgr. +void grpc_set_default_iomgr_platform() {} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + gpr_log_verbosity_init(); + gpr_time_init(); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 4e25b990c2a..0309447b4c6 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5619,6 +5619,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "periodic_update_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,