Fix for a racy WorkSerializer shutdown (#28769)

* Fix for a racy WorkSerializer shutdown

* Reviewer comments

* Additional test

* Fix test compilation on cmake
reviewable/pr25586/r25^2
Yash Tibrewal 3 years ago committed by GitHub
parent b8d3a09092
commit 9ffd1a7b0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      src/core/lib/iomgr/work_serializer.cc
  2. 45
      test/core/iomgr/work_serializer_test.cc

@ -121,10 +121,9 @@ void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
uint64_t prev_ref_pair =
const uint64_t prev_ref_pair =
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
if (GetSize(prev_ref_pair) == 1) {
GPR_DEBUG_ASSERT(GetOwners(prev_ref_pair) == 0);
if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
}
@ -170,13 +169,19 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
return;
}
if (GetSize(prev_ref_pair) == 2) {
// Queue drained. Give up ownership but only if queue remains empty. Note
// that we are using relaxed memory order semantics for the load on
// failure since we don't care about that value.
// Queue drained. Give up ownership but only if queue remains empty.
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
std::memory_order_acq_rel)) {
// Queue is drained.
return;
}
if (GetSize(expected) == 0) {
// WorkSerializer got orphaned while this was running
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Queue Drained. Destroying");
}
delete this;
return;
}
}

@ -19,10 +19,13 @@
#include "src/core/lib/iomgr/work_serializer.h"
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "absl/memory/memory.h"
#include "absl/synchronization/barrier.h"
#include "absl/synchronization/notification.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -30,6 +33,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor.h"
#include "test/core/util/test_config.h"
namespace {
@ -190,6 +194,47 @@ TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
}
}
// Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
}
// Tests additional racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
for (int i = 0; i < 1000; ++i) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
absl::Notification notification;
std::thread t1([&]() {
notification.WaitForNotification();
lock.reset();
});
lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
t1.join();
}
}
// Tests racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
absl::Barrier barrier(51);
std::vector<std::thread> threads;
threads.reserve(50);
for (int i = 0; i < 50; ++i) {
threads.emplace_back([lock, &barrier]() mutable {
barrier.Block();
lock->Run([lock]() mutable { lock.reset(); }, DEBUG_LOCATION);
});
}
barrier.Block();
lock.reset();
for (auto& thread : threads) {
thread.join();
}
}
} // namespace
int main(int argc, char** argv) {

Loading…
Cancel
Save