[resource_quota] Periodic update tracker (#29853)

* [resource_quota] Periodic update tracker

Periodic update type that tries to do updates at some measurable
timescale without polling the current time every tick, instead trying to
approximate how many ticks will be required to fill the desired
duration.

The intent here is that we use this in RQ to donate memory back
periodically whilst keeping timer checks off the fast path.

* fix

* Automated change: Fix sanity tests

* review feedback

* fix

* fix

* review feedback

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/29950/head
Craig Tiller 3 years ago committed by GitHub
parent dd07751c03
commit 720f9d65c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      BUILD
  2. 52
      CMakeLists.txt
  3. 46
      build_autogenerated.yaml
  4. 72
      src/core/lib/resource_quota/periodic_update.cc
  5. 70
      src/core/lib/resource_quota/periodic_update.h
  6. 15
      test/core/resource_quota/BUILD
  7. 111
      test/core/resource_quota/periodic_update_test.cc
  8. 24
      tools/run_tests/generated/tests.json

17
BUILD

@ -1890,6 +1890,23 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "periodic_update",
srcs = [
"src/core/lib/resource_quota/periodic_update.cc",
],
hdrs = [
"src/core/lib/resource_quota/periodic_update.h",
],
tags = ["grpc-autodeps"],
deps = [
"exec_ctx",
"gpr_platform",
"time",
"useful",
],
)
grpc_cc_library(
name = "arena",
srcs = [

52
CMakeLists.txt generated

@ -1092,6 +1092,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx out_of_bounds_bad_client_test)
add_dependencies(buildtests_cxx overload_test)
add_dependencies(buildtests_cxx parsed_metadata_test)
add_dependencies(buildtests_cxx periodic_update_test)
add_dependencies(buildtests_cxx pid_controller_test)
add_dependencies(buildtests_cxx pipe_test)
add_dependencies(buildtests_cxx poll_test)
@ -13751,6 +13752,57 @@ target_link_libraries(parsed_metadata_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(periodic_update_test
src/core/ext/upb-generated/google/protobuf/any.upb.c
src/core/ext/upb-generated/google/rpc/status.upb.c
src/core/lib/debug/trace.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/resource_quota/periodic_update.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_refcount.cc
src/core/lib/slice/slice_string_helpers.cc
test/core/resource_quota/periodic_update_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(periodic_update_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(periodic_update_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
upb
)
endif()
if(gRPC_BUILD_TESTS)

@ -7171,6 +7171,52 @@ targets:
- test/core/transport/parsed_metadata_test.cc
deps:
- grpc_test_util
- name: periodic_update_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/upb-generated/google/protobuf/any.upb.h
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/debug/trace.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/resource_quota/periodic_update.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_refcount_base.h
- src/core/lib/slice/slice_string_helpers.h
src:
- src/core/ext/upb-generated/google/protobuf/any.upb.c
- src/core/ext/upb-generated/google/rpc/status.upb.c
- src/core/lib/debug/trace.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/resource_quota/periodic_update.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_refcount.cc
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- gpr
- upb
uses_polling: false
- name: pid_controller_test
gtest: true
build: test

@ -0,0 +1,72 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/resource_quota/periodic_update.h"
#include <atomic>
#include "src/core/lib/gpr/useful.h"
namespace grpc_core {
bool PeriodicUpdate::MaybeEndPeriod() {
// updates_remaining_ just reached 0 and the thread calling this function was
// the decrementer that got us there.
// We can now safely mutate any non-atomic mutable variables (we've got a
// guarantee that no other thread will), and by the time this function returns
// we must store a postive number into updates_remaining_.
auto now = ExecCtx::Get()->Now();
Duration time_so_far = now - period_start_;
if (time_so_far < period_) {
// At most double the number of updates remaining until the next period.
// At least try to estimate when we'll reach it.
int64_t better_guess;
if (time_so_far.millis() == 0) {
better_guess = expected_updates_per_period_ * 2;
} else {
// Determine a scaling factor that would have gotten us to the next
// period, but clamp between 1.01 (at least 1% increase in guesses)
// and 2.0 (at most doubling) - to avoid running completely out of
// control.
const double scale =
Clamp(period_.seconds() / time_so_far.seconds(), 1.01, 2.0);
better_guess = expected_updates_per_period_ * scale;
if (better_guess <= expected_updates_per_period_) {
better_guess = expected_updates_per_period_ + 1;
}
}
// Store the remainder left. Note that updates_remaining_ may have been
// decremented by another thread whilst we performed the above calculations:
// we simply discard those decrements.
updates_remaining_.store(better_guess - expected_updates_per_period_,
std::memory_order_release);
// Not quite done, return false, try for longer.
return false;
}
// Finished period, start a new one and return true.
// We try to predict how many update periods we'd need to cover the full time
// span, and we increase that by 1% to attempt to tend to not enter the above
// stanza.
expected_updates_per_period_ =
period_.seconds() * expected_updates_per_period_ / time_so_far.seconds();
if (expected_updates_per_period_ < 1) expected_updates_per_period_ = 1;
period_start_ = now;
updates_remaining_.store(expected_updates_per_period_,
std::memory_order_release);
return true;
}
} // namespace grpc_core

@ -0,0 +1,70 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H
#define GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <atomic>
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
// Lightweight timer-like mechanism for periodic updates.
// Fast path only decrements an atomic int64.
// Slow path runs corrections and estimates how many ticks are required to hit
// the target period.
// This is super inaccurate of course, but for places where we can't run timers,
// or places where continuous registration/unregistration would cause problems
// it can be quite useful.
class PeriodicUpdate {
public:
explicit PeriodicUpdate(Duration period) : period_(period) {}
// Tick the update, return true if we think the period expired.
GRPC_MUST_USE_RESULT bool Tick() {
// Atomically decrement the remaining ticks counter.
// If we hit 0 our estimate of period length has expired.
// See the comment next to the data members for a description of thread
// safety.
if (updates_remaining_.fetch_sub(1, std::memory_order_acquire) == 1) {
return MaybeEndPeriod();
}
return false;
}
private:
GRPC_MUST_USE_RESULT bool MaybeEndPeriod();
// Thread safety:
// When updates_remaining_ reaches 0 the thread that decremented becomes
// responsible for updating any mutable variables and then setting
// updates_remaining_ to a value greater than zero.
// Whilst in this state other threads *may* decrement updates_remaining_, but
// this is fine because they'll observe an ignorable negative value.
const Duration period_;
Timestamp period_start_ = ExecCtx::Get()->Now();
int64_t expected_updates_per_period_ = 1;
std::atomic<int64_t> updates_remaining_{1};
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_RESOURCE_QUOTA_PERIODIC_UPDATE_H

@ -43,6 +43,21 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "periodic_update_test",
srcs = ["periodic_update_test.cc"],
external_deps = [
"gtest",
],
language = "c++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:periodic_update",
"//test/core/util:grpc_suppressions",
],
)
grpc_cc_test(
name = "thread_quota_test",
srcs = ["thread_quota_test.cc"],

@ -0,0 +1,111 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/resource_quota/periodic_update.h"
#include <thread>
#include <gtest/gtest.h>
#include "absl/synchronization/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
namespace testing {
TEST(PeriodicUpdateTest, SimpleTest) {
std::unique_ptr<PeriodicUpdate> upd;
Timestamp start;
// Create a periodic update that updates every second.
{
ExecCtx exec_ctx;
upd = absl::make_unique<PeriodicUpdate>(Duration::Seconds(1));
start = exec_ctx.Now();
}
// Wait until the first period has elapsed.
while (true) {
ExecCtx exec_ctx;
if (upd->Tick()) break;
}
// Ensure that took at least 1 second.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1));
start = exec_ctx.Now();
}
// Do ten more update cycles
for (int i = 0; i < 10; i++) {
while (true) {
ExecCtx exec_ctx;
if (upd->Tick()) break;
}
// Ensure the time taken was between 1 and 1.5 seconds - we make a little
// allowance for the presumed inaccuracy of this type.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(1));
EXPECT_LE(exec_ctx.Now() - start, Duration::Milliseconds(1500));
start = exec_ctx.Now();
}
}
}
TEST(PeriodicUpdate, ThreadTest) {
std::unique_ptr<PeriodicUpdate> upd;
std::atomic<int> count(0);
Timestamp start;
// Create a periodic update that updates every second.
{
ExecCtx exec_ctx;
upd = absl::make_unique<PeriodicUpdate>(Duration::Seconds(1));
start = exec_ctx.Now();
}
// Run ten threads all updating the counter continuously, for a total of ten
// update cycles.
// This allows TSAN to catch threading issues.
std::vector<std::thread> threads;
for (size_t i = 0; i < 10; i++) {
threads.push_back(std::thread([&]() {
while (count.load() < 10) {
ExecCtx exec_ctx;
if (upd->Tick()) count.fetch_add(1);
}
}));
}
// Finish all threads.
for (auto& th : threads) {
th.join();
}
// Ensure our ten cycles took at least 10 seconds, and no more than 15.
{
ExecCtx exec_ctx;
EXPECT_GE(exec_ctx.Now() - start, Duration::Seconds(10));
EXPECT_LE(exec_ctx.Now() - start, Duration::Seconds(15));
}
}
} // namespace testing
} // namespace grpc_core
// Hook needed to run ExecCtx outside of iomgr.
void grpc_set_default_iomgr_platform() {}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
gpr_log_verbosity_init();
gpr_time_init();
return RUN_ALL_TESTS();
}

@ -5619,6 +5619,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "periodic_update_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save