diff --git a/test/core/gprpp/work_serializer_test.cc b/test/core/gprpp/work_serializer_test.cc index 918c845e153..3f8be452338 100644 --- a/test/core/gprpp/work_serializer_test.cc +++ b/test/core/gprpp/work_serializer_test.cc @@ -74,9 +74,11 @@ TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) { gpr_event done; gpr_event_init(&done); lock->Schedule( - [&done]() { gpr_event_set(&done, reinterpret_cast(1)); }, + [&done]() { + EXPECT_EQ(gpr_event_get(&done), nullptr); + gpr_event_set(&done, reinterpret_cast(1)); + }, DEBUG_LOCATION); - EXPECT_EQ(gpr_event_get(&done), nullptr); lock->DrainQueue(); EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != nullptr); @@ -288,6 +290,9 @@ TEST(WorkSerializerTest, MetricsWork) { auto before = global_stats().Collect(); auto stats_diff_from = [&before](absl::AnyInvocable f) { f(); + // Insert a pause for the work serialier to update the stats. Reading stats + // here can still race with the work serializer's update attempt. + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); auto after = global_stats().Collect(); auto diff = after->Diff(*before); before = std::move(after); diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index 3f9e2bb9615..7a699bcfcb0 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -2707,6 +2707,7 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) { } TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { + const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor(); InitXdsClient(); // Start watches for "foo1" and "foo2". auto watcher1 = StartFooWatch("foo1"); @@ -2759,11 +2760,11 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); - EXPECT_EQ(stream->reads_started(), 1); + EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); resource1->read_delay_handle.reset(); - EXPECT_EQ(stream->reads_started(), 1); + EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); resource2->read_delay_handle.reset(); - EXPECT_EQ(stream->reads_started(), 2); + EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); resource1 = watcher1->WaitForNextResourceAndHandle(); ASSERT_NE(resource1, absl::nullopt); EXPECT_EQ(resource1->resource->name, "foo1"); @@ -2776,9 +2777,9 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); - EXPECT_EQ(stream->reads_started(), 2); + EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); resource1->read_delay_handle.reset(); - EXPECT_EQ(stream->reads_started(), 3); + EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout)); // Cancel watch. CancelFooWatch(watcher1.get(), "foo1"); request = WaitForRequest(stream.get()); diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc index bc1134f08b6..1153d870151 100644 --- a/test/core/xds/xds_transport_fake.cc +++ b/test/core/xds/xds_transport_fake.cc @@ -81,7 +81,7 @@ void FakeXdsTransportFactory::FakeStreamingCall::SendMessage( MutexLock lock(&mu_); GPR_ASSERT(!orphaned_); from_client_messages_.push_back(std::move(payload)); - cv_.Signal(); + cv_client_msg_.Signal(); if (transport_->auto_complete_messages_from_client()) { CompleteSendMessageFromClientLocked(/*ok=*/true); } @@ -97,7 +97,8 @@ FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient( absl::Duration timeout) { MutexLock lock(&mu_); while (from_client_messages_.empty()) { - if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { + if (cv_client_msg_.WaitWithTimeout(&mu_, + timeout * grpc_test_slowdown_factor())) { return absl::nullopt; } } @@ -133,6 +134,7 @@ void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() { } ++reads_started_; ++num_pending_reads_; + cv_reads_started_.SignalAll(); if (!to_client_messages_.empty()) { // Dispatch pending message (if there's one) on a separate thread to avoid // recursion diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h index c9b24a536ba..a3f50caae25 100644 --- a/test/core/xds/xds_transport_fake.h +++ b/test/core/xds/xds_transport_fake.h @@ -88,9 +88,15 @@ class FakeXdsTransportFactory : public XdsTransportFactory { bool Orphaned(); - size_t reads_started() { + bool WaitForReadsStarted(size_t expected, absl::Duration timeout) { MutexLock lock(&mu_); - return reads_started_; + const absl::Time deadline = absl::Now() + timeout; + do { + if (reads_started_ == expected) { + return true; + } + } while (!cv_reads_started_.WaitWithDeadline(&mu_, deadline)); + return false; } private: @@ -122,7 +128,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory { const char* method_; Mutex mu_; - CondVar cv_; + CondVar cv_reads_started_; + CondVar cv_client_msg_; RefCountedPtr event_handler_ ABSL_GUARDED_BY(&mu_); std::deque from_client_messages_ ABSL_GUARDED_BY(&mu_); bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;