Activities: Don't execute inline (#27341)

* Activities: Don't execute inline

* fix test
pull/27321/head
Craig Tiller 4 years ago committed by GitHub
parent 8545cfa1cb
commit c6a20601dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/core/lib/promise/activity.h
  2. 25
      test/core/promise/activity_test.cc
  3. 6
      test/core/promise/observable_test.cc

@ -310,12 +310,6 @@ class PromiseActivity final
// running on this thread, a note is taken of such and the activity is // running on this thread, a note is taken of such and the activity is
// repolled if it doesn't complete. // repolled if it doesn't complete.
void Wakeup() final { void Wakeup() final {
// If there's no active activity, we can just run inline.
if (!Activity::have_current()) {
Step();
WakeupComplete();
return;
}
// If there is an active activity, but hey it's us, flag that and we'll loop // If there is an active activity, but hey it's us, flag that and we'll loop
// in RunLoop (that's calling from above here!). // in RunLoop (that's calling from above here!).
if (Activity::is_current()) { if (Activity::is_current()) {
@ -432,6 +426,7 @@ ActivityPtr MakeActivity(Factory promise_factory,
} }
// A callback scheduler that simply crashes // A callback scheduler that simply crashes
// Useful for very limited tests
struct NoCallbackScheduler { struct NoCallbackScheduler {
template <typename F> template <typename F>
void operator()(F) { void operator()(F) {
@ -439,6 +434,14 @@ struct NoCallbackScheduler {
} }
}; };
// A callback scheduler that simply runs the callback
// Useful for unit testing, probably not so much for real systems due to lock
// ordering problems
class InlineCallbackScheduler {
public:
void operator()(std::function<void()> f) { f(); }
};
} // namespace grpc_core } // namespace grpc_core
#endif // GRPC_CORE_LIB_PROMISE_ACTIVITY_H #endif // GRPC_CORE_LIB_PROMISE_ACTIVITY_H

@ -153,7 +153,7 @@ TYPED_TEST(BarrierTest, Barrier) {
return absl::OkStatus(); return absl::OkStatus();
}); });
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Clearing the barrier should let the activity proceed to return a result. // Clearing the barrier should let the activity proceed to return a result.
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
@ -165,7 +165,8 @@ TYPED_TEST(BarrierTest, BarrierPing) {
typename TestFixture::Type b2; typename TestFixture::Type b2;
StrictMock<MockFunction<void(absl::Status)>> on_done1; StrictMock<MockFunction<void(absl::Status)>> on_done1;
StrictMock<MockFunction<void(absl::Status)>> on_done2; StrictMock<MockFunction<void(absl::Status)>> on_done2;
MockCallbackScheduler scheduler; MockCallbackScheduler scheduler1;
MockCallbackScheduler scheduler2;
auto activity1 = MakeActivity( auto activity1 = MakeActivity(
[&b1, &b2] { [&b1, &b2] {
return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) { return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
@ -174,7 +175,7 @@ TYPED_TEST(BarrierTest, BarrierPing) {
return absl::OkStatus(); return absl::OkStatus();
}); });
}, },
[&scheduler](std::function<void()> f) { scheduler.Schedule(f); }, [&scheduler1](std::function<void()> f) { scheduler1.Schedule(f); },
[&on_done1](absl::Status status) { on_done1.Call(std::move(status)); }); [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
auto activity2 = MakeActivity( auto activity2 = MakeActivity(
[&b2] { [&b2] {
@ -182,17 +183,21 @@ TYPED_TEST(BarrierTest, BarrierPing) {
return absl::OkStatus(); return absl::OkStatus();
}); });
}, },
[&scheduler](std::function<void()> f) { scheduler.Schedule(f); }, [&scheduler2](std::function<void()> f) { scheduler2.Schedule(f); },
[&on_done2](absl::Status status) { on_done2.Call(std::move(status)); }); [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
EXPECT_CALL(on_done1, Call(absl::OkStatus()));
// Since barrier triggers inside activity1 promise, activity2 wakeup will be // Since barrier triggers inside activity1 promise, activity2 wakeup will be
// scheduled from a callback. // scheduled from a callback.
std::function<void()> cb; std::function<void()> cb1;
EXPECT_CALL(scheduler, Schedule(_)).WillOnce(SaveArg<0>(&cb)); std::function<void()> cb2;
EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
b1.Clear(); b1.Clear();
Mock::VerifyAndClearExpectations(&scheduler1);
EXPECT_CALL(on_done1, Call(absl::OkStatus()));
EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
cb1();
Mock::VerifyAndClearExpectations(&on_done1); Mock::VerifyAndClearExpectations(&on_done1);
EXPECT_CALL(on_done2, Call(absl::OkStatus())); EXPECT_CALL(on_done2, Call(absl::OkStatus()));
cb(); cb2();
} }
TYPED_TEST(BarrierTest, WakeSelf) { TYPED_TEST(BarrierTest, WakeSelf) {
@ -225,7 +230,7 @@ TYPED_TEST(BarrierTest, WakeAfterDestruction) {
return absl::OkStatus(); return absl::OkStatus();
}); });
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
} }
b.Clear(); b.Clear();
@ -246,7 +251,7 @@ TEST(ActivityTest, ForceWakeup) {
abort(); abort();
} }
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
activity->ForceWakeup(); activity->ForceWakeup();

@ -65,7 +65,7 @@ TEST(ObservableTest, CanPushAndGet) {
return i == 42 ? absl::OkStatus() : absl::UnknownError("expected 42"); return i == 42 ? absl::OkStatus() : absl::UnknownError("expected 42");
}); });
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
observable.Push(42); observable.Push(42);
@ -88,7 +88,7 @@ TEST(ObservableTest, CanNext) {
: absl::UnknownError("expected 1"); : absl::UnknownError("expected 1");
}); });
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
observable.Push(42); observable.Push(42);
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
@ -112,7 +112,7 @@ TEST(ObservableTest, CanWatch) {
} }
}); });
}, },
NoCallbackScheduler(), InlineCallbackScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
observable.Push(1); observable.Push(1);
observable.Push(2); observable.Push(2);

Loading…
Cancel
Save