improvements

pull/36048/head
Craig Tiller 9 months ago
parent cd1813542a
commit daf6220d42
  1. 28
      src/core/lib/promise/party.cc
  2. 1
      src/core/lib/promise/party.h
  3. 27
      test/core/promise/party_test.cc

@ -39,7 +39,7 @@ namespace grpc_core {
namespace {
// TODO(ctiller): Once all activities are parties we can remove this.
thread_local Party* g_current_party_ = nullptr;
thread_local Party** g_current_party_run_next = nullptr;
} // namespace
///////////////////////////////////////////////////////////////////////////////
@ -219,21 +219,27 @@ void Party::RunLocked() {
// but instead add it to the end of the list of parties to run.
// This enables a fairly straightforward batching of work from a
// call to a transport (or back again).
if (g_current_party_ != nullptr) {
Party* after = g_current_party_;
while (after->run_next_ != nullptr) {
after = after->run_next_;
if (g_current_party_run_next != nullptr) {
if (*g_current_party_run_next == nullptr) {
*g_current_party_run_next = this;
} else {
// But if there's already a party queued, we're better off asking event
// engine to run it so we can spread load.
event_engine()->Run([this]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
});
}
after->run_next_ = this;
return;
}
auto body = [this]() {
GPR_DEBUG_ASSERT(g_current_party_ == nullptr);
g_current_party_ = this;
GPR_DEBUG_ASSERT(g_current_party_run_next == nullptr);
Party* run_next = nullptr;
g_current_party_run_next = &run_next;
const bool done = RunParty();
GPR_DEBUG_ASSERT(g_current_party_ == this);
Party* run_next = std::exchange(run_next_, nullptr);
g_current_party_ = nullptr;
GPR_DEBUG_ASSERT(g_current_party_run_next == &run_next);
g_current_party_run_next = nullptr;
if (done) {
ScopedActivity activity(this);
PartyOver();

@ -628,7 +628,6 @@ class Party : public Activity, private Wakeable {
Arena* const arena_;
uint8_t currently_polling_ = kNotPolling;
Party* run_next_ = nullptr;
// All current participants, using a tagged format.
// If the lower bit is unset, then this is a Participant*.
// If the lower bit is set, then this is a ParticipantFactory*.

@ -788,8 +788,12 @@ TEST_F(PartyTest, ThreadStressTestWithInnerSpawn) {
TEST_F(PartyTest, NestedWakeup) {
auto party1 = MakeRefCounted<TestParty>();
auto party2 = MakeRefCounted<TestParty>();
auto party3 = MakeRefCounted<TestParty>();
int whats_going_on = 0;
Notification n;
Notification started2;
Notification done2;
Notification started3;
Notification notify_done;
party1->Spawn(
"p1",
[&]() {
@ -798,6 +802,8 @@ TEST_F(PartyTest, NestedWakeup) {
party2->Spawn(
"p2",
[&]() {
started2.Notify();
started3.WaitForNotification();
EXPECT_EQ(whats_going_on, 3);
whats_going_on = 4;
return Empty{};
@ -805,7 +811,22 @@ TEST_F(PartyTest, NestedWakeup) {
[&](Empty) {
EXPECT_EQ(whats_going_on, 4);
whats_going_on = 5;
n.Notify();
done2.Notify();
});
party3->Spawn(
"p3",
[&]() {
started2.WaitForNotification();
started3.Notify();
done2.WaitForNotification();
EXPECT_EQ(whats_going_on, 5);
whats_going_on = 6;
return Empty{};
},
[&](Empty) {
EXPECT_EQ(whats_going_on, 6);
whats_going_on = 7;
notify_done.Notify();
});
EXPECT_EQ(whats_going_on, 1);
whats_going_on = 2;
@ -815,7 +836,7 @@ TEST_F(PartyTest, NestedWakeup) {
EXPECT_EQ(whats_going_on, 2);
whats_going_on = 3;
});
n.WaitForNotification();
notify_done.WaitForNotification();
}
} // namespace grpc_core

Loading…
Cancel
Save