From 1e818c98bb5d429cc9f2618e3691b4c6f33338de Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 21 Aug 2023 12:38:26 -0700 Subject: [PATCH] [client_channel] ensure that subchannels are always destroyed inside the WorkSerializer (#34077) - add debug-only `WorkSerializer::IsRunningInWorkSerializer()` method and use it in client_channel to verify that subchannels are destroyed in the `WorkSerializer` - note: this mechanism uses `std::thread::id`, so I had to exclude work_serializer.cc from the core_banned_constructs check - fix `WorkSerializer::Run()` to unref the callback before releasing ownership of the `WorkSerializer`, so that any refs captured by the `std::function<>` will be released before releasing ownership - fix the WRR timer callback to hop into the `WorkSerializer` to release its ref to the picker, since that transitively releases refs to subchannels - fix subchannel connectivity state notifications to unref the watcher inside the `WorkSerializer`, since the watcher often transitively holds refs to subchannels --- .../filters/client_channel/client_channel.cc | 9 +++-- .../lb_policy/health_check_client.cc | 6 ++- .../lb_policy/oob_backend_metric.cc | 6 ++- .../weighted_round_robin.cc | 8 ++-- .../ext/filters/client_channel/subchannel.cc | 12 ++++-- .../ext/filters/client_channel/subchannel.h | 10 ++++- src/core/lib/gprpp/work_serializer.cc | 36 ++++++++++++++++++ src/core/lib/gprpp/work_serializer.h | 5 +++ test/core/gprpp/work_serializer_test.cc | 37 +++++++++++++++++++ .../run_tests/sanity/cpp_banned_constructs.sh | 4 +- 10 files changed, 115 insertions(+), 18 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 1bab8f80034..02b205a633c 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -513,6 +513,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { ++it->second; } } + GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer()); chand_->subchannel_wrappers_.insert(this); } @@ -522,6 +523,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { "chand=%p: destroying subchannel wrapper %p for subchannel %p", chand_, this, subchannel_.get()); } + GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer()); chand_->subchannel_wrappers_.erase(this); if (chand_->channelz_node_ != nullptr) { auto* subchannel_node = subchannel_->channelz_node(); @@ -615,15 +617,16 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { DEBUG_LOCATION); } - void OnConnectivityStateChange(grpc_connectivity_state state, - const absl::Status& status) override { + void OnConnectivityStateChange( + RefCountedPtr self, + grpc_connectivity_state state, const absl::Status& status) override { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { gpr_log(GPR_INFO, "chand=%p: connectivity change for subchannel wrapper %p " "subchannel %p; hopping into work_serializer", parent_->chand_, parent_.get(), parent_->subchannel_.get()); } - Ref().release(); // ref owned by lambda + self.release(); // Held by callback. parent_->chand_->work_serializer_->Run( [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( *parent_->chand_->work_serializer_) { diff --git a/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc b/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc index 859fc4b78bd..d434f1b5287 100644 --- a/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc +++ b/src/core/ext/filters/client_channel/lb_policy/health_check_client.cc @@ -311,9 +311,11 @@ class HealthProducer::ConnectivityWatcher explicit ConnectivityWatcher(WeakRefCountedPtr producer) : producer_(std::move(producer)) {} - void OnConnectivityStateChange(grpc_connectivity_state state, - const absl::Status& status) override { + void OnConnectivityStateChange( + RefCountedPtr self, + grpc_connectivity_state state, const absl::Status& status) override { producer_->OnConnectivityStateChange(state, status); + self.reset(); } grpc_pollset_set* interested_parties() override { diff --git a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc index 3ed55f2e5e1..c0f4c18dc2b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc +++ b/src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.cc @@ -77,9 +77,11 @@ class OrcaProducer::ConnectivityWatcher grpc_pollset_set_destroy(interested_parties_); } - void OnConnectivityStateChange(grpc_connectivity_state state, - const absl::Status&) override { + void OnConnectivityStateChange( + RefCountedPtr self, + grpc_connectivity_state state, const absl::Status&) override { producer_->OnConnectivityStateChange(state); + self.reset(); } grpc_pollset_set* interested_parties() override { diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 3c64a6890ae..4d3a47c1d5b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -608,7 +608,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { // Start timer. WeakRefCountedPtr self = WeakRef(); timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( - config_->weight_update_period(), [self = std::move(self)]() mutable { + config_->weight_update_period(), + [self = std::move(self), + work_serializer = wrr_->work_serializer()]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; { @@ -621,8 +623,8 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { self->BuildSchedulerAndStartTimerLocked(); } } - // Release ref before ExecCtx goes out of scope. - self.reset(); + // Release the picker ref inside the WorkSerializer. + work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION); }); } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 85da64014fa..80b5c7606a0 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -367,8 +367,10 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked( grpc_connectivity_state state, const absl::Status& status) { for (const auto& p : watchers_) { subchannel_->work_serializer_.Schedule( - [watcher = p.second->Ref(), state, status]() { - watcher->OnConnectivityStateChange(state, status); + [watcher = p.second->Ref(), state, status]() mutable { + auto* watcher_ptr = watcher.get(); + watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, + status); }, DEBUG_LOCATION); } @@ -527,8 +529,10 @@ void Subchannel::WatchConnectivityState( grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); } work_serializer_.Schedule( - [watcher = watcher->Ref(), state = state_, status = status_]() { - watcher->OnConnectivityStateChange(state, status); + [watcher = watcher->Ref(), state = state_, status = status_]() mutable { + auto* watcher_ptr = watcher.get(); + watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, + status); }, DEBUG_LOCATION); watcher_list_.AddWatcherLocked(std::move(watcher)); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e66d80fd139..4d46415d40a 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -173,8 +173,14 @@ class Subchannel : public DualRefCounted { // Invoked whenever the subchannel's connectivity state changes. // There will be only one invocation of this method on a given watcher // instance at any given time. - virtual void OnConnectivityStateChange(grpc_connectivity_state state, - const absl::Status& status) = 0; + // A ref to the watcher is passed in here so that the implementation + // can unref it in the appropriate synchronization context (e.g., + // inside a WorkSerializer). + // TODO(roth): Figure out a cleaner way to guarantee that the ref is + // released in the right context. + virtual void OnConnectivityStateChange( + RefCountedPtr self, + grpc_connectivity_state state, const absl::Status& status) = 0; virtual grpc_pollset_set* interested_parties() = 0; }; diff --git a/src/core/lib/gprpp/work_serializer.cc b/src/core/lib/gprpp/work_serializer.cc index 56fa6cce427..fdade0ebba7 100644 --- a/src/core/lib/gprpp/work_serializer.cc +++ b/src/core/lib/gprpp/work_serializer.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -47,6 +48,12 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable { void DrainQueue(); void Orphan() override; +#ifndef NDEBUG + bool RunningInWorkSerializer() const { + return std::this_thread::get_id() == current_thread_; + } +#endif + private: struct CallbackWrapper { CallbackWrapper(std::function cb, const DebugLocation& loc) @@ -86,6 +93,9 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable { // orphaned. std::atomic refs_{MakeRefPair(0, 1)}; MultiProducerSingleConsumerQueue queue_; +#ifndef NDEBUG + std::thread::id current_thread_; +#endif }; void WorkSerializer::WorkSerializerImpl::Run(std::function callback, @@ -102,10 +112,17 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function callback, GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0); if (GetOwners(prev_ref_pair) == 0) { // We took ownership of the WorkSerializer. Invoke callback and drain queue. +#ifndef NDEBUG + current_thread_ = std::this_thread::get_id(); +#endif if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Executing immediately"); } callback(); + // Delete the callback while still holding the WorkSerializer, so + // that any refs being held by the callback via lambda captures will + // be destroyed inside the WorkSerializer. + callback = nullptr; DrainQueueOwned(); } else { // Another thread is holding the WorkSerializer, so decrement the ownership @@ -158,6 +175,9 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() { const uint64_t prev_ref_pair = refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel); if (GetOwners(prev_ref_pair) == 0) { +#ifndef NDEBUG + current_thread_ = std::this_thread::get_id(); +#endif // We took ownership of the WorkSerializer. Drain the queue. DrainQueueOwned(); } else { @@ -186,6 +206,12 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() { } if (GetSize(prev_ref_pair) == 2) { // Queue drained. Give up ownership but only if queue remains empty. +#ifndef NDEBUG + // Reset current_thread_ before giving up ownership to avoid TSAN + // race. If we don't wind up giving up ownership, we'll set this + // again below before we pull the next callback out of the queue. + current_thread_ = std::thread::id(); +#endif uint64_t expected = MakeRefPair(1, 1); if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1), std::memory_order_acq_rel)) { @@ -200,6 +226,10 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() { delete this; return; } +#ifndef NDEBUG + // Didn't wind up giving up ownership, so set current_thread_ again. + current_thread_ = std::this_thread::get_id(); +#endif } // There is at least one callback on the queue. Pop the callback from the // queue and execute it. @@ -244,4 +274,10 @@ void WorkSerializer::Schedule(std::function callback, void WorkSerializer::DrainQueue() { impl_->DrainQueue(); } +#ifndef NDEBUG +bool WorkSerializer::RunningInWorkSerializer() const { + return impl_->RunningInWorkSerializer(); +} +#endif + } // namespace grpc_core diff --git a/src/core/lib/gprpp/work_serializer.h b/src/core/lib/gprpp/work_serializer.h index 3b61f0cf556..271de349c44 100644 --- a/src/core/lib/gprpp/work_serializer.h +++ b/src/core/lib/gprpp/work_serializer.h @@ -75,6 +75,11 @@ class ABSL_LOCKABLE WorkSerializer { // Drains the queue of callbacks. void DrainQueue(); +#ifndef NDEBUG + // Returns true if the current thread is running in the WorkSerializer. + bool RunningInWorkSerializer() const; +#endif + private: class WorkSerializerImpl; diff --git a/test/core/gprpp/work_serializer_test.cc b/test/core/gprpp/work_serializer_test.cc index b73b725e788..e05ad9d8976 100644 --- a/test/core/gprpp/work_serializer_test.cc +++ b/test/core/gprpp/work_serializer_test.cc @@ -235,6 +235,43 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) { } } +#ifndef NDEBUG +TEST(WorkSerializerTest, RunningInWorkSerializer) { + grpc_core::WorkSerializer work_serializer1; + grpc_core::WorkSerializer work_serializer2; + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer1.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer2.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); + }, + DEBUG_LOCATION); + }, + DEBUG_LOCATION); + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer2.Run( + [&]() { + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); + work_serializer1.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); + }, + DEBUG_LOCATION); + }, + DEBUG_LOCATION); + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); +} +#endif + } // namespace int main(int argc, char** argv) { diff --git a/tools/run_tests/sanity/cpp_banned_constructs.sh b/tools/run_tests/sanity/cpp_banned_constructs.sh index a9bbcc74ccc..527d1b0b65e 100755 --- a/tools/run_tests/sanity/cpp_banned_constructs.sh +++ b/tools/run_tests/sanity/cpp_banned_constructs.sh @@ -26,7 +26,7 @@ cd "$(dirname "$0")/../../.." grep -EIrn \ 'std::(mutex|condition_variable|lock_guard|unique_lock|thread)' \ include/grpc include/grpcpp src/core src/cpp | \ - grep -Ev include/grpcpp/impl/sync.h | \ + grep -Ev 'include/grpcpp/impl/sync.h|src/core/lib/gprpp/work_serializer.cc' | \ diff - /dev/null # @@ -36,7 +36,7 @@ grep -EIrn \ grep -EIrn \ '^#include (||||||)' \ include/grpc include/grpcpp src/core src/cpp | \ - grep -Ev include/grpcpp/impl/sync.h | \ + grep -Ev 'include/grpcpp/impl/sync.h|src/core/lib/gprpp/work_serializer.cc' | \ diff - /dev/null #