[work-serializer] deflake tests with dispatching workserializer

PiperOrigin-RevId: 606292107
pull/35869/head
Yousuk Seung 10 months ago committed by Copybara-Service
parent 41606054c2
commit 3459e5c7b8
  1. 9
      test/core/gprpp/work_serializer_test.cc
  2. 11
      test/core/xds/xds_client_test.cc
  3. 6
      test/core/xds/xds_transport_fake.cc
  4. 13
      test/core/xds/xds_transport_fake.h

@ -74,9 +74,11 @@ TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
gpr_event done; gpr_event done;
gpr_event_init(&done); gpr_event_init(&done);
lock->Schedule( lock->Schedule(
[&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); }, [&done]() {
EXPECT_EQ(gpr_event_get(&done), nullptr);
gpr_event_set(&done, reinterpret_cast<void*>(1));
},
DEBUG_LOCATION); DEBUG_LOCATION);
EXPECT_EQ(gpr_event_get(&done), nullptr);
lock->DrainQueue(); lock->DrainQueue();
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr); nullptr);
@ -288,6 +290,9 @@ TEST(WorkSerializerTest, MetricsWork) {
auto before = global_stats().Collect(); auto before = global_stats().Collect();
auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) { auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
f(); f();
// Insert a pause for the work serialier to update the stats. Reading stats
// here can still race with the work serializer's update attempt.
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
auto after = global_stats().Collect(); auto after = global_stats().Collect();
auto diff = after->Diff(*before); auto diff = after->Diff(*before);
before = std::move(after); before = std::move(after);

@ -2707,6 +2707,7 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
} }
TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor();
InitXdsClient(); InitXdsClient();
// Start watches for "foo1" and "foo2". // Start watches for "foo1" and "foo2".
auto watcher1 = StartFooWatch("foo1"); auto watcher1 = StartFooWatch("foo1");
@ -2759,11 +2760,11 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
/*version_info=*/"1", /*response_nonce=*/"A", /*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(), /*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"}); /*resource_names=*/{"foo1", "foo2"});
EXPECT_EQ(stream->reads_started(), 1); EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
resource1->read_delay_handle.reset(); resource1->read_delay_handle.reset();
EXPECT_EQ(stream->reads_started(), 1); EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout));
resource2->read_delay_handle.reset(); resource2->read_delay_handle.reset();
EXPECT_EQ(stream->reads_started(), 2); EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
resource1 = watcher1->WaitForNextResourceAndHandle(); resource1 = watcher1->WaitForNextResourceAndHandle();
ASSERT_NE(resource1, absl::nullopt); ASSERT_NE(resource1, absl::nullopt);
EXPECT_EQ(resource1->resource->name, "foo1"); EXPECT_EQ(resource1->resource->name, "foo1");
@ -2776,9 +2777,9 @@ TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
/*version_info=*/"2", /*response_nonce=*/"B", /*version_info=*/"2", /*response_nonce=*/"B",
/*error_detail=*/absl::OkStatus(), /*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1", "foo2"}); /*resource_names=*/{"foo1", "foo2"});
EXPECT_EQ(stream->reads_started(), 2); EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout));
resource1->read_delay_handle.reset(); resource1->read_delay_handle.reset();
EXPECT_EQ(stream->reads_started(), 3); EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout));
// Cancel watch. // Cancel watch.
CancelFooWatch(watcher1.get(), "foo1"); CancelFooWatch(watcher1.get(), "foo1");
request = WaitForRequest(stream.get()); request = WaitForRequest(stream.get());

@ -81,7 +81,7 @@ void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
MutexLock lock(&mu_); MutexLock lock(&mu_);
GPR_ASSERT(!orphaned_); GPR_ASSERT(!orphaned_);
from_client_messages_.push_back(std::move(payload)); from_client_messages_.push_back(std::move(payload));
cv_.Signal(); cv_client_msg_.Signal();
if (transport_->auto_complete_messages_from_client()) { if (transport_->auto_complete_messages_from_client()) {
CompleteSendMessageFromClientLocked(/*ok=*/true); CompleteSendMessageFromClientLocked(/*ok=*/true);
} }
@ -97,7 +97,8 @@ FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient(
absl::Duration timeout) { absl::Duration timeout) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
while (from_client_messages_.empty()) { while (from_client_messages_.empty()) {
if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { if (cv_client_msg_.WaitWithTimeout(&mu_,
timeout * grpc_test_slowdown_factor())) {
return absl::nullopt; return absl::nullopt;
} }
} }
@ -133,6 +134,7 @@ void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
} }
++reads_started_; ++reads_started_;
++num_pending_reads_; ++num_pending_reads_;
cv_reads_started_.SignalAll();
if (!to_client_messages_.empty()) { if (!to_client_messages_.empty()) {
// Dispatch pending message (if there's one) on a separate thread to avoid // Dispatch pending message (if there's one) on a separate thread to avoid
// recursion // recursion

@ -88,9 +88,15 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
bool Orphaned(); bool Orphaned();
size_t reads_started() { bool WaitForReadsStarted(size_t expected, absl::Duration timeout) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
return reads_started_; const absl::Time deadline = absl::Now() + timeout;
do {
if (reads_started_ == expected) {
return true;
}
} while (!cv_reads_started_.WaitWithDeadline(&mu_, deadline));
return false;
} }
private: private:
@ -122,7 +128,8 @@ class FakeXdsTransportFactory : public XdsTransportFactory {
const char* method_; const char* method_;
Mutex mu_; Mutex mu_;
CondVar cv_; CondVar cv_reads_started_;
CondVar cv_client_msg_;
RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_); RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_);
std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_); std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_);
bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;

Loading…
Cancel
Save