Fix client idle filter (#27611)
* trial-change * compiles! * added test * Automated change: Fix sanity tests * update test * Fix slow idle detection * review feedback Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/27609/head
parent
da2e8ddc4e
commit
8d8d07139c
20 changed files with 427 additions and 190 deletions
@ -0,0 +1,96 @@ |
|||||||
|
// Copyright 2021 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_idle/idle_filter_state.h" |
||||||
|
|
||||||
|
#include <assert.h> |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
IdleFilterState::IdleFilterState(bool start_timer) |
||||||
|
: state_(start_timer ? kTimerStarted : 0) {} |
||||||
|
|
||||||
|
void IdleFilterState::IncreaseCallCount() { |
||||||
|
uintptr_t state = state_.load(std::memory_order_relaxed); |
||||||
|
uintptr_t new_state; |
||||||
|
do { |
||||||
|
// Increment the counter, and flag that there's been activity.
|
||||||
|
new_state = state; |
||||||
|
new_state |= kCallsStartedSinceLastTimerCheck; |
||||||
|
new_state += kCallIncrement; |
||||||
|
} while (!state_.compare_exchange_weak( |
||||||
|
state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); |
||||||
|
} |
||||||
|
|
||||||
|
bool IdleFilterState::DecreaseCallCount() { |
||||||
|
uintptr_t state = state_.load(std::memory_order_relaxed); |
||||||
|
uintptr_t new_state; |
||||||
|
bool start_timer; |
||||||
|
do { |
||||||
|
start_timer = false; |
||||||
|
new_state = state; |
||||||
|
// Decrement call count (and assert there's at least one call outstanding!)
|
||||||
|
assert(new_state >= kCallIncrement); |
||||||
|
new_state -= kCallIncrement; |
||||||
|
// If that decrement reaches a call count of zero and we have not started a
|
||||||
|
// timer
|
||||||
|
if ((new_state >> kCallsInProgressShift) == 0 && |
||||||
|
(new_state & kTimerStarted) == 0) { |
||||||
|
// Flag that we will start a timer, and mark it started so nobody else
|
||||||
|
// does.
|
||||||
|
start_timer = true; |
||||||
|
new_state |= kTimerStarted; |
||||||
|
new_state &= ~kCallsInProgressShift; |
||||||
|
} |
||||||
|
} while (!state_.compare_exchange_weak( |
||||||
|
state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); |
||||||
|
return start_timer; |
||||||
|
} |
||||||
|
|
||||||
|
bool IdleFilterState::CheckTimer() { |
||||||
|
uintptr_t state = state_.load(std::memory_order_relaxed); |
||||||
|
uintptr_t new_state; |
||||||
|
bool start_timer; |
||||||
|
do { |
||||||
|
if ((state >> kCallsInProgressShift) != 0) { |
||||||
|
// Still calls in progress: nothing needs updating, just return
|
||||||
|
// and keep the timer going!
|
||||||
|
return true; |
||||||
|
} |
||||||
|
new_state = state; |
||||||
|
bool is_active = false; |
||||||
|
if (new_state & kCallsStartedSinceLastTimerCheck) { |
||||||
|
// If any calls started since the last time we checked, then consider the
|
||||||
|
// channel still active and try again.
|
||||||
|
is_active = true; |
||||||
|
new_state &= ~kCallsStartedSinceLastTimerCheck; |
||||||
|
} |
||||||
|
if (is_active) { |
||||||
|
// If we are still active, we should signal that the timer should start
|
||||||
|
// again.
|
||||||
|
start_timer = true; |
||||||
|
} else { |
||||||
|
// Otherwise, we should not start the timer again, and we should signal
|
||||||
|
// that in the updated state.
|
||||||
|
start_timer = false; |
||||||
|
new_state &= ~kTimerStarted; |
||||||
|
} |
||||||
|
} while (!state_.compare_exchange_weak( |
||||||
|
state, new_state, std::memory_order_acq_rel, std::memory_order_relaxed)); |
||||||
|
return start_timer; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,66 @@ |
|||||||
|
// Copyright 2021 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// State machine for the idle filter.
|
||||||
|
// Keeps track of how many calls are in progress, whether there is a timer
|
||||||
|
// started, and whether we've seen calls since the previous timer fired.
|
||||||
|
class IdleFilterState { |
||||||
|
public: |
||||||
|
explicit IdleFilterState(bool start_timer); |
||||||
|
~IdleFilterState() = default; |
||||||
|
|
||||||
|
IdleFilterState(const IdleFilterState&); |
||||||
|
IdleFilterState& operator=(const IdleFilterState&); |
||||||
|
|
||||||
|
// Increment the number of calls in progress.
|
||||||
|
void IncreaseCallCount(); |
||||||
|
|
||||||
|
// Decrement the number of calls in progress.
|
||||||
|
// Return true if we reached idle with no timer started.
|
||||||
|
GRPC_MUST_USE_RESULT bool DecreaseCallCount(); |
||||||
|
|
||||||
|
// Check if there's been any activity since the last timer check.
|
||||||
|
// If there was, reset the activity flag and return true to indicated that
|
||||||
|
// a new timer should be started.
|
||||||
|
// If there was not, reset the timer flag and return false - in this case
|
||||||
|
// we know that the channel is idle and has been for one full cycle.
|
||||||
|
GRPC_MUST_USE_RESULT bool CheckTimer(); |
||||||
|
|
||||||
|
private: |
||||||
|
// Bit in state_ indicating that the timer has been started.
|
||||||
|
static constexpr uintptr_t kTimerStarted = 1; |
||||||
|
// Bit in state_ indicating that we've seen a call start or stop since the
|
||||||
|
// last timer.
|
||||||
|
static constexpr uintptr_t kCallsStartedSinceLastTimerCheck = 2; |
||||||
|
// How much should we shift to get the number of calls in progress.
|
||||||
|
static constexpr uintptr_t kCallsInProgressShift = 2; |
||||||
|
// How much to increment/decrement the state_ when a call is started/stopped.
|
||||||
|
// Ensures we don't clobber the preceding bits.
|
||||||
|
static constexpr uintptr_t kCallIncrement = uintptr_t(1) |
||||||
|
<< kCallsInProgressShift; |
||||||
|
std::atomic<uintptr_t> state_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_IDLE_IDLE_FILTER_STATE_H
|
@ -0,0 +1,33 @@ |
|||||||
|
# Copyright 2021 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
|
||||||
|
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") |
||||||
|
|
||||||
|
licenses(["notice"]) |
||||||
|
|
||||||
|
grpc_package(name = "test/core/client_idle") |
||||||
|
|
||||||
|
grpc_cc_test( |
||||||
|
name = "idle_filter_state_test", |
||||||
|
srcs = ["idle_filter_state_test.cc"], |
||||||
|
external_deps = [ |
||||||
|
"gtest", |
||||||
|
], |
||||||
|
language = "c++", |
||||||
|
uses_polling = False, |
||||||
|
deps = [ |
||||||
|
"//:idle_filter_state", |
||||||
|
"//test/core/util:grpc_suppressions", |
||||||
|
], |
||||||
|
) |
@ -0,0 +1,109 @@ |
|||||||
|
// Copyright 2021 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_idle/idle_filter_state.h" |
||||||
|
|
||||||
|
#include <stdio.h> |
||||||
|
|
||||||
|
#include <chrono> |
||||||
|
#include <random> |
||||||
|
#include <thread> |
||||||
|
|
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
namespace testing { |
||||||
|
|
||||||
|
TEST(IdleFilterStateTest, IdlenessStartsTimer) { |
||||||
|
IdleFilterState s(false); |
||||||
|
s.IncreaseCallCount(); |
||||||
|
// First idle should start the timer
|
||||||
|
EXPECT_TRUE(s.DecreaseCallCount()); |
||||||
|
for (int i = 0; i < 10; i++) { |
||||||
|
// Next idle should not!
|
||||||
|
s.IncreaseCallCount(); |
||||||
|
EXPECT_FALSE(s.DecreaseCallCount()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
TEST(IdleFilterStateTest, TimerStopsAfterIdle) { |
||||||
|
IdleFilterState s(true); |
||||||
|
EXPECT_FALSE(s.CheckTimer()); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(IdleFilterStateTest, TimerKeepsGoingWithActivity) { |
||||||
|
IdleFilterState s(true); |
||||||
|
for (int i = 0; i < 10; i++) { |
||||||
|
s.IncreaseCallCount(); |
||||||
|
(void)s.DecreaseCallCount(); |
||||||
|
EXPECT_TRUE(s.CheckTimer()); |
||||||
|
} |
||||||
|
EXPECT_FALSE(s.CheckTimer()); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(IdleFilterStateTest, StressTest) { |
||||||
|
IdleFilterState s(false); |
||||||
|
std::atomic<bool> done{false}; |
||||||
|
int idle_polls = 0; |
||||||
|
int thread_jumps = 0; |
||||||
|
std::vector<std::thread> threads; |
||||||
|
for (int idx = 0; idx < 100; idx++) { |
||||||
|
std::thread t([&] { |
||||||
|
int ctr = 0; |
||||||
|
auto increase = [&] { |
||||||
|
s.IncreaseCallCount(); |
||||||
|
ctr++; |
||||||
|
}; |
||||||
|
auto decrease = [&] { |
||||||
|
ctr--; |
||||||
|
if (s.DecreaseCallCount()) { |
||||||
|
thread_jumps++; |
||||||
|
if (thread_jumps == 10) done.store(true, std::memory_order_relaxed); |
||||||
|
EXPECT_EQ(ctr, 0); |
||||||
|
do { |
||||||
|
idle_polls++; |
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
||||||
|
} while (s.CheckTimer()); |
||||||
|
} |
||||||
|
}; |
||||||
|
std::mt19937 g{std::random_device()()}; |
||||||
|
while (!done.load(std::memory_order_relaxed)) { |
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500)); |
||||||
|
for (int i = 0; i < 100; i++) { |
||||||
|
if (g() & 1) { |
||||||
|
increase(); |
||||||
|
} else if (ctr > 0) { |
||||||
|
decrease(); |
||||||
|
} |
||||||
|
} |
||||||
|
while (ctr > 0) { |
||||||
|
decrease(); |
||||||
|
} |
||||||
|
} |
||||||
|
while (ctr > 0) { |
||||||
|
decrease(); |
||||||
|
} |
||||||
|
}); |
||||||
|
threads.emplace_back(std::move(t)); |
||||||
|
} |
||||||
|
for (auto& thread : threads) thread.join(); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace testing
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
return RUN_ALL_TESTS(); |
||||||
|
} |
Loading…
Reference in new issue