mirror of https://github.com/grpc/grpc.git
The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
6.7 KiB
242 lines
6.7 KiB
// Copyright 2024 gRPC authors. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
#include "src/core/lib/promise/observable.h" |
|
|
|
#include <cstdint> |
|
#include <limits> |
|
#include <thread> |
|
#include <vector> |
|
|
|
#include "absl/strings/str_join.h" |
|
#include "gmock/gmock.h" |
|
#include "gtest/gtest.h" |
|
|
|
#include "src/core/lib/gprpp/notification.h" |
|
#include "src/core/lib/promise/loop.h" |
|
#include "src/core/lib/promise/map.h" |
|
|
|
using testing::Mock; |
|
using testing::StrictMock; |
|
|
|
namespace grpc_core { |
|
namespace { |
|
|
|
class MockActivity : public Activity, public Wakeable { |
|
public: |
|
MOCK_METHOD(void, WakeupRequested, ()); |
|
|
|
void ForceImmediateRepoll(WakeupMask) override { WakeupRequested(); } |
|
void Orphan() override {} |
|
Waker MakeOwningWaker() override { return Waker(this, 0); } |
|
Waker MakeNonOwningWaker() override { return Waker(this, 0); } |
|
void Wakeup(WakeupMask) override { WakeupRequested(); } |
|
void WakeupAsync(WakeupMask) override { WakeupRequested(); } |
|
void Drop(WakeupMask) override {} |
|
std::string DebugTag() const override { return "MockActivity"; } |
|
std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); } |
|
|
|
void Activate() { |
|
if (scoped_activity_ != nullptr) return; |
|
scoped_activity_ = std::make_unique<ScopedActivity>(this); |
|
} |
|
|
|
void Deactivate() { scoped_activity_.reset(); } |
|
|
|
private: |
|
std::unique_ptr<ScopedActivity> scoped_activity_; |
|
}; |
|
|
|
MATCHER(IsPending, "") { |
|
if (arg.ready()) { |
|
*result_listener << "is ready"; |
|
return false; |
|
} |
|
return true; |
|
} |
|
|
|
MATCHER(IsReady, "") { |
|
if (arg.pending()) { |
|
*result_listener << "is pending"; |
|
return false; |
|
} |
|
return true; |
|
} |
|
|
|
MATCHER_P(IsReady, value, "") { |
|
if (arg.pending()) { |
|
*result_listener << "is pending"; |
|
return false; |
|
} |
|
if (arg.value() != value) { |
|
*result_listener << "is " << ::testing::PrintToString(arg.value()); |
|
return false; |
|
} |
|
return true; |
|
} |
|
|
|
TEST(ObservableTest, ImmediateNext) { |
|
Observable<int> observable(1); |
|
auto next = observable.Next(0); |
|
EXPECT_THAT(next(), IsReady(1)); |
|
} |
|
|
|
TEST(ObservableTest, SetBecomesImmediateNext1) { |
|
Observable<int> observable(0); |
|
auto next = observable.Next(0); |
|
observable.Set(1); |
|
EXPECT_THAT(next(), IsReady(1)); |
|
} |
|
|
|
TEST(ObservableTest, SetBecomesImmediateNext2) { |
|
Observable<int> observable(0); |
|
observable.Set(1); |
|
auto next = observable.Next(0); |
|
EXPECT_THAT(next(), IsReady(1)); |
|
} |
|
|
|
TEST(ObservableTest, SameValueGetsPending) { |
|
StrictMock<MockActivity> activity; |
|
activity.Activate(); |
|
Observable<int> observable(1); |
|
auto next = observable.Next(1); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_THAT(next(), IsPending()); |
|
} |
|
|
|
TEST(ObservableTest, ChangeValueWakesUp) { |
|
StrictMock<MockActivity> activity; |
|
activity.Activate(); |
|
Observable<int> observable(1); |
|
auto next = observable.Next(1); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_CALL(activity, WakeupRequested()); |
|
observable.Set(2); |
|
Mock::VerifyAndClearExpectations(&activity); |
|
EXPECT_THAT(next(), IsReady(2)); |
|
} |
|
|
|
TEST(ObservableTest, NextWhen) { |
|
StrictMock<MockActivity> activity; |
|
activity.Activate(); |
|
Observable<int> observable(1); |
|
auto next = observable.NextWhen([](int i) { return i == 3; }); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_CALL(activity, WakeupRequested()); |
|
observable.Set(2); |
|
EXPECT_THAT(next(), IsPending()); |
|
EXPECT_CALL(activity, WakeupRequested()); |
|
observable.Set(3); |
|
Mock::VerifyAndClearExpectations(&activity); |
|
EXPECT_THAT(next(), IsReady(3)); |
|
} |
|
|
|
TEST(ObservableTest, MultipleActivitiesWakeUp) { |
|
StrictMock<MockActivity> activity1; |
|
StrictMock<MockActivity> activity2; |
|
Observable<int> observable(1); |
|
auto next1 = observable.Next(1); |
|
auto next2 = observable.Next(1); |
|
{ |
|
activity1.Activate(); |
|
EXPECT_THAT(next1(), IsPending()); |
|
} |
|
{ |
|
activity2.Activate(); |
|
EXPECT_THAT(next2(), IsPending()); |
|
} |
|
EXPECT_CALL(activity1, WakeupRequested()); |
|
EXPECT_CALL(activity2, WakeupRequested()); |
|
observable.Set(2); |
|
Mock::VerifyAndClearExpectations(&activity1); |
|
Mock::VerifyAndClearExpectations(&activity2); |
|
EXPECT_THAT(next1(), IsReady(2)); |
|
EXPECT_THAT(next2(), IsReady(2)); |
|
} |
|
|
|
class ThreadWakeupScheduler { |
|
public: |
|
template <typename ActivityType> |
|
class BoundScheduler { |
|
public: |
|
explicit BoundScheduler(ThreadWakeupScheduler) {} |
|
void ScheduleWakeup() { |
|
std::thread t( |
|
[this] { static_cast<ActivityType*>(this)->RunScheduledWakeup(); }); |
|
t.detach(); |
|
} |
|
}; |
|
}; |
|
|
|
TEST(ObservableTest, Stress) { |
|
static constexpr uint64_t kEnd = std::numeric_limits<uint64_t>::max(); |
|
std::vector<uint64_t> values1; |
|
std::vector<uint64_t> values2; |
|
uint64_t current1 = 0; |
|
uint64_t current2 = 0; |
|
Notification done1; |
|
Notification done2; |
|
Observable<uint64_t> observable(0); |
|
auto activity1 = MakeActivity( |
|
Loop([&observable, ¤t1, &values1] { |
|
return Map( |
|
observable.Next(current1), |
|
[&values1, ¤t1](uint64_t value) -> LoopCtl<absl::Status> { |
|
values1.push_back(value); |
|
current1 = value; |
|
if (value == kEnd) return absl::OkStatus(); |
|
return Continue{}; |
|
}); |
|
}), |
|
ThreadWakeupScheduler(), [&done1](absl::Status status) { |
|
EXPECT_TRUE(status.ok()) << status.ToString(); |
|
done1.Notify(); |
|
}); |
|
auto activity2 = MakeActivity( |
|
Loop([&observable, ¤t2, &values2] { |
|
return Map( |
|
observable.Next(current2), |
|
[&values2, ¤t2](uint64_t value) -> LoopCtl<absl::Status> { |
|
values2.push_back(value); |
|
current2 = value; |
|
if (value == kEnd) return absl::OkStatus(); |
|
return Continue{}; |
|
}); |
|
}), |
|
ThreadWakeupScheduler(), [&done2](absl::Status status) { |
|
EXPECT_TRUE(status.ok()) << status.ToString(); |
|
done2.Notify(); |
|
}); |
|
for (uint64_t i = 0; i < 1000000; i++) { |
|
observable.Set(i); |
|
} |
|
observable.Set(kEnd); |
|
done1.WaitForNotification(); |
|
done2.WaitForNotification(); |
|
ASSERT_GE(values1.size(), 1); |
|
ASSERT_GE(values2.size(), 1); |
|
EXPECT_EQ(values1.back(), kEnd); |
|
EXPECT_EQ(values2.back(), kEnd); |
|
} |
|
|
|
} // namespace |
|
} // namespace grpc_core |
|
|
|
int main(int argc, char** argv) { |
|
gpr_log_verbosity_init(); |
|
::testing::InitGoogleTest(&argc, argv); |
|
return RUN_ALL_TESTS(); |
|
}
|
|
|