Activities wakeup logic tweak (#27453)

* wakeup tweak

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/27378/head^2
Craig Tiller 3 years ago committed by GitHub
parent c5bed3f365
commit a419687f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 77
      src/core/lib/promise/activity.h
  2. 35
      test/core/promise/activity_test.cc
  3. 2
      test/core/promise/for_each_test.cc
  4. 2
      test/core/promise/latch_test.cc
  5. 6
      test/core/promise/observable_test.cc
  6. 10
      test/core/promise/pipe_test.cc

@ -95,8 +95,6 @@ class Waker {
// Activity execution may be cancelled by simply deleting the activity. In such
// a case, if execution had not already finished, the done callback would be
// called with absl::CancelledError().
// Activity also takes a CallbackScheduler instance on which to schedule
// callbacks to itself in a lock-clean environment.
class Activity : private Wakeable {
public:
// Cancel execution of the underlying promise.
@ -280,17 +278,27 @@ class EnterContexts : public promise_detail::Context<Contexts>... {
};
// Implementation details for an Activity of an arbitrary type of promise.
template <class F, class CallbackScheduler, class OnDone, typename... Contexts>
// There should exist a static function:
// struct WakeupScheduler {
// template <typename ActivityType>
// static void ScheduleWakeup(WakeupScheduler*, ActivityType* activity);
// };
// This function should arrange that activity->RunScheduledWakeup() be invoked
// at the earliest opportunity.
// It can assume that activity will remain live until RunScheduledWakeup() is
// invoked, and that a given activity will not be concurrently scheduled again
// until its RunScheduledWakeup() has been invoked.
template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
class PromiseActivity final
: public Activity,
private promise_detail::ContextHolder<Contexts>... {
public:
using Factory = PromiseFactory<void, F>;
PromiseActivity(F promise_factory, CallbackScheduler callback_scheduler,
PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
OnDone on_done, Contexts... contexts)
: Activity(),
ContextHolder<Contexts>(std::move(contexts))...,
callback_scheduler_(std::move(callback_scheduler)),
wakeup_scheduler_(std::move(wakeup_scheduler)),
on_done_(std::move(on_done)) {
// Lock, construct an initial promise from the factory, and step it.
// This may hit a waiter, which could expose our this pointer to other
@ -332,6 +340,12 @@ class PromiseActivity final
}
}
void RunScheduledWakeup() {
GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_relaxed));
Step();
WakeupComplete();
}
private:
// Wakeup this activity. Arrange to poll the activity again at a convenient
// time: this could be inline if it's deemed safe, or it could be by passing
@ -346,11 +360,10 @@ class PromiseActivity final
WakeupComplete();
return;
}
// Can't safely run, so ask to run later.
callback_scheduler_([this]() {
this->Step();
this->WakeupComplete();
});
if (!wakeup_scheduled_.exchange(true, std::memory_order_relaxed)) {
// Can't safely run, so ask to run later.
wakeup_scheduler_.ScheduleWakeup(this);
}
}
// Drop a wakeup
@ -429,6 +442,14 @@ class PromiseActivity final
}
using Promise = typename Factory::Promise;
// Scheduler for wakeups
GPR_NO_UNIQUE_ADDRESS WakeupScheduler wakeup_scheduler_;
// Callback on completion of the promise.
GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
// Has execution completed?
GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu_) = false;
// Is there a wakeup scheduled?
GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
// We wrap the promise in a union to allow control over the construction
// simultaneously with annotating mutex requirements and noting that the
// promise contained may not use any memory.
@ -438,45 +459,41 @@ class PromiseActivity final
GPR_NO_UNIQUE_ADDRESS Promise promise;
};
GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu_);
// Schedule callbacks on some external executor.
GPR_NO_UNIQUE_ADDRESS CallbackScheduler callback_scheduler_;
// Callback on completion of the promise.
GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
// Has execution completed?
GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu_) = false;
};
} // namespace promise_detail
// Given a functor that returns a promise (a promise factory), a callback for
// completion, and a callback scheduler, construct an activity.
template <typename Factory, typename CallbackScheduler, typename OnDone,
template <typename Factory, typename WakeupScheduler, typename OnDone,
typename... Contexts>
ActivityPtr MakeActivity(Factory promise_factory,
CallbackScheduler callback_scheduler, OnDone on_done,
WakeupScheduler wakeup_scheduler, OnDone on_done,
Contexts... contexts) {
return ActivityPtr(
new promise_detail::PromiseActivity<Factory, CallbackScheduler, OnDone,
new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
Contexts...>(
std::move(promise_factory), std::move(callback_scheduler),
std::move(promise_factory), std::move(wakeup_scheduler),
std::move(on_done), std::move(contexts)...));
}
// A callback scheduler that simply crashes
// Useful for very limited tests
struct NoCallbackScheduler {
template <typename F>
void operator()(F) {
// A wakeup scheduler that simply crashes.
// Useful for very limited tests.
struct NoWakeupScheduler {
template <typename ActivityType>
void ScheduleWakeup(ActivityType*) {
abort();
}
};
// A callback scheduler that simply runs the callback
// A wakeup scheduler that simply runs the callback immediately.
// Useful for unit testing, probably not so much for real systems due to lock
// ordering problems
class InlineCallbackScheduler {
public:
void operator()(std::function<void()> f) { f(); }
// ordering problems.
struct InlineWakeupScheduler {
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
activity->RunScheduledWakeup();
}
};
} // namespace grpc_core

@ -35,6 +35,14 @@ class MockCallbackScheduler {
MOCK_METHOD(void, Schedule, (std::function<void()>));
};
struct UseMockCallbackScheduler {
MockCallbackScheduler* scheduler;
template <typename ActivityType>
void ScheduleWakeup(ActivityType* activity) {
scheduler->Schedule([activity] { activity->RunScheduledWakeup(); });
}
};
// A simple Barrier type: stalls progress until it is 'cleared'.
class Barrier {
public:
@ -101,7 +109,7 @@ TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] { return [] { return absl::OkStatus(); }; }, NoCallbackScheduler(),
[] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -109,8 +117,7 @@ TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::CancelledError()));
MakeActivity(
[] { return [] { return absl::CancelledError(); }; },
NoCallbackScheduler(),
[] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -119,7 +126,7 @@ TEST(ActivityTest, DropImmediately) {
EXPECT_CALL(on_done, Call(absl::CancelledError()));
MakeActivity(
[] { return []() -> Poll<absl::Status> { return Pending(); }; },
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -127,7 +134,7 @@ TEST(ActivityTest, Cancel) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
auto activity = MakeActivity(
[] { return []() -> Poll<absl::Status> { return Pending(); }; },
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::CancelledError()));
activity->Cancel();
@ -153,7 +160,7 @@ TYPED_TEST(BarrierTest, Barrier) {
return absl::OkStatus();
});
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Clearing the barrier should let the activity proceed to return a result.
EXPECT_CALL(on_done, Call(absl::OkStatus()));
@ -175,7 +182,7 @@ TYPED_TEST(BarrierTest, BarrierPing) {
return absl::OkStatus();
});
},
[&scheduler1](std::function<void()> f) { scheduler1.Schedule(f); },
UseMockCallbackScheduler{&scheduler1},
[&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
auto activity2 = MakeActivity(
[&b2] {
@ -183,7 +190,7 @@ TYPED_TEST(BarrierTest, BarrierPing) {
return absl::OkStatus();
});
},
[&scheduler2](std::function<void()> f) { scheduler2.Schedule(f); },
UseMockCallbackScheduler{&scheduler2},
[&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
// Since barrier triggers inside activity1 promise, activity2 wakeup will be
// scheduled from a callback.
@ -215,7 +222,7 @@ TYPED_TEST(BarrierTest, WakeSelf) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -230,7 +237,7 @@ TYPED_TEST(BarrierTest, WakeAfterDestruction) {
return absl::OkStatus();
});
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
b.Clear();
@ -251,7 +258,7 @@ TEST(ActivityTest, ForceWakeup) {
abort();
}
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus()));
activity->ForceWakeup();
@ -272,7 +279,7 @@ TEST(ActivityTest, WithContext) {
*GetContext<TestContext>()->done = true;
return Immediate(absl::OkStatus());
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
TestContext{&done});
EXPECT_TRUE(done);
@ -296,7 +303,7 @@ TEST(ActivityTest, CanCancelDuringExecution) {
abort();
}
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::CancelledError()));
@ -321,7 +328,7 @@ TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
abort();
}
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus()));

@ -57,7 +57,7 @@ TEST(ForEachTest, SendThriceWithPipe) {
})),
JustElem<1>());
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
Mock::VerifyAndClearExpectations(&on_done);
EXPECT_EQ(num_received, 3);

@ -41,7 +41,7 @@ TEST(LatchTest, Works) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}

@ -65,7 +65,7 @@ TEST(ObservableTest, CanPushAndGet) {
return i == 42 ? absl::OkStatus() : absl::UnknownError("expected 42");
});
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
EXPECT_CALL(on_done, Call(absl::OkStatus()));
observable.Push(42);
@ -88,7 +88,7 @@ TEST(ObservableTest, CanNext) {
: absl::UnknownError("expected 1");
});
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
observable.Push(42);
EXPECT_CALL(on_done, Call(absl::OkStatus()));
@ -112,7 +112,7 @@ TEST(ObservableTest, CanWatch) {
}
});
},
InlineCallbackScheduler(),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
observable.Push(1);
observable.Push(2);

@ -44,7 +44,7 @@ TEST(PipeTest, CanSendAndReceive) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -64,7 +64,7 @@ TEST(PipeTest, CanReceiveAndSend) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -95,7 +95,7 @@ TEST(PipeTest, CanSeeClosedOnSend) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -125,7 +125,7 @@ TEST(PipeTest, CanSeeClosedOnReceive) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
@ -167,7 +167,7 @@ TEST(PipeTest, CanFilter) {
return absl::OkStatus();
});
},
NoCallbackScheduler(),
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}

Loading…
Cancel
Save