|
|
|
@ -51,28 +51,37 @@ using grpc_event_engine::experimental::WaitForSingleOwner; |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
namespace { |
|
|
|
|
TEST(WorkSerializerTest, NoOp) { WorkSerializer lock(GetDefaultEventEngine()); } |
|
|
|
|
TEST(WorkSerializerTest, NoOp) { |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, ExecuteOneRun) { |
|
|
|
|
WorkSerializer lock(GetDefaultEventEngine()); |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
gpr_event done; |
|
|
|
|
gpr_event_init(&done); |
|
|
|
|
lock.Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
lock->Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != |
|
|
|
|
nullptr); |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) { |
|
|
|
|
WorkSerializer lock(GetDefaultEventEngine()); |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
gpr_event done; |
|
|
|
|
gpr_event_init(&done); |
|
|
|
|
lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
lock->Schedule( |
|
|
|
|
[&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
EXPECT_EQ(gpr_event_get(&done), nullptr); |
|
|
|
|
lock.DrainQueue(); |
|
|
|
|
lock->DrainQueue(); |
|
|
|
|
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != |
|
|
|
|
nullptr); |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class TestThread { |
|
|
|
@ -125,13 +134,15 @@ class TestThread { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, ExecuteMany) { |
|
|
|
|
WorkSerializer lock(GetDefaultEventEngine()); |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
{ |
|
|
|
|
std::vector<std::unique_ptr<TestThread>> threads; |
|
|
|
|
for (size_t i = 0; i < 10; ++i) { |
|
|
|
|
threads.push_back(std::make_unique<TestThread>(&lock)); |
|
|
|
|
threads.push_back(std::make_unique<TestThread>(lock.get())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class TestThreadScheduleAndDrain { |
|
|
|
@ -186,26 +197,31 @@ class TestThreadScheduleAndDrain { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) { |
|
|
|
|
WorkSerializer lock(GetDefaultEventEngine()); |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
{ |
|
|
|
|
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads; |
|
|
|
|
for (size_t i = 0; i < 10; ++i) { |
|
|
|
|
threads.push_back(std::make_unique<TestThreadScheduleAndDrain>(&lock)); |
|
|
|
|
threads.push_back( |
|
|
|
|
std::make_unique<TestThreadScheduleAndDrain>(lock.get())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) { |
|
|
|
|
WorkSerializer lock(GetDefaultEventEngine()); |
|
|
|
|
auto lock = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
{ |
|
|
|
|
std::vector<std::unique_ptr<TestThread>> run_threads; |
|
|
|
|
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads; |
|
|
|
|
for (size_t i = 0; i < 10; ++i) { |
|
|
|
|
run_threads.push_back(std::make_unique<TestThread>(&lock)); |
|
|
|
|
run_threads.push_back(std::make_unique<TestThread>(lock.get())); |
|
|
|
|
schedule_threads.push_back( |
|
|
|
|
std::make_unique<TestThreadScheduleAndDrain>(&lock)); |
|
|
|
|
std::make_unique<TestThreadScheduleAndDrain>(lock.get())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
lock.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Tests that work serializers allow destruction from the last callback
|
|
|
|
@ -228,6 +244,7 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRace) { |
|
|
|
|
lock->Run([&]() { notification.Notify(); }, DEBUG_LOCATION); |
|
|
|
|
t1.join(); |
|
|
|
|
} |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Tests racy conditions when the last callback triggers work
|
|
|
|
@ -248,6 +265,7 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) { |
|
|
|
|
for (auto& thread : threads) { |
|
|
|
|
thread.join(); |
|
|
|
|
} |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST(WorkSerializerTest, MetricsWork) { |
|
|
|
@ -255,11 +273,11 @@ TEST(WorkSerializerTest, MetricsWork) { |
|
|
|
|
GTEST_SKIP() << "Work serializer dispatch experiment not enabled"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
WorkSerializer serializer(GetDefaultEventEngine()); |
|
|
|
|
auto serializer = std::make_unique<WorkSerializer>(GetDefaultEventEngine()); |
|
|
|
|
auto schedule_sleep = [&serializer](absl::Duration how_long) { |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto n = std::make_shared<Notification>(); |
|
|
|
|
serializer.Run( |
|
|
|
|
serializer->Run( |
|
|
|
|
[how_long, n]() { |
|
|
|
|
absl::SleepFor(how_long); |
|
|
|
|
n->Notify(); |
|
|
|
@ -349,6 +367,9 @@ TEST(WorkSerializerTest, MetricsWork) { |
|
|
|
|
.Percentile(0.5), |
|
|
|
|
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs) |
|
|
|
|
.Percentile(0.5)); |
|
|
|
|
|
|
|
|
|
serializer.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
@ -395,6 +416,9 @@ TEST(WorkSerializerTest, RunningInWorkSerializer) { |
|
|
|
|
work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION); |
|
|
|
|
done1.WaitForNotification(); |
|
|
|
|
done2.WaitForNotification(); |
|
|
|
|
work_serializer1.reset(); |
|
|
|
|
work_serializer2.reset(); |
|
|
|
|
WaitForSingleOwner(GetDefaultEventEngine()); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|